In [19]:
import os
import json
import random
import requests
import pandas as pd
from datetime import datetime, timedelta
from typing import Dict, Any

from google.cloud import bigquery
from google.cloud.bigquery import LoadJobConfig, SourceFormat, WriteDisposition
from google.api_core.exceptions import NotFound

In [20]:
# Config
PROJECT_ID = os.getenv("BQ_PROJECT", "analytics-pipeline-assessment")
DATASET_ID = os.getenv("BQ_DATASET", "analytics_dw")
bq = bigquery.Client(project=PROJECT_ID)

def log(step: str, **kwargs):
    print(json.dumps({"ts": datetime.utcnow().isoformat()+"Z", "step": step, **kwargs}))

def run_sql(sql: str):
    return bq.query(sql, job_id_prefix="nbq_").result()

def ensure_dataset():
    ds = f"{PROJECT_ID}.{DATASET_ID}"
    try:
        bq.get_dataset(ds)
        log("dataset.exists", dataset=ds)
    except Exception:
        bq.create_dataset(bigquery.Dataset(ds), exists_ok=True)
        log("dataset.created", dataset=ds)

Attendance generator function

In [21]:
def gen_attendance(num_records: int = 3_000_000,
                   output_file: str = "attendance_dataset_3m.csv") -> Dict[str, Any]:

    regions = ["North America", "Europe", "Asia", "South America", "Africa", "Oceania"]
    countries = {
        "North America": ["USA", "Canada", "Mexico"],
        "Europe": ["Germany", "France", "UK", "Italy"],
        "Asia": ["China", "India", "Japan", "Singapore"],
        "South America": ["Brazil", "Argentina", "Chile"],
        "Africa": ["South Africa", "Nigeria", "Egypt"],
        "Oceania": ["Australia", "New Zealand"]
    }
    departments = ["IT", "Sales", "Marketing", "HR", "Finance", "Operations"]
    first_names = ["Alice", "Bob", "Chen", "Daniela", "Ethan", "Fatima", "George", "Hiro", "Isabella", "Juan"]
    last_names = ["Johnson", "Smith", "Wei", "Lopez", "Brown", "Hassan", "Wilson", "Tanaka", "Rossi", "Martinez"]
    statuses = ["Present", "Absent", "Remote"]

    def generate_attendance_data(n):
        for i in range(1, n+1):
            staff_id = f"ST{i:07d}"
            name = f"{random.choice(first_names)} {random.choice(last_names)}"
            region = random.choice(regions)
            country = random.choice(countries[region])
            department = random.choice(departments)
            date = datetime(2020, 1, 1) + timedelta(days=random.randint(0, 1825))  # 5 years
            status = random.choices(statuses, weights=[0.7, 0.1, 0.2])[0]
            if status in ["Present", "Remote"]:
                check_in_hour = random.randint(8, 10)
                check_in_minute = random.randint(0, 59)
                check_out_hour = random.randint(16, 18)
                check_out_minute = random.randint(0, 59)
                check_in = f"{check_in_hour:02d}:{check_in_minute:02d}"
                check_out = f"{check_out_hour:02d}:{check_out_minute:02d}"
            else:
                check_in, check_out = "-", "-"

            yield [
                staff_id, name, region, country, department, date.strftime("%Y-%m-%d"),
                status, check_in, check_out
            ]

    # Write CSV in chunks
    columns = ["StaffID", "Name", "Region", "Country", "Department", "Date",
               "Status", "CheckInTime", "CheckOutTime"]
    chunk_size = 100_000

    log("gen.attendance.start", records=num_records, output=output_file)
    with open(output_file, "w", encoding="utf-8") as f:
        f.write(",".join(columns) + "\n")
        for start in range(0, num_records, chunk_size):
            chunk = list(generate_attendance_data(min(chunk_size, num_records - start)))
            df = pd.DataFrame(chunk, columns=columns)
            df.to_csv(f, header=False, index=False)
    print(f"✅ Attendance dataset generated: {output_file}")

    log("gen.attendance.done", file=output_file)

    return {"type": "path", "data": output_file}


Sales data generator function

