# 1. Title + Aim

## Data Veracity and Integrity in Distributed Systems: Ensuring Trust in Information Ingestion

**Topic 17 case study:** veracity validation for high-velocity web advertising clickstream events with attribution-surrogate modeling.

### Aim
This notebook implements an end-to-end, reproducible pipeline that:

1. Validates data veracity at ingestion using explicit data quality checks.
2. Enforces integrity with tamper-evident audit logging (hashing + chain verification).
3. Compares Spark batch processing and streaming ingestion (Kafka-first, fallback file stream).
4. Trains and evaluates attribution-surrogate classifiers under class imbalance using robust metrics.


# 2. Problem Statement & Why Veracity/Integrity Matter (1–2 paragraphs)

Ad-tech systems ingest massive, high-velocity event streams where fraudulent or low-quality records can quickly degrade model quality, budget efficiency, and operational trust. In this environment, accuracy alone is not enough: organizations must verify that each ingested record is structurally valid, complete, plausible, and non-duplicative. Without ingestion-time veracity checks, downstream analytics and ML systems can optimize on corrupted signals.

Integrity is the second trust pillar. Even if records are validated at ingestion, stakeholders still need confidence that logs and batch outputs were not altered post hoc. For this reason, this notebook implements a tamper-evident audit trail using per-record cryptographic hashing, Merkle roots, and a chained batch hash. Together, veracity + integrity provide practical trust guarantees for distributed information ingestion.


# 3. Dataset Source + Description + Schema

For final validated reporting in this repository, the run uses Kaggle TalkingData `data/train.csv` with a deterministic cap of 300,000 rows.

Supervised target for this implementation is Kaggle `is_attributed` (attribution/conversion proxy), used as a surrogate target for demonstrating veracity/integrity-aware ingestion and classification. It is not a native fraud ground-truth label.

### Event schema used by the pipeline

- `click_time` (event timestamp)
- `ip` (user surrogate)
- `app`, `device`, `os`, `channel` (context/device/app identifiers)
- `is_attributed` (supervised target for this implementation)
- optional `attributed_time`


In [None]:
from pathlib import Path
import json
import os
import platform
import subprocess
import sys
import time

import matplotlib.pyplot as plt
import pandas as pd
import seaborn as sns

# Resolve repository root robustly from notebook or repo execution context.
CWD = Path.cwd().resolve()
if (CWD / "src").exists():
    PROJECT_ROOT = CWD
elif (CWD.parent / "src").exists():
    PROJECT_ROOT = CWD.parent
else:
    raise RuntimeError("Could not resolve PROJECT_ROOT containing src/")

SRC_PATH = PROJECT_ROOT / "src"
if str(SRC_PATH) not in sys.path:
    sys.path.insert(0, str(SRC_PATH))

from ingestion_trust.core import (
    append_audit_batch,
    build_run_metadata,
    measure_integrity_overhead,
    plot_data_quality_summary,
    plot_fraud_prevalence,
    save_json,
    tamper_log_copy,
    train_and_evaluate_models,
    verify_audit_log,
)
from ingestion_trust.spark_utils import (
    apply_schema_casting,
    build_baseline_stats,
    compute_veracity_score,
    create_spark_session,
    feature_engineering,
    get_event_schema,
    run_veracity_checks,
)

DATA_DIR = PROJECT_ROOT / "data"
ARTIFACTS_DIR = PROJECT_ROOT / "artifacts"
FIGURES_DIR = PROJECT_ROOT / "reports" / "figures"
METRICS_DIR = ARTIFACTS_DIR / "metrics"
MODELS_DIR = ARTIFACTS_DIR / "models"
AUDIT_DIR = ARTIFACTS_DIR / "audit"
PROVENANCE_DIR = ARTIFACTS_DIR / "provenance"
CURATED_DIR = ARTIFACTS_DIR / "curated"

for p in [METRICS_DIR, MODELS_DIR, AUDIT_DIR, PROVENANCE_DIR, CURATED_DIR, FIGURES_DIR]:
    p.mkdir(parents=True, exist_ok=True)

DATASET_PATH = DATA_DIR / "sample_click_fraud.csv"
if not DATASET_PATH.exists():
    raise FileNotFoundError(f"Sample dataset missing at {DATASET_PATH}")

print("PROJECT_ROOT:", PROJECT_ROOT)
print("DATASET_PATH:", DATASET_PATH)


