In [1]:
!pip install pandas pymongo openpyxl "pymongo[srv]"



In [2]:
from google.colab import files
import io

print("Please upload the Online_Retail.xlsx file.")
uploaded = files.upload()
uploaded_filename = list(uploaded.keys())[0]
if uploaded_filename:
    print(f"\nFile '{uploaded_filename}' uploaded successfully!")
    DATASET_BYTES = uploaded[uploaded_filename]
else:
    print("\nError: 'Online Retail.xlsx' not found. Please upload the correct file.")
    assert uploaded_filename

Please upload the Online_Retail.xlsx file.


Saving Online Retail.xlsx to Online Retail.xlsx

File 'Online Retail.xlsx' uploaded successfully!


In [5]:
import pandas as pd
import sqlite3
from pymongo import MongoClient
from pymongo.errors import ConnectionFailure, OperationFailure
import time
import logging

LOG_FORMAT = '%(asctime)s - %(levelname)s - %(message)s'
logging.basicConfig(level=logging.INFO, format=LOG_FORMAT)
RECORDS_TO_LOAD = 1500 # Using 1500 to ensure we have enough clean data

# --- SQL Configuration ---
DB_FILE = "retail.db"

MONGO_ATLAS_URI = "mongodb+srv://saitejavelpula:Saiteja1016M@assignment4-cluster.izloz9c.mongodb.net/?retryWrites=true&w=majority&appName=assignment4-cluster"
MONGO_DB_NAME = "retail_db"

# --- Shared Data Loading and Cleaning Function ---
def load_and_clean_data():
    """Loads and cleans the dataset from the uploaded file bytes."""
    logging.info("Loading and cleaning data...")
    df = pd.read_excel(io.BytesIO(DATASET_BYTES))

    # Clean the data
    df.dropna(subset=['CustomerID'], inplace=True)
    df = df[pd.to_numeric(df['CustomerID'], errors='coerce').notna()]
    df['CustomerID'] = df['CustomerID'].astype(int)
    df = df[~df['InvoiceNo'].astype(str).str.startswith('C')]
    df = df[df['Quantity'] > 0]
    df['InvoiceDate'] = pd.to_datetime(df['InvoiceDate'])

    logging.info(f"Data cleaned. Shape: {df.shape}")
    return df.head(RECORDS_TO_LOAD)

In [6]:
# PART 1: SQL SETUP

def create_sql_schema(cursor):
    """Creates the database tables for 2NF."""
    logging.info("Creating SQL database schema...")
    cursor.execute('''DROP TABLE IF EXISTS InvoiceDetails;''')
    cursor.execute('''DROP TABLE IF EXISTS Invoices;''')
    cursor.execute('''DROP TABLE IF EXISTS Customers;''')
    cursor.execute('''DROP TABLE IF EXISTS Products;''')

    cursor.executescript('''
    CREATE TABLE Customers (CustomerID INTEGER PRIMARY KEY, Country TEXT);
    CREATE TABLE Products (StockCode TEXT PRIMARY KEY, Description TEXT, UnitPrice REAL);
    CREATE TABLE Invoices (InvoiceNo TEXT PRIMARY KEY, InvoiceDate TEXT, CustomerID INTEGER, FOREIGN KEY (CustomerID) REFERENCES Customers(CustomerID));
    CREATE TABLE InvoiceDetails (InvoiceNo TEXT, StockCode TEXT, Quantity INTEGER, PRIMARY KEY (InvoiceNo, StockCode), FOREIGN KEY (InvoiceNo) REFERENCES Invoices(InvoiceNo), FOREIGN KEY (StockCode) REFERENCES Products(StockCode));
    ''')
    logging.info("SQL Schema created.")