In [22]:
def gen_sales(num_records: int = 3_000_000,
              output_file: str = "sales_dataset_3m.csv") -> Dict[str, Any]:

    regions = ["North America", "Europe", "Asia", "South America", "Africa", "Oceania"]
    countries = {
        "North America": ["USA", "Canada", "Mexico"],
        "Europe": ["Germany", "France", "UK", "Italy"],
        "Asia": ["China", "India", "Japan", "Singapore"],
        "South America": ["Brazil", "Argentina", "Chile"],
        "Africa": ["South Africa", "Nigeria", "Egypt"],
        "Oceania": ["Australia", "New Zealand"]
    }
    currencies = {
        "USA": "USD", "Canada": "CAD", "Mexico": "MXN",
        "Germany": "EUR", "France": "EUR", "UK": "GBP", "Italy": "EUR",
        "China": "CNY", "India": "INR", "Japan": "JPY", "Singapore": "SGD",
        "Brazil": "BRL", "Argentina": "ARS", "Chile": "CLP",
        "South Africa": "ZAR", "Nigeria": "NGN", "Egypt": "EGP",
        "Australia": "AUD", "New Zealand": "NZD"
    }
    products = ["Software", "Hardware", "Consulting", "Cloud Services", "Licenses"]

    def generate_sales_data(n):
        for i in range(1, n+1):
            region = random.choice(regions)
            country = random.choice(countries[region])
            product = random.choice(products)
            currency = currencies[country]
            date = datetime(2020, 1, 1) + timedelta(days=random.randint(0, 1825))  # 5 years
            quantity = random.randint(1, 50)
            unit_price = round(random.uniform(100, 5000), 2)
            total_sales = round(quantity * unit_price, 2)

            yield [
                f"S{i:07d}", region, country, product, date.strftime("%Y-%m-%d"),
                currency, quantity, unit_price, total_sales
            ]

    columns = ["SaleID", "Region", "Country", "Product", "Date",
               "Currency", "Quantity", "UnitPrice", "TotalSales"]
    chunk_size = 100_000

    log("gen.sales.start", records=num_records, output=output_file)
    with open(output_file, "w", encoding="utf-8") as f:
        f.write(",".join(columns) + "\n")
        for start in range(0, num_records, chunk_size):
            chunk = list(generate_sales_data(min(chunk_size, num_records - start)))
            df = pd.DataFrame(chunk, columns=columns)
            df.to_csv(f, header=False, index=False)
    print(f"✅ Sales dataset generated: {output_file}")

    log("gen.sales.done", file=output_file)

    return {"type": "path", "data": output_file}

Finance Data Generator function

In [23]:
def gen_finance(num_records: int = 3_000_000,
                output_file: str = "financial_dataset_3m.csv") -> Dict[str, Any]:

    regions = ["North America", "Europe", "Asia", "South America", "Africa", "Oceania"]
    countries = {
        "North America": ["USA", "Canada", "Mexico"],
        "Europe": ["Germany", "France", "UK", "Italy"],
        "Asia": ["China", "India", "Japan", "Singapore"],
        "South America": ["Brazil", "Argentina", "Chile"],
        "Africa": ["South Africa", "Nigeria", "Egypt"],
        "Oceania": ["Australia", "New Zealand"]
    }
    currencies = {
        "USA": "USD", "Canada": "CAD", "Mexico": "MXN",
        "Germany": "EUR", "France": "EUR", "UK": "GBP", "Italy": "EUR",
        "China": "CNY", "India": "INR", "Japan": "JPY", "Singapore": "SGD",
        "Brazil": "BRL", "Argentina": "ARS", "Chile": "CLP",
        "South Africa": "ZAR", "Nigeria": "NGN", "Egypt": "EGP",
        "Australia": "AUD", "New Zealand": "NZD"
    }
    products = ["Software", "Hardware", "Consulting", "Cloud Services", "Licenses"]

    def generate_finance_data(n):
        for i in range(1, n+1):
            region = random.choice(regions)
            country = random.choice(countries[region])
            currency = currencies[country]
            product = random.choice(products)
            date = datetime(2020, 1, 1) + timedelta(days=random.randint(0, 1825))  # 5 years
            revenue = round(random.uniform(1000, 100000), 2)
            expense = round(revenue * random.uniform(0.4, 0.9), 2)
            profit = revenue - expense

            yield [
                f"T{i:07d}", region, country, product, date.strftime("%Y-%m-%d"),
                currency, revenue, expense, profit
            ]

    columns = ["TransactionID", "Region", "Country", "Product", "Date",
               "Currency", "Revenue", "Expense", "Profit"]
    chunk_size = 100_000

    log("gen.finance.start", records=num_records, output=output_file)
    with open(output_file, "w", encoding="utf-8") as f:
        f.write(",".join(columns) + "\n")
        for start in range(0, num_records, chunk_size):
            chunk = list(generate_finance_data(min(chunk_size, num_records - start)))
            df = pd.DataFrame(chunk, columns=columns)
            df.to_csv(f, header=False, index=False)
    print(f"✅ Finance dataset generated: {output_file}")

    log("gen.finance.done", file=output_file)

    return {"type": "path", "data": output_file}

