In [1]:
# 1. Imports
import pandas as pd
import numpy as np
import dask.dataframe as dd
import os

# 2. Paths
raw_path = "../data/raw/BigBasket_Products.csv"
processed_dir = "../data/processed"
split_dir = os.path.join(processed_dir, "products")

os.makedirs(processed_dir, exist_ok=True)
os.makedirs(split_dir, exist_ok=True)

# 3. Load raw data with Dask (explicit dtypes)
products = dd.read_csv(
    raw_path,
    dtype={'sale_price': 'float64', 'market_price': 'float64'},
    assume_missing=True
).compute()

# 4. Add product_id
products['product_id'] = ['BB' + str(i+1).zfill(4) for i in range(len(products))]

# 5. Select essential columns
products = products[['product_id', 'product', 'category', 'sub_category', 
                     'brand', 'sale_price', 'market_price', 'type', 'rating']]

# 6. Create 2 years of daily dates
dates = pd.date_range(start="2023-01-01", end="2024-12-31", freq='D')
n_dates = len(dates)

# 7. Simulate daily sales (vectorized)
np.random.seed(42)
seasonal_factor = np.sin(np.linspace(0, 6*np.pi, n_dates)) + 2

data_list = []

for idx, row in products.iterrows():
    base_sales = np.random.randint(10, 80, size=n_dates)
    sales = (base_sales * seasonal_factor).astype(int)

    df_temp = pd.DataFrame({
        'date': dates,
        'product_id': row['product_id'],
        'product': row['product'],
        'category': row['category'],
        'sub_category': row['sub_category'],
        'brand': row['brand'],
        'sale_price': row['sale_price'],
        'market_price': row['market_price'],
        'type': row['type'],
        'rating': row['rating'],
        'units_sold': sales
    })

    data_list.append(df_temp)

# 8. Concatenate all product sales
sales_df = pd.concat(data_list, ignore_index=True)

# 9. Save single merged CSV (for FastAPI/Streamlit)
sales_df.to_csv(os.path.join(processed_dir, "historical_sales.csv"), index=False)

# 10. Save partitioned per-product CSV (fast groupby)
for pid, pdf in sales_df.groupby("product_id"):
    pdf.to_csv(f"{split_dir}/{pid}.csv", index=False)

print(f"Saved merged file and {len(products)} per-product files to {processed_dir}")


Saved merged file and 27555 per-product files to ../data/processed
