# Instacart-Style Data Preparation

This notebook follows the same staged approach as the Instacart Basket Analysis scripts: systematic ingestion, cleaning, unification, and export of curated parquet datasets. Source files live in `Data/` and include store, web, and promo extracts.

**What this notebook does**
- Inspect XLSB files (sheets, headers) to align schemas.
- Load raw promo, store, and web sales data.
- Clean and standardize columns (dates, codes, strings, numeric types).
- Merge sales across channels and optionally align with promo periods.
- Persist clean parquet/CSV outputs for downstream analytics or ML.

In [1]:
from pathlib import Path
import pandas as pd
import numpy as np
import duckdb
from pyxlsb import open_workbook
import pyarrow as pa
import pyarrow.parquet as pq
import pyarrow.dataset as ds

pd.set_option("display.max_columns", 50)
pd.set_option("display.width", 120)

# Project paths
PROJECT_ROOT = Path.cwd().parent  # notebook is stored in notebooks/
DATA_DIR = PROJECT_ROOT / "Data"
OUTPUT_DIR = PROJECT_ROOT / "tmp" / "processed"
OUTPUT_DIR.mkdir(parents=True, exist_ok=True)

DATA_DIR, OUTPUT_DIR

(PosixPath('/Users/Glebazzz/MMS/Data'),
 PosixPath('/Users/Glebazzz/MMS/tmp/processed'))

In [2]:
def excel_serial_to_datetime(series: pd.Series) -> pd.Series:
    """Convert Excel serial dates to pandas datetime, handling pre-parsed datetimes."""
    if pd.api.types.is_datetime64_any_dtype(series):
        return pd.to_datetime(series, errors="coerce")
    numeric = pd.to_numeric(series, errors="coerce")
    return pd.to_datetime(numeric, unit="D", origin="1899-12-30", errors="coerce")


def standardize_columns(df: pd.DataFrame) -> pd.DataFrame:
    """Normalize column names to snake_case and strip spaces."""
    df = df.copy()
    df.columns = (
        df.columns.str.strip()
        .str.lower()
        .str.replace(" ", "_", regex=False)
        .str.replace("/", "_", regex=False)
    )
    return df


def read_xlsb(path: Path, sheet: str | None = None) -> pd.DataFrame:
    """Read an XLSB file via pandas/pyxlsb."""
    return pd.read_excel(path, sheet_name=sheet or 0, engine="pyxlsb")


def inspect_xlsb(path: Path) -> dict:
    """Quickly inspect sheet names, headers, and row count without loading all data."""
    with open_workbook(path) as wb:
        sheets = wb.sheets
        first = sheets[0]
        with wb.get_sheet(first) as sh:
            headers = [c.v for c in next(sh.rows())]
            row_count = sum(1 for _ in sh.rows())
    return {"path": path.name, "sheets": sheets, "headers": headers, "rows": row_count}


def clean_sales_df(df: pd.DataFrame, channel: str, has_b2b: bool = False) -> pd.DataFrame:
    """Standardize sales data across store and web feeds."""
    df = standardize_columns(df)
    # Harmonize column names that differ between sources
    rename_map = {
        "sales_qty_anon": "sales_qty",
        "sales_value_anon": "sales_value",
        "margin_value_anon": "margin_value",
    }
    df = df.rename(columns=rename_map)
    # Convert dates from Excel serials
    df["date"] = excel_serial_to_datetime(df["date"])
    if "date_start_promo" in df.columns:
        df["date_start_promo"] = excel_serial_to_datetime(df["date_start_promo"])
    if "date_end_promo" in df.columns:
        df["date_end_promo"] = excel_serial_to_datetime(df["date_end_promo"])

    # Trim strings and unify types
    str_cols = [
        "channel",
        "department",
        "desc_group",
        "desc_subgroup",
        "desc_1_sku",
        "desc_2_sku",
        "desc_1_supplier",
        "desc_2_supplier",
        "brand",
    ]
    for col in str_cols:
        if col in df.columns:
            df[col] = df[col].astype(str).str.strip()

    df["channel"] = channel  # enforce channel label
    if not has_b2b:
        df["b2b"] = "NO"

    numeric_cols = [
        "code_department",
        "code_group",
        "code_subgroup",
        "cod_sku",
        "cod_supplier",
        "sales_qty",
        "sales_value",
        "margin_value",
    ]
    for col in numeric_cols:
        if col in df.columns:
            df[col] = pd.to_numeric(df[col], errors="coerce")

    return df