In [24]:
def gen_data():

    log("gen_data.start", order="attendance>sales>finance")

    a_src = gen_attendance()
    s_src = gen_sales()
    f_src = gen_finance()

    log("gen_data.done", outputs=[a_src["data"], s_src["data"], f_src["data"]])
    return a_src, s_src, f_src

DDL

In [25]:
DDL_SQL = f"""
CREATE SCHEMA IF NOT EXISTS `{PROJECT_ID}.{DATASET_ID}`;

CREATE TABLE IF NOT EXISTS `{PROJECT_ID}.{DATASET_ID}.dim_date` (
  date_key DATE,
  year INT64,
  quarter INT64,
  month INT64,
  month_name STRING,
  day_of_month INT64,
  day_of_week INT64,
  day_name STRING
);

CREATE TABLE IF NOT EXISTS `{PROJECT_ID}.{DATASET_ID}.dim_location` (
  location_key STRING,
  region STRING,
  country STRING
);

CREATE TABLE IF NOT EXISTS `{PROJECT_ID}.{DATASET_ID}.dim_product` (
  product_key STRING,
  product_name STRING
);

CREATE TABLE IF NOT EXISTS `{PROJECT_ID}.{DATASET_ID}.dim_employee` (
  employee_key STRING,
  staffid STRING,
  name STRING,
  department STRING,
  home_country STRING,
  home_region STRING
);

CREATE TABLE IF NOT EXISTS `{PROJECT_ID}.{DATASET_ID}.dim_currency` (
  currency_key STRING,
  currency_code STRING,
  date_to_usd NUMERIC
);

CREATE TABLE IF NOT EXISTS `{PROJECT_ID}.{DATASET_ID}.stg_attendance` (
  StaffID STRING,
  Name STRING,
  Region STRING,
  Country STRING,
  Department STRING,
  Date DATE,
  Status STRING,
  CheckInTime STRING,
  CheckOutTime STRING
);

CREATE TABLE IF NOT EXISTS `{PROJECT_ID}.{DATASET_ID}.stg_sales` (
  SaleID STRING,
  Region STRING,
  Country STRING,
  Product STRING,
  Date DATE,
  Currency STRING,
  Quantity INT64,
  UnitPrice NUMERIC,
  TotalSales NUMERIC
);

CREATE TABLE IF NOT EXISTS `{PROJECT_ID}.{DATASET_ID}.stg_finance` (
  TransactionID STRING,
  Region STRING,
  Country STRING,
  Product STRING,
  Date DATE,
  Currency STRING,
  Revenue NUMERIC,
  Expense NUMERIC,
  Profit NUMERIC
);

CREATE TABLE IF NOT EXISTS `{PROJECT_ID}.{DATASET_ID}.fact_attendance` (
  attendance_key STRING,
  employee_key STRING,
  location_key STRING,
  date_key DATE,
  status STRING,
  checkin_time TIMESTAMP,
  checkout_time TIMESTAMP
);

CREATE TABLE IF NOT EXISTS `{PROJECT_ID}.{DATASET_ID}.fact_sales` (
  saleid STRING,
  product_key STRING,
  location_key STRING,
  date_key DATE,
  currency_key STRING,
  quantity INT64,
  conversion_rate_to_usd NUMERIC,
  unit_price_usd NUMERIC,
  total_sales_usd NUMERIC
);

CREATE TABLE IF NOT EXISTS `{PROJECT_ID}.{DATASET_ID}.fact_finance` (
  transaction_id STRING,
  product_key STRING,
  location_key STRING,
  date_key DATE,
  currency_key STRING,
  conversion_rate_to_usd NUMERIC,
  revenue_usd NUMERIC,
  expense_usd NUMERIC,
  profit_usd NUMERIC
);
"""
run_sql(DDL_SQL)
log("ddl.applied")

{"ts": "2025-08-24T11:50:48.848446Z", "step": "ddl.applied"}


Load data into staging tables

