
## 1. Project structure

Codes

```text
batch_ml_platform/
  config/
    config.yaml
  data/
    bronze/
      operational/
      logs/
      external/
    silver/
      cleaned/
    gold/
      features/
      labels/
      training_sets/
  src/
    __init__.py
    config_loader.py
    utils.py
    bronze_ingestion.py
    silver_processing.py
    gold_feature_store.py
    run_quarterly_pipeline.py
  requirements.txt
  README.md
```

---

## 2. Example `requirements.txt`
Install Packages 

```text
pyspark==3.5.0
pyyaml==6.0.2
pandas==2.2.2
pyarrow==17.0.0
```

Install with:

```bash
pip install -r requirements.txt
```

---

## 3. `config/config.yaml`

This centralizes paths and basic settings for the quarterly batch:

```yaml
storage:
  base_path: "./data"
  bronze_path: "./data/bronze"
  silver_path: "./data/silver"
  gold_path: "./data/gold"

sources:
  operational:
    path: "./data/raw_sources/operational"
    pattern: "*.csv"
  logs:
    path: "./data/raw_sources/logs"
    pattern: "*.csv"
  external:
    path: "./data/raw_sources/external"
    pattern: "*.csv"

batch:
  training_quarter: "2025-Q4"
  cutoff_date: "2025-09-30"

spark:
  app_name: "Quarterly_Batch_Pipeline"
  master: "local[*]"

governance:
  enable_lineage_logging: true
  lineage_log_path: "./data/lineage/lineage_log.csv"
```

---

## 4. `src/config_loader.py`

Loads config from YAML.

```python
# src/config_loader.py
# Load YAML configuration for the batch data platform.

import os
import yaml

def load_config(config_path="config/config.yaml"):
    if not os.path.exists(config_path):
        raise FileNotFoundError("Config file not found at path " + config_path)
    with open(config_path, "r") as f:
        config = yaml.safe_load(f)
    return config
```

---

## 5. `src/utils.py`

Spark session, lineage logging, quality checks.

```python
# src/utils.py
# Shared utilities: Spark session factory, lineage logging, simple DQ helpers.

import os
import datetime
import pandas as pd
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, count, lit

def create_spark_session(app_name, master="local[*]"):
    spark = (
        SparkSession.builder
        .appName(app_name)
        .master(master)
        .config("spark.sql.shuffle.partitions", "4")
        .getOrCreate()
    )
    return spark

def log_lineage(config, step_name, input_paths, output_path):
    if not config.get("governance", {}).get("enable_lineage_logging", False):
        return

    lineage_path = config["governance"]["lineage_log_path"]
    os.makedirs(os.path.dirname(lineage_path), exist_ok=True)

    timestamp = datetime.datetime.utcnow().isoformat()

    record = {
        "timestamp_utc": timestamp,
        "step_name": step_name,
        "input_paths": ";".join(input_paths),
        "output_path": output_path,
    }

    if os.path.exists(lineage_path):
        df = pd.read_csv(lineage_path)
        df = pd.concat([df, pd.DataFrame([record])], ignore_index=True)
    else:
        df = pd.DataFrame([record])

    df.to_csv(lineage_path, index=False)

def dq_null_check(df, key_columns):
    checks = []
    for c in key_columns:
        null_cnt = df.filter(col(c).isNull()).count()
        checks.append((c, null_cnt))
    return checks

def dq_duplicate_check(df, key_columns):
    dup_count = (
        df.groupBy([col(c) for c in key_columns])
        .agg(count("*").alias("cnt"))
        .filter(col("cnt") > 1)
        .count()
    )
    return dup_count

def add_batch_metadata(df, batch_id, source_name):
    df_with_meta = df.withColumn("batch_id", lit(batch_id)).withColumn("source_system", lit(source_name))
    return df_with_meta
```

---

## 6. Bronze layer ingestion – `src/bronze_ingestion.py`

```python
# src/bronze_ingestion.py
# Batch ingestion into bronze: append-only raw storage for each source.

import os
import glob
from pyspark.sql.functions import input_file_name

from config_loader import load_config
from utils import create_spark_session, log_lineage, add_batch_metadata

def ingest_source_to_bronze(spark, config, source_name, batch_id):
    source_cfg = config["sources"][source_name]
    src_path = source_cfg["path"]
    src_pattern = source_cfg["pattern"]

    files = glob.glob(os.path.join(src_path, src_pattern))
    if len(files) == 0:
        print("No files found for source " + source_name + " in path " + src_path)
        return None

    df = (
        spark.read
        .option("header", "true")
        .option("inferSchema", "true")
        .csv(files)
        .withColumn("ingest_file_path", input_file_name())
    )

    df = add_batch_metadata(df, batch_id, source_name)

    bronze_base = config["storage"]["bronze_path"]
    target_path = os.path.join(bronze_base, source_name)

    (
        df.write
        .mode("append")
        .option("compression", "snappy")
        .parquet(target_path)
    )

    log_lineage(config, "bronze_ingestion_" + source_name, files, target_path)

    print("Ingested source " + source_name + " into bronze at " + target_path)
    return target_path

def run_bronze_ingestion():
    config = load_config()
    spark = create_spark_session(
        config["spark"]["app_name"] + "_bronze",
        master=config["spark"]["master"],
    )

    batch_id = config["batch"]["training_quarter"]

    for src in ["operational", "logs", "external"]:
        ingest_source_to_bronze(spark, config, src, batch_id)

    spark.stop()

if __name__ == "__main__":
    run_bronze_ingestion()
```

