In [5]:
from db.helpers import new_sales_collection
import pandas as pd
from datetime import datetime
from dateutil.relativedelta import relativedelta
from helpers.tables import industry_table, area_table
import numpy as np

In [6]:
keys = [
    "Weekday_Store_Sales",
    "Weekday_Delivery_Sales",
    "Weekend_Store_Sales",
    "Weekend_Delivery_Sales",
]


def calculate_growth(value1, value2):
    if value1 == 0:
        return None
    else:
        growth = (value2 - value1) / value1
        return growth


def group_sales(group_id, match):
    pipeline = [
        {
            "$match": {
                **match,
                "Level_1_Area": "Kuwait",
                "$or": [
                    {
                        "Weekday_Store_Sales": {"$ne": None},
                    },
                    {
                        "Weekend_Store_Sales": {"$ne": None},
                    },
                    {
                        "Weekday_Delivery_Sales": {"$ne": None},
                    },
                    {
                        "Weekend_Delivery_Sales": {"$ne": None},
                    },
                ],
            }
        },
        {
            "$group": {
                "_id": {**group_id, "year": "$Sales_Year", "month": "$Sales_Month"},
                "Weekday_Store_Sales": {"$sum": "$Weekday_Store_Sales"},
                "Weekday_Delivery_Sales": {"$sum": "$Weekday_Delivery_Sales"},
                "Weekend_Store_Sales": {"$sum": "$Weekend_Store_Sales"},
                "Weekend_Delivery_Sales": {"$sum": "$Weekend_Delivery_Sales"},
                "numberOfOutlets": {"$sum": 1},
            }
        },
        {"$sort": {"_id.year": 1, "_id.month": 1}},
    ]
    return new_sales_collection.aggregate(pipeline)


def generate_seasonality_record(base, data):
    result = {**base}
    keys = [
        "Weekday_Store_Sales",
        "Weekday_Delivery_Sales",
        "Weekend_Store_Sales",
        "Weekend_Delivery_Sales",
    ]
    for key in keys:
        if len(data) != 2:
            result[key] = None
            continue
        growth = calculate_growth(
            data[0][key] / data[0]["numberOfOutlets"],
            data[1][key] / data[1]["numberOfOutlets"],
        )
        if growth:
            if growth < 2 and growth > -1:
                result[key] = growth
    return result


def getDates(
    start_date: datetime = datetime(2018, 1, 1),
    end_date: datetime = datetime(2023, 12, 1),
):
    date = start_date
    while date <= end_date:
        yield date
        date += relativedelta(months=1)


def filter_sales(data: list, date_1: datetime, date_2: datetime):
    return [
        record
        for record in data
        if (
            record["_id"]["year"] == date_1.year
            and record["_id"]["month"] == date_1.month
        )
        or (
            record["_id"]["year"] == date_2.year
            and record["_id"]["month"] == date_2.month
        )
    ]

In [7]:
# remove for loop for years and moths and use getDates

generate Location Type Seasonality


In [8]:
location_types = new_sales_collection.distinct(
    "Location_Type", {"Location_Type": {"$nin": [0, None]}}
)
print(location_types)
_id = {"Location_Type": "$Location_Type"}
result = []
for i in location_types:
    for date in getDates():
        last_month = date - relativedelta(months=1)
        data = filter_sales(
            list(
                group_sales(
                    _id,
                    {
                        "Location_Type": i,
                        "Sales_Month": {"$in": [date.month, last_month.month]},
                        "Sales_Year": {"$in": [date.year, last_month.year]},
                    },
                )
            ),
            date,
            last_month,
        )
        result.append(
            generate_seasonality_record(
                {
                    "Location_Type": i,
                    "Sales_Year": date.year,
                    "Sales_Month": date.month,
                },
                data,
            )
        )
for record in result:
    for key in keys:
        if not key in record or record[key] == None:
            current_date = datetime(record["Sales_Year"], record["Sales_Month"], 1)
            last_month = current_date - relativedelta(months=1)
            all_locations_growth = filter_sales(
                list(
                    group_sales(
                        {},
                        {
                            "Sales_Month": {
                                "$in": [current_date.month, last_month.month]
                            },
                            "Sales_Year": {"$in": [current_date.year, last_month.year]},
                        },
                    )
                ),
                current_date,
                last_month,
            )
            if len(all_locations_growth) != 2:
                print("hello")
                continue
                raise Exception("all_locations_growth length issue")
            first_month = (
                all_locations_growth[0][key]
                / all_locations_growth[0]["numberOfOutlets"]
            )
            second_month = (
                all_locations_growth[1][key]
                / all_locations_growth[1]["numberOfOutlets"]
            )
            growth = calculate_growth(first_month, second_month)
            # Check next month, add all_locations_growth to next month growth
            record[key] = growth
