#STEP 1

## 1. **Data Loading and Database Design:**

- **Normalization: Carefully design a normalized database schema to house the sales data effectively. Consider tables for orders, products, and potentially more.**
- **Efficient Loading: Create a script to load the CSV data into your database. Ensure proper data validation and transformation throughout the process.**

In [1]:
import pandas as pd
import sqlite3
import numpy
from datetime import datetime

In [2]:
csv_file_path = '/content/sample_data/sample_csv.csv'

In [3]:
def transform(df):
    df['Date of Sale'] = pd.to_datetime(df['Date of Sale'], errors='coerce')
    # Convert numeric fields
    df['Quantity Sold'] = pd.to_numeric(df['Quantity Sold'], errors='coerce')
    df['Unit Price'] = pd.to_numeric(df['Unit Price'], errors='coerce')
    df['Discount'] = pd.to_numeric(df['Discount'], errors='coerce')
    df['Shipping Cost'] = pd.to_numeric(df['Shipping Cost'], errors='coerce')

    # Drop rows with missing essential fields
    df.dropna(subset=['Order ID', 'Product ID', 'Customer ID', 'Date of Sale', 'Quantity Sold', 'Unit Price'], inplace=True)

    # Fill missing values for non-essential fields if necessary
    df['Discount'].fillna(0, inplace=True)
    df['Shipping Cost'].fillna(0, inplace=True)

    # Remove duplicates
    df.drop_duplicates(subset=['Order ID', 'Product ID'], keep='first', inplace=True)

    df['Total Price'] = df['Quantity Sold'] * df['Unit Price'] - df['Discount']

    return df

chunksize = 100000
for data in pd.read_csv(csv_file_path, chunksize=chunksize):
    # Validate and transform data
    data = transform(data)
    # Insert data into the Orders table
    orders_chunk = data[['Order ID', 'Customer ID', 'Date of Sale', 'Shipping Cost', 'Payment Method', 'Discount']].drop_duplicates()
    # Insert data into the Products table
    products_chunk = data[['Product ID', 'Product Name', 'Category', 'Unit Price']].drop_duplicates()
    # Insert data into the OrderDetails table
    order_details_chunk = data[['Order ID', 'Product ID', 'Quantity Sold', 'Total Price']].drop_duplicates()
    # Insert data into the Customers table
    customers_chunk = data[['Customer ID', 'Customer Name', 'Customer Email', 'Customer Address', 'Region']].drop_duplicates()


In [4]:

conn = sqlite3.connect('lumel.db')

cursor = conn.cursor()

In [5]:
orders_chunk.info()


<class 'pandas.core.frame.DataFrame'>
RangeIndex: 6 entries, 0 to 5
Data columns (total 6 columns):
 #   Column          Non-Null Count  Dtype         
---  ------          --------------  -----         
 0   Order ID        6 non-null      int64         
 1   Customer ID     6 non-null      object        
 2   Date of Sale    6 non-null      datetime64[ns]
 3   Shipping Cost   6 non-null      int64         
 4   Payment Method  6 non-null      object        
 5   Discount        6 non-null      float64       
dtypes: datetime64[ns](1), float64(1), int64(2), object(2)
memory usage: 416.0+ bytes


In [6]:
products_chunk.info()

<class 'pandas.core.frame.DataFrame'>
Index: 4 entries, 0 to 4
Data columns (total 4 columns):
 #   Column        Non-Null Count  Dtype  
---  ------        --------------  -----  
 0   Product ID    4 non-null      object 
 1   Product Name  4 non-null      object 
 2   Category      4 non-null      object 
 3   Unit Price    4 non-null      float64
dtypes: float64(1), object(3)
memory usage: 160.0+ bytes


In [7]:
order_details_chunk.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 6 entries, 0 to 5
Data columns (total 4 columns):
 #   Column         Non-Null Count  Dtype  
---  ------         --------------  -----  
 0   Order ID       6 non-null      int64  
 1   Product ID     6 non-null      object 
 2   Quantity Sold  6 non-null      int64  
 3   Total Price    6 non-null      float64
