# End-to-End Pipeline Walkthrough (Kaggle → SQLite)

This notebook walks through the full pipeline in a clear sequence: **extract → load (bronze) → model (silver) → validate**.

## What this pipeline does
- Downloads the latest dataset from Kaggle (no committed CSVs).
- Stores the raw dataset into SQLite as `bronze_sales_raw` (staging).
- Builds modeled tables in SQLite:
  - `silver_dim_product_line` (Type 1 dimension)
  - `silver_dim_branch` (SCD Type 2 dimension)
  - `silver_fact_sales` (fact table at transaction grain)
- Runs lightweight validation checks to confirm correctness (tables exist, non-zero row counts, key constraints).

## Where outputs go
- Raw files: `./data/raw/`
- SQLite database: `./db/supermarket_sales.sqlite` (default)

## 0) Prerequisites



### 0.1 Install dependencies

Install the Python packages in `requirements.txt`:



- `pip install -r requirements.txt`



### 0.2 Create `.env` (required for Kaggle download)

Create a `.env` file at the repo root and set:



- `KAGGLE_USERNAME=...`

- `KAGGLE_KEY=...`



That’s it — paste the `username` and `key` values from your Kaggle `kaggle.json`.



### 0.3 Optional configuration

You can override defaults using:



- `KAGGLE_DATASET` (default: `faresashraf1001/supermarket-sales`)

- `DATA_DIR` (default: `./data`)

- `SQLITE_DB_PATH` (default: `./db/supermarket_sales.sqlite`)

- `LOG_LEVEL` (default: `INFO`)



### 0.4 Notes

- Do not commit `.env`.

- `data/` and `db/` are runtime outputs.


### 0.5 Notebook setup notes (why the repo-root cell exists)

- This notebook imports code from `src/`.

- Depending on how you launch the notebook, Python’s working directory may be `notebooks/` instead of the repo root.

- The `find_repo_root()` cell finds the repo root (by checking for `src/` + `requirements.txt`) and adds it to `sys.path` so imports work reliably.


We are not reading `README.md` or `requirement.md` at runtime — we only use the presence of `requirements.txt` as a stable “repo marker”.

In [8]:
from pathlib import Path



def find_repo_root(start: Path) -> Path:

    """Locate repo root so imports work no matter where the notebook is launched from."""

    cur = start.resolve()

    for candidate in [cur, *cur.parents]:

        if (candidate / "src").is_dir() and (candidate / "requirements.txt").exists():

            return candidate

    return cur



repo_root = find_repo_root(Path.cwd())

repo_root


WindowsPath('C:/Users/gadde/Desktop/Interview Perparation/66 Degrees Assignment')

In [9]:
# Make `src` importable from this notebook
import sys

if str(repo_root) not in sys.path:
    sys.path.insert(0, str(repo_root))

from src.config import load_settings
from src.logging_utils import configure_logging
from src.schema_sql import DDL_SQLITE
from src.extract import extract_latest_dataset, find_first_csv
from src.transform_load import (
    read_raw_csv,
    load_staging,
    ensure_dim_product_line,
    scd2_upsert_dim_branch,
    load_fact_sales,
)
from src.validate import validate_sqlite_db
from src import db

print("Imports OK")


Imports OK


## Codebase reference (full `src/` in ETL order)

This notebook stays “in sync” by rendering the current contents of every `src/*.py` file directly into the notebook output.


### Module order (matches pipeline execution)

1. `src/config.py` (settings)

2. `src/logging_utils.py` (logging)

3. `src/schema_sql.py` (DDL)

4. `src/db.py` (SQLite helpers)

5. `src/extract.py` (Kaggle extraction + CSV selection)

6. `src/transform_load.py` (normalize, bronze load, dims, fact)

7. `src/validate.py` (post-load data quality checks)

8. `src/runner.py` (orchestration)


Run the next 2 cells to generate the **function index + source** for each module.


In [10]:
from __future__ import annotations

import inspect
from dataclasses import is_dataclass

from IPython.display import Markdown, display

import src.config as config_mod
import src.logging_utils as logging_utils_mod
import src.schema_sql as schema_sql_mod
import src.db as db_mod
import src.extract as extract_mod
import src.transform_load as transform_load_mod
import src.validate as validate_mod
import src.runner as runner_mod


def _md_codeblock(code: str, lang: str = "python") -> Markdown:
    return Markdown(f"```{lang}\n{code}\n```")


def show_source(rel_path: str) -> None:
    """Render a workspace file as a Markdown python code block."""
    path = (repo_root / rel_path).resolve()
    text = path.read_text(encoding="utf-8")

    display(Markdown(f"### Source: `{rel_path}`"))
    display(_md_codeblock(text, "python"))


def _format_signature(obj) -> str:
    try:
        return str(inspect.signature(obj))
    except Exception:
        return "(...)"


def _const_summary(value) -> str:
    if isinstance(value, str):
        return f"str (len={len(value)})"
    if isinstance(value, (list, tuple, set, frozenset, dict)):
        return f"{type(value).__name__} (len={len(value)})"
    return type(value).__name__


def show_module_index(module, *, include_private: bool = True) -> None:
    """List functions/classes/constants defined in a module (top-level only)."""
    items: list[str] = []

    for name, obj in sorted(vars(module).items(), key=lambda kv: kv[0]):
        if name.startswith("__") and name.endswith("__"):
            continue

        if inspect.isfunction(obj) and getattr(obj, "__module__", None) == module.__name__:
            if (not include_private) and name.startswith("_"):
                continue
            items.append(f"- `{name}{_format_signature(obj)}`")
            continue

        if inspect.isclass(obj) and getattr(obj, "__module__", None) == module.__name__:
            if is_dataclass(obj):
                items.append(f"- `{name}` (dataclass)")
            else:
                items.append(f"- `{name}` (class)")
            continue

        # Surface important module-level constants (DDL, expected tables, etc.)
        if name.isupper() and getattr(obj, "__module__", None) in {None, module.__name__}:
            items.append(f"- `{name}` (constant: {_const_summary(obj)})")

    display(Markdown("### Defined symbols"))
    display(Markdown("\n".join(items) if items else "(No symbols found)"))


def show_module(rel_path: str, module, *, include_private: bool = True) -> None:
    display(Markdown(f"## {rel_path}"))
    show_module_index(module, include_private=include_private)
    show_source(rel_path)

### src/config.py (settings)

Defines how the pipeline reads configuration from environment variables (and optionally `.env`) and resolves paths.


Functions/symbols:

- `Settings`: dataclass that holds dataset id, output folders, and log level.

- `load_settings()`: loads `.env`, applies defaults, resolves `DATA_DIR`/`SQLITE_DB_PATH` relative to repo root.

In [11]:
show_module("src/config.py", config_mod)

## src/config.py

### Defined symbols

- `Settings` (dataclass)
- `load_settings() -> src.config.Settings`

### Source: `src/config.py`

