In [14]:
import os
import sys
import time
import json
import pymongo
import pymssql
import numpy as np
import pandas as pd
from tqdm import tqdm
from bson import ObjectId
from datetime import datetime
from dotenv import load_dotenv
from sqlalchemy import create_engine

load_dotenv()


True

## Set the start and end date for the output

In [15]:
start_date = None
end_date = None
limit = None

## Database Connection

In [16]:
# ------------------------ Connect to Botit-Production Database ---------------------- #
mongo_connection_string = os.getenv("botit_mongo_connection_string")
if mongo_connection_string:
    mongo_client = pymongo.MongoClient(mongo_connection_string)
    mongo_db = mongo_client["botitprod"]
    print("MongoDB connection string retrieved successfully.")
else:
    raise ValueError("MongoDB connection string is missing. Check environment variables.")

MongoDB connection string retrieved successfully.


## Helper Functions

In [None]:

def aggregate_mongo(collection_name, pipeline):
    print(f"Fetching data from collection: {collection_name}...")
    data = list(mongo_db[collection_name].aggregate(pipeline))
    print(f"Fetched {len(data)} records.")
    return data

def load_pipeline(json_file, key_name):
    with open(json_file, "r", encoding="utf-8") as file:
        aggregation_data = json.load(file)
    pipeline_info = aggregation_data.get(key_name, {})
    return pipeline_info.get("collection", ""), pipeline_info.get("aggregation_pipeline", [])

def convert_objectid_to_str(df):
    print("Converting ObjectId fields to string...")
    for col in tqdm(df.columns, desc="Processing columns"):
        if df[col].apply(lambda x: isinstance(x, ObjectId)).any():
            df[col] = df[col].astype(str)
    return df

def add_date_filter(pipeline, start_date=None, end_date=None):
    if start_date and end_date:
        print(f"Filtering data from {start_date} to {end_date}...")
        return [{"$match": {"createdAt": {"$gte": start_date, "$lt": end_date}}}] + pipeline
    return pipeline

def fetch_and_process_data(json_file, key_name, df_name, start_date=None, end_date=None, limit=None):
    collection_name, pipeline = load_pipeline(json_file, key_name)
    if collection_name and pipeline:
        pipeline = add_date_filter(pipeline, start_date, end_date)

        print(f"Starting aggregation for: {key_name}")
        data = aggregate_mongo(collection_name, pipeline)

        if limit:
            data = data[:limit] 

        df = pd.DataFrame(data)
        if not df.empty:
            df = convert_objectid_to_str(df)
        os.makedirs("CSVs", exist_ok=True)
        csv_path = os.path.join("CSVs", f"{df_name}.csv")
        df.to_csv(csv_path, index=False)
        print(f"Saved CSV at: {csv_path}")

        globals()[df_name] = df
    else:
        print(f"Skipping {key_name}: No collection or pipeline found.")

print("ETL Process Started...")
fetch_and_process_data("aggregation_pipelines.json", "bestselleritems", "bestseller_df", start_date, end_date, limit)

ETL Process Started...
Starting aggregation for: bestselleritems
Fetching data from collection: Orders...


In [None]:
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns

file_path = "CSVs/bestseller_df.csv"
bestseller_df = pd.read_csv(file_path)


bestseller_df.fillna("Unknown", inplace=True)

bestseller_df["price"] = pd.to_numeric(bestseller_df["price"], errors='coerce')

bestseller_df["total_sales"] = bestseller_df["totalQuantitySold"] * bestseller_df["price"]

bestseller_df = bestseller_df.reset_index(drop=True)

In [None]:
bestseller_df

