
# Walmart Polyglot Persistence using SQL and MongoDB 
 
This notebook demonstrates a polyglot persistence approach to handling Walmart-style retail data, using SQL (for transactional consistency) and MongoDB (for flexibility and nested document storage).
 
## SQL Database (Relational Model)
The SQL portion of the project models core transactional data using a normalized schema with proper foreign key relationships. Key entities include:

- Store, Product, Customer, Supplier

- Inventory (which links Store, Product, and Supplier)

- Sales (which records transactions, linked to Time, Customer, and Inventory)

- Time (used for time-series analysis)

The ERD reflects best practices in relational modeling and normalization. SQL is used here to ensure data integrity, ACID compliance, and support for complex joins.

## MongoDB (NoSQL Document Model)
MongoDB is used to model two domains where document-based storage provides flexibility:

- Product Catalogue: Contains product details along with an embedded Specifications document.

- Customer Reviews: Each review includes embedded Feedback (e.g., helpful votes, comments).

These are modeled in MongoDB because:

Product Specifications (e.g., dimensions, warranty) are intrinsic to a product and rarely queried independently.

Review Feedback benefits from being embedded for atomic access and performance. 

#### Acknowledgement: Inspired by resources from  ele.exeter.ac.uk.


## SQL 

### Install pyodbc
This package is used for connecting to SQL Server databases using ODBC.

In [1]:
# Install pyodbc package
import subprocess
import sys

def install(package):
    subprocess.check_call([sys.executable, "-m", "pip", "install", package])
    
install("pyodbc")

### Install pymongo
This package is used to interact with MongoDB from Python.

In [2]:
# Install pymongo package
import subprocess
import sys

def install(package):
    subprocess.check_call([sys.executable, "-m", "pip", "install", package])
    
install("pymongo")

In [3]:
import random
import string
from datetime import datetime, timedelta
import pandas as pd
import pyodbc
import matplotlib.pyplot as plt
import numpy as np
import pymongo
from pymongo import MongoClient
import sqlite3
import dash
from dash import dcc, html, Input, Output, State, ctx
import dash_bootstrap_components as dbc
import plotly.express as px
import warnings
warnings.filterwarnings("ignore")

### SQL Server Credentials


In [4]:
server = 'tcp:mcruebs04.isad.isadroot.ex.ac.uk' 
database = 'BEMM459_GroupAA'
username = 'GroupAA' 
password = 'OazY789*Oc'

### SQL Server Connection
This establishes a connection to the SQL Server using `pyodbc`. Make sure the correct driver is installed.

In [5]:
# Driver for own machine.  Comment out when on windows machine.
cnxn = pyodbc.connect('DRIVER={ODBC Driver 17 for SQL Server};SERVER='+server+';DATABASE='+database+';UID='+username+';PWD='+ password+';TrustServerCertificate=yes;Encrypt=no;')
cursor = cnxn.cursor()

In [None]:
# -------------------------------------------------------------------
# Step 1: Drop tables if they exist in the dbo schema.
# -------------------------------------------------------------------
tables = ['Sales', 'Inventory', 'Time', 'Customer', 'Supplier', 'Store', 'Product']
for table in tables:
    tsql = f"IF OBJECT_ID(N'dbo.{table}', N'U') IS NOT NULL DROP TABLE dbo.{table};"
    cursor.execute(tsql)
cnxn.commit()

# -------------------------------------------------------------------
# Step 2: Create Tables in the dbo schema.
# -------------------------------------------------------------------

# Create Product table
tsql = '''
CREATE TABLE dbo.Product (
    ProductID INT IDENTITY(1,1) PRIMARY KEY,
    ProductName NVARCHAR(100),
    Category NVARCHAR(50),
    Price REAL
)
'''
cursor.execute(tsql)

# Create Store table
tsql = '''
CREATE TABLE dbo.Store (
    StoreID INT IDENTITY(1,1) PRIMARY KEY,
    StoreName NVARCHAR(100),
    Location NVARCHAR(100)
)
'''
cursor.execute(tsql)