# 4. Environment & Experimental Setup (print versions + hardware summary)

This section records runtime details for reproducibility and provenance:

- Python and package versions
- OS and hardware summary
- Spark session configuration (local mode)
- Deterministic seed settings


In [None]:
import numpy as np
import random

SEED = 42
random.seed(SEED)
np.random.seed(SEED)

def _pkg_version(name: str):
    try:
        mod = __import__(name)
        return getattr(mod, "__version__", "unknown")
    except Exception:
        return None

env_summary = {
    "python_version": sys.version,
    "platform": platform.platform(),
    "machine": platform.machine(),
    "processor": platform.processor(),
    "numpy": _pkg_version("numpy"),
    "pandas": _pkg_version("pandas"),
    "sklearn": _pkg_version("sklearn"),
    "xgboost": _pkg_version("xgboost"),
    "pyspark": _pkg_version("pyspark"),
}

print(json.dumps(env_summary, indent=2))

spark = create_spark_session("topic17-notebook-batch", master="local[*]")
print("Spark version:", spark.version)


# 5. Batch Pipeline (Spark): ETL + Validation + Feature Engineering

In this section we run the Spark batch ETL path:

1. Read CSV with explicit schema.
2. Apply deterministic type casting.
3. Perform feature engineering used by both batch and streaming.
4. Persist curated output as parquet after validation (next section).


In [None]:
start_etl = time.perf_counter()

raw_df = (
    spark.read.option("header", True)
    .schema(get_event_schema())
    .csv(str(DATASET_PATH))
)

print("Raw rows:", raw_df.count())
raw_df.show(5, truncate=False)

cast_df = apply_schema_casting(raw_df)
feature_df = feature_engineering(cast_df)

print("Feature columns:", len(feature_df.columns))
feature_df.select(
    "click_time", "click_time_ts", "ip", "app", "device", "os", "channel", "is_attributed", "clicks_per_ip_per_hour"
).show(5, truncate=False)


# 6. Veracity Checks (rules, scoring, summary tables, plots)

The veracity layer applies six rule families:

- Schema validity
- Completeness
- Range checks
- Uniqueness (duplicate detection)
- Drift checks versus baseline profile
- Outlier checks on `clicks_per_ip_per_hour`

A weighted per-record **veracity_score** (0 to 1) is then computed.


In [None]:
baseline_stats = build_baseline_stats(feature_df)
save_json(baseline_stats, METRICS_DIR / "baseline_stats.json")

validated_df, validation_summary = run_veracity_checks(feature_df, baseline_stats=baseline_stats)
scored_df = compute_veracity_score(validated_df)

save_json(validation_summary, METRICS_DIR / "validation_report.json")

summary_rows = []
for rule, invalid_count in validation_summary["invalid_counts_by_rule"].items():
    summary_rows.append(
        {
            "rule": rule,
            "invalid_count": invalid_count,
            "invalid_percent": validation_summary["invalid_percent_by_rule"][rule],
        }
    )
validation_df = pd.DataFrame(summary_rows)
validation_df.to_csv(METRICS_DIR / "validation_report.csv", index=False)

plot_data_quality_summary(validation_df, FIGURES_DIR / "data_quality_violations.png")

# Keep valid records for downstream model training.
curated_df = scored_df.filter("is_valid_record = true")
curated_path = CURATED_DIR / "batch_curated.parquet"
curated_df.write.mode("overwrite").parquet(str(curated_path))

etl_time_sec = time.perf_counter() - start_etl

print("Validation summary:")
print(json.dumps(validation_summary, indent=2))
print("ETL + validation time (sec):", round(etl_time_sec, 3))
print("Curated rows:", curated_df.count())
print("Curated parquet:", curated_path)

display(validation_df)


In [None]:
# Generate prevalence figure from curated dataset.
curated_pd_preview = curated_df.select("is_attributed").toPandas()
plot_fraud_prevalence(curated_pd_preview, FIGURES_DIR / "attribution_prevalence.png")

# Build and save provenance metadata.
column_schema = [{"name": name, "type": dtype} for name, dtype in curated_df.dtypes]
run_metadata = build_run_metadata(
    dataset_path=DATASET_PATH,
    dataset_source="sample_click_fraud.csv (local sample)",
    dataset_mode="sample",
    column_schema=column_schema,
    repo_dir=PROJECT_ROOT,
    extra={
        "etl_time_sec": etl_time_sec,
        "curated_output_parquet": str(curated_path),
        "validation_report": str(METRICS_DIR / "validation_report.json"),
    },
)
save_json(run_metadata, PROVENANCE_DIR / "run_metadata.json")
print(json.dumps(run_metadata, indent=2))