def clean_promo_df(df: pd.DataFrame) -> pd.DataFrame:
    """Prepare promo feed for merges."""
    df = standardize_columns(df)
    df["date"] = excel_serial_to_datetime(df["date"])
    df["date_start_promo"] = excel_serial_to_datetime(df.get("date_start_promo"))
    df["date_end_promo"] = excel_serial_to_datetime(df.get("date_end_promo"))

    string_like = [
        "channel",
        "code_promo",
        "desc_promo",
        "type_of_promo",
        "department",
        "desc_group",
        "desc_subgroup",
        "desc_1_sku",
        "desc_2_sku",
        "desc_1_supplier",
        "desc_2_supplier",
        "brand",
    ]
    for col in string_like:
        if col in df.columns:
            df[col] = df[col].astype(str).str.strip()

    numeric_cols = [
        "code_department",
        "code_group",
        "code_subgroup",
        "cod_sku",
        "cod_supplier",
        "sales_qty_anon",
        "sales_value_anon",
        "margin_value_anon",
    ]
    for col in numeric_cols:
        if col in df.columns:
            df[col] = pd.to_numeric(df[col], errors="coerce")

    # Drop potential header repeats / empty rows
    df = df[df["date"].notna()].reset_index(drop=True)
    return df

In [3]:
# Stream XLSB to Parquet to avoid RAM blow-ups

def _stream_xlsb_chunks(path: Path, chunk_size: int = 50_000):
    """Yield DataFrame chunks from an XLSB sheet."""
    with open_workbook(path) as wb:
        sheet_name = wb.sheets[0]
        with wb.get_sheet(sheet_name) as sh:
            rows_iter = sh.rows()
            header = [c.v for c in next(rows_iter)]
            batch = []
            for row in rows_iter:
                batch.append([c.v for c in row])
                if len(batch) >= chunk_size:
                    yield pd.DataFrame(batch, columns=header)
                    batch = []
            if batch:
                yield pd.DataFrame(batch, columns=header)


def stream_sales_xlsb_to_parquet(path: Path, out_path: Path, channel: str, has_b2b: bool = False, chunk_size: int = 50_000) -> int:
    """Stream a sales XLSB file to Parquet in chunks; return rows written."""
    writer = None
    total = 0
    for raw_df in _stream_xlsb_chunks(path, chunk_size):
        cleaned = clean_sales_df(raw_df, channel=channel, has_b2b=has_b2b)
        table = pa.Table.from_pandas(cleaned, preserve_index=False)
        if writer is None:
            writer = pq.ParquetWriter(out_path, table.schema)
        writer.write_table(table)
        total += len(cleaned)
    if writer:
        writer.close()
    return total


def stream_promo_xlsb_to_parquet(path: Path, out_path: Path, chunk_size: int = 50_000) -> int:
    """Stream a promo XLSB file to Parquet in chunks; return rows written."""
    writer = None
    total = 0
    for raw_df in _stream_xlsb_chunks(path, chunk_size):
        cleaned = clean_promo_df(raw_df)
        table = pa.Table.from_pandas(cleaned, preserve_index=False)
        if writer is None:
            writer = pq.ParquetWriter(out_path, table.schema)
        writer.write_table(table)
        total += len(cleaned)
    if writer:
        writer.close()
    return total

In [4]:
# Stream all raw files to Parquet (idempotent; run once, then reuse Parquet)
parquet_dir = OUTPUT_DIR / "raw_parquet"
parquet_dir.mkdir(parents=True, exist_ok=True)