```python
import os
from dataclasses import dataclass
from pathlib import Path

from dotenv import load_dotenv


@dataclass(frozen=True)
class Settings:
    kaggle_dataset: str
    data_dir: Path
    sqlite_db_path: Path
    log_level: str


# Load settings from environment (optionally via .env)
def load_settings() -> Settings:
    load_dotenv(override=False)

    repo_root = Path(__file__).resolve().parents[1]

    kaggle_dataset = os.getenv("KAGGLE_DATASET", "faresashraf1001/supermarket-sales")

    data_dir_env = os.getenv("DATA_DIR")
    if data_dir_env:
        data_dir_candidate = Path(data_dir_env).expanduser()
        data_dir = (
            data_dir_candidate.resolve()
            if data_dir_candidate.is_absolute()
            else (repo_root / data_dir_candidate).resolve()
        )
    else:
        data_dir = (repo_root / "data").resolve()

    sqlite_db_path_env = os.getenv("SQLITE_DB_PATH")
    if sqlite_db_path_env:
        sqlite_candidate = Path(sqlite_db_path_env).expanduser()
        sqlite_db_path = (
            sqlite_candidate.resolve()
            if sqlite_candidate.is_absolute()
            else (repo_root / sqlite_candidate).resolve()
        )
    else:
        sqlite_db_path = (repo_root / "db" / "supermarket_sales.sqlite").resolve()

    log_level = os.getenv("LOG_LEVEL", "INFO").upper()

    return Settings(
        kaggle_dataset=kaggle_dataset,
        data_dir=data_dir,
        sqlite_db_path=sqlite_db_path,
        log_level=log_level,
    )

```

### src/logging_utils.py (logging)

Configures Python logging once for the whole pipeline.


Functions/symbols:

- `configure_logging(level)`: sets the global log level + log message format used across modules.

In [12]:
show_module("src/logging_utils.py", logging_utils_mod)

## src/logging_utils.py

### Defined symbols

- `configure_logging(level: str) -> None`

### Source: `src/logging_utils.py`

```python
import logging


def configure_logging(level: str) -> None:
    logging.basicConfig(
        level=getattr(logging, level, logging.INFO),
        format="%(asctime)s | %(levelname)s | %(name)s | %(message)s",
    )

```

### src/schema_sql.py (DDL)

Defines the SQLite DDL used to create the bronze + silver tables.

In [13]:
show_module("src/schema_sql.py", schema_sql_mod)

## src/schema_sql.py

### Defined symbols

- `DDL_SQLITE` (constant: str (len=1947))

### Source: `src/schema_sql.py`

```python
DDL_SQLITE = """
-- Staging (bronze) table: raw records as landed

-- Legacy tables (pre-rename). Drop if present to keep the DB clean.
DROP TABLE IF EXISTS fact_sales;
DROP TABLE IF EXISTS dim_branch;
DROP TABLE IF EXISTS dim_product_line;
DROP TABLE IF EXISTS stg_sales_raw;

DROP TABLE IF EXISTS bronze_sales_raw;
CREATE TABLE bronze_sales_raw (
    row_hash TEXT PRIMARY KEY,
    invoice_id TEXT,
    branch TEXT,
    city TEXT,
    customer_type TEXT,
    gender TEXT,
    product_line TEXT,
    unit_price REAL,
    quantity INTEGER,
    tax_5_percent REAL,
    total REAL,
    date TEXT,
    time TEXT,
    payment TEXT,
    cogs REAL,
    gross_margin_percentage REAL,
    gross_income REAL,
    rating REAL,
    extracted_at TEXT NOT NULL
);

-- Dimension: Product Line (Type 1)
CREATE TABLE IF NOT EXISTS silver_dim_product_line (
    product_line_key INTEGER PRIMARY KEY,
    product_line_name TEXT NOT NULL UNIQUE,
    created_at TEXT NOT NULL
);

-- Dimension: Branch (SCD Type 2)
CREATE TABLE IF NOT EXISTS silver_dim_branch (
    branch_key INTEGER PRIMARY KEY,
    branch_code TEXT NOT NULL,
    city TEXT NOT NULL,
    valid_from TEXT NOT NULL,
    valid_to TEXT,
    is_current INTEGER NOT NULL,
    created_at TEXT NOT NULL,
    UNIQUE(branch_code, valid_from)
);

-- Fact: Sales (transaction grain)
CREATE TABLE IF NOT EXISTS silver_fact_sales (
    sales_key INTEGER PRIMARY KEY,
    row_hash TEXT NOT NULL UNIQUE,
    invoice_id TEXT,
    product_line_key INTEGER NOT NULL,
    branch_key INTEGER NOT NULL,
    txn_date TEXT NOT NULL,
    txn_time TEXT,
    unit_price REAL,
    quantity INTEGER,
    tax_5_percent REAL,
    total REAL,
    cogs REAL,
    gross_income REAL,
    rating REAL,
    payment TEXT,
    customer_type TEXT,
    gender TEXT,
    loaded_at TEXT NOT NULL,
    FOREIGN KEY(product_line_key) REFERENCES silver_dim_product_line(product_line_key),
    FOREIGN KEY(branch_key) REFERENCES silver_dim_branch(branch_key)
);

"""

```

### src/db.py (SQLite helpers)

Small wrapper functions for connecting to SQLite and running queries safely.

In [14]:
show_module("src/db.py", db_mod)

## src/db.py

### Defined symbols

- `connect(db_path: pathlib.Path) -> sqlite3.Connection`
- `execute_script(conn: sqlite3.Connection, sql: str) -> None`
- `executemany(conn: sqlite3.Connection, sql: str, rows: Iterable[tuple[Any, ...]]) -> None`
- `fetch_all(conn: sqlite3.Connection, sql: str, params: Optional[Tuple[Any, ...]] = None) -> List[Tuple[Any, ...]]`

### Source: `src/db.py`

```python
import sqlite3
from pathlib import Path
from typing import Any, Iterable, Optional, Tuple, List


def connect(db_path: Path) -> sqlite3.Connection:
    db_path.parent.mkdir(parents=True, exist_ok=True)
    conn = sqlite3.connect(db_path)
    conn.execute("PRAGMA foreign_keys = ON;")
    return conn


def execute_script(conn: sqlite3.Connection, sql: str) -> None:
    conn.executescript(sql)


def executemany(conn: sqlite3.Connection, sql: str, rows: Iterable[tuple[Any, ...]]) -> None:
    conn.executemany(sql, rows)


def fetch_all(
    conn: sqlite3.Connection,
    sql: str,
    params: Optional[Tuple[Any, ...]] = None,
) -> List[Tuple[Any, ...]]:
    cur = conn.cursor()
    cur.execute(sql, params or ())
    return cur.fetchall()

```

### src/extract.py (Kaggle extraction + CSV selection)

Handles downloading/unzipping from Kaggle and selecting the primary CSV file to load.

In [15]:
show_module("src/extract.py", extract_mod)

## src/extract.py

### Defined symbols

- `_ensure_kaggle_env_credentials_present() -> None`
- `_temporary_kaggle_config_dir() -> Iterator[pathlib.Path]`
- `extract_latest_dataset(*, dataset: str, output_dir: pathlib.Path) -> pathlib.Path`
- `find_first_csv(extracted_dir: pathlib.Path) -> pathlib.Path`

### Source: `src/extract.py`