Unnamed: 0,_id,totalQuantitySold,price,itemName,vendorId,vendorName,vendorShoppingCategory,itemCategory,itemSubcategory,total_sales
0,66162e63abfbffc7370229d6,394,17.04,Falafel Original Sandwich,621b49651090b1b7de045163,Tabali,restaurants,restaurants,middle eastern,6713.76
1,65534212c0838afd080f3e66,264,29.00,Elite Still 500 ml,65534034dd9ae512e9018c42,Quanta Egypt,specialityfood,groceries,specialty foods,7656.00
2,66d034f59edc66c3f207a18a,263,106.12,Spanish Latte (Ice Cubes),655e2702434f29ba1d0787a2,Social Specialty Coffee,restaurants,restaurants,cafes,27909.56
3,663cab993c939e6abf0807ef,239,80.00,"Avocado, Kiwi & Apple",663ca48568d17a713d0965d4,Little Chefo,specialityfood,kids,baby care,19120.00
4,66162e42abfbffc737022963,227,15.90,Foul Original Sandwich,621b49651090b1b7de045163,Tabali,restaurants,restaurants,middle eastern,3609.30
...,...,...,...,...,...,...,...,...,...,...
32910,64f9c0cad3b97abc570ece29,1,829.00,LA ROCHE LIPIKAR BAUME AP+M T.A.BALM200M,617e56a11d70a776319e8b31,Elezaby,pharmacies,beauty,skincare,829.00
32911,65e82c3c8c57eae6050e01ba,1,2500.00,Pink Flower Lamp,65e8278d9f6be6b0b107d606,Rabbit Hole,home_garden,home and garden,lighting,2500.00
32912,65797af7dfa7cc66e009649c,1,30.00,Snowflake Cookies Pkt,623ae7e62c677968d145090a,Nola Bakery,restaurants,Unknown,Unknown,30.00
32913,641c0d626a8b7f054c0fc7e8,1,1650.00,Apple USB-C to Magsafe 3 Cable (2 m),641c07c7f659bc633406a212,Tradeline,electronics,Unknown,Unknown,1650.00


### Best-selling item per Vendor

In [None]:
best_selling_per_vendor = bestseller_df.loc[bestseller_df.groupby("vendorId")["totalQuantitySold"].idxmax()]
best_selling_per_vendor = best_selling_per_vendor.sort_values(by="totalQuantitySold", ascending=False).reset_index(drop=True)
best_selling_per_vendor


Unnamed: 0,_id,totalQuantitySold,price,itemName,vendorId,vendorName,vendorShoppingCategory,itemCategory,itemSubcategory,total_sales
0,66162e63abfbffc7370229d6,394,17.04,Falafel Original Sandwich,621b49651090b1b7de045163,Tabali,restaurants,restaurants,middle eastern,6713.76
1,65534212c0838afd080f3e66,264,29.00,Elite Still 500 ml,65534034dd9ae512e9018c42,Quanta Egypt,specialityfood,groceries,specialty foods,7656.00
2,66d034f59edc66c3f207a18a,263,106.12,Spanish Latte (Ice Cubes),655e2702434f29ba1d0787a2,Social Specialty Coffee,restaurants,restaurants,cafes,27909.56
3,663cab993c939e6abf0807ef,239,80.00,"Avocado, Kiwi & Apple",663ca48568d17a713d0965d4,Little Chefo,specialityfood,kids,baby care,19120.00
4,6639ecf9d095ff25380ec079,186,158.46,Brown Sugar Milk Tea,658187c46ba9c21fc500a978,Pao,restaurants,restaurants,juices and drinks,29473.56
...,...,...,...,...,...,...,...,...,...,...
1325,64cf8eed3f8aa0f8b90cf1e6,1,969.00,Skin Transformation Routine,64cf6c1b9e3f4e4916041c63,Flawless Cosmetics,beauty,beauty,skincare,969.00
1326,64ca4251a6d41c968503d743,1,2500.00,Cotton Set,64ca347519dddec6e3076f72,Rama Haute Couture,fashion,fashion,designer wear,2500.00
1327,64e4796a7fa60ebabc0ef1ea,1,,Looli Earrings,64ca341bcca9ec39c3021a62,Sandbox Jewelry,fashion,fashion,jewelry,
1328,64c7999f03c941c1a00b3218,1,800.00,Boho Beige Pants,64c6560479ce9700720a7092,Boho Stitch,fashion,fashion,casual wear,800.00