promo_parquet = parquet_dir / "promo_full.parquet"
store_parquets = {
    "Stores_October-January_FY25.xlsb": parquet_dir / "stores_oct_jan.parquet",
    "Stores_February-June_FY25.xlsb": parquet_dir / "stores_feb_jun.parquet",
    "Stores_July-September_FY25.xlsb": parquet_dir / "stores_jul_sep.parquet",
}
web_parquets = {
    "Web_October-January_FY25.xlsb": parquet_dir / "web_oct_jan.parquet",
    "Web_February-August_FY25.xlsb": parquet_dir / "web_feb_aug.parquet",
    "Web_September_FY25.xlsb": parquet_dir / "web_sep.parquet",
}

# Promo
rows_promo = stream_promo_xlsb_to_parquet(DATA_DIR / "Promo_October-September_FY25.xlsb", promo_parquet)
print(f"Promo rows written: {rows_promo:,}")

# Stores
rows_store = 0
for fname, out_pq in store_parquets.items():
    rows_store += stream_sales_xlsb_to_parquet(DATA_DIR / fname, out_pq, channel="STORES")
print(f"Store rows written total: {rows_store:,}")

# Web
rows_web = 0
for fname, out_pq in web_parquets.items():
    rows_web += stream_sales_xlsb_to_parquet(DATA_DIR / fname, out_pq, channel="WEB", has_b2b=True)
print(f"Web rows written total: {rows_web:,}")

Promo rows written: 481,477
Store rows written total: 2,617,146
Web rows written total: 2,094,259


In [5]:
# Load from Parquet (fast, memory-friendly scanning) and build combined sales
sales_parquet_paths = list(store_parquets.values()) + list(web_parquets.values())

# Arrow dataset for lazy scans
sales_ds = ds.dataset(sales_parquet_paths, format="parquet")
promo_ds = ds.dataset([promo_parquet], format="parquet")

# Materialize to pandas only when needed; otherwise keep as Arrow/Parquet.
sales_df = sales_ds.to_table().to_pandas()
promo_df = promo_ds.to_table().to_pandas()

# Continue with existing logic
sales_df["date_year"] = sales_df["date"].dt.year
sales_df["date_month"] = sales_df["date"].dt.to_period("M")

print("rows", len(sales_df))
print(sales_df.groupby("channel").size())
sales_df.head()

rows 4711405
channel
STORES    2617146
WEB       2094259
dtype: int64


Unnamed: 0,channel,date,code_department,department,code_group,desc_group,code_subgroup,desc_subgroup,cod_sku,desc_1_sku,desc_2_sku,cod_supplier,desc_1_supplier,desc_2_supplier,brand,sales_qty,sales_value,margin_value,b2b,date_year,date_month
0,STORES,2024-10-01,3.0,TV,3.0,LCD,122.0,TVC LCD FINO A 27P,173237.0,LG LCD24 24TQ510S BK,LED HD 5HDMI,20477.0,LG ELECTRONICS IT SPA,MONITOR TV,LG,15.0,2230.65,-123.33,NO,2024,2024-10
1,STORES,2024-10-01,3.0,TV,3.0,LCD,122.0,TVC LCD FINO A 27P,177909.0,LG LCD27 27TQ615S BK,LED FHD 2HDMI,20477.0,LG ELECTRONICS IT SPA,MONITOR TV,LG,12.0,2177.41,237.16,NO,2024,2024-10
2,STORES,2024-10-01,3.0,TV,3.0,LCD,122.0,TVC LCD FINO A 27P,178664.0,PHILIPS LCD24 24PHS5537,LED HD 2HDMI,21141.0,TECHNOIT SPA/PHILIPS,PHILIPS,PHILIPS,1.0,132.78,-13.02,NO,2024,2024-10
3,STORES,2024-10-01,3.0,TV,3.0,LCD,122.0,TVC LCD FINO A 27P,178665.0,PHILIPS LCD24 24PHS5507,LED HD 2HDMI,21141.0,TECHNOIT SPA/PHILIPS,PHILIPS,PHILIPS,0.0,-9.43,-9.43,NO,2024,2024-10
4,STORES,2024-10-01,3.0,TV,3.0,LCD,122.0,TVC LCD FINO A 27P,180463.0,LG LCD27 27TQ615S WH,LED FHD 2HDMI,20477.0,LG ELECTRONICS IT SPA,MONITOR TV,LG,10.0,1809.79,196.99,NO,2024,2024-10