dtypes: float64(1), int64(2), object(1)
memory usage: 320.0+ bytes


In [8]:
customers_chunk.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 6 entries, 0 to 5
Data columns (total 5 columns):
 #   Column            Non-Null Count  Dtype 
---  ------            --------------  ----- 
 0   Customer ID       6 non-null      object
 1   Customer Name     6 non-null      object
 2   Customer Email    6 non-null      object
 3   Customer Address  6 non-null      object
 4   Region            6 non-null      object
dtypes: object(5)
memory usage: 368.0+ bytes


In [9]:
#we have convert similar to this for all the table, due to timing constraint
orders_chunk = orders_chunk.rename(columns={
    'Order ID': 'OrderID',
      "Customer ID":"CustomerID" ,
      'Date of Sale':'DateOfSale',
      'Shipping Cost':'ShippingCost' ,
      'Payment Method': 'PaymentMethod',
       'Discount':'Discount'
})


In [10]:
#we have convert similar to this for all the table, due to timing constraint
customers_chunk = customers_chunk.rename(columns={
    'Order ID': 'OrderID',
      "Customer ID":"CustomerID" ,
      'Date of Sale':'DateOfSale',
      'Shipping Cost':'ShippingCost' ,
      'Payment Method': 'PaymentMethod',
       'Discount':'Discount'
})

products_chunk = products_chunk.rename(columns={
    'Product ID': 'ProductID',
    'Product Name': 'ProductName',
    'Category': 'Category',
    'Unit Price': 'UnitPrice',
})

order_details_chunk = order_details_chunk.rename(columns={
    'Order ID': 'OrderID',
    'Product ID': 'ProductID',
    'Quantity Sold': 'QuantitySold',
    'Total Price': 'TotalPrice',
})

customers_chunk = customers_chunk.rename(columns={

    'Customer ID': 'CustomerID',
    'Customer Name': 'CustomerName',
    'Customer Email': 'CustomerEmail',
    'Customer Address': 'CustomerAddress',
    'Region': 'Region'
})


In [11]:
orders_chunk.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 6 entries, 0 to 5
Data columns (total 6 columns):
 #   Column         Non-Null Count  Dtype         
---  ------         --------------  -----         
 0   OrderID        6 non-null      int64         
 1   CustomerID     6 non-null      object        
 2   DateOfSale     6 non-null      datetime64[ns]
 3   ShippingCost   6 non-null      int64         
 4   PaymentMethod  6 non-null      object        
 5   Discount       6 non-null      float64       
dtypes: datetime64[ns](1), float64(1), int64(2), object(2)
memory usage: 416.0+ bytes


In [12]:
products_chunk.isna().sum()

ProductID      0
ProductName    0
Category       0
UnitPrice      0
dtype: int64

In [13]:
orders_chunk.isna().sum()

OrderID          0
CustomerID       0
DateOfSale       0
ShippingCost     0
PaymentMethod    0
Discount         0
dtype: int64

In [14]:
customers_chunk.isna().sum()

CustomerID         0
CustomerName       0
CustomerEmail      0
CustomerAddress    0
Region             0
dtype: int64

In [15]:
order_details_chunk.isna().sum()

OrderID         0
ProductID       0
QuantitySold    0
TotalPrice      0
dtype: int64

In [16]:

cursor.execute('''
CREATE TABLE IF NOT EXISTS Customers (
    CustomerID INTEGER PRIMARY KEY,
    CustomerName VARCHAR(255),
    CustomerEmail VARCHAR(255),
    CustomerAddress TEXT,
    Region VARCHAR(255)
)
''')

cursor.execute('''
CREATE TABLE IF NOT EXISTS Orders (
    OrderID INTEGER PRIMARY KEY,
    CustomerID INTEGER,
    DateOfSale DATE,
    ShippingCost DECIMAL(10, 2),
    PaymentMethod VARCHAR(50),
    Discount DECIMAL(10, 2),
    FOREIGN KEY (CustomerID) REFERENCES Customers(CustomerID)
)
''')