def insert_sql_data(conn, df):
    """Inserts dataframe data into the SQLite database."""
    logging.info("Inserting data into SQL database...")

    # Insert data into parent tables first, ensuring no duplicates
    df[['CustomerID', 'Country']].drop_duplicates().to_sql('Customers', conn, if_exists='append', index=False)
    df[['StockCode', 'Description', 'UnitPrice']].drop_duplicates(subset=['StockCode']).to_sql('Products', conn, if_exists='append', index=False)
    df[['InvoiceNo', 'InvoiceDate', 'CustomerID']].drop_duplicates().to_sql('Invoices', conn, if_exists='append', index=False)

    # Aggregate data before inserting into InvoiceDetails
    logging.info("Aggregating invoice details to handle duplicates...")
    invoice_details = df.groupby(['InvoiceNo', 'StockCode']).agg(
        Quantity=('Quantity', 'sum')
    ).reset_index()

    # Insert the cleaned, aggregated data
    invoice_details.to_sql('InvoiceDetails', conn, if_exists='append', index=False)

    conn.commit()
    logging.info("SQL data insertion complete.")


def run_sql_setup(df):
    """Main function to set up and populate the SQLite database."""
    try:
        conn = sqlite3.connect(DB_FILE)
        cursor = conn.cursor()
        create_sql_schema(cursor)
        insert_sql_data(conn, df)

        # Verification
        record_count = cursor.execute("SELECT COUNT(*) FROM InvoiceDetails;").fetchone()[0]
        logging.info(f"SQL Verification: {record_count} records inserted into InvoiceDetails.")
    except sqlite3.Error as e:
        logging.error(f"SQL Database error: {e}")
    finally:
        if conn:
            conn.close()
df_retail = load_and_clean_data()
run_sql_setup(df_retail)

In [7]:
# PART 2: MONGODB ATLAS SETUP

def setup_transaction_centric(db, df):
    """Creates a collection where each document is a transaction line item."""
    logging.info("Setting up MongoDB transaction-centric collection...")
    collection = db["transactions"]
    collection.drop() # Clear old data
    records = df.to_dict('records')
    collection.insert_many(records)
    logging.info(f"Inserted {collection.count_documents({})} documents into 'transactions' collection.")

def setup_customer_centric(db, df):
    """Creates a collection where each document is a customer with their invoices."""
    logging.info("Setting up MongoDB customer-centric collection...")
    collection = db["customers"]
    collection.drop() # Clear old data

    customers_dict = {}
    for _, row in df.iterrows():
        cust_id = row['CustomerID']
        if cust_id not in customers_dict:
            customers_dict[cust_id] = {'_id': cust_id, 'Country': row['Country'], 'invoices': {}}

        inv_no = row['InvoiceNo']
        if inv_no not in customers_dict[cust_id]['invoices']:
            customers_dict[cust_id]['invoices'][inv_no] = {
                'InvoiceNo': inv_no,
                'InvoiceDate': row['InvoiceDate'],
                'products': []
            }

        customers_dict[cust_id]['invoices'][inv_no]['products'].append({
            'StockCode': row['StockCode'], 'Description': row['Description'],
            'Quantity': row['Quantity'], 'UnitPrice': row['UnitPrice']
        })

    final_docs = []
    for cid, data in customers_dict.items():
        data['invoices'] = list(data['invoices'].values())
        final_docs.append(data)

    collection.insert_many(final_docs)
    logging.info(f"Inserted {collection.count_documents({})} documents into 'customers' collection.")

def run_mongo_setup(df):
    """Main function to set up MongoDB collections on Atlas."""
    try:
        client = MongoClient(MONGO_ATLAS_URI)
        client.admin.command('ping') # Check connection
        logging.info("MongoDB Atlas connection successful.")
        db = client[MONGO_DB_NAME]

        setup_transaction_centric(db, df)
        setup_customer_centric(db, df)

    except (ConnectionFailure, OperationFailure) as e:
        logging.error(f"MongoDB connection/operation failed: {e}")
    finally:
        if 'client' in locals():
            client.close()
run_mongo_setup(df_retail)

In [8]:
# PART 3: PERFORMANCE COMPARISON

# Sample Data for CRUD Operations
# finding some existing data from our dataframe to ensure tests work
EXISTING_INVOICE_NO = df_retail['InvoiceNo'].iloc[0]
EXISTING_CUSTOMER_ID = df_retail['CustomerID'].iloc[0]
EXISTING_STOCK_CODE = df_retail['StockCode'].iloc[0]