In [6]:
# DuckDB aggregation directly on Arrow datasets (no full pandas load)
con = duckdb.connect()
con.register("sales_ds", sales_ds)
con.register("promo_ds", promo_ds)

# Row counts (lazy)
duckdb.query("SELECT COUNT(*) AS sales_rows FROM sales_ds").df()

# Monthly channel-level rollup (limited for display)
duckdb.query(
    """
    SELECT
      channel,
      date_part('year', date) AS year,
      date_trunc('month', date) AS month,
      SUM(sales_qty) AS qty,
      SUM(sales_value) AS sales_value,
      SUM(margin_value) AS margin
    FROM sales_ds
    GROUP BY 1, 2, 3
    ORDER BY 1, 2, 3
    LIMIT 50
    """
).df()

Unnamed: 0,channel,year,month,qty,sales_value,margin
0,STORES,2024,2024-10-01,1745154.0,146499000.0,-4336592.0
1,STORES,2024,2024-11-01,2205392.0,209429500.0,-18464300.0
2,STORES,2024,2024-12-01,2835758.0,228536400.0,-11689490.0
3,STORES,2025,2025-01-01,1678843.0,132795900.0,-2891506.0
4,STORES,2025,2025-02-01,1466751.0,116654500.0,-3072959.0
5,STORES,2025,2025-03-01,1581958.0,119087100.0,-3756544.0
6,STORES,2025,2025-04-01,1420352.0,110277300.0,-3580728.0
7,STORES,2025,2025-05-01,1528863.0,122678800.0,-7275515.0
8,STORES,2025,2025-06-01,1659073.0,131096100.0,-5880098.0
9,STORES,2025,2025-07-01,1762480.0,141755900.0,-7194380.0


In [7]:
# Join promos via DuckDB without loading all rows into pandas
sales_with_promo_out = OUTPUT_DIR / "sales_with_promo.parquet"

con.execute(
    f"""
    COPY (
      SELECT
        s.*,
        p.code_promo,
        p.desc_promo,
        p.type_of_promo,
        p.date_start_promo,
        p.date_end_promo,
        p.sales_qty_anon   AS promo_sales_qty,
        p.sales_value_anon AS promo_sales_value,
        p.margin_value_anon AS promo_margin_value
      FROM sales_ds s
      LEFT JOIN promo_ds p
        ON s.channel = p.channel
       AND s.cod_sku = p.cod_sku
       AND s.date BETWEEN p.date_start_promo AND p.date_end_promo
    ) TO '{sales_with_promo_out}' (FORMAT PARQUET);
    """
)

# Lightweight preview (no full load)
con.execute(f"SELECT * FROM read_parquet('{sales_with_promo_out}') LIMIT 20").df()