In [None]:
# Type coercion map per staging table
COERCE_COLUMNS: Dict[str, Dict[str, str]] = {
    "stg_attendance": {"Date": "date", "CheckInTime": "time", "CheckOutTime": "time", "StaffID": "int"},
    "stg_sales":      {"Date": "date", "Quantity": "int", "UnitPrice": "float"},
    "stg_finance":    {"Date": "date", "Revenue": "float", "Expense": "float", "Profit": "float"},
}

# Helpers
def _coerce_types_light(df: pd.DataFrame, coerce: Dict[str, str]) -> pd.DataFrame:
    out = df.copy()

    def _clean_time_series(s: pd.Series) -> pd.Series:
        s = (
            s.astype(str)
             .str.strip()
             .replace({"-": pd.NA, "": pd.NA, "NULL": pd.NA, "NaN": pd.NA, "nan": pd.NA, "None": pd.NA})
        )
        parsed = pd.to_datetime(s, errors="coerce").dt.time 
        return parsed.where(pd.notna(parsed), None) 

    for col, kind in (coerce or {}).items():
        if col not in out.columns:
            continue
        if kind == "date":
            out[col] = pd.to_datetime(out[col], errors="coerce").dt.date
        elif kind == "time":
            out[col] = _clean_time_series(out[col])
        elif kind == "int":
            out[col] = pd.to_numeric(out[col], errors="coerce").astype("Int64")
        elif kind == "float":
            out[col] = pd.to_numeric(out[col], errors="coerce")
    return out

def _read_src_to_df(src: Dict[str, Any]) -> pd.DataFrame:
    if src["type"] == "df":
        return src["data"].copy()
    elif src["type"] == "path":
        path = src["data"]
        if path.lower().endswith(".csv"):
            return pd.read_csv(path)
        else:
            return pd.read_json(path, lines=True)
    else:
        raise ValueError("Unknown src type; expected {'type':'df'|'path','data':...}")

def _load_df(table_id: str, df: pd.DataFrame):
    job_config = LoadJobConfig(write_disposition=WriteDisposition.WRITE_TRUNCATE)
    bq.load_table_from_dataframe(df, table_id, job_config=job_config).result()

def _dedupe_and_load_to_bq(table_id: str, src: Dict[str, Any]) -> Dict[str, Any]:
    short = table_id.split(".")[-1]
    rejects_table = table_id + "_rejects"

    df_in = _read_src_to_df(src)
    total_rows = int(len(df_in) if df_in is not None else 0)

    # Normalize types before duplicate detection to match DW expectations
    df_norm = _coerce_types_light(df_in, COERCE_COLUMNS.get(short, {}))

    if total_rows == 0:
        # ensure targets exist and empty
        try:
            bq.query(f"TRUNCATE TABLE `{table_id}`").result()
        except Exception:
            pass
        try:
            bq.get_table(rejects_table)
            bq.query(f"TRUNCATE TABLE `{rejects_table}`").result()
        except Exception:
            pass
        log(f"{short}.validated", total=0, loaded=0, rejected=0)
        return {"table": short, "total_rows": 0, "loaded_rows": 0, "rejected_rows": 0, "rejects_table": rejects_table}

    # Exact-row duplicate detection
    dup_mask = df_norm.duplicated(keep="first")
    rejects_df = df_norm.loc[dup_mask].copy()
    valid_df   = df_norm.loc[~dup_mask].copy()

    if not rejects_df.empty:
        rejects_df["error_reason"] = "duplicate_row"

    # Load valid
    if not valid_df.empty:
        _load_df(table_id, valid_df)
        loaded_rows = int(len(valid_df))
    else:
        try:
            bq.query(f"TRUNCATE TABLE `{table_id}`").result()
        except Exception:
            pass
        loaded_rows = 0

    # Load and clear rejects
    if not rejects_df.empty:
        _load_df(rejects_table, rejects_df)
        rejected_rows = int(len(rejects_df))
    else:
        try:
            bq.get_table(rejects_table)
            bq.query(f"TRUNCATE TABLE `{rejects_table}`").result()
        except Exception:
            pass
        rejected_rows = 0

    log(f"{short}.validated", total=total_rows, loaded=loaded_rows, rejected=rejected_rows)
    return {
        "table": short,
        "total_rows": total_rows,
        "loaded_rows": loaded_rows,
        "rejected_rows": rejected_rows,
        "rejects_table": rejects_table,
    }