cursor.execute('''
CREATE TABLE IF NOT EXISTS OrderDetails (
    OrderDetailID INTEGER PRIMARY KEY,
    OrderID INTEGER,
    ProductID INTEGER,
    QuantitySold INTEGER,
    TotalPrice DECIMAL(10, 2),
    FOREIGN KEY (OrderID) REFERENCES Orders(OrderID),
    FOREIGN KEY (ProductID) REFERENCES Products(ProductID)
)
''')

cursor.execute('''
CREATE TABLE IF NOT EXISTS Products (
    ProductID INTEGER PRIMARY KEY,
    ProductName VARCHAR(255) NOT NULL,
    Category VARCHAR(255),
    UnitPrice DECIMAL(10, 2),
    ProductDescription TEXT
)
''')

conn.commit()

In [17]:
customers_chunk.to_sql('Customers', conn, if_exists='replace', index =False)
order_details_chunk.to_sql('OrdersDetails', conn, if_exists='replace', index =False)
orders_chunk.to_sql('Orders', conn, if_exists='replace', index =False)
products_chunk.to_sql('Products', conn, if_exists='replace', index =False)

4

In [18]:
query = 'SELECT * FROM Orders'


In [19]:
conn.execute(query)

<sqlite3.Cursor at 0x7a17144173c0>

In [20]:
results = cursor.fetchall()

# STEP - 2

## **2. Data Refresh Mechanism:**

- **Periodic Refresh: Set up a mechanism to refresh the database data daily or on-demand. This could involve overwriting existing data or appending new data while managing duplicates.**
- **Optional: Implement the data refresh mechanism as a background job or thread instead of handling in the API server application.**
- **Logging: Maintain logs of data refresh activities (successful and failed) to facilitate troubleshooting.**

In [21]:
import pandas as pd
import sqlite3
import numpy as np
import logging
from datetime import datetime
from sqlalchemy import create_engine

In [22]:
logging.basicConfig(filename='data_refresh.log', level=logging.INFO, format='%(asctime)s %(message)s')

# By this point we have already created the database in the previous step
db_path = 'lumel.db'
engine = create_engine(f'sqlite:///{db_path}')


In [23]:
# CSV file path
csv_file_path = '/content/sample_data/sample_csv.csv'

def transform(df):
    df['Date of Sale'] = pd.to_datetime(df['Date of Sale'], errors='coerce')
    df['Quantity Sold'] = pd.to_numeric(df['Quantity Sold'], errors='coerce')
    df['Unit Price'] = pd.to_numeric(df['Unit Price'], errors='coerce')
    df['Discount'] = pd.to_numeric(df['Discount'], errors='coerce')
    df['Shipping Cost'] = pd.to_numeric(df['Shipping Cost'], errors='coerce')
    df.dropna(subset=['Order ID', 'Product ID', 'Customer ID', 'Date of Sale', 'Quantity Sold', 'Unit Price'], inplace=True)
    df['Discount'].fillna(0, inplace=True)
    df['Shipping Cost'].fillna(0, inplace=True)
    df.drop_duplicates(subset=['Order ID', 'Product ID'], keep='first', inplace=True)
    df['Total Price'] = df['Quantity Sold'] * df['Unit Price'] - df['Discount']
    return df