```python
import logging
import json
import os
import tempfile
from contextlib import contextmanager
from pathlib import Path
from typing import Iterator

logger = logging.getLogger(__name__)


# Fail fast if Kaggle credentials are missing
def _ensure_kaggle_env_credentials_present() -> None:
    username = os.getenv("KAGGLE_USERNAME")
    key = os.getenv("KAGGLE_KEY")
    if username and key:
        return

    raise RuntimeError(
        "Kaggle credentials not found. "
        "Set KAGGLE_USERNAME and KAGGLE_KEY (for example in a .env at the repo root)."
    )
@contextmanager
# Create a temporary kaggle.json for the Kaggle client
def _temporary_kaggle_config_dir() -> Iterator[Path]:
    previous_config_dir = os.environ.get("KAGGLE_CONFIG_DIR")

    with tempfile.TemporaryDirectory(prefix="kaggle-") as tmp_dir:
        tmp_path = Path(tmp_dir)

        payload = {
            "username": os.environ["KAGGLE_USERNAME"].strip(),
            "key": os.environ["KAGGLE_KEY"].strip(),
        }
        (tmp_path / "kaggle.json").write_text(json.dumps(payload), encoding="utf-8")

        os.environ["KAGGLE_CONFIG_DIR"] = str(tmp_path)
        try:
            yield tmp_path
        finally:
            if previous_config_dir is None:
                os.environ.pop("KAGGLE_CONFIG_DIR", None)
            else:
                os.environ["KAGGLE_CONFIG_DIR"] = previous_config_dir
# Download and unzip the latest version of a Kaggle dataset
def extract_latest_dataset(*, dataset: str, output_dir: Path) -> Path:
    output_dir.mkdir(parents=True, exist_ok=True)

    _ensure_kaggle_env_credentials_present()

    with _temporary_kaggle_config_dir():
        from kaggle.api.kaggle_api_extended import KaggleApi

        api = KaggleApi()
        try:
            api.authenticate()
        except Exception as e:
            raise RuntimeError(
                "Failed to authenticate to Kaggle API. "
                "Verify KAGGLE_USERNAME/KAGGLE_KEY in .env, and that your Kaggle account API access is enabled."
            ) from e

        logger.info("Downloading Kaggle dataset: %s", dataset)
        api.dataset_download_files(dataset, path=str(output_dir), unzip=True, quiet=False)

    logger.info("Dataset extracted to: %s", output_dir)
    return output_dir
# Find the primary CSV under the extracted dataset directory
def find_first_csv(extracted_dir: Path) -> Path:
    csvs = list(extracted_dir.rglob("*.csv"))
    if not csvs:
        raise FileNotFoundError(f"No CSV files found under {extracted_dir}")

    chosen = max(csvs, key=lambda p: p.stat().st_size)
    logger.info("Using CSV file: %s", chosen)
    return chosen

```

### src/transform_load.py (normalize + bronze/silver loads)

Normalizes the raw CSV into a stable schema, loads `bronze_sales_raw`, and builds silver dimensions + fact.

In [16]:
show_module("src/transform_load.py", transform_load_mod)

## src/transform_load.py

### Defined symbols

- `NormalizedFrames` (dataclass)
- `_lookup_current_branch_keys(conn) -> dict[str, int]`
- `_lookup_product_line_keys(conn) -> dict[str, int]`
- `_normalize_columns(df: pandas.DataFrame) -> pandas.DataFrame`
- `_parse_date_iso(date_series: pandas.Series) -> pandas.Series`
- `_row_hash(row: pandas.Series) -> str`
- `ensure_dim_product_line(conn) -> None`
- `load_fact_sales(conn) -> None`
- `load_staging(conn, frames: src.transform_load.NormalizedFrames) -> None`
- `read_raw_csv(csv_path: pathlib.Path) -> src.transform_load.NormalizedFrames`
- `scd2_upsert_dim_branch(conn) -> None`
- `utc_now_iso() -> str`

### Source: `src/transform_load.py`