def stage_from_generated(a_src, s_src, f_src):
    log("staging.start")

    att = _dedupe_and_load_to_bq(f"{PROJECT_ID}.{DATASET_ID}.stg_attendance", a_src)
    log("staging.attendance.done", rows=att["loaded_rows"], rejected=att["rejected_rows"])

    sal = _dedupe_and_load_to_bq(f"{PROJECT_ID}.{DATASET_ID}.stg_sales", s_src)
    log("staging.sales.done", rows=sal["loaded_rows"], rejected=sal["rejected_rows"])

    fin = _dedupe_and_load_to_bq(f"{PROJECT_ID}.{DATASET_ID}.stg_finance", f_src)
    log("staging.finance.done", rows=fin["loaded_rows"], rejected=fin["rejected_rows"])

    log("staging.all.done", totals={"attendance": att, "sales": sal, "finance": fin})

Load data into dim tables

In [27]:
def build_dimensions():
    log("dims.start")

    # dim_date
    date_range = pd.date_range(start="2020-01-01", end="2025-12-31", freq="D")
    df_date = pd.DataFrame({
        "date_key": date_range.date,
        "year": date_range.year,
        "quarter": date_range.quarter,
        "month": date_range.month,
        "month_name": date_range.strftime("%B"),
        "day_of_month": date_range.day,
        "day_of_week": date_range.dayofweek + 1,
        "day_name": date_range.strftime("%A"),
    })
    table_id = f"{PROJECT_ID}.{DATASET_ID}.dim_date"
    bq.load_table_from_dataframe(
        df_date,
        table_id,
        job_config=bigquery.LoadJobConfig(write_disposition="WRITE_TRUNCATE"),
    ).result()
    log("dim_date.loaded", rows=len(df_date), table=table_id)

    # dim_location
    sql_location = f"""
    CREATE OR REPLACE TABLE `{PROJECT_ID}.{DATASET_ID}.dim_location` AS
    SELECT DISTINCT
      CONCAT(Country, "_", Region) AS location_key,
      Region,
      Country
    FROM `{PROJECT_ID}.{DATASET_ID}.stg_sales`
    UNION DISTINCT
    SELECT DISTINCT
      CONCAT(Country, "_", Region), Region, Country
    FROM `{PROJECT_ID}.{DATASET_ID}.stg_finance`
    UNION DISTINCT
    SELECT DISTINCT
      CONCAT(Country, "_", Region), Region, Country
    FROM `{PROJECT_ID}.{DATASET_ID}.stg_attendance`
    """
    bq.query(sql_location, location="US").result()
    log("dim_location.created", table=f"{PROJECT_ID}.{DATASET_ID}.dim_location")

    # dim_product
    sql_product = f"""
    CREATE OR REPLACE TABLE `{PROJECT_ID}.{DATASET_ID}.dim_product` AS
    SELECT DISTINCT
      CONCAT('PROD_', Product) AS product_key,
      Product AS product_name
    FROM `{PROJECT_ID}.{DATASET_ID}.stg_sales`
    UNION DISTINCT
    SELECT DISTINCT
      CONCAT('PROD_', Product), Product
    FROM `{PROJECT_ID}.{DATASET_ID}.stg_finance`
    """
    bq.query(sql_product, location="US").result()
    log("dim_product.created", table=f"{PROJECT_ID}.{DATASET_ID}.dim_product")

    # dim_employee
    sql_employee = f"""
    CREATE OR REPLACE TABLE `{PROJECT_ID}.{DATASET_ID}.dim_employee` AS
    SELECT DISTINCT
      CONCAT('EMP_', StaffID) AS employee_key,
      StaffID,
      Name,
      Department,
      Country AS home_country,
      Region  AS home_region
    FROM `{PROJECT_ID}.{DATASET_ID}.stg_attendance`
    """
    bq.query(sql_employee, location="US").result()
    log("dim_employee.created", table=f"{PROJECT_ID}.{DATASET_ID}.dim_employee")

    # dim_currency
    url = "https://api.exchangerate-api.com/v4/latest/USD"
    resp = requests.get(url)
    resp.raise_for_status()
    rates = resp.json()["rates"]
    today = datetime.utcnow().date()

    df_currency = pd.DataFrame([
        {"currency_key": code, "currency_code": code, "date_key": today, "date_to_usd": rate}
        for code, rate in rates.items()
    ])

    currency_table = f"{PROJECT_ID}.{DATASET_ID}.dim_currency"
    bq.load_table_from_dataframe(
        df_currency,
        currency_table,
        job_config=bigquery.LoadJobConfig(write_disposition="WRITE_TRUNCATE"),
    ).result()
    log("dim_currency.loaded", rows=len(df_currency), table=currency_table)

    log("dims.done")