Unnamed: 0,channel,date,code_department,department,code_group,desc_group,code_subgroup,desc_subgroup,cod_sku,desc_1_sku,desc_2_sku,cod_supplier,desc_1_supplier,desc_2_supplier,brand,sales_qty,sales_value,margin_value,b2b,code_promo,desc_promo,type_of_promo,date_start_promo,date_end_promo,promo_sales_qty,promo_sales_value,promo_margin_value
0,STORES,2024-10-01,3.0,TV,3.0,LCD,122.0,TVC LCD FINO A 27P,173237.0,LG LCD24 24TQ510S BK,LED HD 5HDMI,20477.0,LG ELECTRONICS IT SPA,MONITOR TV,LG,15.0,2230.65,-123.33,NO,,,,NaT,NaT,,,
1,STORES,2024-10-01,3.0,TV,3.0,LCD,122.0,TVC LCD FINO A 27P,177909.0,LG LCD27 27TQ615S BK,LED FHD 2HDMI,20477.0,LG ELECTRONICS IT SPA,MONITOR TV,LG,12.0,2177.41,237.16,NO,,,,NaT,NaT,,,
2,STORES,2024-10-01,3.0,TV,3.0,LCD,122.0,TVC LCD FINO A 27P,178664.0,PHILIPS LCD24 24PHS5537,LED HD 2HDMI,21141.0,TECHNOIT SPA/PHILIPS,PHILIPS,PHILIPS,1.0,132.78,-13.02,NO,,,,NaT,NaT,,,
3,STORES,2024-10-01,3.0,TV,3.0,LCD,122.0,TVC LCD FINO A 27P,178665.0,PHILIPS LCD24 24PHS5507,LED HD 2HDMI,21141.0,TECHNOIT SPA/PHILIPS,PHILIPS,PHILIPS,0.0,-9.43,-9.43,NO,,,,NaT,NaT,,,
4,STORES,2024-10-01,3.0,TV,3.0,LCD,122.0,TVC LCD FINO A 27P,180463.0,LG LCD27 27TQ615S WH,LED FHD 2HDMI,20477.0,LG ELECTRONICS IT SPA,MONITOR TV,LG,10.0,1809.79,196.99,NO,,,,NaT,NaT,,,
5,STORES,2024-10-01,3.0,TV,3.0,LCD,122.0,TVC LCD FINO A 27P,186940.0,SAMSUNG LCD24 24N4300AD,LED HD 2HDMI,20072.0,SAMSUNG ELECTRONIC SPA CE,SAMSUNG CE,SAMSUNG,4.0,636.44,-31.7,NO,,,,NaT,NaT,,,
6,STORES,2024-10-01,3.0,TV,3.0,LCD,122.0,TVC LCD FINO A 27P,189981.0,PANA LCD24 24MS350E,LED WXGA 2HDMI,20424.0,PANASONIC MARKET.EU GMBH,PANASONIC BRUNO,PANASONIC,0.0,0.0,0.0,NO,,,,NaT,NaT,,,
7,STORES,2024-10-01,3.0,TV,3.0,LCD,122.0,TVC LCD FINO A 27P,217962.0,PEAQ LCD24 24GH-5024C,,21616.0,IMTRON ITALIA SRL,OK KOENIC PEAQ,PEAQ,2.0,179.76,38.66,NO,,,,NaT,NaT,,,
8,STORES,2024-10-01,3.0,TV,3.0,LCD,122.0,TVC LCD FINO A 27P,222170.0,PEAQ LCD24 24H-5024E,Q-LED HD 3HDMI,21616.0,IMTRON ITALIA SRL,OK KOENIC PEAQ,PEAQ,41.0,3212.45,606.41,NO,,,,NaT,NaT,,,
9,STORES,2024-10-01,3.0,TV,3.0,LCD,122.0,TVC LCD FINO A 27P,339058.0,LG LCD24 24TQ510S WH,LED HD 2HDMI,20477.0,LG ELECTRONICS IT SPA,MONITOR TV,LG,11.0,1593.32,-83.64,NO,,,,NaT,NaT,,,


In [8]:
# Quick file catalogue
xlsb_files = sorted(DATA_DIR.glob("*.xlsb"))
summary = pd.DataFrame([inspect_xlsb(path) for path in xlsb_files])
summary