# Create Supplier table
tsql = '''
CREATE TABLE dbo.Supplier (
    SupplierID INT IDENTITY(1,1) PRIMARY KEY,
    SupplierName NVARCHAR(100),
    ContactName NVARCHAR(100),
    ContactEmail NVARCHAR(100)
)
'''
cursor.execute(tsql)

# Create Customer table
tsql = '''
CREATE TABLE dbo.Customer (
    CustomerID INT IDENTITY(1,1) PRIMARY KEY,
    CustomerName NVARCHAR(100),
    Address NVARCHAR(255),
    City NVARCHAR(50),
    State NVARCHAR(50)
)
'''
cursor.execute(tsql)

# Create Time table
tsql = '''
CREATE TABLE dbo.Time (
    TimeID INT IDENTITY(1,1) PRIMARY KEY,
    Date DATE,
    Day INT,
    Month INT,
    Year INT,
    Quarter INT
)
'''
cursor.execute(tsql)

# Create Inventory table (linking Store, Product, and Supplier)
tsql = '''
CREATE TABLE dbo.Inventory (
    InventoryID INT IDENTITY(1,1) PRIMARY KEY,
    StoreID INT,
    ProductID INT,
    SupplierID INT,
    StockQuantity INT,
    FOREIGN KEY (StoreID) REFERENCES dbo.Store(StoreID),
    FOREIGN KEY (ProductID) REFERENCES dbo.Product(ProductID),
    FOREIGN KEY (SupplierID) REFERENCES dbo.Supplier(SupplierID)
)
'''
cursor.execute(tsql)

# Create Sales table (fact table)
tsql = '''
CREATE TABLE dbo.Sales (
    SalesID INT IDENTITY(1,1) PRIMARY KEY,
    TimeID INT,
    CustomerID INT,
    InventoryID INT,
    Quantity INT,
    TotalAmount REAL,
    FOREIGN KEY (TimeID) REFERENCES dbo.Time(TimeID),
    FOREIGN KEY (CustomerID) REFERENCES dbo.Customer(CustomerID),
    FOREIGN KEY (InventoryID) REFERENCES dbo.Inventory(InventoryID)
)
'''
cursor.execute(tsql)

cnxn.commit()

# -------------------------------------------------------------------
# Step 3: Insert Sample Data
# -------------------------------------------------------------------

# Insert sample data into Product (100 rows)
categories = ['Electronics', 'Clothing', 'Books', 'Toys', 'Home']
for i in range(1, 101):
    product_name = f"Product {i}"
    category = random.choice(categories)
    price = round(random.uniform(10.0, 200.0), 2)
    tsql = '''
    INSERT INTO dbo.Product (ProductName, Category, Price)
    VALUES (?, ?, ?)
    '''
    cursor.execute(tsql, (product_name, category, price))

# Insert sample data into Store (100 rows)
locations = ['New York', 'Los Angeles', 'Chicago', 'Houston', 'Phoenix']
for i in range(1, 101):
    store_name = f"Store {i}"
    location = random.choice(locations)
    tsql = '''
    INSERT INTO dbo.Store (StoreName, Location)
    VALUES (?, ?)
    '''
    cursor.execute(tsql, (store_name, location))

# Insert sample data into Supplier (100 rows)
for i in range(1, 101):
    supplier_name = f"Supplier {i}"
    contact_name = f"Contact {i}"
    contact_email = f"contact{i}@supplier.com"
    tsql = '''
    INSERT INTO dbo.Supplier (SupplierName, ContactName, ContactEmail)
    VALUES (?, ?, ?)
    '''
    cursor.execute(tsql, (supplier_name, contact_name, contact_email))