Load data into fact_attendance

In [28]:
def merge_fact_attendance():
    sql = f"""
    MERGE `{PROJECT_ID}.{DATASET_ID}.fact_attendance` T
    USING (
      SELECT
        e.employee_key,
        l.location_key,
        d.date_key,
        s.Status AS status,
        -- Already TIME from staging normalization
        s.CheckInTime  AS in_time,
        s.CheckOutTime AS out_time
      FROM `{PROJECT_ID}.{DATASET_ID}.stg_attendance` s
      JOIN `{PROJECT_ID}.{DATASET_ID}.dim_employee`  e
        ON CAST(s.StaffID AS STRING) = CAST(e.StaffID AS STRING)
      JOIN `{PROJECT_ID}.{DATASET_ID}.dim_location`  l
        ON s.Country = l.country AND s.Region = l.region
      JOIN `{PROJECT_ID}.{DATASET_ID}.dim_date`      d
        ON DATE(s.Date) = d.date_key   -- if s.Date is DATE already, you can just do: s.Date = d.date_key
    ) S
    ON T.employee_key = S.employee_key
   AND T.date_key     = S.date_key

    WHEN MATCHED THEN UPDATE SET
      T.location_key  = S.location_key,
      T.status        = S.status,
      T.checkin_time  = IF(S.in_time  IS NULL, NULL, TIMESTAMP(DATETIME(S.date_key, S.in_time))),
      T.checkout_time = IF(S.out_time IS NULL, NULL, TIMESTAMP(DATETIME(S.date_key, S.out_time)))

    WHEN NOT MATCHED THEN INSERT (
      attendance_key, employee_key, location_key, date_key, status, checkin_time, checkout_time
    ) VALUES (
      GENERATE_UUID(), S.employee_key, S.location_key, S.date_key, S.status,
      IF(S.in_time  IS NULL, NULL, TIMESTAMP(DATETIME(S.date_key, S.in_time))),
      IF(S.out_time IS NULL, NULL, TIMESTAMP(DATETIME(S.date_key, S.out_time)))
    );
    """
    run_sql(sql)
    log("fact_attendance.merged")


Load data into fact_sales

In [29]:
def merge_fact_sales():
    sql = f"""
    MERGE `{PROJECT_ID}.{DATASET_ID}.fact_sales` T
    USING (
      SELECT
        CONCAT(s.SaleID) AS nk,
        p.product_key, l.location_key, d.date_key,
        c.currency_key,
        CAST(s.Quantity AS INT64) AS quantity,
        CAST(r.date_to_usd AS NUMERIC) AS conversion_rate_to_usd,
        CAST(FORMAT('%.2f', ROUND(s.UnitPrice * r.date_to_usd, 2)) AS NUMERIC) AS unit_price_usd,
        CAST(FORMAT('%.2f', ROUND(s.TotalSales * r.date_to_usd, 2)) AS NUMERIC) AS total_sales_usd
      FROM `{PROJECT_ID}.{DATASET_ID}.stg_sales` s
      JOIN `{PROJECT_ID}.{DATASET_ID}.dim_product`  p ON s.Product = p.product_name
      JOIN `{PROJECT_ID}.{DATASET_ID}.dim_location` l ON s.Country = l.country AND s.Region = l.region
      JOIN `{PROJECT_ID}.{DATASET_ID}.dim_date`     d ON s.Date = d.date_key
      JOIN `{PROJECT_ID}.{DATASET_ID}.dim_currency` r ON s.Currency = r.currency_code
      JOIN `{PROJECT_ID}.{DATASET_ID}.dim_currency` c ON s.Currency = c.currency_code
    ) S
    ON T.saleid = S.nk
    WHEN MATCHED THEN UPDATE SET
      T.product_key = S.product_key,
      T.location_key= S.location_key,
      T.date_key    = S.date_key,
      T.currency_key= S.currency_key,
      T.quantity    = S.quantity,
      T.conversion_rate_to_usd = S.conversion_rate_to_usd,
      T.unit_price_usd = S.unit_price_usd,
      T.total_sales_usd= S.total_sales_usd
    WHEN NOT MATCHED THEN INSERT (
      saleid, product_key, location_key, date_key, currency_key, quantity,
      conversion_rate_to_usd, unit_price_usd, total_sales_usd
    ) VALUES (
      S.nk, S.product_key, S.location_key, S.date_key, S.currency_key, S.quantity,
      S.conversion_rate_to_usd, S.unit_price_usd, S.total_sales_usd
    );
    """
    run_sql(sql)
    log("fact_sales.merged")