location_type_df = pd.DataFrame(result)

['Airport', 'Bus Station', 'Business', 'Car Dealer', 'Community Services', 'Cooperative', 'Cultural', 'Education', 'Entertainment', 'Exhibition', 'Food and Beverage (F&B)', 'Gas Station', 'Government', 'Healthcare', 'Hotel', 'Hypermarket', 'Industrial', 'Military', 'Mixed Use Development', 'Mixed use Development', 'Palace', 'Park', 'Residential', 'Resort', 'Rest Area', 'Restail Strip', 'Retail Complex', 'Retail Mall', 'Retail Strip', 'Sports', 'Standalone Location', 'Supermarket', 'Warehouse']
Airport
Bus Station
Business
Car Dealer
Community Services
Cooperative
Cultural
Education
Entertainment
Exhibition
Food and Beverage (F&B)
Gas Station
Government
Healthcare
Hotel
Hypermarket
Industrial
Military
Mixed Use Development
Mixed use Development
Palace
Park
Residential
Resort
Rest Area
Restail Strip
Retail Complex
Retail Mall
Retail Strip
Sports
Standalone Location
Supermarket
Warehouse


generate Products Seasonality


In [9]:
products_types = new_sales_collection.distinct(
    "Product_Focus", {"Level_1_Area": "Kuwait", "Product_Focus": {"$ne": 0}}
)
_id = {"Product_Focus": "$Product_Focus"}
result = []
for i in products_types:
    for date in getDates():
        last_month = date - relativedelta(months=1)
        data = filter_sales(
            list(
                group_sales(
                    _id,
                    {
                        "Product_Focus": i,
                        "Sales_Month": {"$in": [date.month, last_month.month]},
                        "Sales_Year": {"$in": [date.year, last_month.year]},
                    },
                )
            ),
            date,
            last_month,
        )
        result.append(
            generate_seasonality_record(
                {
                    "Product_Focus": i,
                    "Sales_Year": date.year,
                    "Sales_Month": date.month,
                },
                data,
            )
        )
for record in result:
    for key in keys:
        if key not in record or record[key] == None:
            current_date = datetime(record["Sales_Year"], record["Sales_Month"], 1)
            last_month = current_date - relativedelta(months=1)
            all_locations_growth = filter_sales(
                list(
                    group_sales(
                        {},
                        {
                            "Sales_Month": {
                                "$in": [current_date.month, last_month.month]
                            },
                            "Sales_Year": {"$in": [current_date.year, last_month.year]},
                        },
                    )
                ),
                current_date,
                last_month,
            )
            if len(all_locations_growth) != 2:
                # raise Exception("all_locations_growth length issue")
                continue
            first_month = (
                all_locations_growth[0][key]
                / all_locations_growth[0]["numberOfOutlets"]
            )
            second_month = (
                all_locations_growth[1][key]
                / all_locations_growth[1]["numberOfOutlets"]
            )
            growth = calculate_growth(first_month, second_month)
            # Check next month, add all_locations_growth to next month growth
            record[key] = growth
product_focus_df = pd.DataFrame(result)

generate Area Seasonality


In [None]:
areas = new_sales_collection.distinct("Level_3_Area", {"Level_1_Area": "Kuwait"})
_id = {"Level_3_Area": "$Level_3_Area"}
result = []
for i in areas:
    for date in getDates():
        last_month = date - relativedelta(months=1)
        data = filter_sales(
            list(
                group_sales(
                    _id,
                    {
                        "Level_3_Area": i,
                        "Sales_Month": {"$in": [date.month, last_month.month]},
                        "Sales_Year": {"$in": [date.year, last_month.year]},
                    },
                )
            ),
            date,
            last_month,
        )
        result.append(
            generate_seasonality_record(
                {"Level_3_Area": i, "Sales_Year": date.year, "Sales_Month": date.month},
                data,
            )
        )
