# 01 - Data Ingestion Pipeline

This notebook demonstrates the ingestion pipeline for the **Retail Multi-Brand Data Project**.  
We ingest raw CSV extracts (provided as a zipped dataset) into **DuckDB**, and export them into **Parquet** format for efficient downstream analytics.

**Steps in this notebook:**
1. Setup environment & paths  
2. Mount data source  
3. Extract raw CSV files  
4. Load CSVs into **DuckDB tables**  
5. Export tables into **Parquet (staging layer)**  
6. Verify row counts & samples  

In [4]:
# ==========================
# STEP 0: Install libraries
# ==========================
!pip install duckdb pandas pyarrow

# ==========================
# STEP 1: Mount Google Drive
# ==========================
from google.colab import drive
drive.mount('/content/drive')

import os, zipfile, duckdb

# ==========================
# STEP 2: Define Paths
# ==========================
ZIP_PATH = '/content/drive/My Drive/Dataset/Retail Multi-Brand Datasets.zip'
EXTRACT_DIR = '/content/drive/My Drive/Dataset/Retail_Multibrand_extracted'
DUCKDB_PATH = '/content/drive/My Drive/Dataset/Retail_Multibrand_extracted/multibrand_retail.duckdb'
EXPORT_DIR = '/content/drive/My Drive/Dataset/Retail_Multibrand_extracted/Parquet'

# ==========================
# STEP 3: Extract ZIP
# ==========================
if not os.path.exists(EXTRACT_DIR):
    os.makedirs(EXTRACT_DIR)

with zipfile.ZipFile(ZIP_PATH, 'r') as z:
    z.extractall(EXTRACT_DIR)

print("Extracted files:", os.listdir(EXTRACT_DIR))

# ==========================
# STEP 4: Load into DuckDB
# ==========================
con = duckdb.connect(DUCKDB_PATH)

file_map = {
    "brands.csv": "brands",
    "customers.csv": "customers",
    "final_inventory.csv": "inventory",
    "online_channels.csv": "channels",
    "products.csv": "products",
    "promotions.csv": "promotions",
    "sales_transactions_simulated.csv": "sales",
    "stores_enhanced.csv": "stores"
}

for fname, tname in file_map.items():
    fpath = os.path.join(EXTRACT_DIR, fname)
    print(f"Loading {fname} -> {tname}")
    con.execute(f"CREATE OR REPLACE TABLE {tname} AS SELECT * FROM read_csv_auto('{fpath}')")
    cnt = con.execute(f"SELECT COUNT(*) FROM {tname}").fetchone()[0]
    print(f" -> {tname}: {cnt} rows")

# ==========================
# STEP 5: Export to Parquet
# ==========================
if not os.path.exists(EXPORT_DIR):
    os.makedirs(EXPORT_DIR)

for fname, tname in file_map.items():
    out_path = os.path.join(EXPORT_DIR, f"{tname}.parquet")
    print(f"Exporting {tname} -> {out_path}")
    con.execute(f"COPY {tname} TO '{out_path}' (FORMAT PARQUET)")

# ==========================
# STEP 6: Verification
# ==========================
print("\nTables in DuckDB:")
print(con.execute("SHOW TABLES").fetchdf())

print("\nSample from sales:")
print(con.execute("SELECT * FROM sales LIMIT 5").fetchdf())

print("\nParquet files created:", os.listdir(EXPORT_DIR))

con.close()


Mounted at /content/drive
Extracted files: ['customers.csv', 'brands.csv', 'products.csv', 'stores_enhanced.csv', 'online_channels.csv', 'promotions.csv', 'sales_transactions_simulated.csv', 'final_inventory.csv']
Loading brands.csv -> brands
 -> brands: 123 rows
Loading customers.csv -> customers
 -> customers: 112473 rows
Loading final_inventory.csv -> inventory
 -> inventory: 3507 rows
Loading online_channels.csv -> channels
 -> channels: 4 rows
Loading products.csv -> products
 -> products: 645 rows
Loading promotions.csv -> promotions
 -> promotions: 296 rows
Loading sales_transactions_simulated.csv -> sales


FloatProgress(value=0.0, layout=Layout(width='auto'), style=ProgressStyle(bar_color='black'))

 -> sales: 4852414 rows
Loading stores_enhanced.csv -> stores
 -> stores: 663 rows
Exporting brands -> /content/drive/My Drive/Dataset/Retail_Multibrand_extracted/Parquet/brands.parquet
Exporting customers -> /content/drive/My Drive/Dataset/Retail_Multibrand_extracted/Parquet/customers.parquet
Exporting inventory -> /content/drive/My Drive/Dataset/Retail_Multibrand_extracted/Parquet/inventory.parquet
Exporting channels -> /content/drive/My Drive/Dataset/Retail_Multibrand_extracted/Parquet/channels.parquet
Exporting products -> /content/drive/My Drive/Dataset/Retail_Multibrand_extracted/Parquet/products.parquet
Exporting promotions -> /content/drive/My Drive/Dataset/Retail_Multibrand_extracted/Parquet/promotions.parquet
Exporting sales -> /content/drive/My Drive/Dataset/Retail_Multibrand_extracted/Parquet/sales.parquet


FloatProgress(value=0.0, layout=Layout(width='auto'), style=ProgressStyle(bar_color='black'))

Exporting stores -> /content/drive/My Drive/Dataset/Retail_Multibrand_extracted/Parquet/stores.parquet

Tables in DuckDB:
         name
0      brands
1    channels
2   customers
3   inventory
4    products
5  promotions
6       sales
7      stores

Sample from sales:
                  date  store_id  product_id  units_sold subsidiary_company  \
0  2021-01-01 00:00:00  STR-0002  PROD-00002           1          Gourmetia   
1  2021-01-01 00:00:00  STR-0002  PROD-00003           3          Gourmetia   
2  2021-01-01 00:00:00  STR-0002  PROD-00004           0          Gourmetia   
3  2021-01-01 00:00:00  STR-0002  PROD-00005           2          Gourmetia   
4  2021-01-01 00:00:00  STR-0002  PROD-00006           2          Gourmetia   

   total_amount promotion_id  discounted_amount customer_id  
0       50000.0         None                0.0  CUST-71671  
1      138000.0         None                0.0  CUST-13598  
2      195000.0         None                0.0  CUST-40071  
3       9