### Best-selling item per items Category

In [None]:
best_selling_per_category = bestseller_df.loc[bestseller_df.groupby("itemCategory")["totalQuantitySold"].idxmax()].reset_index(drop=True)
best_selling_per_category = best_selling_per_category.sort_values(by="totalQuantitySold", ascending=False).reset_index(drop=True)
best_selling_per_category

Unnamed: 0,_id,totalQuantitySold,price,itemName,vendorId,vendorName,vendorShoppingCategory,itemCategory,itemSubcategory,total_sales
0,66162e63abfbffc7370229d6,394,17.04,Falafel Original Sandwich,621b49651090b1b7de045163,Tabali,restaurants,restaurants,middle eastern,6713.76
1,65534212c0838afd080f3e66,264,29.0,Elite Still 500 ml,65534034dd9ae512e9018c42,Quanta Egypt,specialityfood,groceries,specialty foods,7656.0
2,663cab993c939e6abf0807ef,239,80.0,"Avocado, Kiwi & Apple",663ca48568d17a713d0965d4,Little Chefo,specialityfood,kids,baby care,19120.0
3,65d761826e3a025d9b096bed,142,108.29,Spanish Latte (Ice Cubes),655e2702434f29ba1d0787a2,Social Specialty Coffee,restaurants,Unknown,Unknown,15377.18
4,655b5555578bed1f0b0eb9de,120,349.0,Deep Conditioning Hair Mask,625be2e57e23d29dfcd48bec,Braes Hair & Skin,beauty,beauty,haircare,41880.0
5,65534245c0838afd080f4133,102,103.0,Z - Pump + Racks: Pump 1 l for Glass Bottle,65534034dd9ae512e9018c42,Quanta Egypt,specialityfood,home and garden,drinkware,10506.0
6,66cd9008682d240d4376be1b,73,750.0,Clogs - Coffe Latte,63a438f8d5e32028840e9107,Bou Eg,fashion,fashion,footwear,54750.0
7,6542573b8f9b2d96162f7e9e,43,9.0,MINALAX 10/TAB(NEW),617e56a11d70a776319e8b31,Elezaby,pharmacies,pharmacies,medicine,387.0
8,670e31cc616416cc8cb47192,30,40.0,Best Pet Adult Cat Beef - Pouch 85G,65364ef1facd736dff05e602,Zima Pets Center,pet_care,pet care,cats,1200.0
9,66c1ab221fdb5b1de70c3925,27,45.0,"Clay Modeling Dough, Kl-500b, White, 500 gr, R...",66829d33575844dae90f53be,Bakier Stationery,stationary,stationary,arts and crafts,1215.0


### Best-selling item per items Subcategory

In [None]:
best_selling_per_subcategory = bestseller_df.loc[bestseller_df.groupby("itemSubcategory")["totalQuantitySold"].idxmax()].reset_index(drop=True)
best_selling_per_subcategory = best_selling_per_subcategory.sort_values(by="totalQuantitySold", ascending=False).reset_index(drop=True)
best_selling_per_subcategory