# check growth for level 2 area
for record in result:
    for key in keys:
        if key not in record or record[key] == None:
            area_level_2 = area_table[record["Level_3_Area"]]
            current_date = datetime(record["Sales_Year"], record["Sales_Month"], 1)
            last_month = current_date - relativedelta(months=1)
            all_locations_growth = filter_sales(
                list(
                    group_sales(
                        {"Level_2_Area": "$Level_2_Area"},
                        {
                            "Level_2_Area": area_level_2,
                            "Sales_Month": {
                                "$in": [current_date.month, last_month.month]
                            },
                            "Sales_Year": {"$in": [current_date.year, last_month.year]},
                        },
                    )
                ),
                current_date,
                last_month,
            )
            if len(all_locations_growth) != 2:
                continue
                # raise Exception("all_locations_growth length issue")
            first_month = (
                all_locations_growth[0][key]
                / all_locations_growth[0]["numberOfOutlets"]
            )
            second_month = (
                all_locations_growth[1][key]
                / all_locations_growth[1]["numberOfOutlets"]
            )
            growth = calculate_growth(first_month, second_month)
            # Check next month, add all_locations_growth to next month growth
            record[key] = growth
area_df = pd.DataFrame(result)

generate Industry Type Seasonality


In [None]:
industry = new_sales_collection.distinct(
    "Industry_Level_2", {"Level_1_Area": "Kuwait", "Industry_Level_2": {"$ne": 0}}
)
_id = {"Industry_Level_2": "Industry_Level_2"}
result = []


def group_sales_2(group_id, match, industry):
    pipeline = [
        {
            "$match": {
                **match,
                "Level_1_Area": "Kuwait",
                "Monthly_Sales": {"$nin": [None, 0]},
            }
        },
        {
            "$lookup": {
                "from": "Brands",
                "localField": "Brand",
                "foreignField": "Brand_Name_English",
                "as": "brand",
                "pipeline": [
                    {
                        "$match": {
                            "Industry_Level_1": industry,
                        },
                    },
                ],
            }
        },
        {"$match": {"brand.0": {"$exists": True}}},
        {
            "$group": {
                "_id": {
                    **group_id,
                    "year": "$Sales_Year",
                    "month": "$Sales_Month",
                },
                "Weekday_Store_Sales": {"$sum": "$Weekday_Store_Sales"},
                "Weekday_Delivery_Sales": {"$sum": "$Weekday_Delivery_Sales"},
                "Weekend_Store_Sales": {"$sum": "$Weekend_Store_Sales"},
                "Weekend_Delivery_Sales": {"$sum": "$Weekend_Delivery_Sales"},
                "numberOfOutlets": {"$sum": 1},
            }
        },
        {"$sort": {"_id.year": 1, "_id.month": 1}},
    ]
    try:
        return new_sales_collection.aggregate(pipeline)
    except:
        print(pipeline)
        raise Exception("group_sales_2 error")


for i in industry:
    for date in getDates():
        last_month = date - relativedelta(months=1)
        data = filter_sales(
            list(
                group_sales(
                    _id,
                    {
                        "Industry_Level_2": i,
                        "Sales_Month": {"$in": [date.month, last_month.month]},
                        "Sales_Year": {"$in": [date.year, last_month.year]},
                    },
                )
            ),
            date,
            last_month,
        )
        result.append(
            generate_seasonality_record(
                {
                    "Industry_Level_2": i,
                    "Sales_Year": date.year,
                    "Sales_Month": date.month,
                },
                data,
            )
        )

for record in result:
    for key in keys:
        if key not in record or record[key] == None:
            industry = industry_table.get(record.get("Industry_Level_2", None), None)
            current_date = datetime(record["Sales_Year"], record["Sales_Month"], 1)
            last_month = current_date - relativedelta(months=1)
            all_locations_growth = filter_sales(
                list(
                    group_sales_2(
                        {},
                        {
                            "Sales_Month": {
                                "$in": [current_date.month, last_month.month]
                            },
                            "Sales_Year": {"$in": [current_date.year, last_month.year]},
                        },
                        industry,
                    )
                ),
                current_date,
                last_month,
            )
            if len(all_locations_growth) != 2:
                continue
                # raise Exception("all_locations_growth length issue")
            first_month = (
                all_locations_growth[0][key]
                / all_locations_growth[0]["numberOfOutlets"]
            )
            second_month = (
                all_locations_growth[1][key]
                / all_locations_growth[1]["numberOfOutlets"]
            )
            growth = calculate_growth(first_month, second_month)
            # Check next month, add all_locations_growth to next month growth
            record[key] = growth
industry_df = pd.DataFrame(result)

In [None]:
with pd.ExcelWriter("seasonalities.xlsx", engine="xlsxwriter") as writer:
    location_type_df.to_excel(writer, sheet_name="location_type", index=False)
    product_focus_df.to_excel(writer, sheet_name="product_focus", index=False)
    area_df.to_excel(writer, sheet_name="area", index=False)
    industry_df.to_excel(writer, sheet_name="industry", index=False)