```python
import hashlib
import logging
from dataclasses import dataclass
from datetime import datetime, timezone
from pathlib import Path

import pandas as pd

from . import db

logger = logging.getLogger(__name__)

@dataclass(frozen=True)
class NormalizedFrames:
    raw: pd.DataFrame


def utc_now_iso() -> str:
    return datetime.now(timezone.utc).replace(microsecond=0).isoformat()


# Standardize source columns into a stable, snake_case schema
def _normalize_columns(df: pd.DataFrame) -> pd.DataFrame:
    mapping = {
        "Invoice ID": "invoice_id",
        "Branch": "branch",
        "City": "city",
        "Customer type": "customer_type",
        "Gender": "gender",
        "Product line": "product_line",
        "Unit price": "unit_price",
        "Quantity": "quantity",
        "Tax 5%": "tax_5_percent",
        "Total": "total",
        "Sales": "total",
        "Date": "date",
        "Time": "time",
        "Payment": "payment",
        "cogs": "cogs",
        "gross margin percentage": "gross_margin_percentage",
        "gross income": "gross_income",
        "Rating": "rating",
    }

    df2 = df.rename(columns={k: v for k, v in mapping.items() if k in df.columns}).copy()
    missing = [v for v in mapping.values() if v not in df2.columns]
    if missing:
        logger.warning("Some expected columns are missing: %s", missing)

    return df2


# Parse dates and return ISO strings (YYYY-MM-DD)
def _parse_date_iso(date_series: pd.Series) -> pd.Series:
    dt = pd.to_datetime(date_series, errors="coerce")
    return dt.dt.date.astype("string")


# Deterministic hash used for idempotent fact loads
def _row_hash(row: pd.Series) -> str:
    parts = [
        str(row.get("invoice_id", "")),
        str(row.get("branch", "")),
        str(row.get("product_line", "")),
        str(row.get("date", "")),
        str(row.get("time", "")),
        str(row.get("total", "")),
    ]
    payload = "|".join(parts).encode("utf-8")
    return hashlib.sha256(payload).hexdigest()


def read_raw_csv(csv_path: Path) -> NormalizedFrames:
    logger.info("Reading raw CSV: %s", csv_path)
    df = pd.read_csv(csv_path)
    df = _normalize_columns(df)

    if "date" in df.columns:
        df["date"] = _parse_date_iso(df["date"])

    df["row_hash"] = df.apply(_row_hash, axis=1)

    for col in ["unit_price", "tax_5_percent", "total", "cogs", "gross_margin_percentage", "gross_income", "rating"]:
        if col in df.columns:
            df[col] = pd.to_numeric(df[col], errors="coerce")

    for col in ["quantity"]:
        if col in df.columns:
            df[col] = pd.to_numeric(df[col], errors="coerce").astype("Int64")

    return NormalizedFrames(raw=df)


def load_staging(conn, frames: NormalizedFrames) -> None:
    extracted_at = utc_now_iso()
    df = frames.raw.copy()
    df["extracted_at"] = extracted_at

    logger.info("Loading %d rows into staging", len(df))

    cols = [
        "row_hash",
        "invoice_id",
        "branch",
        "city",
        "customer_type",
        "gender",
        "product_line",
        "unit_price",
        "quantity",
        "tax_5_percent",
        "total",
        "date",
        "time",
        "payment",
        "cogs",
        "gross_margin_percentage",
        "gross_income",
        "rating",
        "extracted_at",
    ]

    rows = []
    for _, r in df[cols].iterrows():
        rows.append(tuple(None if pd.isna(v) else v for v in r.to_list()))

    db.executemany(
        conn,
        """
        INSERT INTO bronze_sales_raw (
            row_hash, invoice_id, branch, city, customer_type, gender, product_line,
            unit_price, quantity, tax_5_percent, total, date, time, payment,
            cogs, gross_margin_percentage, gross_income, rating, extracted_at
        ) VALUES (?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?)
        """,
        rows,
    )


# Type 1 dim: insert missing product lines
def ensure_dim_product_line(conn) -> None:
    now = utc_now_iso()

    existing = {r[0] for r in db.fetch_all(conn, "SELECT product_line_name FROM silver_dim_product_line")}
    missing = db.fetch_all(
        conn,
        """
        SELECT DISTINCT product_line
        FROM bronze_sales_raw
        WHERE product_line IS NOT NULL
        """,
    )

    to_insert = [(name, now) for (name,) in missing if name not in existing]
    if not to_insert:
        logger.info("dim_product_line is up to date")
        return

    logger.info("Inserting %d new product lines", len(to_insert))
    db.executemany(
        conn,
        "INSERT INTO silver_dim_product_line(product_line_name, created_at) VALUES (?,?)",
        to_insert,
    )


# SCD Type 2 upsert for branch (natural key: branch_code; tracked: city)
def scd2_upsert_dim_branch(conn) -> None:
    now = utc_now_iso()

    incoming = db.fetch_all(
        conn,
        """
        SELECT DISTINCT branch AS branch_code, city
        FROM bronze_sales_raw
        WHERE branch IS NOT NULL AND city IS NOT NULL
        """,
    )

    for branch_code, city in incoming:
        current = db.fetch_all(
            conn,
            """
            SELECT branch_key, city, valid_from
            FROM silver_dim_branch
            WHERE branch_code = ? AND is_current = 1
            """,
            (branch_code,),
        )

        if not current:
            db.executemany(
                conn,
                """
                INSERT INTO silver_dim_branch(branch_code, city, valid_from, valid_to, is_current, created_at)
                VALUES (?,?,?,?,?,?)
                """,
                [(branch_code, city, now, None, 1, now)],
            )
            continue

        branch_key, current_city, _valid_from = current[0]
        if current_city == city:
            continue

        logger.info("Branch %s changed city %s -> %s (SCD2)", branch_code, current_city, city)

        db.executemany(
            conn,
            "UPDATE silver_dim_branch SET valid_to = ?, is_current = 0 WHERE branch_key = ?",
            [(now, branch_key)],
        )

        db.executemany(
            conn,
            """
            INSERT INTO silver_dim_branch(branch_code, city, valid_from, valid_to, is_current, created_at)
            VALUES (?,?,?,?,?,?)
            """,
            [(branch_code, city, now, None, 1, now)],
        )


def _lookup_product_line_keys(conn) -> dict[str, int]:
    rows = db.fetch_all(conn, "SELECT product_line_key, product_line_name FROM silver_dim_product_line")
    return {name: int(key) for key, name in rows}


def _lookup_current_branch_keys(conn) -> dict[str, int]:
    rows = db.fetch_all(
        conn,
        "SELECT branch_key, branch_code FROM silver_dim_branch WHERE is_current = 1",
    )
    return {code: int(key) for key, code in rows}


# Load facts idempotently using row_hash uniqueness
def load_fact_sales(conn) -> None:
    now = utc_now_iso()

    product_keys = _lookup_product_line_keys(conn)
    branch_keys = _lookup_current_branch_keys(conn)

    stg_rows = db.fetch_all(
        conn,
        """
        SELECT
            row_hash, invoice_id, product_line, branch, date, time,
            unit_price, quantity, tax_5_percent, total, cogs, gross_income, rating,
            payment, customer_type, gender
        FROM bronze_sales_raw
        WHERE date IS NOT NULL
        """,
    )

    rows_to_insert: list[tuple] = []
    skipped_missing_dim = 0

    for (
        row_hash,
        invoice_id,
        product_line,
        branch,
        txn_date,
        txn_time,
        unit_price,
        quantity,
        tax_5_percent,
        total,
        cogs,
        gross_income,
        rating,
        payment,
        customer_type,
        gender,
    ) in stg_rows:
        if product_line not in product_keys or branch not in branch_keys:
            skipped_missing_dim += 1
            continue

        rows_to_insert.append(
            (
                row_hash,
                invoice_id,
                product_keys[product_line],
                branch_keys[branch],
                txn_date,
                txn_time,
                unit_price,
                quantity,
                tax_5_percent,
                total,
                cogs,
                gross_income,
                rating,
                payment,
                customer_type,
                gender,
                now,
            )
        )

    if skipped_missing_dim:
        logger.warning("Skipped %d rows due to missing dimension keys", skipped_missing_dim)

    if not rows_to_insert:
        logger.info("No fact rows to insert")
        return

    logger.info("Inserting %d fact rows (idempotent)", len(rows_to_insert))
    db.executemany(
        conn,
        """
        INSERT OR IGNORE INTO silver_fact_sales(
            row_hash, invoice_id, product_line_key, branch_key, txn_date, txn_time,
            unit_price, quantity, tax_5_percent, total, cogs, gross_income, rating,
            payment, customer_type, gender, loaded_at
        ) VALUES (?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?)
        """,
        rows_to_insert,
    )

```

### src/validate.py (data quality checks)

Runs lightweight checks on the generated SQLite DB (expected tables, row counts, uniqueness, coverage, etc.).

In [17]:
show_module("src/validate.py", validate_mod)

## src/validate.py

### Defined symbols

- `EXPECTED_TABLES` (constant: set (len=4))
- `env_bool(name: str, default: bool) -> bool`
- `env_float(name: str, default: float) -> float`
- `validate_sqlite_db(db_path: pathlib.Path) -> None`

### Source: `src/validate.py`