Load data into fact_finance

In [30]:
def merge_fact_finance():
    sql = f"""
    MERGE `{PROJECT_ID}.{DATASET_ID}.fact_finance` T
    USING (
      SELECT
        CONCAT(f.TransactionID) AS nk,
        p.product_key, l.location_key, d.date_key,
        c.currency_key,
        CAST(r.date_to_usd AS NUMERIC) AS conversion_rate_to_usd,
        CAST(FORMAT('%.2f', ROUND(f.Revenue * r.date_to_usd, 2)) AS NUMERIC) AS revenue_usd,
        CAST(FORMAT('%.2f', ROUND(f.Expense * r.date_to_usd, 2)) AS NUMERIC) AS expense_usd,
        CAST(FORMAT('%.2f', ROUND(f.Profit  * r.date_to_usd, 2)) AS NUMERIC) AS profit_usd
      FROM `{PROJECT_ID}.{DATASET_ID}.stg_finance` f
      JOIN `{PROJECT_ID}.{DATASET_ID}.dim_product`  p ON f.Product = p.product_name
      JOIN `{PROJECT_ID}.{DATASET_ID}.dim_location` l ON f.Country = l.country AND f.Region = l.region
      JOIN `{PROJECT_ID}.{DATASET_ID}.dim_date`     d ON f.Date = d.date_key
      JOIN `{PROJECT_ID}.{DATASET_ID}.dim_currency` r ON f.Currency = r.currency_code
      JOIN `{PROJECT_ID}.{DATASET_ID}.dim_currency` c ON f.Currency = c.currency_code
    ) S
    ON T.transaction_id = S.nk
    WHEN MATCHED THEN UPDATE SET
      T.product_key = S.product_key,
      T.location_key= S.location_key,
      T.date_key    = S.date_key,
      T.currency_key= S.currency_key,
      T.conversion_rate_to_usd = S.conversion_rate_to_usd,
      T.revenue_usd = S.revenue_usd,
      T.expense_usd = S.expense_usd,
      T.profit_usd  = S.profit_usd
    WHEN NOT MATCHED THEN INSERT (
      transaction_id, product_key, location_key, date_key, currency_key,
      conversion_rate_to_usd,
      revenue_usd, expense_usd, profit_usd
    ) VALUES (
      S.nk, S.product_key, S.location_key, S.date_key, S.currency_key,
      S.conversion_rate_to_usd,
      S.revenue_usd, S.expense_usd, S.profit_usd
    );
    """
    run_sql(sql)
    log("fact_finance.merged")

Validation and summary report generation

In [31]:
def summarize_and_validate():
    checks = {
        "stg_attendance":  f"SELECT COUNT(*) c FROM `{PROJECT_ID}.{DATASET_ID}.stg_attendance`",
        "stg_sales":       f"SELECT COUNT(*) c FROM `{PROJECT_ID}.{DATASET_ID}.stg_sales`",
        "stg_finance":     f"SELECT COUNT(*) c FROM `{PROJECT_ID}.{DATASET_ID}.stg_finance`",
        "fact_attendance": f"SELECT COUNT(*) c FROM `{PROJECT_ID}.{DATASET_ID}.fact_attendance`",
        "fact_sales":      f"SELECT COUNT(*) c, MIN(unit_price_usd) min_usd, MAX(unit_price_usd) max_usd FROM `{PROJECT_ID}.{DATASET_ID}.fact_sales`",
        "fact_finance":    f"SELECT COUNT(*) c, MIN(revenue_usd) min_rev, MAX(revenue_usd) max_rev FROM `{PROJECT_ID}.{DATASET_ID}.fact_finance`",
    }
    report = {}
    for name, sql in checks.items():
        rows = list(run_sql(sql))
        report[name] = dict(rows[0].items())

    print("\n=== SUMMARY REPORT ===")
    print(json.dumps(report, indent=2, default=str))

    # USD-only sanity checks
    if report["fact_sales"]["min_usd"] is None or report["fact_finance"]["min_rev"] is None:
        raise RuntimeError("Validation failed: USD fields contain NULLs.")
    if report["fact_sales"]["min_usd"] < 0 or report["fact_finance"]["min_rev"] < 0:
        raise RuntimeError("Validation failed: negative USD encountered.")
    log("validation.ok")

Table truncation