Unnamed: 0,_id,totalQuantitySold,price,itemName,vendorId,vendorName,vendorShoppingCategory,itemCategory,itemSubcategory,total_sales
0,66162e63abfbffc7370229d6,394,17.04,Falafel Original Sandwich,621b49651090b1b7de045163,Tabali,restaurants,restaurants,middle eastern,6713.76
1,65534212c0838afd080f3e66,264,29.00,Elite Still 500 ml,65534034dd9ae512e9018c42,Quanta Egypt,specialityfood,groceries,specialty foods,7656.00
2,66d034f59edc66c3f207a18a,263,106.12,Spanish Latte (Ice Cubes),655e2702434f29ba1d0787a2,Social Specialty Coffee,restaurants,restaurants,cafes,27909.56
3,663cab993c939e6abf0807ef,239,80.00,"Avocado, Kiwi & Apple",663ca48568d17a713d0965d4,Little Chefo,specialityfood,kids,baby care,19120.00
4,6639ecf9d095ff25380ec079,186,158.46,Brown Sugar Milk Tea,658187c46ba9c21fc500a978,Pao,restaurants,restaurants,juices and drinks,29473.56
...,...,...,...,...,...,...,...,...,...,...
113,6775e6277c14e56bd4b707da,1,2300.00,Ballon Comfy Women Set - Burgundy,65c8a409f804343c070f7e84,NAS Trends,fashion,fashion,outerwear,2300.00
114,66019e0b5d39131f530fcfe8,1,200.00,Canped 90*60 cm 30 Pcs Bed Protection,653137768a57f4ac2201ac36,Tag Eldin Pharmacy,pharmacies,pharmacies,incontinence,200.00
115,65e8711648cb9b3ad1044c24,1,135.00,Creamy Chicken Curry & White Rice,65e86a2eda7252e4dd0c289e,Insta Chef,groceries,restaurants,indian,135.00
116,646a2e5cae0ee4af78423181,1,899.00,Embroidered Pants Set,62f5013e3736e3635ce1b425,Potato Head,kids,fashion,kids wear,899.00


## Orders Gross Revenue

In [None]:
orders_collection = mongo_db["Orders"]

In [None]:
def get_revenue(start_date=None, end_date=None, match_filter=None):
    pipeline = [
        {"$addFields": {"lastStatus": {"$arrayElemAt": ["$status.name", -1]}}}
    ]

    if start_date and end_date:
        pipeline.append({"$match": {"createdAt": {"$gte": start_date, "$lt": end_date}}})

    if match_filter:
        pipeline.append({"$match": match_filter})

    pipeline += [
        {
            "$project": {
                "price": {
                    "$cond": {
                        "if": {"$regexMatch": {"input": {"$ifNull": ["$price.total", ""]}, "regex": "^[0-9]+(\\.[0-9]+)?$"}},
                        "then": {"$toDouble": "$price.total"},
                        "else": 0,
                    }
                }
            }
        },
        {"$group": {"_id": None, "totalRevenue": {"$sum": "$price"}}},
    ]

    result = list(orders_collection.aggregate(pipeline))
    return result[0]["totalRevenue"] if result else 0

def print_revenue(start_date=None, end_date=None):
    delivered_revenue = get_revenue(start_date, end_date, {"lastStatus": "Delivered"})
    total_revenue = get_revenue(start_date, end_date)
    undelivered_revenue = total_revenue - delivered_revenue
    undelivered_percentage = (undelivered_revenue / total_revenue) * 100 if total_revenue else 0
    delivered_percentage = 100 - undelivered_percentage

    date_range_str = f" ({start_date.strftime('%d/%m/%Y')} - {end_date.strftime('%d/%m/%Y')})" if start_date and end_date else ""

    print(f"Total Revenue (Delivered Orders){date_range_str}: {delivered_revenue:,.0f} EGP")
    print("—" * 100)
    print(f"Total Revenue (All Orders){date_range_str}: {total_revenue:,.0f} EGP")
    print("—" * 100)
    print(f"Revenue Gap (Undelivered Orders): {undelivered_revenue:,.0f} EGP ({undelivered_percentage:.1f}% of total revenue)")
    print(f"Delivery Success Rate: {delivered_percentage:.1f}% of total revenue comes from delivered orders, indicating that nearly {undelivered_percentage:.1f}% of revenue is tied to undelivered orders.")


In [None]:
orders_gross_revenue = print_revenue(start_date, end_date)
orders_gross_revenue