# Timing Decorator
def time_operation(func):
    """Decorator to time a function's execution."""
    def wrapper(*args, **kwargs):
        start_time = time.perf_counter()
        func(*args, **kwargs)
        end_time = time.perf_counter()
        print(f"{func.__name__:<35} | Execution Time: {end_time - start_time:.6f} seconds")
    return wrapper

# SQL CRUD Operations
@time_operation
def sql_read_customer_invoices(conn):
    conn.execute("SELECT * FROM Invoices WHERE CustomerID = ?", (EXISTING_CUSTOMER_ID,)).fetchall()

# MongoDB CRUD Operations
@time_operation
def mongo_tx_read_customer_invoices(collection):
    # Convert numpy.int64 to int
    customer_id_int = int(EXISTING_CUSTOMER_ID)
    list(collection.find({"CustomerID": customer_id_int}))

@time_operation
def mongo_customer_read_by_id(collection):
    # Convert numpy.int64 to int for the _id field
    customer_id_int = int(EXISTING_CUSTOMER_ID)
    collection.find_one({"_id": customer_id_int})

@time_operation
def sql_update_quantity(conn):
    conn.execute("UPDATE InvoiceDetails SET Quantity = 99 WHERE InvoiceNo = ? AND StockCode = ?",
                   (EXISTING_INVOICE_NO, EXISTING_STOCK_CODE))
    conn.commit()

@time_operation
def mongo_tx_update_quantity(collection):
    # Convert numpy.int64 to int
    customer_id_int = int(EXISTING_CUSTOMER_ID)
    collection.update_one(
        {"InvoiceNo": EXISTING_INVOICE_NO, "StockCode": EXISTING_STOCK_CODE},
        {"$set": {"Quantity": 99}}
    )

@time_operation
def mongo_customer_update_quantity(collection):
    # Convert numpy.int64 to int
    customer_id_int = int(EXISTING_CUSTOMER_ID)
    collection.update_one(
        {"_id": customer_id_int, "invoices.InvoiceNo": EXISTING_INVOICE_NO},
        {"$set": {"invoices.$[inv].products.$[prod].Quantity": 99}},
        array_filters=[{"inv.InvoiceNo": EXISTING_INVOICE_NO}, {"prod.StockCode": EXISTING_STOCK_CODE}]
    )


def run_performance_tests():
    print("PERFORMANCE TEST RESULTS")

    # --- SQL Connection and Tests ---
    print("\n--- SQL (Relational) ---")
    conn_sql = sqlite3.connect(DB_FILE)
    sql_read_customer_invoices(conn_sql)
    sql_update_quantity(conn_sql)
    conn_sql.close()

    # --- MongoDB Atlas Connection and Tests ---
    client_mongo = MongoClient(MONGO_ATLAS_URI)
    db_mongo = client_mongo[MONGO_DB_NAME]

    print("\n--- MongoDB (Transaction-Centric) ---")
    collection_tx = db_mongo['transactions']
    mongo_tx_read_customer_invoices(collection_tx)
    mongo_tx_update_quantity(collection_tx)

    print("\n--- MongoDB (Customer-Centric) ---")
    collection_customer = db_mongo['customers']
    mongo_customer_read_by_id(collection_customer)
    mongo_customer_update_quantity(collection_customer)


    client_mongo.close()

run_performance_tests()

PERFORMANCE TEST RESULTS

--- SQL (Relational) ---
sql_read_customer_invoices          | Execution Time: 0.000284 seconds
sql_update_quantity                 | Execution Time: 0.008581 seconds

--- MongoDB (Transaction-Centric) ---
mongo_tx_read_customer_invoices     | Execution Time: 0.110477 seconds
mongo_tx_update_quantity            | Execution Time: 0.005220 seconds

--- MongoDB (Customer-Centric) ---
mongo_customer_read_by_id           | Execution Time: 0.002836 seconds
mongo_customer_update_quantity      | Execution Time: 0.004952 seconds