```python
import logging
import sqlite3
import os
from pathlib import Path

logger = logging.getLogger(__name__)


EXPECTED_TABLES = {
    "bronze_sales_raw",
    "silver_dim_product_line",
    "silver_dim_branch",
    "silver_fact_sales",
}


def env_bool(name: str, default: bool) -> bool:
    raw = os.getenv(name)
    if raw is None:
        return default
    return raw.strip().lower() in {"1", "true", "t", "yes", "y", "on"}


def env_float(name: str, default: float) -> float:
    raw = os.getenv(name)
    if raw is None or not raw.strip():
        return default
    try:
        return float(raw)
    except ValueError:
        logger.warning("Invalid %s=%r; using default %s", name, raw, default)
        return default


# Lightweight validation checks for the generated SQLite DB
def validate_sqlite_db(db_path: Path) -> None:
    if not db_path.exists():
        raise FileNotFoundError(f"SQLite DB not found at: {db_path}")

    fail_on_warnings = env_bool("DQ_FAIL_ON_WARNINGS", False)
    min_fact_coverage = env_float("DQ_MIN_FACT_COVERAGE", 0.98)
    min_fact_coverage = max(0.0, min(1.0, float(min_fact_coverage)))

    conn = sqlite3.connect(db_path)
    try:
        conn.execute("PRAGMA foreign_keys = ON;")

        errors: list[str] = []
        warnings: list[str] = []

        def warn(msg: str) -> None:
            warnings.append(msg)
            logger.warning(msg)

        def err(msg: str) -> None:
            errors.append(msg)
            logger.error(msg)

        def count(sql: str, params: tuple | None = None) -> int:
            row = conn.execute(sql, params or ()).fetchone()
            return int(row[0]) if row and row[0] is not None else 0

        tables = {
            r[0]
            for r in conn.execute(
                "SELECT name FROM sqlite_master WHERE type='table' ORDER BY name"
            ).fetchall()
        }
        missing = EXPECTED_TABLES - tables
        if missing:
            raise RuntimeError(f"Missing expected tables: {sorted(missing)}")

        bronze_rows = count("SELECT COUNT(*) FROM bronze_sales_raw")
        fact_rows = count("SELECT COUNT(*) FROM silver_fact_sales")
        dim_pl_rows = count("SELECT COUNT(*) FROM silver_dim_product_line")
        dim_branch_rows = count("SELECT COUNT(*) FROM silver_dim_branch")
        logger.info(
            "Row counts: bronze=%d fact=%d dim_product_line=%d dim_branch=%d",
            bronze_rows,
            fact_rows,
            dim_pl_rows,
            dim_branch_rows,
        )

        if bronze_rows == 0:
            raise RuntimeError("bronze_sales_raw has 0 rows — extraction/load likely failed")
        if fact_rows == 0:
            raise RuntimeError("silver_fact_sales has 0 rows — dim lookups or fact load likely failed")

        if dim_pl_rows == 0:
            raise RuntimeError("silver_dim_product_line has 0 rows — dimension load likely failed")

        fact_dupes = count(
            """
            SELECT COUNT(*)
            FROM (
                SELECT row_hash
                FROM silver_fact_sales
                GROUP BY row_hash
                HAVING COUNT(*) > 1
            )
            """,
        )
        if fact_dupes:
            raise RuntimeError("silver_fact_sales contains duplicate row_hash values (should be UNIQUE)")

        # Coverage: how many distinct eligible bronze rows made it into the fact table.
        expected_fact = count(
            """
            SELECT COUNT(*)
            FROM (
                SELECT DISTINCT row_hash
                FROM bronze_sales_raw
                WHERE date IS NOT NULL
                  AND product_line IS NOT NULL
                  AND branch IS NOT NULL
                  AND city IS NOT NULL
            )
            """,
        )
        actual_fact = fact_rows
        if expected_fact > 0:
            coverage = actual_fact / float(expected_fact)
            logger.info("Fact coverage: %.3f (%d/%d)", coverage, actual_fact, expected_fact)
            if coverage < min_fact_coverage:
                err(
                    "Fact coverage below threshold: "
                    f"{coverage:.3f} ({actual_fact}/{expected_fact}) < {min_fact_coverage:.3f}. "
                    "This can indicate missing dimension keys, bad parsing, or load errors."
                )
        else:
            warn("No eligible bronze rows found for coverage check (date/product_line/branch/city all required)")

        bad_txn_dates = conn.execute(
            """
            SELECT txn_date
            FROM silver_fact_sales
            WHERE txn_date IS NOT NULL
              AND txn_date NOT GLOB '????-??-??'
            LIMIT 5
            """
        ).fetchall()
        if bad_txn_dates:
            err(f"Found non-ISO txn_date values (sample): {[r[0] for r in bad_txn_dates]}")

        current_branch = count("SELECT COUNT(*) FROM silver_dim_branch WHERE is_current = 1")
        if current_branch == 0:
            err("silver_dim_branch has no current records (is_current=1)")

        multi_current = count(
            """
            SELECT COUNT(*)
            FROM (
                SELECT branch_code
                FROM silver_dim_branch
                WHERE is_current = 1
                GROUP BY branch_code
                HAVING COUNT(*) != 1
            )
            """,
        )
        if multi_current:
            err(f"Found {multi_current} branch_code values with != 1 current record (SCD2 integrity issue)")

        # Null checks (should be zero for NOT NULL columns; invoice_id is allowed but useful to know)
        null_fact_critical = count(
            """
            SELECT COUNT(*)
            FROM silver_fact_sales
            WHERE row_hash IS NULL
               OR product_line_key IS NULL
               OR branch_key IS NULL
               OR txn_date IS NULL
               OR loaded_at IS NULL
            """,
        )
        if null_fact_critical:
            err(f"silver_fact_sales has {null_fact_critical} rows with NULLs in critical columns")

        # Referential integrity (should be enforced by FK constraints, but validate anyway)
        unmatched_product = count(
            """
            SELECT COUNT(*)
            FROM silver_fact_sales f
            LEFT JOIN silver_dim_product_line d
              ON f.product_line_key = d.product_line_key
            WHERE d.product_line_key IS NULL
            """,
        )
        if unmatched_product:
            err(f"Found {unmatched_product} fact rows with missing product_line_key in dim")

        unmatched_branch = count(
            """
            SELECT COUNT(*)
            FROM silver_fact_sales f
            LEFT JOIN silver_dim_branch b
              ON f.branch_key = b.branch_key
            WHERE b.branch_key IS NULL
            """,
        )
        if unmatched_branch:
            err(f"Found {unmatched_branch} fact rows with missing branch_key in dim")

        # Basic numeric sanity checks
        negative_money = count(
            """
            SELECT COUNT(*)
            FROM silver_fact_sales
            WHERE (unit_price IS NOT NULL AND unit_price < 0)
               OR (tax_5_percent IS NOT NULL AND tax_5_percent < 0)
               OR (total IS NOT NULL AND total < 0)
               OR (cogs IS NOT NULL AND cogs < 0)
               OR (gross_income IS NOT NULL AND gross_income < 0)
            """,
        )
        if negative_money:
            err(f"Found {negative_money} fact rows with negative monetary values")

        nonpositive_qty = count(
            """
            SELECT COUNT(*)
            FROM silver_fact_sales
            WHERE quantity IS NOT NULL AND quantity <= 0
            """,
        )
        if nonpositive_qty:
            err(f"Found {nonpositive_qty} fact rows with non-positive quantity")

        rating_out_of_range = count(
            """
            SELECT COUNT(*)
            FROM silver_fact_sales
            WHERE rating IS NOT NULL AND (rating < 0 OR rating > 10)
            """,
        )
        if rating_out_of_range:
            warn(f"Found {rating_out_of_range} fact rows with rating outside [0,10]")

        if errors or (fail_on_warnings and warnings):
            parts: list[str] = []
            if errors:
                parts.append("Data quality errors:\n- " + "\n- ".join(errors))
            if fail_on_warnings and warnings:
                parts.append("Warnings treated as errors:\n- " + "\n- ".join(warnings))
            raise RuntimeError("\n\n".join(parts))

        logger.info("Validation passed (%d warnings)", len(warnings))
    finally:
        conn.close()

```

### src/runner.py (orchestration)

Single entrypoint that wires settings + extraction + load + validation into a single run.

In [18]:
show_module("src/runner.py", runner_mod)

## src/runner.py

### Defined symbols

- `DDL_SQLITE` (constant: str (len=1947))
- `run_pipeline() -> None`

### Source: `src/runner.py`