Unnamed: 0,path,sheets,headers,rows
0,Promo_October-September_FY25.xlsb,[Anon Data],"[CHANNEL, CODE PROMO, DESC PROMO, DATE START P...",481478
1,Stores_February-June_FY25.xlsb,[Anon Data],"[CHANNEL, DATE, CODE DEPARTMENT, DEPARTMENT, C...",1046578
2,Stores_July-September_FY25.xlsb,[Anon Data],"[CHANNEL, DATE, CODE DEPARTMENT, DEPARTMENT, C...",655363
3,Stores_October-January_FY25.xlsb,[Anon Data],"[CHANNEL, DATE, CODE DEPARTMENT, DEPARTMENT, C...",915208
4,Web_February-August_FY25.xlsb,[Anon Data],"[CHANNEL, DATE, B2B, CODE DEPARTMENT, DEPARTME...",973258
5,Web_October-January_FY25.xlsb,[Anon Data],"[CHANNEL, DATE, B2B, CODE DEPARTMENT, DEPARTME...",964184
6,Web_September_FY25.xlsb,[Anon Data],"[CHANNEL, DATE, B2B, CODE DEPARTMENT, DEPARTME...",156820


In [9]:
# Load promo data
promo_path = DATA_DIR / "Promo_October-September_FY25.xlsb"
promo_raw = read_xlsb(promo_path)
promo_df = clean_promo_df(promo_raw)

promo_df.head()

Unnamed: 0,channel,code_promo,desc_promo,date_start_promo,date_end_promo,type_of_promo,date,code_department,department,code_group,desc_group,code_subgroup,desc_subgroup,cod_sku,desc_1_sku,desc_2_sku,cod_supplier,desc_1_supplier,desc_2_supplier,brand,sales_qty_anon,sales_value_anon,margin_value_anon
0,STORES,1-250003,SPECIALE LA CASA PER ME,2024-10-14,2024-10-31,NO PEAK,2024-10-18,32,UTENSILI DA CUCINA,127,CUCINA,637,PADELLE,228050,LAGO SET PADEL C.NATURA,5 PEZZI,21621,GROUPE SEB IT/LAGOSTINA,LAGOSTINA,LAGOSTINA,8,399,52.53
1,STORES,1-250003,SPECIALE LA CASA PER ME,2024-10-14,2024-10-31,NO PEAK,2024-10-18,32,UTENSILI DA CUCINA,127,CUCINA,645,PENTOLE A PRESSIONE,228136,LAGO P/PRESS AMICA 5LT,,21621,GROUPE SEB IT/LAGOSTINA,LAGOSTINA,LAGOSTINA,8,317,-85.64
2,STORES,1-250003,SPECIALE LA CASA PER ME,2024-10-14,2024-10-31,NO PEAK,2024-10-18,32,UTENSILI DA CUCINA,127,CUCINA,651,PENTOLE,127755,LAGOSTINA BATTERIA SMART,SET 9PZ BATTERIA SMART,21621,GROUPE SEB IT/LAGOSTINA,LAGOSTINA,LAGOSTINA,3,273,30.74
3,STORES,1-250003,SPECIALE LA CASA PER ME,2024-10-14,2024-10-31,NO PEAK,2024-10-18,33,PICCOLI ELETTRODOMESTICI,121,ACCESSORI P.E.,629,BEVANDE IN CAPSULE/CIALDE,119353,LAVAZZA CREMA&GUSTO,108 CAPSULE,20349,LAVAZZA SPA,LAVAZZA CAFFÈ,LAVAZZA,34,775,80.45
4,STORES,1-250003,SPECIALE LA CASA PER ME,2024-10-14,2024-10-31,NO PEAK,2024-10-18,33,PICCOLI ELETTRODOMESTICI,121,ACCESSORI P.E.,629,BEVANDE IN CAPSULE/CIALDE,119359,LAVAZZA ESPRES.PASSIONAL,108 CAPSULE,20349,LAVAZZA SPA,LAVAZZA CAFFÈ,LAVAZZA,17,393,39.73


In [None]:
# Load store sales (three fiscal windows)
store_files = [
    "Stores_October-January_FY25.xlsb",
    "Stores_February-June_FY25.xlsb",
    "Stores_July-September_FY25.xlsb",
]
store_frames = []
for fname in store_files:
    path = DATA_DIR / fname
    raw = read_xlsb(path)
    cleaned = clean_sales_df(raw, channel="STORES")
    store_frames.append(cleaned)

stores_df = pd.concat(store_frames, ignore_index=True)
stores_df.head()