In [24]:
def load_data(csv_path):
    chunksize = 100000
    try:
        for data in pd.read_csv(csv_path, chunksize=chunksize):
            data = transform(data)
            orders_chunk = data[['Order ID', 'Customer ID', 'Date of Sale', 'Shipping Cost', 'Payment Method', 'Discount']].drop_duplicates()
            products_chunk = data[['Product ID', 'Product Name', 'Category', 'Unit Price']].drop_duplicates()
            order_details_chunk = data[['Order ID', 'Product ID', 'Quantity Sold', 'Total Price']].drop_duplicates()
            customers_chunk = data[['Customer ID', 'Customer Name', 'Customer Email', 'Customer Address', 'Region']].drop_duplicates()

            orders_chunk = orders_chunk.rename(columns={
                'Order ID': 'OrderID',
                'Customer ID': 'CustomerID',
                'Date of Sale': 'DateOfSale',
                'Shipping Cost': 'ShippingCost',
                'Payment Method': 'PaymentMethod',
                'Discount': 'Discount'
            })

            products_chunk = products_chunk.rename(columns={
                'Product ID': 'ProductID',
                'Product Name': 'ProductName',
                'Category':'Category',
                'Unit Price': 'UnitPrice',
            })

            order_details_chunk = order_details_chunk.rename(columns={
                'Order ID': 'OrderID',
                'Product ID': 'ProductID',
                'Quantity Sold':'QuantitySold',
                'Total Price': 'TotalPrice',
            })

            customers_chunk = customers_chunk.rename(columns={

                'Customer ID': 'CustomerID',
                'Customer Name': 'CustomerName',
                'Customer Email': 'CustomerEmail',
                'Customer Address': 'CustomerAddress',
                'Region': 'Region'
            })

            orders_chunk.to_sql('Orders', engine, if_exists='append', index=False)
            products_chunk.to_sql('Products', engine, if_exists='append', index=False)
            order_details_chunk.to_sql('OrderDetails', engine, if_exists='append', index=False)
            customers_chunk.to_sql('Customers', engine, if_exists='append', index=False)

            logging.info(f"Loaded {len(data)} records to the database.")
    except Exception as e:
        logging.error(f"Data refresh failed: {e}")

In [25]:
def refresh_data():
    try:
        load_data(csv_file_path)
        logging.info('Data refresh completed successfully.')
    except Exception as e:
        logging.error(f"Data refresh failed: {e}")

#STEP-3
**3. RESTful API for Analysis:**

- API Design: Craft a well-structured RESTful API with endpoints to trigger and retrieve the results of various calculations (detailed below).**
- API to trigger the data refresh on demand

In [26]:
#Adding API END Points - FLASK

from flask import Flask, jsonify, request
import pandas as pd
import sqlite3
import logging
from datetime import datetime
from sqlalchemy import create_engine

In [27]:
app = Flask(__name__)

# Configure logging
logging.basicConfig(filename='data_refresh.log', level=logging.INFO, format='%(asctime)s %(message)s')

# Database connection
db_path = 'lumel.db'
engine = create_engine(f'sqlite:///{db_path}')

# CSV file path
csv_file_path = '/content/sample_data/sample_csv.csv'

In [28]:
def transform(df):
    df['Date of Sale'] = pd.to_datetime(df['Date of Sale'], errors='coerce')
    df['Quantity Sold'] = pd.to_numeric(df['Quantity Sold'], errors='coerce')
    df['Unit Price'] = pd.to_numeric(df['Unit Price'], errors='coerce')
    df['Discount'] = pd.to_numeric(df['Discount'], errors='coerce')
    df['Shipping Cost'] = pd.to_numeric(df['Shipping Cost'], errors='coerce')
    df.dropna(subset=['Order ID', 'Product ID', 'Customer ID', 'Date of Sale', 'Quantity Sold', 'Unit Price'], inplace=True)
    df['Discount'].fillna(0, inplace=True)
    df['Shipping Cost'].fillna(0, inplace=True)
    df.drop_duplicates(subset=['Order ID', 'Product ID'], keep='first', inplace=True)
    df['Total Price'] = df['Quantity Sold'] * df['Unit Price'] - df['Discount']
    return df