```python
import logging

from .config import load_settings
from .extract import extract_latest_dataset, find_first_csv
from .logging_utils import configure_logging
from .schema_sql import DDL_SQLITE
from .transform_load import (
    ensure_dim_product_line,
    load_fact_sales,
    load_staging,
    read_raw_csv,
    scd2_upsert_dim_branch,
)
from .validate import validate_sqlite_db
from . import db

logger = logging.getLogger(__name__)


def run_pipeline() -> None:
    settings = load_settings()
    configure_logging(settings.log_level)

    raw_dir = settings.data_dir / "raw"
    extracted_dir = extract_latest_dataset(dataset=settings.kaggle_dataset, output_dir=raw_dir)
    csv_path = find_first_csv(extracted_dir)

    conn = db.connect(settings.sqlite_db_path)
    try:
        logger.info("Creating (or recreating) tables")
        db.execute_script(conn, DDL_SQLITE)

        frames = read_raw_csv(csv_path)
        load_staging(conn, frames)

        ensure_dim_product_line(conn)
        scd2_upsert_dim_branch(conn)
        load_fact_sales(conn)

        conn.commit()
        logger.info("Pipeline complete. SQLite DB at %s", settings.sqlite_db_path)

        validate_sqlite_db(settings.sqlite_db_path)
    finally:
        conn.close()


if __name__ == "__main__":
    run_pipeline()

```

### src/__init__.py

Package marker for `src`.

In [19]:
show_source("src/__init__.py")

### Source: `src/__init__.py`

```python
# Canonical pipeline source package (notebooks/CLI import from src.*)

```

In [20]:
import os

settings = load_settings()
configure_logging(settings.log_level)

# Safety: do not print secrets; only confirm presence
print('KAGGLE_USERNAME present:', bool(os.getenv('KAGGLE_USERNAME')))
print('KAGGLE_KEY present:', bool(os.getenv('KAGGLE_KEY')))
print('DATA_DIR:', settings.data_dir)
print('SQLITE_DB_PATH:', settings.sqlite_db_path)

settings

KAGGLE_USERNAME present: True
KAGGLE_KEY present: True
DATA_DIR: C:\Users\gadde\Desktop\Interview Perparation\66 Degrees Assignment\data
SQLITE_DB_PATH: C:\Users\gadde\Desktop\Interview Perparation\66 Degrees Assignment\db\supermarket_sales.sqlite


Settings(kaggle_dataset='faresashraf1001/supermarket-sales', data_dir=WindowsPath('C:/Users/gadde/Desktop/Interview Perparation/66 Degrees Assignment/data'), sqlite_db_path=WindowsPath('C:/Users/gadde/Desktop/Interview Perparation/66 Degrees Assignment/db/supermarket_sales.sqlite'), log_level='INFO')

## 1) Extraction (Kaggle → `data/raw/`)

### Goal
Download and unzip the **latest** version of the Kaggle dataset into `data/raw/`.

### What happens in code
- Confirms `KAGGLE_USERNAME` and `KAGGLE_KEY` exist in the environment.
- Creates a temporary Kaggle config folder for this run (so credentials are not stored under `data/`).
- Uses Kaggle API to download + unzip the dataset into `data/raw/`.

### Output
- A folder under `data/raw/` containing the extracted files
- We then pick the largest `.csv` file found and treat it as the dataset input for the pipeline.

In [21]:
raw_dir = settings.data_dir / 'raw'
try:
    extracted_dir = extract_latest_dataset(dataset=settings.kaggle_dataset, output_dir=raw_dir)
except Exception as e:
    # Smooth notebook experience: if Kaggle creds/auth aren't set up,
    # fall back to using any existing CSV under data/raw/.
    print('Kaggle extraction skipped:', str(e))
    extracted_dir = raw_dir

csv_path = find_first_csv(extracted_dir)
csv_path

2026-02-20 11:28:47,746 | INFO | src.extract | Downloading Kaggle dataset: faresashraf1001/supermarket-sales


Dataset URL: https://www.kaggle.com/datasets/faresashraf1001/supermarket-sales
Downloading supermarket-sales.zip to C:\Users\gadde\Desktop\Interview Perparation\66 Degrees Assignment\data\raw


100%|██████████| 36.5k/36.5k [00:00<00:00, 203kB/s]
2026-02-20 11:28:49,096 | INFO | src.extract | Dataset extracted to: C:\Users\gadde\Desktop\Interview Perparation\66 Degrees Assignment\data\raw
2026-02-20 11:28:49,102 | INFO | src.extract | Using CSV file: C:\Users\gadde\Desktop\Interview Perparation\66 Degrees Assignment\data\raw\SuperMarket Analysis.csv





WindowsPath('C:/Users/gadde/Desktop/Interview Perparation/66 Degrees Assignment/data/raw/SuperMarket Analysis.csv')

## 2) Quick data check (Raw CSV → normalized DataFrame)

### Goal
Load the raw CSV into pandas and normalize it into a stable schema for downstream processing.

### What we do
- Standardize column names to `snake_case` (for example `Product line` → `product_line`).
- Handle dataset variations (some versions use `Sales` instead of `Total`; both map to `total`).
- Normalize `date` to ISO format: `YYYY-MM-DD`.
- Compute a deterministic `row_hash` used later for idempotent loading into the fact table.

### Why this matters
This step makes the pipeline resilient to small source schema changes while keeping the database model stable.

In [22]:
frames = read_raw_csv(csv_path)
df = frames.raw
df.head()

2026-02-20 11:30:18,977 | INFO | src.transform_load | Reading raw CSV: C:\Users\gadde\Desktop\Interview Perparation\66 Degrees Assignment\data\raw\SuperMarket Analysis.csv


Unnamed: 0,invoice_id,branch,city,customer_type,gender,product_line,unit_price,quantity,tax_5_percent,total,date,time,payment,cogs,gross_margin_percentage,gross_income,rating,row_hash
0,750-67-8428,Alex,Yangon,Member,Female,Health and beauty,74.69,7,26.1415,548.9715,2019-01-05,1:08:00 PM,Ewallet,522.83,4.761905,26.1415,9.1,1adb80b6a7012f93de8d66d2ae071edf2fb13c13aea15d...
1,226-31-3081,Giza,Naypyitaw,Normal,Female,Electronic accessories,15.28,5,3.82,80.22,2019-03-08,10:29:00 AM,Cash,76.4,4.761905,3.82,9.6,5052ba62bff42dc538a2af8f73d8ebc47387ef1a2a206e...
2,631-41-3108,Alex,Yangon,Normal,Female,Home and lifestyle,46.33,7,16.2155,340.5255,2019-03-03,1:23:00 PM,Credit card,324.31,4.761905,16.2155,7.4,7c5cc5c87dcf99a63f8dca6bc32d94d8007bc75ba350c8...
3,123-19-1176,Alex,Yangon,Member,Female,Health and beauty,58.22,8,23.288,489.048,2019-01-27,8:33:00 PM,Ewallet,465.76,4.761905,23.288,8.4,55f155e28fc838d7f0873bc2fd5518a73add9f984cb72b...
4,373-73-7910,Alex,Yangon,Member,Female,Sports and travel,86.31,7,30.2085,634.3785,2019-02-08,10:37:00 AM,Ewallet,604.17,4.761905,30.2085,5.3,96ac2edbaf17f685fdbc4a99c689e727d561855b34d6ab...


In [23]:
import pandas as pd

print('Rows:', len(df))
print('Columns:', len(df.columns))
display(pd.DataFrame({'dtype': df.dtypes.astype(str), 'nulls': df.isna().sum()}).sort_values('nulls', ascending=False))

Rows: 1000
Columns: 18