---

## 7. Silver layer – cleaning and integration – `src/silver_processing.py`

Applies schema enforcement, cleaning, deduplication, and joins.

Assumptions:
- Operational data: e.g. `customer_id`, `event_time`, `amount`, etc.
- Logs: e.g. interaction logs keyed by `customer_id`.
- External: e.g. credit scores or partner data keyed by `customer_id`.

```python
# src/silver_processing.py
# Batch processing from bronze to silver: schema enforcement, cleaning, deduplication, integration.

import os
from pyspark.sql.functions import col, to_timestamp, trim, lower
from pyspark.sql.types import StructType, StructField, StringType, DoubleType, TimestampType

from config_loader import load_config
from utils import create_spark_session, log_lineage, dq_null_check, dq_duplicate_check

def get_operational_schema():
    schema = StructType(
        [
            StructField("customer_id", StringType(), True),
            StructField("event_time", StringType(), True),
            StructField("amount", DoubleType(), True),
            StructField("currency", StringType(), True),
            StructField("channel", StringType(), True),
        ]
    )
    return schema

def clean_operational(spark, config, batch_id):
    bronze_path = os.path.join(config["storage"]["bronze_path"], "operational")
    df = spark.read.parquet(bronze_path)

    schema = get_operational_schema()
    df_cast = spark.createDataFrame(df.rdd, schema)

    df_clean = (
        df_cast
        .withColumn("event_time_ts", to_timestamp(col("event_time")))
        .withColumn("currency", trim(lower(col("currency"))))
        .withColumn("channel", trim(lower(col("channel"))))
    )

    cutoff_date = config["batch"]["cutoff_date"]
    df_clean = df_clean.filter(col("event_time_ts") <= cutoff_date)

    null_checks = dq_null_check(df_clean, ["customer_id", "event_time_ts"])
    print("DQ null checks for operational: " + str(null_checks))

    dup_count = dq_duplicate_check(df_clean, ["customer_id", "event_time_ts", "amount"])
    print("DQ duplicate count for operational: " + str(dup_count))

    silver_path = os.path.join(config["storage"]["silver_path"], "operational_cleaned")

    (
        df_clean.write
        .mode("overwrite")
        .parquet(silver_path)
    )

    log_lineage(config, "silver_operational_cleaning", [bronze_path], silver_path)
    return silver_path

def integrate_sources_to_silver(spark, config, batch_id):
    op_path = os.path.join(config["storage"]["silver_path"], "operational_cleaned")
    logs_bronze = os.path.join(config["storage"]["bronze_path"], "logs")
    external_bronze = os.path.join(config["storage"]["bronze_path"], "external")

    df_op = spark.read.parquet(op_path)
    df_logs = spark.read.parquet(logs_bronze)
    df_ext = spark.read.parquet(external_bronze)

    df_logs_sel = df_logs.select("customer_id", "log_event_type", "log_timestamp")
    df_ext_sel = df_ext.select("customer_id", "partner_score", "partner_segment")

    df_join_1 = df_op.join(df_logs_sel, on="customer_id", how="left")
    df_join_all = df_join_1.join(df_ext_sel, on="customer_id", how="left")

    silver_int_path = os.path.join(config["storage"]["silver_path"], "integrated")

    (
        df_join_all.write
        .mode("overwrite")
        .parquet(silver_int_path)
    )

    log_lineage(config, "silver_integration", [op_path, logs_bronze, external_bronze], silver_int_path)
    return silver_int_path

def run_silver_processing():
    config = load_config()
    spark = create_spark_session(
        config["spark"]["app_name"] + "_silver",
        master=config["spark"]["master"],
    )

    batch_id = config["batch"]["training_quarter"]

    clean_operational(spark, config, batch_id)
    integrate_sources_to_silver(spark, config, batch_id)

    spark.stop()

if __name__ == "__main__":
    run_silver_processing()
```

---

## 8. Gold layer – features, labels, training sets – `src/gold_feature_store.py`

Implements quarterly snapshotting, label generation, and train/eval splits.

Assumption:
- A churn-like task: label = 1 if customer has no events in next N days after a reference date, else 0.
- Features: aggregated spend, counts, and partner_score.