Total Revenue (Delivered Orders): 21,177,065 EGP
————————————————————————————————————————————————————————————————————————————————————————————————————
Total Revenue (All Orders): 31,258,241 EGP
————————————————————————————————————————————————————————————————————————————————————————————————————
Revenue Gap (Undelivered Orders): 10,081,176 EGP (32.3% of total revenue)
Delivery Success Rate: 67.7% of total revenue comes from delivered orders, indicating that nearly 32.3% of revenue is tied to undelivered orders.


## Order Status Distribution

In [None]:
def get_order_status_distribution(start_date=None, end_date=None):
    pipeline = []

    if start_date and end_date:
        pipeline.append({"$match": {"createdAt": {"$gte": start_date, "$lt": end_date}}})

    pipeline += [
        {
            "$set": {
                "lastStatus": {"$arrayElemAt": ["$status.name", -1]}
            }
        },
        {
            "$group": {
                "_id": "$lastStatus",
                "count": {"$sum": 1}
            }
        },
        {
            "$setWindowFields": {
                "output": {
                    "total": {"$sum": "$count", "window": {}}
                }
            }
        },
        {
            "$project": {
                "Status": "$_id",
                "Count": "$count",
                "Percentage": {
                    "$multiply": [{"$divide": ["$count", "$total"]}, 100]
                }
            }
        },
        {
            "$sort": {"Count": -1}
        }
    ]

    result = list(orders_collection.aggregate(pipeline))
    df = pd.DataFrame(result)
    df = df[["Status", "Count", "Percentage"]]
    df["Count"] = df["Count"].fillna(0).astype(int)
    df["Percentage"] = df["Percentage"].round(2)
    
    return df


In [None]:
df_order_status_distribution = get_order_status_distribution(start_date, end_date)
df_order_status_distribution

## Average Order Value (AOV)

In [None]:
def get_order_statistics(start_date=None, end_date=None):
    pipeline = []

    if start_date and end_date:
        pipeline.append({"$match": {"createdAt": {"$gte": start_date, "$lt": end_date}}})

    pipeline += [
        {
            "$addFields": {
                "lastStatus": {"$arrayElemAt": ["$status.name", -1]}
            }
        },
        {
            "$match": {
                "lastStatus": "Delivered"
            }
        },
        {
            "$lookup": {
                "from": "Vendors",
                "localField": "_vendor",
                "foreignField": "_id",
                "as": "vendor"
            }
        },
        {
            "$unwind": "$vendor"
        },
        {
            "$project": {
                "_id": 0,
                "device_platform": "$device.platform",
                "shoppingCategory": "$vendor.shoppingCategory",
                "price": {
                    "$cond": {
                        "if": {
                            "$regexMatch": {
                                "input": {"$ifNull": ["$price.total", ""]},
                                "regex": "^[0-9]+(\\.[0-9]+)?$"
                            }
                        },
                        "then": {"$toDouble": "$price.total"},
                        "else": 0
                    }
                }
            }
        },
        {
            "$group": {
                "_id": "$shoppingCategory",
                "totalOrders": {"$sum": 1},
                "totalRevenue": {"$sum": "$price"},
                "iosRevenue": {
                    "$sum": {
                        "$cond": [{"$eq": ["$device_platform", "ios"]}, "$price", 0]
                    }
                },
                "androidRevenue": {
                    "$sum": {
                        "$cond": [{"$eq": ["$device_platform", "android"]}, "$price", 0]
                    }
                }
            }
        },
        {
            "$sort": {"totalOrders": -1}
        }
    ]

    result = list(orders_collection.aggregate(pipeline))
    df = pd.DataFrame(result)
    df.rename(columns={"_id": "Shopping Category"}, inplace=True)
    df = df[["Shopping Category", "totalOrders", "totalRevenue"]]
    df["totalOrders"] = df["totalOrders"].astype(int)
    df["totalRevenue"] = df["totalRevenue"].round(2)
    df["AOV"] = (df["totalRevenue"] / df["totalOrders"]).round(2)
    return df


In [None]:
df_orders_statistics = get_order_statistics(start_date, end_date)
df_orders_statistics