In [3]:
!pip install pyarrow

Collecting pyarrow
  Using cached pyarrow-21.0.0-cp311-cp311-macosx_12_0_arm64.whl.metadata (3.3 kB)
Using cached pyarrow-21.0.0-cp311-cp311-macosx_12_0_arm64.whl (31.2 MB)
Installing collected packages: pyarrow
Successfully installed pyarrow-21.0.0


In [1]:
import pandas as pd

df = pd.DataFrame({"a": [1, 2, 3]})
df.to_parquet("test.parquet", index=False)
pd.read_parquet("test.parquet")

Unnamed: 0,a
0,1
1,2
2,3


In [1]:
from pathlib import Path
import os
import zipfile
import sqlite3
import pandas as pd

# PATHS
RAW_DIR = Path("data/raw")
PROCESSED = Path("data/processed")
DB_FILE = "supermarket.db"

# 1. DOWNLOAD DATA FROM KAGGLE
def download_from_kaggle():
    """
    Downloads the dataset from Kaggle using the API and extracts it.
    Requires kaggle.json to be configured in ~/.kaggle
    """
    RAW_DIR.mkdir(parents=True, exist_ok=True)
    print("Downloading dataset...")
    os.system("kaggle datasets download -d lovishbansal123/sales-of-a-supermarket -p data/raw")

    zip_path = next(RAW_DIR.glob("*.zip"), None)
    if not zip_path:
        raise FileNotFoundError("ZIP not found. Download may have failed.")
    
    with zipfile.ZipFile(zip_path) as zf:
        zf.extractall(RAW_DIR)
    print(f"Downloaded and extracted: {zip_path.name}")

# 2. TRANSFORM TO STAR SCHEMA
def build_dimensions(df: pd.DataFrame):
    # Rename relevant columns up front
    df = df.rename(columns={
        "Product line": "product_line",
        "Unit price": "unit_price",
        "Branch": "branch",
        "City": "city"
    })

    # Product dimension
    dim_product = (
        df[["product_line", "unit_price"]]
        .drop_duplicates()
        .reset_index(drop=True)
    )
    dim_product["product_id"] = dim_product.index + 1

    # Store dimension
    dim_store = (
        df[["branch", "city"]]
        .drop_duplicates()
        .reset_index(drop=True)
    )
    dim_store["store_id"] = dim_store.index + 1

    return dim_product, dim_store, df  # also return renamed df for use in fact

def build_fact(df, dim_product, dim_store):
    fact = (
        df.merge(dim_product, on=["product_line", "unit_price"])
          .merge(dim_store, on=["branch", "city"])
          .assign(sale_id=lambda d: d.index + 1)
          .rename(columns={
              "Date": "date",
              "Customer type": "customer_type",
              "Gender": "gender",
              "Total": "total",
              "Quantity": "quantity",
              "Payment": "payment"
          })
    )

    fact_cols = ["sale_id", "date", "customer_type", "gender",
                 "product_id", "store_id", "total", "quantity", "payment"]
    return fact[fact_cols]


# 3. LOAD TO SQLITE + SAVE TO PARQUET
def write_to_sqlite_and_parquet(dim_product, dim_store, fact_sales):
    # Save to SQLite
    conn = sqlite3.connect(DB_FILE)
    dim_product.to_sql("dim_product", conn, index=False, if_exists="replace")
    dim_store.to_sql("dim_store", conn, index=False, if_exists="replace")
    fact_sales.to_sql("fact_sales", conn, index=False, if_exists="replace")
    conn.close()
    print(f"SQLite DB created: {DB_FILE}")

    # Save to Parquet
    PROCESSED.mkdir(parents=True, exist_ok=True)
    dim_product.to_parquet(PROCESSED / "dim_product.parquet", index=False)
    dim_store.to_parquet(PROCESSED / "dim_store.parquet", index=False)
    fact_sales.to_parquet(PROCESSED / "fact_sales.parquet", index=False)
    print(f"Parquet files written to: {PROCESSED.resolve()}")

# 4. HELPER FUNCTION TO QUERY DB

def run_query(q):
    """
    Executes a SQL query on the SQLite DB and returns a DataFrame.
    """
    with sqlite3.connect(DB_FILE) as conn:
        return pd.read_sql(q, conn)


def show_tables():
    """
    calls the run_query() function to return a list of all tables and views in the database.
    """
    q = "SELECT name, type FROM sqlite_master WHERE type IN ('table', 'view')"
    return run_query(q)


# 5. EXECUTE PIPELINE

if __name__ == "__main__":
    download_from_kaggle()

    # Load raw CSV (extracted filename may vary slightly)
    csv_file = next(RAW_DIR.glob("*.csv"))
    df = pd.read_csv(csv_file)

    # Transform
    dim_product, dim_store, df_renamed = build_dimensions(df)
    fact_sales = build_fact(df_renamed, dim_product, dim_store)

    # Load
    write_to_sqlite_and_parquet(dim_product, dim_store, fact_sales)

Downloading dataset...
Dataset URL: https://www.kaggle.com/datasets/lovishbansal123/sales-of-a-supermarket
License(s): apache-2.0
Downloading sales-of-a-supermarket.zip to data/raw

Downloaded and extracted: sales-of-a-supermarket.zip
SQLite DB created: supermarket.db
Parquet files written to: /Users/faridun/66degrees-supermarket-pipeline/notebooks/data/processed


100%|██████████| 35.9k/35.9k [00:00<00:00, 65.1MB/s]


## Show tables in DB

In [2]:
show_tables()

Unnamed: 0,name,type
0,dim_product,table
1,dim_store,table
2,fact_sales,table


## Table Count

In [3]:
q = """
SELECT 'dim_product' AS table_name, COUNT(*) AS row_count FROM dim_product
UNION
SELECT 'dim_store', COUNT(*) FROM dim_store
UNION
SELECT 'fact_sales', COUNT(*) FROM fact_sales;
"""
run_query(q)

Unnamed: 0,table_name,row_count
0,dim_product,993
1,dim_store,3
2,fact_sales,1000