Unnamed: 0,dtype,nulls
invoice_id,str,0
branch,str,0
city,str,0
customer_type,str,0
gender,str,0
product_line,str,0
unit_price,float64,0
quantity,Int64,0
tax_5_percent,float64,0
total,float64,0


## 3) Data model (Bronze/Silver in SQLite)

### Bronze: `bronze_sales_raw` (staging)
- Purpose: store landed data with minimal transformation plus an `extracted_at` timestamp.
- Load pattern: full refresh (we drop/recreate the table each run).

### Silver: modeled tables for analytics
Dimensions:
- `silver_dim_product_line` (Type 1): one row per product line (no history).
- `silver_dim_branch` (SCD Type 2): keeps history if a branch’s city changes over time.

Fact:
- `silver_fact_sales`: transaction grain (one row per source record).
- Uses `row_hash` and `INSERT OR IGNORE` to keep the load idempotent.

### SCD Type 2 behavior (branch)
- Natural key: `branch_code`
- Tracked attribute: `city`
- If `city` changes for a branch:
  - expire the current row (`valid_to`, `is_current=0`)
  - insert a new current row (`valid_from`, `is_current=1`)

In [24]:
print(DDL_SQLITE)


-- Staging (bronze) table: raw records as landed

-- Legacy tables (pre-rename). Drop if present to keep the DB clean.
DROP TABLE IF EXISTS fact_sales;
DROP TABLE IF EXISTS dim_branch;
DROP TABLE IF EXISTS dim_product_line;
DROP TABLE IF EXISTS stg_sales_raw;

DROP TABLE IF EXISTS bronze_sales_raw;
CREATE TABLE bronze_sales_raw (
    row_hash TEXT PRIMARY KEY,
    invoice_id TEXT,
    branch TEXT,
    city TEXT,
    customer_type TEXT,
    gender TEXT,
    product_line TEXT,
    unit_price REAL,
    quantity INTEGER,
    tax_5_percent REAL,
    total REAL,
    date TEXT,
    time TEXT,
    payment TEXT,
    cogs REAL,
    gross_margin_percentage REAL,
    gross_income REAL,
    rating REAL,
    extracted_at TEXT NOT NULL
);

-- Dimension: Product Line (Type 1)
CREATE TABLE IF NOT EXISTS silver_dim_product_line (
    product_line_key INTEGER PRIMARY KEY,
    product_line_name TEXT NOT NULL UNIQUE,
    created_at TEXT NOT NULL
);

-- Dimension: Branch (SCD Type 2)
CREATE TABLE IF NOT EX

## 4) Load into SQLite (Bronze → Silver)

### Load sequence
1. Create tables (run DDL).
2. Read raw CSV and load `bronze_sales_raw`.
3. Build `silver_dim_product_line` (insert missing product lines).
4. Upsert `silver_dim_branch` using SCD Type 2 logic.
5. Load `silver_fact_sales` idempotently (`row_hash` + `INSERT OR IGNORE`).

### Output
A populated SQLite DB at `SQLITE_DB_PATH` (default `./db/supermarket_sales.sqlite`).

In [25]:
conn = db.connect(settings.sqlite_db_path)

try:

    db.execute_script(conn, DDL_SQLITE)

    load_staging(conn, frames)

    ensure_dim_product_line(conn)

    scd2_upsert_dim_branch(conn)

    load_fact_sales(conn)

    conn.commit()

finally:

    conn.close()



settings.sqlite_db_path


2026-02-20 11:32:15,210 | INFO | src.transform_load | Loading 1000 rows into staging
2026-02-20 11:32:15,307 | INFO | src.transform_load | dim_product_line is up to date
2026-02-20 11:32:15,314 | INFO | src.transform_load | Inserting 1000 fact rows (idempotent)


WindowsPath('C:/Users/gadde/Desktop/Interview Perparation/66 Degrees Assignment/db/supermarket_sales.sqlite')

## 5) Validation (quick confidence checks)

### What we validate
- Expected tables exist.
- Row counts are non-zero (staging + fact).
- `silver_fact_sales.row_hash` is unique (no duplicate fact rows).
- `txn_date` is ISO formatted (`YYYY-MM-DD`).
- The SCD2 dimension has at least one current row (`is_current=1`).

### Why this matters
These checks catch the most common pipeline failures (bad download, schema drift, empty loads, or broken joins) without adding heavy testing infrastructure.

In [26]:
validate_sqlite_db(settings.sqlite_db_path)

2026-02-20 11:32:54,024 | INFO | src.validate | Row counts: bronze=1000 fact=1000 dim_product_line=6 dim_branch=3
2026-02-20 11:32:54,032 | INFO | src.validate | Fact coverage: 1.000 (1000/1000)


In [27]:
import sqlite3

conn = sqlite3.connect(settings.sqlite_db_path)
try:
    for table in ['bronze_sales_raw', 'silver_dim_product_line', 'silver_dim_branch', 'silver_fact_sales']:
        count = conn.execute(f'SELECT COUNT(*) FROM {table}').fetchone()[0]
        print(table, count)
finally:
    conn.close()

bronze_sales_raw 1000
silver_dim_product_line 6
silver_dim_branch 3
silver_fact_sales 1000


## 6) Next step: reporting (SQL over the Silver tables)

At this point, the modeled layer is ready for analytics queries.

Typical reporting patterns you can demonstrate next:
- Join facts to dimensions for readable attributes (product line name, branch city).
- Use window functions for rankings and time-based comparisons.
- Export query results to CSV for sharing (optional).

## 7) Analysis queries (from `sql/` folder)

The cells below execute the numbered analysis queries against the generated SQLite database (`settings.sqlite_db_path`).

In [28]:
from pathlib import Path
import sqlite3
import pandas as pd

def run_sql_file(relative_sql_path: str) -> pd.DataFrame:
    sql_path = (repo_root / relative_sql_path).resolve()
    sql = sql_path.read_text(encoding="utf-8")

    # Using sqlite3 + pandas for convenient display in notebook
    with sqlite3.connect(settings.sqlite_db_path) as conn:
        df = pd.read_sql_query(sql, conn)

    display(df)
    return df

In [29]:
# 01) Average Rating by Product Line
run_sql_file("sql/01.Average Rating by Product Line.sql")

Unnamed: 0,product_line_name,avg_rating,transaction_count
0,Food and beverages,7.113218,174
1,Fashion accessories,7.029213,178
2,Health and beauty,7.003289,152
3,Electronic accessories,6.924706,170
4,Sports and travel,6.916265,166
5,Home and lifestyle,6.8375,160


Unnamed: 0,product_line_name,avg_rating,transaction_count
0,Food and beverages,7.113218,174
1,Fashion accessories,7.029213,178
2,Health and beauty,7.003289,152
3,Electronic accessories,6.924706,170
4,Sports and travel,6.916265,166
5,Home and lifestyle,6.8375,160


In [30]:
# 02) Customer Type Spend Analysis
run_sql_file("sql/02.Customer Type Spend Analysis.sql")

Unnamed: 0,customer_type,transaction_count,total_sales,avg_transaction_value,sales_share
0,Member,565,189694.764,335.742945,0.5874
1,Normal,435,133271.985,306.372379,0.4126


Unnamed: 0,customer_type,transaction_count,total_sales,avg_transaction_value,sales_share
0,Member,565,189694.764,335.742945,0.5874
1,Normal,435,133271.985,306.372379,0.4126