# 7. Model Training & Evaluation (all required metrics + plots)

Models trained in this section:

- Logistic Regression
- Random Forest
- XGBoost (if available)

Imbalance handling is applied via class weighting / `scale_pos_weight`.

Metrics computed from actual run outputs:

- Accuracy, Precision, Recall, F1
- ROC-AUC, PR-AUC
- MCC
- Execution times

A sanity check for the all-negative classifier is also included to show why raw accuracy can be misleading under class imbalance.


In [None]:
start_train = time.perf_counter()
curated_pd = curated_df.toPandas()

model_summary = train_and_evaluate_models(
    curated_df=curated_pd,
    artifacts_metrics_dir=METRICS_DIR,
    figures_dir=FIGURES_DIR,
    model_dir=MODELS_DIR,
    seed=SEED,
)
training_time_sec = time.perf_counter() - start_train

metrics_df = pd.read_csv(METRICS_DIR / "model_metrics.csv")

display(metrics_df)
print("Model run summary:")
print(json.dumps(model_summary, indent=2))
print("Model training block time (sec):", round(training_time_sec, 3))

# Save split prevalence figure for report.
split_prev = pd.DataFrame(
    [
        {"split": k, "attributed_prevalence": v}
        for k, v in model_summary.get("class_prevalence", {}).items()
    ]
)
plt.figure(figsize=(6, 4))
sns.barplot(data=split_prev, x="split", y="attributed_prevalence", palette="crest")
plt.ylim(0, 1)
plt.title("Attributed Prevalence by Split")
plt.xlabel("Split")
plt.ylabel("Attributed Class Rate")
plt.tight_layout()
plt.savefig(FIGURES_DIR / "attribution_split_distribution.png", dpi=150)
plt.close()


# 8. Streaming Pipeline (Kafka + Spark Structured Streaming)

The streaming implementation is provided in `scripts/run_stream_pipeline.py` and reuses the same shared functions as batch:

- `apply_schema_casting`
- `feature_engineering`
- `run_veracity_checks`
- `compute_veracity_score`

## Default path: Kafka

1. Start Kafka via Docker (`docker/docker-compose.kafka.yml`).
2. Publish events with `scripts/kafka_producer.py`.
3. Run the stream pipeline with Kafka source.

## Fallback path: File stream

If Kafka is not available, run Structured Streaming from file source while keeping the same validation/inference logic.


In [None]:
import shlex
import subprocess

RUN_STREAMING_DEMO = False  # Set True to execute fallback streaming demo in-notebook.

kafka_instructions = [
    "cd docker && docker compose -f docker-compose.kafka.yml up -d",
    "python scripts/kafka_producer.py --input-csv data/sample_click_fraud.csv --topic clickstream_events",
    "python scripts/run_stream_pipeline.py --kafka-bootstrap localhost:9092 --topic clickstream_events --model-path artifacts/models/best_model_pipeline.joblib",
]

print("Kafka streaming commands:")
for cmd in kafka_instructions:
    print(" -", cmd)

if RUN_STREAMING_DEMO:
    cmd = [
        sys.executable,
        str(PROJECT_ROOT / "scripts" / "run_stream_pipeline.py"),
        "--fallback-file-source",
        str(PROJECT_ROOT / "data"),
        "--max-batches",
        "2",
        "--trigger-seconds",
        "4",
        "--model-path",
        str(PROJECT_ROOT / "artifacts" / "models" / "best_model_pipeline.joblib"),
    ]
    print("Running fallback stream demo:", " ".join(shlex.quote(c) for c in cmd))
    proc = subprocess.run(cmd, cwd=str(PROJECT_ROOT), text=True, capture_output=True)
    print(proc.stdout)
    if proc.returncode != 0:
        print(proc.stderr)
        raise RuntimeError("Streaming demo failed")
else:
    print("RUN_STREAMING_DEMO is False; streaming implementation is available via scripts and optional notebook execution.")


### Comparative Discussion: Batch (Spark) vs Streaming (Kafka + Spark) vs Storm