```python
# src/gold_feature_store.py
# Gold layer: ML-ready features, labels, and training sets for quarterly retraining.

import os
from pyspark.sql.functions import col, sum as _sum, countDistinct, max as _max, when, datediff
from pyspark.sql.window import Window

from config_loader import load_config
from utils import create_spark_session, log_lineage

def build_feature_table(spark, config, batch_id):
    silver_int_path = os.path.join(config["storage"]["silver_path"], "integrated")
    df = spark.read.parquet(silver_int_path)

    agg = (
        df.groupBy("customer_id")
        .agg(
            _sum("amount").alias("total_amount"),
            countDistinct("event_time_ts").alias("active_days"),
            _max("event_time_ts").alias("last_activity_ts"),
            _max("partner_score").alias("partner_score_max"),
        )
    )

    gold_feat_path = os.path.join(config["storage"]["gold_path"], "features", batch_id)

    (
        agg.write
        .mode("overwrite")
        .parquet(gold_feat_path)
    )

    log_lineage(config, "gold_feature_table", [silver_int_path], gold_feat_path)
    return gold_feat_path

def build_labels(spark, config, batch_id, prediction_window_days=90):
    silver_int_path = os.path.join(config["storage"]["silver_path"], "integrated")
    df = spark.read.parquet(silver_int_path)

    ref_date = config["batch"]["cutoff_date"]

    window_spec = Window.partitionBy("customer_id")

    df_ref = (
        df.withColumn("last_activity_ts", _max("event_time_ts").over(window_spec))
        .select("customer_id", "last_activity_ts")
        .dropDuplicates(["customer_id"])
    )

    df_label = df_ref.withColumn(
        "label",
        when(
            datediff(ref_date, col("last_activity_ts")) > prediction_window_days,
            1,
        ).otherwise(0),
    )

    gold_label_path = os.path.join(config["storage"]["gold_path"], "labels", batch_id)

    (
        df_label.write
        .mode("overwrite")
        .parquet(gold_label_path)
    )

    log_lineage(config, "gold_labels", [silver_int_path], gold_label_path)
    return gold_label_path

def build_training_set(spark, config, batch_id):
    feat_path = os.path.join(config["storage"]["gold_path"], "features", batch_id)
    label_path = os.path.join(config["storage"]["gold_path"], "labels", batch_id)

    df_feat = spark.read.parquet(feat_path)
    df_label = spark.read.parquet(label_path)

    df_train = df_feat.join(df_label, on="customer_id", how="inner")

    train_path = os.path.join(config["storage"]["gold_path"], "training_sets", batch_id)

    (
        df_train.write
        .mode("overwrite")
        .parquet(train_path)
    )

    log_lineage(config, "gold_training_set", [feat_path, label_path], train_path)
    return train_path

def run_gold_pipeline():
    config = load_config()
    spark = create_spark_session(
        config["spark"]["app_name"] + "_gold",
        master=config["spark"]["master"],
    )

    batch_id = config["batch"]["training_quarter"]

    feat_path = build_feature_table(spark, config, batch_id)
    label_path = build_labels(spark, config, batch_id)
    train_path = build_training_set(spark, config, batch_id)

    print("Feature table at " + feat_path)
    print("Labels at " + label_path)
    print("Training set at " + train_path)

    spark.stop()

if __name__ == "__main__":
    run_gold_pipeline()
```

## 9. Orchestration driver – `src/run_quarterly_pipeline.py`

This script glues bronze → silver → gold into a single quarterly run. In the report, this corresponds to the “orchestration” piece this can later be mapped to Airflow DAG or similar.

```python
# src/run_quarterly_pipeline.py
# Simple quarterly orchestration: run bronze, silver, and gold phases in sequence.

from bronze_ingestion import run_bronze_ingestion
from silver_processing import run_silver_processing
from gold_feature_store import run_gold_pipeline

def run_full_quarterly_pipeline():
    print("Starting quarterly batch pipeline...")
    run_bronze_ingestion()
    run_silver_processing()
    run_gold_pipeline()
    print("Quarterly batch pipeline completed.")

if __name__ == "__main__":
    run_full_quarterly_pipeline()
```

## 10. Minimal `README.md` outline

```markdown
# Batch-Processing Data Architecture for Quarterly ML Retraining

This project implements the conceptual bronze–silver–gold batch-processing architecture described in the report:

- **Bronze**: Immutable, append-only raw data lake zone for operational databases, logs, and external feeds.
- **Silver**: Cleaned, deduplicated, and integrated data with enforced schemas and basic data quality checks.
- **Gold**: ML-ready feature tables, labels, and quarterly training datasets.

## Running

1. Create and populate `data/raw_sources/operational`, `logs`, `external` with CSVs.
2. Adjust `config/config.yaml` (paths, cutoff date, batch id).
3. Install dependencies:

   ```bash
   pip install -r requirements.txt
   ```

4. Run the full quarterly pipeline:

   ```bash
   python -m src.run_quarterly_pipeline
   ``