In [None]:
# Load web sales
web_files = [
    "Web_October-January_FY25.xlsb",
    "Web_February-August_FY25.xlsb",
    "Web_September_FY25.xlsb",
]
web_frames = []
for fname in web_files:
    path = DATA_DIR / fname
    raw = read_xlsb(path)
    cleaned = clean_sales_df(raw, channel="WEB", has_b2b=True)
    web_frames.append(cleaned)

web_df = pd.concat(web_frames, ignore_index=True)
web_df.head()

Unnamed: 0,channel,date,b2b,code_department,department,code_group,desc_group,code_subgroup,desc_subgroup,cod_sku,desc_1_sku,desc_2_sku,cod_supplier,desc_1_supplier,desc_2_supplier,brand,sales_qty,sales_value,margin_value
0,WEB,2024-10-01,NO,33,PICCOLI ELETTRODOMESTICI,121,ACCESSORI P.E.,629,BEVANDE IN CAPSULE/CIALDE,365253,SODA CON.LIMONE ZERO 440,,20577,SODASTREAM INT.BVIT.BRANC,SODASTREAM WASSERMAXX,SODASTREAM,1,0.99,-3.75
1,WEB,2024-10-01,NO,33,PICCOLI ELETTRODOMESTICI,125,PED CUCINA,677,MIXER,614318,ARIETE MIXER PIMMY,200W,20286,DE LONGHI APPLIANCES SRL,ARIETE,ARIETE,1,0.87,-15.51
2,WEB,2024-10-01,NO,30,ACCESSORI,78,CAVI,461,ACC.ELETTR.,187192,POLYP ADATT 433 BIANCO,BLISTER SPINA 10A-16A,22875,POLYPOOL SPA,POLYPOOL QUALITY,POLYPOOL,1,1.76,0.41
3,WEB,2024-10-01,NO,30,ACCESSORI,78,CAVI,461,ACC.ELETTR.,187192,POLYP ADATT 433 BIANCO,BLISTER SPINA 10A-16A,22875,POLYPOOL SPA,POLYPOOL QUALITY,POLYPOOL,1,1.92,0.45
4,WEB,2024-10-01,NO,30,ACCESSORI,78,CAVI,461,ACC.ELETTR.,187192,POLYP ADATT 433 BIANCO,BLISTER SPINA 10A-16A,22875,POLYPOOL SPA,POLYPOOL QUALITY,POLYPOOL,1,1.7,0.4


In [None]:
# Combine sales across channels
sales_df = pd.concat([stores_df, web_df], ignore_index=True)

# Basic sanity checks
sales_df["date_year"] = sales_df["date"].dt.year
sales_df["date_month"] = sales_df["date"].dt.to_period("M")

print("rows", len(sales_df))
print(sales_df.groupby("channel").size())
sales_df.head()

rows 4711405
channel
STORES    2617146
WEB       2094259
dtype: int64


Unnamed: 0,channel,date,code_department,department,code_group,desc_group,code_subgroup,desc_subgroup,cod_sku,desc_1_sku,desc_2_sku,cod_supplier,desc_1_supplier,desc_2_supplier,brand,sales_qty,sales_value,margin_value,b2b,date_year,date_month
0,STORES,2024-10-01,3,TV,3,LCD,122,TVC LCD FINO A 27P,173237,LG LCD24 24TQ510S BK,LED HD 5HDMI,20477,LG ELECTRONICS IT SPA,MONITOR TV,LG,15,2230.65,-123.33,NO,2024,2024-10
1,STORES,2024-10-01,3,TV,3,LCD,122,TVC LCD FINO A 27P,177909,LG LCD27 27TQ615S BK,LED FHD 2HDMI,20477,LG ELECTRONICS IT SPA,MONITOR TV,LG,12,2177.41,237.16,NO,2024,2024-10
2,STORES,2024-10-01,3,TV,3,LCD,122,TVC LCD FINO A 27P,178664,PHILIPS LCD24 24PHS5537,LED HD 2HDMI,21141,TECHNOIT SPA/PHILIPS,PHILIPS,PHILIPS,1,132.78,-13.02,NO,2024,2024-10
3,STORES,2024-10-01,3,TV,3,LCD,122,TVC LCD FINO A 27P,178665,PHILIPS LCD24 24PHS5507,LED HD 2HDMI,21141,TECHNOIT SPA/PHILIPS,PHILIPS,PHILIPS,0,-9.43,-9.43,NO,2024,2024-10
4,STORES,2024-10-01,3,TV,3,LCD,122,TVC LCD FINO A 27P,180463,LG LCD27 27TQ615S WH,LED FHD 2HDMI,20477,LG ELECTRONICS IT SPA,MONITOR TV,LG,10,1809.79,196.99,NO,2024,2024-10