| Dimension | Spark Batch | Kafka + Spark Structured Streaming | Apache Storm (conceptual comparison) |
|---|---|---|---|
| Processing model | Finite datasets, scheduled ETL/training jobs | Continuous micro-batches with unified DataFrame API | Record-by-record tuple processing topology |
| Latency profile | Higher latency, high throughput for large jobs | Lower latency than batch; tunable trigger intervals | Potentially very low latency with custom bolt design |
| Developer model | Strong SQL/DataFrame support, easier reproducibility | Reuses same batch logic and schemas in many cases | More custom topology logic; higher operational complexity |
| Fault tolerance | Checkpoint/restart by job | Exactly-once-like behavior with checkpoint + Kafka offsets | Acknowledgement trees and replay mechanisms |
| Fit for this use case | Baseline quality profiling and offline model training | Near-real-time ingestion validation + scoring | Useful when ultra-low latency and custom operators dominate |

In this project, Kafka + Spark Structured Streaming is implemented as the default streaming path because it maximizes code reuse with batch Spark validation and reduces implementation divergence.


# 9. Integrity & Tamper-Evident Audit Logging (hash chain/Merkle + verification + tamper demo)

Each batch writes an append-only JSONL audit entry containing:

- `record_hashes` (SHA-256 of canonicalized records)
- `merkle_root`
- `prev_batch_hash`
- `batch_hash = sha256(prev_batch_hash + merkle_root + metadata + batch_id + count)`

We demonstrate both normal verification and tamper detection below.


In [None]:
audit_log_path = AUDIT_DIR / "audit_log.jsonl"
if audit_log_path.exists():
    audit_log_path.unlink()

records_for_audit = curated_pd.head(6000).to_dict(orient="records")
chunks = [records_for_audit[i:i+2000] for i in range(0, len(records_for_audit), 2000)]

for idx, chunk in enumerate(chunks):
    append_audit_batch(
        log_path=audit_log_path,
        batch_id=f"nb_batch_{idx}",
        records=chunk,
        batch_metadata={
            "stage": "notebook_demo",
            "chunk_index": idx,
            "records": len(chunk),
        },
    )

verification_ok = verify_audit_log(audit_log_path)
save_json(verification_ok, AUDIT_DIR / "verification_report.json")

# Tamper demonstration
Tampered = tamper_log_copy(audit_log_path, AUDIT_DIR / "audit_log_tampered_demo.jsonl")
verification_bad = verify_audit_log(Tampered)
save_json(verification_bad, AUDIT_DIR / "verification_report_tampered.json")