In [31]:
# 03) KPI Dashboard (5 Tiles)
run_sql_file("sql/03.KPI Dashboard (5 Tiles).sql")

Unnamed: 0,total_sales,avg_basket,transactions,avg_quantity,avg_rating
0,322966.75,322.97,1000,5.51,6.97


Unnamed: 0,total_sales,avg_basket,transactions,avg_quantity,avg_rating
0,322966.75,322.97,1000,5.51,6.97


In [32]:
# 04) Number of Sales per Branch
run_sql_file("sql/04.Number of Sales per Branch.sql")

Unnamed: 0,branch_code,city,transaction_count,invoice_count
0,Alex,Yangon,340,340
1,Cairo,Mandalay,332,332
2,Giza,Naypyitaw,328,328


Unnamed: 0,branch_code,city,transaction_count,invoice_count
0,Alex,Yangon,340,340
1,Cairo,Mandalay,332,332
2,Giza,Naypyitaw,328,328


In [34]:
# 11) Running Revenue by Branch (Daily)
run_sql_file("sql/11.Running Revenue by Branch (Daily).sql")

Unnamed: 0,branch_code,txn_date,day_revenue,running_revenue
0,Alex,2019-01-01,2371.32,2371.32
1,Alex,2019-01-02,307.05,2678.37
2,Alex,2019-01-03,937.41,3615.78
3,Alex,2019-01-04,483.26,4099.04
4,Alex,2019-01-05,2024.51,6123.55
...,...,...,...,...
258,Giza,2019-03-26,473.97,106716.92
259,Giza,2019-03-27,943.30,107660.22
260,Giza,2019-03-28,480.81,108141.02
261,Giza,2019-03-29,985.70,109126.72


Unnamed: 0,branch_code,txn_date,day_revenue,running_revenue
0,Alex,2019-01-01,2371.32,2371.32
1,Alex,2019-01-02,307.05,2678.37
2,Alex,2019-01-03,937.41,3615.78
3,Alex,2019-01-04,483.26,4099.04
4,Alex,2019-01-05,2024.51,6123.55
...,...,...,...,...
258,Giza,2019-03-26,473.97,106716.92
259,Giza,2019-03-27,943.30,107660.22
260,Giza,2019-03-28,480.81,108141.02
261,Giza,2019-03-29,985.70,109126.72


In [35]:
# 12) Monthly Revenue by Branch & Product Line
run_sql_file("sql/12.Monthly Revenue by Branch & Product Line.sql")

Unnamed: 0,year_month,branch_code,city,product_line_name,revenue,product_rank_in_branch_month,running_revenue_in_branch
0,2019-01,Alex,Yangon,Home and lifestyle,10313.5935,1,10313.5935
1,2019-01,Alex,Yangon,Fashion accessories,6847.491,2,17161.0845
2,2019-01,Alex,Yangon,Sports and travel,6509.9475,3,23671.032
3,2019-01,Alex,Yangon,Electronic accessories,6401.2725,4,30072.3045
4,2019-01,Alex,Yangon,Food and beverages,4646.229,5,34718.5335
5,2019-01,Alex,Yangon,Health and beauty,3962.595,6,38681.1285
6,2019-01,Cairo,Mandalay,Sports and travel,6768.0795,1,6768.0795
7,2019-01,Cairo,Mandalay,Electronic accessories,6699.777,2,13467.8565
8,2019-01,Cairo,Mandalay,Food and beverages,6609.2775,3,20077.134
9,2019-01,Cairo,Mandalay,Health and beauty,6399.8865,4,26477.0205


Unnamed: 0,year_month,branch_code,city,product_line_name,revenue,product_rank_in_branch_month,running_revenue_in_branch
0,2019-01,Alex,Yangon,Home and lifestyle,10313.5935,1,10313.5935
1,2019-01,Alex,Yangon,Fashion accessories,6847.491,2,17161.0845
2,2019-01,Alex,Yangon,Sports and travel,6509.9475,3,23671.032
3,2019-01,Alex,Yangon,Electronic accessories,6401.2725,4,30072.3045
4,2019-01,Alex,Yangon,Food and beverages,4646.229,5,34718.5335
5,2019-01,Alex,Yangon,Health and beauty,3962.595,6,38681.1285
6,2019-01,Cairo,Mandalay,Sports and travel,6768.0795,1,6768.0795
7,2019-01,Cairo,Mandalay,Electronic accessories,6699.777,2,13467.8565
8,2019-01,Cairo,Mandalay,Food and beverages,6609.2775,3,20077.134
9,2019-01,Cairo,Mandalay,Health and beauty,6399.8865,4,26477.0205


In [36]:
# 13) Month-over-month revenue by branch
run_sql_file("sql/13.Month-over-month revenue by branch.sql")

Unnamed: 0,branch_code,year_month,revenue,prev_month_revenue
0,Alex,2019-01,38681.13,
1,Alex,2019-02,29860.12,38681.13
2,Alex,2019-03,37659.12,29860.12
3,Cairo,2019-01,37176.06,
4,Cairo,2019-02,34424.27,37176.06
5,Cairo,2019-03,34597.34,34424.27
6,Giza,2019-01,40434.68,
7,Giza,2019-02,32934.98,40434.68
8,Giza,2019-03,37199.04,32934.98


Unnamed: 0,branch_code,year_month,revenue,prev_month_revenue
0,Alex,2019-01,38681.13,
1,Alex,2019-02,29860.12,38681.13
2,Alex,2019-03,37659.12,29860.12
3,Cairo,2019-01,37176.06,
4,Cairo,2019-02,34424.27,37176.06
5,Cairo,2019-03,34597.34,34424.27
6,Giza,2019-01,40434.68,
7,Giza,2019-02,32934.98,40434.68
8,Giza,2019-03,37199.04,32934.98


In [37]:
# 14) Top 3 Product Lines per Branch (Revenue Rank)
run_sql_file("sql/14.Top 3 Product Lines per Branch (Revenue Rank).sql")

Unnamed: 0,branch_code,product_line_name,revenue,rev_rank,branch_revenue_pct
0,Alex,Home and lifestyle,22417.2,1,21.11
1,Alex,Sports and travel,19372.7,2,18.24
2,Alex,Electronic accessories,18317.11,3,17.25
3,Cairo,Sports and travel,19988.2,1,18.82
4,Cairo,Health and beauty,19980.66,2,18.81
5,Cairo,Home and lifestyle,17549.16,3,16.52
6,Giza,Food and beverages,23766.85,1,21.5
7,Giza,Fashion accessories,21560.07,2,19.5
8,Giza,Electronic accessories,18968.97,3,17.16


Unnamed: 0,branch_code,product_line_name,revenue,rev_rank,branch_revenue_pct
0,Alex,Home and lifestyle,22417.2,1,21.11
1,Alex,Sports and travel,19372.7,2,18.24
2,Alex,Electronic accessories,18317.11,3,17.25
3,Cairo,Sports and travel,19988.2,1,18.82
4,Cairo,Health and beauty,19980.66,2,18.81
5,Cairo,Home and lifestyle,17549.16,3,16.52
6,Giza,Food and beverages,23766.85,1,21.5
7,Giza,Fashion accessories,21560.07,2,19.5
8,Giza,Electronic accessories,18968.97,3,17.16