In [32]:
def truncate_all_tables(project_id: str,
                        dataset_id: str,
                        prefixes: list[str] | None = None,
                        exclude: list[str] | None = None,
                        dry_run: bool = False):

    client = bigquery.Client(project=project_id)
    ds_ref = bigquery.DatasetReference(project_id, dataset_id)
    exclude = set(exclude or [])
    prefixes = tuple(prefixes or ())

    for tbl in client.list_tables(ds_ref):
        if tbl.table_type != "TABLE":
            continue

        name = tbl.table_id
        if prefixes and not name.startswith(prefixes):
            continue
        if name in exclude:
            continue

        sql = f"TRUNCATE TABLE `{project_id}.{dataset_id}.{name}`"
        if dry_run:
            print(sql)
        else:
            client.query(sql).result()
            print(f"Truncated {name}")

Pipeline driver

In [33]:
def run_pipeline():
    try:
        ensure_dataset()

        truncate_all_tables(PROJECT_ID, DATASET_ID, prefixes=["dim_", "fact_", "stg_"], exclude=["dim_date"])

        # 1) Generate data (attendance → sales → finance)
        a_src, s_src, f_src = gen_data()

        # 2) Stage
        stage_from_generated(a_src, s_src, f_src)

        # 3) Dimensions
        build_dimensions()

        # 4) Facts
        merge_fact_attendance()
        merge_fact_sales()
        merge_fact_finance()

        # 5) Validate and report
        summarize_and_validate()

        log("pipeline.done", status="success")
    except Exception as e:
        log("pipeline.failed", error=str(e))
        raise

# Execute
run_pipeline()

{"ts": "2025-08-24T11:50:49.233814Z", "step": "dataset.exists", "dataset": "analytics-pipeline-assessment.analytics_dw"}
Truncated dim_currency
Truncated dim_employee
Truncated dim_location
Truncated dim_product
Truncated fact_attendance
Truncated fact_finance
Truncated fact_sales
Truncated stg_attendance
Truncated stg_attendance_rejects
Truncated stg_finance
Truncated stg_finance_rejects
Truncated stg_sales
Truncated stg_sales_rejects
{"ts": "2025-08-24T11:51:26.529219Z", "step": "gen_data.start", "order": "attendance>sales>finance"}
{"ts": "2025-08-24T11:51:26.529276Z", "step": "gen.attendance.start", "records": 3000000, "output": "attendance_dataset_3m.csv"}
✅ Attendance dataset generated: attendance_dataset_3m.csv
{"ts": "2025-08-24T11:52:19.691135Z", "step": "gen.attendance.done", "file": "attendance_dataset_3m.csv"}
{"ts": "2025-08-24T11:52:19.721625Z", "step": "gen.sales.start", "records": 3000000, "output": "sales_dataset_3m.csv"}
✅ Sales dataset generated: sales_dataset_3m.csv

  parsed = pd.to_datetime(s, errors="coerce").dt.time  # handles HH:MM and HH:MM:SS
  parsed = pd.to_datetime(s, errors="coerce").dt.time  # handles HH:MM and HH:MM:SS


{"ts": "2025-08-24T11:54:27.663619Z", "step": "stg_attendance.validated", "total": 3000000, "loaded": 2997754, "rejected": 2246}
{"ts": "2025-08-24T11:54:27.663813Z", "step": "staging.attendance.done", "rows": 2997754, "rejected": 2246}
{"ts": "2025-08-24T11:54:52.596006Z", "step": "stg_sales.validated", "total": 3000000, "loaded": 3000000, "rejected": 0}
{"ts": "2025-08-24T11:54:52.596152Z", "step": "staging.sales.done", "rows": 3000000, "rejected": 0}
{"ts": "2025-08-24T11:55:22.385801Z", "step": "stg_finance.validated", "total": 3000000, "loaded": 3000000, "rejected": 0}
{"ts": "2025-08-24T11:55:22.385945Z", "step": "staging.finance.done", "rows": 3000000, "rejected": 0}
{"ts": "2025-08-24T11:55:22.385979Z", "step": "staging.all.done", "totals": {"attendance": {"table": "stg_attendance", "total_rows": 3000000, "loaded_rows": 2997754, "rejected_rows": 2246, "rejects_table": "analytics-pipeline-assessment.analytics_dw.stg_attendance_rejects"}, "sales": {"table": "stg_sales", "total_ro

Download generated input files

In [34]:
# from google.colab import files

# files.download('attendance_dataset_3m.csv')
# files.download('sales_dataset_3m.csv')
# files.download('financial_dataset_3m.csv')