In [29]:
def load_data(csv_path):
    chunksize = 100000
    try:
        for data in pd.read_csv(csv_path, chunksize=chunksize):
            data = transform(data)
            orders_chunk = data[['Order ID', 'Customer ID', 'Date of Sale', 'Shipping Cost', 'Payment Method', 'Discount']].drop_duplicates()
            products_chunk = data[['Product ID', 'Product Name', 'Category', 'Unit Price']].drop_duplicates()
            order_details_chunk = data[['Order ID', 'Product ID', 'Quantity Sold', 'Total Price']].drop_duplicates()
            customers_chunk = data[['Customer ID', 'Customer Name', 'Customer Email', 'Customer Address', 'Region']].drop_duplicates()

            orders_chunk = orders_chunk.rename(columns={
                'Order ID': 'OrderID',
                'Customer ID': 'CustomerID',
                'Date of Sale': 'DateOfSale',
                'Shipping Cost': 'ShippingCost',
                'Payment Method': 'PaymentMethod',
                'Discount': 'Discount'
            })

            products_chunk = products_chunk.rename(columns={
                'Product ID': 'ProductID',
                'Product Name': 'ProductName',
                'Category':'Category',
                'Unit Price': 'UnitPrice',
            })

            order_details_chunk = order_details_chunk.rename(columns={
                'Order ID': 'OrderID',
                'Product ID': 'ProductID',
                'Quantity Sold':'QuantitySold',
                'Total Price': 'TotalPrice',
            })

            customers_chunk = customers_chunk.rename(columns={

                'Customer ID': 'CustomerID',
                'Customer Name': 'CustomerName',
                'Customer Email': 'CustomerEmail',
                'Customer Address': 'CustomerAddress',
                'Region': 'Region'
            })

            orders_chunk.to_sql('Orders', engine, if_exists='append', index=False)
            products_chunk.to_sql('Products', engine, if_exists='append', index=False)
            order_details_chunk.to_sql('OrderDetails', engine, if_exists='append', index=False)
            customers_chunk.to_sql('Customers', engine, if_exists='append', index=False)

            logging.info(f"Loaded {len(data)} records to the database.")
    except Exception as e:
        logging.error(f"Data refresh failed: {e}")

In [30]:
def refresh_data():
    try:
        load_data(csv_file_path)
        logging.info('Data refresh completed successfully.')
        return "Data refresh completed successfully."
    except Exception as e:
        logging.error(f"Data refresh failed: {e}")
        return f"Data refresh failed: {e}"

In [31]:
@app.route('/refresh', methods=['POST'])
def trigger_refresh():
    message = refresh_data()
    return jsonify({'message': message})

@app.route('/total_sales_by_product', methods=['GET'])
def total_sales_by_product():
    conn = sqlite3.connect(db_path)
    query = '''
    SELECT ProductName, SUM(TotalPrice) as TotalSales
    FROM OrderDetails
    JOIN Products ON OrderDetails.ProductID = Products.ProductID
    GROUP BY ProductName
    '''
    df = pd.read_sql_query(query, conn)
    result = df.to_dict(orient='records')
    conn.close()
    return jsonify(result)

In [32]:

if __name__ == "__main__":
    app.run(debug=True)

 * Serving Flask app '__main__'
 * Debug mode: on


 * Running on http://127.0.0.1:5000
INFO:werkzeug:[33mPress CTRL+C to quit[0m
INFO:werkzeug: * Restarting with stat


# STEP - 4
## 4. Core Analysis (Triggered via API):

In [None]:
#Queries to execute:
#Top N Products
@app.route('/refresh', methods=['POST'])
def trigger_refresh():
    message = refresh_data()
    return jsonify({'message': message})

@app.route('/total_sales_by_product', methods=['GET'])
def total_sales_by_product():
    conn = sqlite3.connect(db_path)
    query = '''
    SELECT c.Region, p.ProductName, SUM(od.QuantitySold) AS TotalQuantitySold
    FROM OrderDetails od
    JOIN Orders o ON od.OrderID = o.OrderID
    JOIN Products p ON od.ProductID = p.ProductID
    JOIN Customers c ON o.CustomerID = c.CustomerID
    WHERE o.DateOfSale BETWEEN 'start_date' AND 'end_date'
    GROUP BY 1, 2
    ORDER BY 1, TotalQuantitySold DESC
    LIMIT N
    '''
    df = pd.read_sql_query(query, conn)
    result = df.to_dict(orient='records')
    conn.close()
    return jsonify(result)
