## Step 2: Denormalization for analytical workloads
The relational data is reshaped into a document-oriented structure
optimized for MongoDB analytics.

In [None]:
from flask import Flask,Request,jsonify
from pymongo import MongoClient
from bson.objectid import ObjectId
import pyodbc
import pandas as pd 
import numpy as np
import threading
import time  
import requests
from dotenv import load_dotenv
import os
from pymongo import MongoClient




In [2]:
app=Flask(__name__)

In [None]:
load_dotenv()

Client = MongoClient(os.getenv("MONGO_URI"))

db = Client[os.getenv("MONGO_DB_NAME")]

coll = db[os.getenv("MONGO_COLLECTION")]

In [None]:
load_dotenv()

sql_conn = pyodbc.connect(
    f"DRIVER={{{os.getenv('DB_DRIVER')}}};"
    f"SERVER={os.getenv('DB_SERVER')};"
    f"DATABASE={os.getenv('DB_NAME')};"
    f"Trusted_Connection={os.getenv('DB_TRUSTED_CONNECTION')};"
)


In [None]:
sales_df = pd.read_sql("SELECT * FROM Sales", sql_conn)
customers_df = pd.read_sql("SELECT * FROM Customers", sql_conn)
branches_df = pd.read_sql("SELECT * FROM Branches", sql_conn)
products_df = pd.read_sql("SELECT * FROM Products", sql_conn)
salesdetails_df=pd.read_sql("SELECT * From SalesDetails",sql_conn)

# STEP 3: Loading denormalized documents into MongoDB for analytics

In [6]:
@app.route("/retail", methods=["POST"])
def get_data():
    try:
        # Step 1️⃣: Merge Sales with SalesDetails
        retail_df = sales_df.merge(
            salesdetails_df,
            on="SaleID",
            how="inner"
        )

        # Step 2️⃣: Merge with Customers
        retail_df = retail_df.merge(
            customers_df,
            on="CustomerID",
            how="left",
            suffixes=("", "_customer")
        )

        # Step 3️⃣: Merge with Branches
        retail_df = retail_df.merge(
            branches_df,
            on="BranchID",
            how="left",
            suffixes=("", "_branch")
        )

        # Step 4️⃣: Merge with Products
        retail_df = retail_df.merge(
            products_df,
            on="ProductID",
            how="left",
            suffixes=("", "_product")
        )

        # Step 5️⃣: Convert to JSON records (denormalized)
        retail = []
        for _, row in retail_df.iterrows():
            doc = {
                "SaleID": int(row["SaleID"]),
                "Date": str(row["SaleDate"]),
                "Customer": {
                    "CustomerID": int(row["CustomerID"]),
                    "Name": row["Name"],
                    "Gender": row.get("Gender"),
                    "City": row.get("City")
                },
                "Branch": {
                    "BranchID": int(row["BranchID"]),
                    "BranchName": row["BranchName"],
                    "City": row.get("City_branch")
                },
                "Product": {
                    "ProductID": int(row["ProductID"]),
                    "ProductName": row["ProductName"],
                    "Category": row.get("Category"),
                    "Unitprice": float(row["Unitprice"])
                },
                "Quantity": int(row["Quantity"]),
                "TotalPrice": float(row["TotalPrice"])
            }
            retail.append(doc)

        # Step 6️⃣: Insert all records into MongoDB
        db["retail"].insert_many(retail)

        return jsonify({"message": f"{len(retail)} records inserted successfully"}), 201

    except Exception as e:
        return jsonify({"error": str(e)}), 404



In [8]:
def run_flask():
    app.run(port=5000,debug=False,use_reloader=False)
threading.Thread(target=run_flask).start()
time.sleep(1)    

 * Serving Flask app '__main__'
 * Debug mode: off


 * Running on http://127.0.0.1:5000
Press CTRL+C to quit


127.0.0.1 - - [02/Nov/2025 11:47:21] "POST /retail HTTP/1.1" 201 -
127.0.0.1 - - [02/Nov/2025 11:47:29] "GET /retail HTTP/1.1" 405 -


In [9]:
import requests
res=requests.post("http://127.0.0.1:5000/retail")
print("Post",res.text)

Post {"message":"200000 records inserted successfully"}