: 

In [None]:
# Optional: promo preview without full pandas merge (avoid OOM)
promo_join_parquet = OUTPUT_DIR / "sales_with_promo.parquet"

if promo_join_parquet.exists():
    # Sample from precomputed DuckDB join
    sales_with_promo = pd.read_parquet(promo_join_parquet)
    sales_with_promo.head(20)
else:
    preview = duckdb.query(
        """
        SELECT
          s.*,
          p.code_promo,
          p.desc_promo,
          p.type_of_promo,
          p.date_start_promo,
          p.date_end_promo
        FROM sales_ds s
        LEFT JOIN promo_ds p
          ON s.channel = p.channel
         AND s.cod_sku = p.cod_sku
         AND s.date BETWEEN p.date_start_promo AND p.date_end_promo
        LIMIT 20
        """
    ).to_df()
    preview

In [None]:
# Persist curated outputs
promo_out = OUTPUT_DIR / "promo_clean.parquet"
sales_out = OUTPUT_DIR / "sales_clean.parquet"
sales_with_promo_out = OUTPUT_DIR / "sales_with_promo.parquet"

promo_df.to_parquet(promo_out, index=False)
sales_df.to_parquet(sales_out, index=False)
sales_with_promo.to_parquet(sales_with_promo_out, index=False)

# Lightweight CSVs for quick inspection (first 50k rows to keep size manageable)
(promo_df.head(50_000)).to_csv(OUTPUT_DIR / "promo_clean_sample.csv", index=False)
(sales_df.head(50_000)).to_csv(OUTPUT_DIR / "sales_clean_sample.csv", index=False)

promo_out, sales_out, sales_with_promo_out

**How to run**
1. Install deps (inside project env): `pip install pandas pyxlsb pyarrow`.
2. Open this notebook from the repo root with Jupyter or VS Code and run all cells.
3. Cleaned parquet/CSV files will appear in `tmp/processed/` for downstream use (e.g., DuckDB, analytics, or model training).

Unnamed: 0,channel,month,qty,sales_value,margin
0,STORES,2024-10-01,1745154.0,146499000.0,-4336592.0
1,WEB,2024-10-01,186846.0,43305610.0,-4604644.0
2,STORES,2024-11-01,2205392.0,209429500.0,-18464300.0
3,WEB,2024-11-01,333084.0,78253550.0,-10560310.0
4,STORES,2024-12-01,2835758.0,228536400.0,-11689490.0
5,WEB,2024-12-01,378545.0,68427910.0,-7560274.0
6,STORES,2025-01-01,1678843.0,132795900.0,-2891506.0
7,WEB,2025-01-01,188881.0,40257550.0,-4190211.0
8,STORES,2025-02-01,1466751.0,116654500.0,-3072959.0
9,WEB,2025-02-01,161242.0,34594340.0,-4125690.0


In [18]:
from pathlib import Path
import duckdb, sqlite3, os

PROJECT_ROOT = Path("/Users/Glebazzz/MMS")
duck_db_path = PROJECT_ROOT / "tmp" / "mms_data.duckdb"

duck = duckdb.connect(duck_db_path)  # створить файл, якщо його ще немає
print("DB at:", duck_db_path)

DB at: /Users/Glebazzz/MMS/tmp/mms_data.duckdb
