In [1]:
from flask import Flask , request , jsonify
import pyodbc
import pymongo
from pymongo import MongoClient
from bson.objectid import ObjectId
import threading
import time
from datetime import datetime, date
from decimal import Decimal
import numpy as np
import pandas as pd 
import matplotlib.pyplot as plt
import seaborn as sns
import requests
import random
from faker import Faker

In [None]:
# SQL Server connection
def connect_db():
    return pyodbc.connect(
        "DRIVER={SQL Server};"
        'SERVER=localhost\\SQLEXPRESS;'
        'DATABASE=RetailSales;'       
        "Trusted_Connection=yes;"
    )

In [None]:
from pymongo import MongoClient

def get_mongo_collection(collection_name="Transactions"):
    """
    Connect to MongoDB and return a specific collection.
    Default = Transactions.
    """
    client = MongoClient("mongodb://localhost:27017/")
    db = client["RetailML"] 
    return db[collection_name]


In [4]:
app = Flask(__name__)

In [None]:
# ---------- Sync Endpoint ----------
@app.route('/sync-to-mongo', methods=['POST'])
def sync_to_mongo():
    try:
        # ==========================
        # Transactions Collection
        # ==========================
        transactions_query = """
        SELECT 
            c.CustomerID,
            c.Name AS CustomerName,
            c.Gender,
            c.Age,
            c.City AS CustomerCity,
            b.BranchID,
            b.BranchName,
            b.City AS BranchCity,
            s.SaleID,
            s.SaleDate,
            s.TotalAmount,
            sd.SaleDetailID,
            sd.Quantity,
            sd.UnitPrice,
            (sd.Quantity * sd.UnitPrice) AS TotalPrice,
            p.ProductID,
            p.ProductName,
            p.Category,
            p.Brand
        FROM Customers c
        JOIN Sales s ON c.CustomerID = s.CustomerID
        JOIN Branches b ON s.BranchID = b.BranchID
        JOIN SaleDetails sd ON s.SaleID = sd.SaleID
        JOIN Products p ON sd.ProductID = p.ProductID
        """

        conn = connect_db()
        df_tx = pd.read_sql(transactions_query, conn)

        # convert SaleDate to datetime
        df_tx["SaleDate"] = pd.to_datetime(df_tx["SaleDate"], errors="coerce")

        # convert to dict
        tx_records = df_tx.to_dict(orient="records")

        mongo_col_tx = get_mongo_collection("Transactions")
        mongo_col_tx.delete_many({})
        if tx_records:
            mongo_col_tx.insert_many(tx_records)

        # ==========================
        # Reviews Collection
        # ==========================
        reviews_query = """
        SELECT 
            r.ReviewID,
            r.CustomerID,
            c.Name AS CustomerName,
            r.ProductID,
            p.ProductName,
            r.SaleID,
            r.Rating,
            r.ReviewText,
            r.ReviewDate
        FROM Reviews r
        JOIN Customers c ON r.CustomerID = c.CustomerID
        JOIN Products p ON r.ProductID = p.ProductID
        """

        df_rev = pd.read_sql(reviews_query, conn)
        conn.close()

        positive_reviews = [
            "Absolutely loved this product, highly recommend!",
            "Exceeded my expectations, great quality.",
            "Fast delivery and amazing packaging.",
            "Worth every penny, I will buy again.",
            "Fantastic experience overall!"
        ]

        neutral_reviews = [
            "The product is okay, nothing special.",
            "Average experience, could be better.",
            "Not bad, but not great either.",
            "It works fine, but I expected more.",
            "Neutral feelings, it just does the job."
        ]

        negative_reviews = [
            "Terrible quality, broke after one use.",
            "Not worth the price at all.",
            "Very disappointing experience.",
            "The product did not meet expectations.",
            "Customer service was unhelpful and slow."
        ]

        fake = Faker()

        for index, row in df_rev.iterrows():
            rating = int(row["Rating"])
            if  rating >= 4:
                text = random.choice(positive_reviews) + " " + fake.sentence(nb_words=6)
            elif rating == 3:
                text = random.choice(neutral_reviews) + " " + fake.sentence(nb_words=6)
            else:
                text = random.choice(negative_reviews) + " " + fake.sentence(nb_words=6)
            
            df_rev.loc[index, "ReviewText"] = text


        df_rev["ReviewDate"] = pd.to_datetime(df_rev["ReviewDate"], errors="coerce")

        # very simple sentiment label: rating >= 4 → positive, 3 → neutral, else → negative
        def label_sentiment(rating):
            if rating >= 4:
                return "positive"
            elif rating == 3:
                return "neutral"
            else:
                return "negative"

        df_rev["Sentiment"] = df_rev["Rating"].apply(label_sentiment)

        rev_records = df_rev.to_dict(orient="records")

        mongo_col_rev = get_mongo_collection("Reviews")
        mongo_col_rev.delete_many({})
        if rev_records:
            mongo_col_rev.insert_many(rev_records)

        return jsonify({
            "message": f"{len(rev_records)} reviews into 'Reviews' collection"
        }), 201

    except Exception as e:
        return jsonify({"error": str(e)}), 500

# Code to Run the Flask API 
def run_flask():
    app.run(port=5000, debug=False, use_reloader=False)

# Code to Run the Flask API in a separate thread in the background  
threading.Thread(target=run_flask).start()
time.sleep(1)

 * Serving Flask app '__main__'
 * Debug mode: off


 * Running on http://127.0.0.1:5000
Press CTRL+C to quit


  df_rev = pd.read_sql(reviews_query, conn)
127.0.0.1 - - [27/Sep/2025 22:14:44] "POST /sync-to-mongo HTTP/1.1" 201 -


In [6]:
# URL of your Flask API endpoint
url = "http://127.0.0.1:5000/sync-to-mongo"

try:
    # Make POST request to trigger the sync
    response = requests.post(url)

    # Print status and JSON response
    print("Status Code:", response.status_code)
    print("Response JSON:", response.json())

except requests.exceptions.RequestException as e:
    print("❌ Request failed:", e)


Status Code: 201
Response JSON: {'message': "10000 reviews into 'Reviews' collection"}