# Insert sample data into Customer (100 rows)
cities = ['New York', 'Los Angeles', 'Chicago', 'Houston', 'Phoenix']
states = ['NY', 'CA', 'IL', 'TX', 'AZ']
for i in range(1, 101):
    customer_name = f"Customer {i}"
    address = f"{i} Example St"
    city = random.choice(cities)
    state = random.choice(states)
    tsql = '''
    INSERT INTO dbo.Customer (CustomerName, Address, City, State)
    VALUES (?, ?, ?, ?)
    '''
    cursor.execute(tsql, (customer_name, address, city, state))

# Insert sample data into Time (100 rows)
start_date = datetime(2025, 1, 1)
for i in range(0, 100):
    current_date = start_date + timedelta(days=i)
    day = current_date.day
    month = current_date.month
    year = current_date.year
    quarter = ((month - 1) // 3) + 1
    tsql = '''
    INSERT INTO dbo.Time (Date, Day, Month, Year, Quarter)
    VALUES (?, ?, ?, ?, ?)
    '''
    cursor.execute(tsql, (current_date.date(), day, month, year, quarter))

# Insert sample data into Inventory (100 rows)
for i in range(1, 101):
    store_id = random.randint(1, 100)
    product_id = random.randint(1, 100)
    supplier_id = random.randint(1, 100)
    stock_quantity = random.randint(1, 100)
    tsql = '''
    INSERT INTO dbo.Inventory (StoreID, ProductID, SupplierID, StockQuantity)
    VALUES (?, ?, ?, ?)
    '''
    cursor.execute(tsql, (store_id, product_id, supplier_id, stock_quantity))

# Insert sample data into Sales (100 rows)
for i in range(1, 101):
    time_id = random.randint(1, 100)
    customer_id = random.randint(1, 100)
    inventory_id = random.randint(1, 100)
    quantity = random.randint(1, 10)
    # Calculate total amount: quantity multiplied by the price of the product from Inventory.
    tsql = 'SELECT ProductID FROM dbo.Inventory WHERE InventoryID = ?'
    cursor.execute(tsql, (inventory_id,))
    prod_row = cursor.fetchone()
    if prod_row:
        product_id = prod_row[0]
        tsql = 'SELECT Price FROM dbo.Product WHERE ProductID = ?'
        cursor.execute(tsql, (product_id,))
        price_row = cursor.fetchone()
        if price_row:
            price = price_row[0]
            total_amount = round(quantity * price, 2)
        else:
            total_amount = 0
    else:
        total_amount = 0
    tsql = '''
    INSERT INTO dbo.Sales (TimeID, CustomerID, InventoryID, Quantity, TotalAmount)
    VALUES (?, ?, ?, ?, ?)
    '''
    cursor.execute(tsql, (time_id, customer_id, inventory_id, quantity, total_amount))

cnxn.commit()

# -------------------------------------------------------------------
# Step 4: Verification - Print row counts for each table in dbo schema.
# -------------------------------------------------------------------
for table in ["Product", "Store", "Supplier", "Customer", "Time", "Inventory", "Sales"]:
    tsql = f"SELECT COUNT(*) FROM dbo.{table}"
    cursor.execute(tsql)
    count = cursor.fetchone()[0]
    print(f"dbo.{table} has {count} rows.")

#cnxn.close()


dbo.Product has 100 rows.
dbo.Store has 100 rows.
dbo.Supplier has 100 rows.
dbo.Customer has 100 rows.
dbo.Time has 100 rows.
dbo.Inventory has 100 rows.
dbo.Sales has 100 rows.


## The following code connects to MongoDB using `pymongo`, creates the collections, generates sample documents, and populates the collections.


# 1. Connectivity


## MongoDB Connection Setup

### Make sure your MongoDB instance is running and update the connection URI as needed.
### For a local instance, the default URI is typically "mongodb://localhost:27017/".


In [6]:
# Note: I am using port number 4200. You may have to change this port number if the connection is being refused. 
# To change the port number, open "mongod_use this.bat" (in S:/ drive) and change 4200 to a different number, and then execute the server.
# You can also update the .bat file for the the shell (client) for MongoDB with the same number (this file is called "mongo_use this.bat" and is also under S:/)
mongoclient = pymongo.MongoClient("mongodb://localhost:27017/")

#Check what databases exist - the output is a list of database names
print(mongoclient.list_database_names())


['Walmart_db', 'admin', 'config', 'local']


In [7]:
#You can also check databases that presently exist using a loop
dblist = mongoclient.list_database_names()
for x in dblist:
    print(x)    

Walmart_db
admin
config
local


### Defining user-defined functions (..for later use)

In [8]:
#Defining a user function to check if database exists - In MongoDB, a database is not created until it gets content. 
def check_DatabaseExists(argDBName):
    local_dblist = mongoclient.list_database_names()
    if argDBName in local_dblist:
        print("The database ", argDBName, " exists.")
    else:
        print("The database ", argDBName, " does not exist.")

#Defining a user function to check if a collection exists - In MongoDB, a collection is not created until it gets content. 
def check_CollectionExists(argDBName, argCollName, local_mydb):
    local_collist = local_mydb.list_collection_names()
    if argCollName in local_collist:
        print("The collection ",  argCollName, "exists in database ", argDBName)
    else:
        print("The collection ", argCollName, " does not exist in database ", argDBName)

# 2. Create new MongoDB database

In [9]:
# Create/use a database for our Walmart project      
mydb = mongoclient["Walmart_db"]
print(type(mydb))


<class 'pymongo.synchronous.database.Database'>


In [10]:
#Check if database exists by calling function check_DatabaseExists with name of database as the arguement
check_DatabaseExists("Walmart_db")

'''
#Without a function the code will be as follows
dblist = mongoclient.list_database_names()
if "Database_Walmart_db" in dblist:
    print("The database 'Walmart_db' exists.")
else:
    print("The database 'Walmart_db' does not exist.")
'''

print()

The database  Walmart_db  exists.



# 3. Create new Collection 

Creating two collections: **product_catalogue:** and **customer_reviews:**

**Note:** In MongoDB, collections are created automatically when you first insert a document.


In [12]:
# Define or create the collections
product_collection = mydb['Product_catalogue']
print(type(product_collection))
review_collection = mydb['Customer_reviews']
print(type(review_collection))

<class 'pymongo.synchronous.collection.Collection'>
<class 'pymongo.synchronous.collection.Collection'>


# 4. Add documents

### Generate Sample Documents for the Product Catalogue


In [None]:
# Read the product CSV file to get product IDs
df_products = pd.read_csv("Product.csv")
# Assume the CSV has a 'product_id' column; use up to 40 entries.
product_ids = df_products['ProductID'].tolist()[:40]

def create_sample_products_from_csv(product_ids):
    """
    Generate a list of product documents aligned with the RDBMS Product table.
    
    Each document includes:
      - product_id (from CSV)
      - name
      - category
      - price
    
      - description
      - specifications: an embedded document of various attributes.
      - images: an array of image URLs.
      - reviews: an array of review sub-documents.
      - availability: product stock status.
      - updatedAt: timestamp for the last update.
      - customAttributes: additional metadata.
      
    Parameters:
        product_ids (List): List of product IDs read from the CSV.
        
    Returns:
        List[dict]: A list of product documents.
    """
    products = []
    categories = ["Electronics", "Home", "Clothing", "Toys"]
    descriptions = [
        "High quality product with excellent performance.",
        "A great addition to your home.",
        "Fashionable and comfortable.",
        "Fun and engaging for all ages."
    ]
    availabilities = ["in stock", "pre-order", "out of stock"]
    
    specifications_options = [
        {"size": "M", "color": "Red", "technical_details": "Specs A"},
        {"size": "L", "color": "Blue", "technical_details": "Specs B"},
        {"size": "S", "color": "Green", "technical_details": "Specs C"}
    ]
    
    for pid in product_ids:
        product = {
            "product_id": pid,  # Use the product id from the CSV.
            "name": f"Product {pid}",
            "category": random.choice(categories),
            "price": round(random.uniform(10, 500), 2),
            "description": random.choice(descriptions),
            "specifications": random.choice(specifications_options),
            "availability": random.choice(availabilities),
            "updatedAt": datetime.now(),
            "customAttributes": {
                "warranty": f"{random.choice([1, 2, 3])} years",
                "manufacturer": f"Manufacturer {random.randint(1, 5)}"
            }
        }
        products.append(product)
    return products

# Create product documents using product_ids from the CSV.
sample_products = create_sample_products_from_csv(product_ids)

result = product_collection.insert_many(sample_products)
print(f"Inserted {len(result.inserted_ids)} documents into the product_catalogue collection.")


Inserted 40 documents into the product_catalogue collection.


### Generate Sample Documents for Customer Reviews
Each review document includes:
- A unique review ID and the associated product ID (linking to a product in the catalogue).
- The reviewer's name, a rating, review text, and the review date.
- An embedded document (`feedback`) that holds additional feedback details such as helpful votes and an array of comments.
This design allows grouping related review feedback directly with the review, simplifying access and updates.

In [14]:
def generate_synthetic_review(product_name, product_category):
    """
    Generate a single synthetic review for a product.
    
    The review's comment varies based on a random rating:
      - Negative if rating < 3.
      - Neutral if rating == 3.
      - Positive if rating > 3.
    
    Returns:
        dict: Synthetic review with rating, comment, and date.
    """
    rating = random.randint(1, 5)
    if rating < 3:
        comment = (f"I was disappointed with {product_name} in the {product_category} category. "
                   f"It did not meet my expectations.")
    elif rating == 3:
        comment = (f"{product_name} from {product_category} was just okay, neither good nor bad.")
    else:
        comment = (f"I really enjoyed {product_name} from the {product_category} category! "
                   f"It exceeded my expectations.")
    return {
        "rating": rating,
        "comment": comment,
        "date": datetime.now() - timedelta(days=random.randint(0, 365))
    }

# Load Customer table from CSV.
# Expected columns: CustomerID, CustomerName, Address, City, State
df_customers = pd.read_csv("Customer.csv")
# Limit to 40 rows.
df_customers = df_customers.head(40)

# Load Product table from CSV.
# Expected columns: ProductID, ProductName, Category, Price
df_products = pd.read_csv("Product.csv")
# Use only the first 40 products for consistency.
df_products = df_products.head(40)

# Create a list of dictionaries for product information.
products_info = df_products[['ProductID', 'ProductName', 'Category']].to_dict('records')

def create_grouped_product_review_documents(df_customers, products_info):
    """
    For each product, randomly select two distinct customers and generate one synthetic review per customer.
    Then create a grouped document that includes:
      - product_id, product_name, product_category
      - reviews: an array containing two review sub-documents (each with customer_id, customer_name, city, state, and synthetic_review)
      - customer_cities: a list of unique cities from the selected reviews
      - customer_states: a list of unique states from the selected reviews
      - updatedAt: timestamp for the last update.
      
    Returns:
        List[dict]: A list of grouped product review documents.
    """
    grouped_docs = []
    for prod in products_info:
        product_id = prod["ProductID"]
        product_name = prod["ProductName"]
        product_category = prod["Category"]
        
        # Randomly select two distinct customers.
        selected_customers = df_customers.sample(n=2)
        reviews = []
        cities = []
        states = []
        for _, cust_row in selected_customers.iterrows():
            synthetic_review = generate_synthetic_review(product_name, product_category)
            review_doc = {
                "customer_id": cust_row["CustomerID"],
                "customer_name": cust_row["CustomerName"],
                "city": cust_row["City"],
                "state": cust_row["State"],
                "synthetic_review": synthetic_review
            }
            reviews.append(review_doc)
            cities.append(cust_row["City"])
            states.append(cust_row["State"])
        
        doc = {
            "product_id": product_id,
            "product_name": product_name,
            "product_category": product_category,
            "reviews": reviews,
            "customer_cities": list(set(cities)),
            "customer_states": list(set(states)),
            "updatedAt": datetime.now()
        }
        grouped_docs.append(doc)
    return grouped_docs

# Create grouped product review documents.
grouped_product_review_docs = create_grouped_product_review_documents(df_customers, products_info)

result = review_collection.insert_many(grouped_product_review_docs)
print(f"Inserted {len(result.inserted_ids)} documents into the review_collection.")


Inserted 40 documents into the review_collection.


In [13]:
#Check if Database Exists .. In MongoDB, a collection is not created until it gets content
#Note that now data has been added
check_DatabaseExists("Walmart_db")

#Check if Collection Exists .. In MongoDB, a collection is not created until it gets content
#Note that now data has been added
check_CollectionExists("Walmart_db", "Customer_reviews", mydb)

The database  Walmart_db  exists.
The collection  Customer_reviews exists in database  Walmart_db


# 5. Query documents

### Verification: Query the Collections

Query for a few documents from each collection to verify that the data has been inserted correctly.


#### The find_one() method returns the first occurrence in the selection.

In [14]:
print("Sample Product Document:")
print(product_collection.find_one())

Sample Product Document:
{'_id': ObjectId('67eafbfe1b4fd380e3080ebe'), 'product_id': 1, 'name': 'Product 1', 'category': 'Clothing', 'price': 276.49, 'description': 'Fashionable and comfortable.', 'specifications': {'size': 'L', 'color': 'Blue', 'technical_details': 'Specs B'}, 'availability': 'pre-order', 'updatedAt': datetime.datetime(2025, 3, 31, 21, 33, 2, 332000), 'customAttributes': {'warranty': '2 years', 'manufacturer': 'Manufacturer 5'}}


In [15]:
#Python MongoDB Limit
#To limit the result in MongoDB, we use the limit() method. The limit() method takes one parameter, a number defining how many documents to return.
#Limit the result to only return 2 documents:
myresult = review_collection.find().limit(2)

#print the result:
for x in myresult:
    print(x)

{'_id': ObjectId('67eafc001b4fd380e3080ee6'), 'product_id': 1, 'product_name': 'Product 1', 'product_category': 'Electronics', 'reviews': [{'customer_id': 15, 'customer_name': 'Customer 15', 'city': 'Chicago', 'state': 'NY', 'synthetic_review': {'rating': 3, 'comment': 'Product 1 from Electronics was just okay, neither good nor bad.', 'date': datetime.datetime(2024, 10, 18, 21, 33, 4, 288000)}}, {'customer_id': 6, 'customer_name': 'Customer 6', 'city': 'Chicago', 'state': 'IL', 'synthetic_review': {'rating': 4, 'comment': 'I really enjoyed Product 1 from the Electronics category! It exceeded my expectations.', 'date': datetime.datetime(2024, 5, 24, 21, 33, 4, 288000)}}], 'customer_cities': ['Chicago'], 'customer_states': ['IL', 'NY'], 'updatedAt': datetime.datetime(2025, 3, 31, 21, 33, 4, 288000)}
{'_id': ObjectId('67eafc001b4fd380e3080ee7'), 'product_id': 2, 'product_name': 'Product 2', 'product_category': 'Clothing', 'reviews': [{'customer_id': 28, 'customer_name': 'Customer 28', 'ci

# 9. Drop collection (if needed)

In [44]:
#Python MongoDB Drop Collection - You can delete a collection by using the drop() method. All deleted indexes
product_collection.drop()

#Check if the collection exists by calling the user function defined earlier
check_CollectionExists("Walmart_db", "product_cataloque", mydb)

The collection  product_cataloque  does not exist in database  Walmart_db


In [45]:
#Python MongoDB Drop Collection - You can delete a collection by using the drop() method. All deleted indexes
review_collection.drop()

#Check if the collection exists by calling the user function defined earlier
check_CollectionExists("Walmart_db", "Customer_reviews", mydb)

The collection  Customer_reviews  does not exist in database  Walmart_db


# 10. Drop database (if needed)

In [46]:
#Drop database using instance of MongoClient
mongoclient.drop_database("Walmart_db")

#Check to see if database exists by calling the user function defined earlier 
check_DatabaseExists("Walmart_db")

The database  Walmart_db  does not exist.


## 11. Visualizations

# Dashboard for Analytics 

In [16]:

# Query 1: Total Sales per Product Category
category_query = '''
SELECT Category, SUM(TotalAmount) AS TotalSales
FROM Sales
JOIN Inventory ON Sales.InventoryID = Inventory.InventoryID
JOIN Product ON Inventory.ProductID = Product.ProductID
GROUP BY Category;
'''
category_df = pd.read_sql(category_query, cnxn)

# Query 2: Sales Trend Over Time
trend_query = '''
SELECT t.Date, SUM(s.TotalAmount) AS TotalSales
FROM Sales s
JOIN Time t ON s.TimeID = t.TimeID
GROUP BY t.Date
ORDER BY t.Date;
'''
trend_df = pd.read_sql(trend_query, cnxn)
trend_df['Date'] = pd.to_datetime(trend_df['Date'])

# Query 3: Average CLV by City
clv_query = '''
SELECT 
    c.City, 
    COUNT(DISTINCT c.CustomerID) AS NumCustomers,
    SUM(s.TotalAmount) AS TotalCLV,
    AVG(s.TotalAmount) AS AvgCLV
FROM Sales s
JOIN Customer c ON s.CustomerID = c.CustomerID
GROUP BY c.City;
'''
clv_df = pd.read_sql(clv_query, cnxn)

# Query 4: Inventory Value per Store
inventory_query = '''
SELECT 
    st.StoreID,
    st.StoreName,
    st.Location,
    i.ProductID,
    i.StockQuantity
FROM Store st
JOIN Inventory i ON st.StoreID = i.StoreID;
'''
inventory_df = pd.read_sql(inventory_query, cnxn)
mongo_docs = list(mydb.Product_catalogue.find())
mongo_df = pd.DataFrame(mongo_docs)

# Ensure the product id in MongoDB is an integer for merging.
mongo_df['product_id'] = mongo_df['product_id'].astype(int)

merged_df = pd.merge(inventory_df, mongo_df[['product_id', 'price']], 
                     left_on='ProductID', right_on='product_id', how='left')

# Calculate the inventory value for each record.
merged_df['InventoryValue'] = merged_df['StockQuantity'] * merged_df['price']
# Aggregate inventory value by store location
inventory_agg_df = merged_df.groupby(['Location'])['InventoryValue'].sum().reset_index()




# Query 5: Top 10 Products by Average Rating
# MongoDB Query: Product availability by category
mongo_data = list(mydb.Product_catalogue.find({}, {"category": 1, "availability": 1, "_id": 0}))
availability_df = pd.DataFrame(mongo_data)
availability_df = availability_df.groupby(["category", "availability"]).size().reset_index(name="ProductCount")

# Create visualizations
category_fig = px.bar(category_df, x='Category', y='TotalSales', title='Total Sales per Product Category')
trend_fig = px.line(trend_df, x='Date', y='TotalSales', markers=True, title='Sales Trend Over Time')
clv_fig = px.bar(clv_df, x='City', y='AvgCLV', title='Average Customer Lifetime Value by City')
inventory_fig = px.bar(inventory_agg_df, x='Location', y='InventoryValue', title='Inventory Value per Store')
availability_fig = px.bar(availability_df, x='category', y='ProductCount', color='availability', barmode='group', title='Product Availability by Category')

# KPIs
total_sales = category_df['TotalSales'].sum()
avg_clv = clv_df['AvgCLV'].mean()
total_inventory_value = merged_df['InventoryValue'].sum()
total_products = len(mongo_df)

# Dash App setup
app = dash.Dash(__name__, external_stylesheets=[dbc.themes.BOOTSTRAP])

# Layout with CRUD form and visualizations
app.layout = dbc.Container([
    html.H1("Walmart Data Insights Dashboard", className="text-center mt-3 mb-4"),

    # CRUD Form for ProductCatalogue
    dbc.Row([
        dbc.Col([
            html.H5("Manage Product (MongoDB + SQL Sync)"),
            dbc.Input(id="prod-id", placeholder="Product ID", type="number", className="mb-2"),
            dbc.Input(id="prod-category", placeholder="Category", type="text", className="mb-2"),
            dbc.Input(id="prod-price", placeholder="Price", type="number", className="mb-2"),
            dbc.Input(id="prod-status", placeholder="Availability Status", type="text", className="mb-2"),
            dbc.Button("Add/Update Product", id="add-btn", color="success", className="me-2"),
            dbc.Button("Delete Product", id="delete-btn", color="danger"),
            html.Div(id="crud-output", className="mt-3")
        ], md=6)
    ], className="mb-4"),

    dbc.Row([
        dbc.Col(dcc.Graph(figure=category_fig), md=6),
        dbc.Col(dcc.Graph(figure=clv_fig), md=6)
    ]),
    dbc.Row([
        dbc.Col(dcc.Graph(figure=trend_fig), md=6),
        dbc.Col(dcc.Graph(figure=inventory_fig), md=6)
    ]),
    dbc.Row([
        dbc.Col(dcc.Graph(figure=availability_fig), md=12)
    ])
], fluid=True)

# Callback for Create, Update, and Delete actions with validation and SQL sync
@app.callback(
    Output("crud-output", "children"),
    Input("add-btn", "n_clicks"),
    Input("delete-btn", "n_clicks"),
    State("prod-id", "value"),
    State("prod-category", "value"),
    State("prod-price", "value"),
    State("prod-status", "value")
)
def manage_product(add_clicks, del_clicks, pid, category, price, status):
    if pid is None:
        return "⚠️ Product ID is required."

    if ctx.triggered_id == "add-btn":
        if not category or price is None or not status:
            return "⚠️ All fields must be filled to add or update."

        existing = mydb.Product_catalogue.find_one({"product_id": int(pid)})
        data = {
            "product_id": int(pid),
            "category": category,
            "price": float(price),
            "availabilitystatus": status,
            "updated_at": datetime.utcnow()
        }

        if existing:
            mydb.Product_catalogue.update_one({"product_id": int(pid)}, {"$set": data})
            message = f"🔄 Product {pid} updated in MongoDB."
        else:
            mydb.Product_catalogue.insert_one(data)
            message = f"✅ Product {pid} added to MongoDB."

        try:
            cursor.execute("""
                IF EXISTS (SELECT 1 FROM Product WHERE ProductID = ?)
                UPDATE Product SET Category = ?, Price = ? WHERE ProductID = ?
                ELSE
                INSERT INTO Product (ProductID, Category, Price) VALUES (?, ?, ?)
            """, pid, category, price, pid, pid, category, price)
            cnxn.commit()
            message += " Synced with SQL."
        except Exception as e:
            message += f" ❌ SQL Sync Failed: {str(e)}"

        return message

    elif ctx.triggered_id == "delete-btn":
        result = mydb.Product_catalogue.delete_one({"product_id": int(pid)})
        if result.deleted_count:
            try:
                cursor.execute("DELETE FROM Product WHERE ProductID = ?", pid)
                cnxn.commit()
                return f"🗑️ Product {pid} deleted from MongoDB and SQL."
            except Exception as e:
                return f"⚠️ Mongo deleted, SQL failed: {str(e)}"
        else:
            return f"⚠️ Product {pid} not found in MongoDB."

    return ""

if __name__ == '__main__':
    app.run_server(jupyter_mode = 'external', debug=True)


Dash app running on http://127.0.0.1:8050/


### Close SQL Connection

In [None]:
cnxn.close()