print("Normal verification:")
print(json.dumps(verification_ok, indent=2))
print("
Tampered verification:")
print(json.dumps(verification_bad, indent=2))


# 10. Integrity Overhead Measurement (throughput/latency with/without hash chain)

To quantify integrity overhead, we compare:

- baseline processing without hashing,
- processing with record hashing + Merkle root generation.

This gives a practical throughput and latency tradeoff estimate.


In [None]:
overhead_stats = measure_integrity_overhead(records_for_audit[:5000], loops=5)
save_json(overhead_stats, METRICS_DIR / "integrity_overhead.json")

overhead_df = pd.DataFrame(
    [
        {
            "mode": "without_integrity",
            "throughput_rps": overhead_stats["throughput_plain_rps"],
            "latency_sec": overhead_stats["avg_plain_sec"],
        },
        {
            "mode": "with_integrity",
            "throughput_rps": overhead_stats["throughput_hash_rps"],
            "latency_sec": overhead_stats["avg_hash_sec"],
        },
    ]
)

display(overhead_df)

fig, axes = plt.subplots(1, 2, figsize=(11, 4))
sns.barplot(data=overhead_df, x="mode", y="throughput_rps", ax=axes[0], palette="mako")
axes[0].set_title("Throughput Comparison")
axes[0].set_xlabel("Mode")
axes[0].set_ylabel("Records / second")
axes[0].tick_params(axis="x", rotation=20)

sns.barplot(data=overhead_df, x="mode", y="latency_sec", ax=axes[1], palette="rocket")
axes[1].set_title("Latency Comparison")
axes[1].set_xlabel("Mode")
axes[1].set_ylabel("Seconds")
axes[1].tick_params(axis="x", rotation=20)

plt.tight_layout()
plt.savefig(FIGURES_DIR / "audit_overhead_throughput_latency.png", dpi=150)
plt.close(fig)

print(json.dumps(overhead_stats, indent=2))


# 11. Scalability Note (what scales, what doesn’t; optionally replay larger chunks)

## What scales well

- Spark ETL and validation rules are DataFrame-based and distribute across partitions.
- Structured Streaming applies the same transformations per micro-batch.
- Audit logging can be attached per micro-batch with bounded sampling for record-hash payload size.

## What needs care at larger scale

- `toPandas()` conversions are practical for coursework-scale runs but should be replaced with Spark-native model serving or vectorized UDF patterns for very large streams.
- Full `record_hashes` storage in JSONL may grow quickly; in production, store hash manifests/object storage references.
- Drift calculations based on full cardinality should use bounded sketches or sampled histograms for high-cardinality columns.

## Optional scale experiment

Replay larger chunks by increasing `--max-events` in the Kafka producer and comparing:

- micro-batch latency (`artifacts/metrics/streaming_metrics.csv`)
- integrity overhead (`artifacts/metrics/integrity_overhead.json`)


# 12. Social Impact Analysis (attribution-target monitoring + broader ingestion trust)

Reliable detection of invalid traffic improves fairness and efficiency for advertisers, publishers, and users by reducing budget waste and suppressing abusive behavior. Veracity-aware ingestion also reduces the chance that downstream models amplify noisy or manipulated data.

Beyond ad-tech, the same design pattern applies to healthcare, finance, and public-sector analytics where ingestion trust is critical. Explicit validation rules, provenance metadata, and tamper-evident logs improve accountability and auditability, which are key for responsible data-driven decision systems.


# 13. Research Alignment (map implemented components to the research papers list)

This repository keeps reference extraction source-grounded:

- `docs/references.md` is generated only from provided XLSX/PDF sources.
- `docs/research_alignment.md` maps implemented modules to literature themes and is updated with numbered citations after extraction.

Current status in this environment:

- Reference extraction source files may need to be copied into the repository root before regeneration.
- Run `python scripts/generate_references.py --xlsx "Reference List BDA IA2.xlsx" --pdf "BDA_GRP12_IA2_LABCA (1).pdf"` to refresh citations.


### Diagnostics Summary
- Leakage checks: no forbidden target/post-event features (`is_attributed`, legacy `is_fraud`, `attributed_time`) were present in the training matrix.
- Split-overlap checks: event-key overlap across train/val/test was zero.
- Metric parity checks: independently recomputed metrics matched `artifacts/metrics/model_metrics.csv`.
- Audit overhead benchmark uses repeated runs (warmup + loops) and reports median/dispersion.
- Best-model selection criterion documented: PR-AUC -> MCC -> F1.


# 14. Conclusion + Next Steps

This notebook implemented a complete ingestion-trust pipeline for ad-click event streams, combining veracity checks, tamper-evident integrity controls, and batch-vs-stream processing pathways in a single reproducible workflow.

## Next steps

1. Replace fallback/sample data with full production-scale stream replay.
2. Move model inference fully into Spark-native serving for large streams.
3. Add online drift alarms and adaptive rule thresholds.
4. Extend audit-chain verification into external immutable storage or ledger-backed systems.


### Results Summary
- All metrics shown below are generated from this run and saved under `artifacts/metrics`.
- Final summary values: `artifacts/metrics/final_results_summary.json` and `artifacts/metrics/final_results_summary.csv`.
- Model table: `artifacts/metrics/model_metrics.csv`.
- Threshold analysis table: `artifacts/metrics/threshold_analysis_best_model.csv`.
- Validation summary: `artifacts/metrics/validation_report.csv` and `artifacts/metrics/validation_report.json`.
- Streaming metrics: `artifacts/metrics/stream_metrics.csv` and `artifacts/metrics/stream_pipeline_summary.json`.
- Figures used in report/presentation: `reports/figures/`.
- Supervised target caveat: Kaggle `is_attributed` is used as a surrogate attribution/conversion target, not native fraud ground-truth.


In [None]:
# Final run summary artifact for quick grading/reference.
run_summary = {
    "dataset": str(DATASET_PATH),
    "etl_time_sec": etl_time_sec,
    "training_time_sec": training_time_sec,
    "curated_parquet": str(CURATED_DIR / "batch_curated.parquet"),
    "validation_report": str(METRICS_DIR / "validation_report.json"),
    "model_metrics": str(METRICS_DIR / "model_metrics.csv"),
    "audit_verification": verification_ok,
    "tamper_verification": verification_bad,
}
save_json(run_summary, METRICS_DIR / "notebook_run_summary.json")
print(json.dumps(run_summary, indent=2))
