# Data Pipeline Template  
A repeatable pipeline for building analysis-ready datasets with validation.

## Dataset Overview  

**Describe the dataset here:**
* What does a row represent?
* What are the key variables?
* What makes this data messy/realistic?
* What is the time range?

## What this Pipeline Produces

**Artifacts**
* `data/raw/` - raw data snapshots + metadata
* `data/staged/` - parsed/normalized table (typed, missingness normalized)
* `data/warehouse/` - curated table (Parquet; optionally partitioned)
* `data/reference/validation_report.json` - contracts + anomaly rates + canaries
* `data/reference/pipeline_runs/` - run logs for reproducibility

# 0. Setup

## 0.1 Directory Structure

Create project directories for the pipeline layers

In [3]:
# Project structure setup

# Import Libraries
from __future__ import annotations

from pathlib import Path
from datetime import datetime, timedelta, timezone
import json
import hashlib
import math

import numpy as np
import pandas as pd

from IPython.display import display

pd.set_option("display.max_columns", 180)
pd.set_option("display.width", 180)

# Define Paths
WORK_DIR = Path("work")
PROJECT_DIR = WORK_DIR / "hai_21_03"
DATA_DIR = PROJECT_DIR / "data"
RAW_DIR = DATA_DIR / "raw"
STAGED_DIR = DATA_DIR / "staged"
WH_DIR = DATA_DIR / "warehouse"
REF_DIR = DATA_DIR / "reference"
RUN_DIR = REF_DIR / "pipeline_runs"

# Create Directories
for p in [RAW_DIR, STAGED_DIR, WH_DIR, REF_DIR, RUN_DIR]:
    p.mkdir(parents=True, exist_ok=True)

# Output
print("Project:", PROJECT_DIR)
print("Raw:", RAW_DIR)
print("Staged:", STAGED_DIR)
print("Warehouse:", WH_DIR)
print("Reference:", REF_DIR)
print("Runs:", RUN_DIR)


Project: work/hai_21_03
Raw: work/hai_21_03/data/raw
Staged: work/hai_21_03/data/staged
Warehouse: work/hai_21_03/data/warehouse
Reference: work/hai_21_03/data/reference
Runs: work/hai_21_03/data/reference/pipeline_runs


## 0.2 Helper Utilities

Define reusable helper functions for the pipeline

In [4]:
# Helper Functions
class PipelineError(RuntimeError):
    pass

def utc_now_iso() -> str:
    return datetime.now(timezone.utc).isoformat()

def sha16(x: str) -> str:
    return hashlib.sha256(x.encode('utf-8')).hexdigest()[:16]

def write_json(path: Path, obj: dict) -> None:
    path.parent.mkdir(parents=True, exist_ok=True)
    path.write_text(json.dumps(obj, indent=2, default=str))

def read_json(path: Path) -> dict:
    return json.loads(path.read_text())

def require_columns(df: pd.DataFrame, cols: list[str], context: str) -> None:
    missing = [c for c in cols if c not in df.columns]
    if missing:
        raise PipelineError(f'[{context}] Missing required columns: {missing}')

def require_unique(df: pd.DataFrame, key: str, context: str) -> None:
    if key not in df.columns:
        raise PipelineError(f'[{context}] Missing key column "{key}"')
    dupes = int(df[key].duplicated().sum())
    if dupes:
        raise PipelineError(f'[{context}] Key "{key}" has {dupes} duplicates')

print("Helpers ready.")

Helpers ready.


## 0.3 Configuration

Set pipeline configuration constants

In [5]:
# Pipeline Configuration

DATASET_NAME = "HAI-21.03"
DATASET_VERSION = "21.03"
RUN_ID = datetime.now(timezone.utc).strftime("%Y%m%d_%H%M%S_utc")

TRAIN_FILES = ["train1.csv.gz", "train2.csv.gz", "train3.csv.gz"]
TEST_FILES = ["test1.csv.gz", "test2.csv.gz", "test3.csv.gz", "test4.csv.gz", "test5.csv.gz"]
ALL_FILES = TRAIN_FILES + TEST_FILES

TIMESTAMP_COL = "timestamp"
ATTACK_LABEL_COLS = ["attack", "attack_P1", "attack_P2", "attack_P3"]

---

# 1. Ingest: Acquire Raw Data

Download or load raw data and save with metadata

## 1.1 Fetch Raw Data

Download or load the raw dataset

In [6]:
# Fetch Dataset

import requests
import shutil

BASE_URL = "https://raw.githubusercontent.com/icsdataset/hai/master/hai-21.03/"

raw_file_meta = {}

for file_name in ALL_FILES:
    url = BASE_URL + file_name
    dest = RAW_DIR / f"hai_{file_name.replace('.csv.gz', '')}_run_{RUN_ID}.csv.gz"

    if dest.exists():
        print(f"  [SKIP] {file_name} already exists")
    else:
        print(f"  [DOWNLOAD] {file_name} ...")
        with requests.get(url, stream=True) as r:
            r.raise_for_status()
            with open(dest, 'wb') as f:
                shutil.copyfileobj(r.raw, f)
        print(f"  [DONE] saved to {dest}")

    raw_file_meta[file_name] = {
        "source_url": url,
        "local_path": str(dest),
        "size_bytes": dest.stat().st_size,
        "format": "csv.gz",
        "decompression": "handled automatically by pandas at read time",
        "split": "train" if file_name in TRAIN_FILES else "test"
    }

  [DOWNLOAD] train1.csv.gz ...
  [DONE] saved to work/hai_21_03/data/raw/hai_train1_run_20260221_135249_utc.csv.gz
  [DOWNLOAD] train2.csv.gz ...
  [DONE] saved to work/hai_21_03/data/raw/hai_train2_run_20260221_135249_utc.csv.gz
  [DOWNLOAD] train3.csv.gz ...
  [DONE] saved to work/hai_21_03/data/raw/hai_train3_run_20260221_135249_utc.csv.gz
  [DOWNLOAD] test1.csv.gz ...
  [DONE] saved to work/hai_21_03/data/raw/hai_test1_run_20260221_135249_utc.csv.gz
  [DOWNLOAD] test2.csv.gz ...
  [DONE] saved to work/hai_21_03/data/raw/hai_test2_run_20260221_135249_utc.csv.gz
  [DOWNLOAD] test3.csv.gz ...
  [DONE] saved to work/hai_21_03/data/raw/hai_test3_run_20260221_135249_utc.csv.gz
  [DOWNLOAD] test4.csv.gz ...
  [DONE] saved to work/hai_21_03/data/raw/hai_test4_run_20260221_135249_utc.csv.gz
  [DOWNLOAD] test5.csv.gz ...
  [DONE] saved to work/hai_21_03/data/raw/hai_test5_run_20260221_135249_utc.csv.gz


## 1.2 Write Raw Metadata

Document the raw data source and retrieval details

In [7]:
# Create Raw Metadata

ingest_meta = {
    "run_id": RUN_ID,
    "dataset": DATASET_NAME,
    "version": DATASET_VERSION,
    "fetched_at_utc": datetime.now(timezone.utc).isoformat(),
    "source": "https://github.com/icsdataset/hai",
    "base_url": BASE_URL,
    "files": raw_file_meta,   
    "summary": {
        "total_files": len(ALL_FILES),
        "train_files": len(TRAIN_FILES),
        "test_files": len(TEST_FILES),
        "total_size_bytes": sum(v["size_bytes"] for v in raw_file_meta.values())
    }
}

meta_path = RAW_DIR / f"hai_ingest_meta_run_{RUN_ID}.json"
write_json(meta_path, ingest_meta)

print(f"Saved: {meta_path}")
print(f"Total files: {ingest_meta['summary']['total_files']}")
print(f"Total size: {ingest_meta['summary']['total_size_bytes'] / 1e6:.1f} MB")

Saved: work/hai_21_03/data/raw/hai_ingest_meta_run_20260221_135249_utc.json
Total files: 8
Total size: 187.2 MB


---

# 2. Stage: Parse and Normalize

Convert raw data into a clean, typed DataFrame

## 2.1 Read Raw Data

**How do we read it?**

In [8]:
# Read raw data

def read_raw_data(path: Path, split: str) -> pd.DataFrame:
    df = pd.read_csv(path, compression='gzip')
    df["file_source"] = path.stem.replace(".csv", "")
    df["split"] = split
    return df

In [9]:
# Load an concatenate all 8 files

dfs = []
for file_name in TRAIN_FILES:
    path = RAW_DIR / f"hai_{file_name.replace('.csv.gz', '')}_run_{RUN_ID}.csv.gz"
    dfs.append(read_raw_data(path, split="train"))

for file_name in TEST_FILES:
    path = RAW_DIR / f"hai_{file_name.replace('.csv.gz', '')}_run_{RUN_ID}.csv.gz"
    dfs.append(read_raw_data(path, split="test"))

df_raw = pd.concat(dfs, ignore_index=True)
print(f"Combined shape: {df_raw.shape}")
print(df_raw.dtypes.value_counts())
display(df_raw.head(3))

Combined shape: (1323608, 86)
float64    57
int64      26
object      3
Name: count, dtype: int64


Unnamed: 0,time,P1_B2004,P1_B2016,P1_B3004,P1_B3005,P1_B4002,P1_B4005,P1_B400B,P1_B4022,P1_FCV01D,P1_FCV01Z,P1_FCV02D,P1_FCV02Z,P1_FCV03D,P1_FCV03Z,P1_FT01,P1_FT01Z,P1_FT02,P1_FT02Z,P1_FT03,P1_FT03Z,P1_LCV01D,P1_LCV01Z,P1_LIT01,P1_PCV01D,P1_PCV01Z,P1_PCV02D,P1_PCV02Z,P1_PIT01,P1_PIT02,P1_PP01AD,P1_PP01AR,P1_PP01BD,P1_PP01BR,P1_PP02D,P1_PP02R,P1_STSP,P1_TIT01,P1_TIT02,P2_24Vdc,P2_ASD,P2_AutoGO,P2_CO_rpm,P2_Emerg,P2_HILout,P2_MSD,P2_ManualGO,P2_OnOff,P2_RTR,P2_SIT01,P2_SIT02,P2_TripEx,P2_VT01,P2_VTR01,P2_VTR02,P2_VTR03,P2_VTR04,P2_VXT02,P2_VXT03,P2_VYT02,P2_VYT03,P3_FIT01,P3_LCP01D,P3_LCV01D,P3_LH,P3_LIT01,P3_LL,P3_PIT01,P4_HT_FD,P4_HT_LD,P4_HT_PO,P4_HT_PS,P4_LD,P4_ST_FD,P4_ST_GOV,P4_ST_LD,P4_ST_PO,P4_ST_PS,P4_ST_PT01,P4_ST_TT01,attack,attack_P1,attack_P2,attack_P3,file_source,split
0,2020-07-11 00:00:00,0.10121,1.29784,397.63785,1001.99799,33.6555,100.0,2847.02539,37.14706,100.0,100.0,0.0,-1.87531,51.58201,52.80456,166.74039,808.2962,1973.19031,2847.02539,246.43968,1000.44769,8.79882,8.46252,395.19528,39.09198,40.49072,12.0,12.01782,1.3681,0.27786,540833,540833,0,0,1,1,1,35.437,35.74219,28.02645,0,1,54074.0,0,712.07275,763.19324,0,1,2880,780.0,779.59595,1,11.89504,10,10,10,10,-3.066,-1.2648,4.1758,6.0951,4795.0,10832.0,608.0,70,15454.0,20,815.0,-0.00072,0.06511,4.01474,0,301.01636,-0.00297,16495.0,301.35992,305.03113,0,10052.0,27610.0,0,0,0,0,hai_train1_run_20260221_135249_utc,train
1,2020-07-11 00:00:01,0.10121,1.29692,397.63785,1001.99799,33.6555,100.0,2839.5852,37.14477,100.0,100.0,0.0,-1.88294,51.60648,52.78931,168.64778,819.16809,1975.479,2839.5852,246.43968,1000.0127,8.78811,8.47015,395.1442,39.0568,40.49072,12.0,12.01782,1.3681,0.27634,540833,540833,0,0,1,1,1,35.45227,35.74219,28.02473,0,1,54089.0,0,708.52661,763.19324,0,1,2880,781.0,780.67328,1,11.93421,10,10,10,10,-2.9721,-1.3147,3.9259,5.9262,4835.0,10984.0,528.0,70,15461.0,20,883.0,-0.00051,0.0434,3.74347,0,297.43567,0.00072,16402.0,297.43567,304.27161,0,10052.0,27610.0,0,0,0,0,hai_train1_run_20260221_135249_utc,train
2,2020-07-11 00:00:02,0.10121,1.29631,397.63785,1001.99799,33.6555,100.0,2833.26807,37.14325,100.0,100.0,0.0,-1.88294,51.5779,52.79694,168.83849,823.51697,1972.42725,2833.26807,246.05821,1000.88245,8.81787,8.47015,395.1442,38.97124,40.49835,12.0,12.01782,1.36734,0.27634,540833,540833,0,0,1,1,1,35.45227,35.74219,28.02817,0,1,54124.0,0,709.15527,763.19324,0,1,2880,780.0,780.06574,1,11.9703,10,10,10,10,-2.9857,-1.4032,3.6489,5.8101,4961.0,11120.0,464.0,70,15462.0,20,956.0,-0.00043,0.0434,3.43603,0,298.84619,-0.00145,16379.0,298.66534,303.89179,0,10050.0,27617.0,0,0,0,0,hai_train1_run_20260221_135249_utc,train


## 2.2 Create Staged DataFrame

**How do we transform it?**

In [10]:
# Create staged DataFrame

def staged_dataframe(df: pd.DataFrame) -> pd.DataFrame:
    df = df.copy()

    # 1. Parse timestamp
    df["timestamp"] = pd.to_datetime(df["time"], format="%Y-%m-%d %H:%M:%S", utc=True)
    df = df.drop(columns=["time"])

    # 2. Sort by file_source then timestamp (preserve time order within each file)
    df = df.sort_values(["file_source", "timestamp"]).reset_index(drop=True)

    # 3. Derive observation_day (for partitioning and canary checks later)
    df["observation_day"] = df["timestamp"].dt.date

    # 4. Normalize attack label cols to int8
    for col in ATTACK_LABEL_COLS:
        df[col] = df[col].astype("int8")

    # 5. Create unified attack label (1 if ANY subsystem under attack)
    df["is_attack"] = (df[ATTACK_LABEL_COLS].any(axis=1)).astype("int8")

    # 6. Cast sensor columnst to float 32
    sensor_cols = [c for c in df.columns
                    if c not in ATTACK_LABEL_COLS + 
                    ["timestamp", "observation_day", "file_source", "split", "is_attack"]]
    df[sensor_cols] = df[sensor_cols].astype("float32")

    return df

## 2.3 Test Staging Function

**Does it work?**

In [11]:
# Test staged function

df_staged = staged_dataframe(df_raw)
print(f"Staged_shape: {df_staged.shape}")
print(df_staged.dtypes.value_counts())
display(df_staged.head(3))

Staged_shape: (1323608, 88)
float32                79
int8                    5
object                  3
datetime64[ns, UTC]     1
Name: count, dtype: int64


Unnamed: 0,P1_B2004,P1_B2016,P1_B3004,P1_B3005,P1_B4002,P1_B4005,P1_B400B,P1_B4022,P1_FCV01D,P1_FCV01Z,P1_FCV02D,P1_FCV02Z,P1_FCV03D,P1_FCV03Z,P1_FT01,P1_FT01Z,P1_FT02,P1_FT02Z,P1_FT03,P1_FT03Z,P1_LCV01D,P1_LCV01Z,P1_LIT01,P1_PCV01D,P1_PCV01Z,P1_PCV02D,P1_PCV02Z,P1_PIT01,P1_PIT02,P1_PP01AD,P1_PP01AR,P1_PP01BD,P1_PP01BR,P1_PP02D,P1_PP02R,P1_STSP,P1_TIT01,P1_TIT02,P2_24Vdc,P2_ASD,P2_AutoGO,P2_CO_rpm,P2_Emerg,P2_HILout,P2_MSD,P2_ManualGO,P2_OnOff,P2_RTR,P2_SIT01,P2_SIT02,P2_TripEx,P2_VT01,P2_VTR01,P2_VTR02,P2_VTR03,P2_VTR04,P2_VXT02,P2_VXT03,P2_VYT02,P2_VYT03,P3_FIT01,P3_LCP01D,P3_LCV01D,P3_LH,P3_LIT01,P3_LL,P3_PIT01,P4_HT_FD,P4_HT_LD,P4_HT_PO,P4_HT_PS,P4_LD,P4_ST_FD,P4_ST_GOV,P4_ST_LD,P4_ST_PO,P4_ST_PS,P4_ST_PT01,P4_ST_TT01,attack,attack_P1,attack_P2,attack_P3,file_source,split,timestamp,observation_day,is_attack
0,0.10178,1.58771,403.788544,985.373535,32.595268,100.0,2839.585205,36.810101,100.0,99.916077,0.0,-1.86768,50.907261,51.950069,176.086426,845.695496,1978.721558,2843.375488,243.388016,989.141174,10.8929,10.8429,402.709473,40.741249,41.32233,12.0,12.26196,1.34293,0.27557,540833.0,540833.0,0.0,0.0,1.0,1.0,1.0,34.887699,35.147099,28.03162,0.0,1.0,54116.0,0.0,725.213623,763.193237,0.0,1.0,2880.0,790.0,789.765076,1.0,11.9104,10.0,10.0,10.0,10.0,-2.8687,-1.0189,3.7751,5.633,-25.0,688.0,15888.0,70.0,18082.0,20.0,-23.0,0.00029,76.801208,73.585808,0.0,464.066101,0.0047,20469.0,386.266663,380.316833,0.0,10044.0,27567.0,0,0,0,0,hai_test1_run_20260221_135249_utc,test,2020-07-07 15:00:00+00:00,2020-07-07,0
1,0.10178,1.58725,403.788544,985.373535,32.595268,100.0,2843.375488,36.808949,100.0,99.916077,0.0,-1.86768,50.746071,51.965328,173.797562,840.477051,1986.923218,2845.060059,243.006561,992.620178,10.80512,10.8429,402.811737,40.86124,41.32233,12.0,12.26196,1.34216,0.2771,540833.0,540833.0,0.0,0.0,1.0,1.0,1.0,34.887699,35.147099,28.02301,0.0,1.0,54114.0,0.0,721.740723,763.193237,0.0,1.0,2880.0,789.0,789.13147,1.0,11.98856,10.0,10.0,10.0,10.0,-2.9842,-1.2637,3.1689,5.4158,-25.0,648.0,15952.0,70.0,18043.0,20.0,-23.0,0.00051,76.924187,73.89325,0.0,464.228882,0.0021,20489.0,386.302856,380.027466,0.0,10040.0,27564.0,0,0,0,0,hai_test1_run_20260221_135249_utc,test,2020-07-07 15:00:01+00:00,2020-07-07,0
2,0.10178,1.59519,403.788544,985.373535,32.595268,100.0,2845.060059,36.828789,100.0,99.916077,0.0,-1.86768,50.662289,51.965328,174.560516,835.258423,1978.721558,2837.339111,242.815857,993.924683,10.80029,10.8429,402.76062,41.02906,41.32233,12.0,12.26196,1.34369,0.2771,540833.0,540833.0,0.0,0.0,1.0,1.0,1.0,34.887699,35.147099,28.02993,0.0,1.0,54082.0,0.0,718.157959,763.193237,0.0,1.0,2880.0,786.0,785.816528,1.0,11.974,10.0,10.0,10.0,10.0,-3.4939,-1.5398,2.9615,5.5532,-25.0,616.0,16000.0,70.0,18024.0,20.0,-23.0,0.00022,77.04715,74.200684,0.0,466.905334,0.0013,20604.0,389.738831,381.528503,0.0,10037.0,27565.0,0,0,0,0,hai_test1_run_20260221_135249_utc,test,2020-07-07 15:00:02+00:00,2020-07-07,0


## 2.4 Write Staged Outputs

**How do we save it?**

In [12]:
# Write staged outputs


# Save staged Parquet
staged_path = STAGED_DIR / f"hai_staged_run_{RUN_ID}.parquet"
df_staged.to_parquet(staged_path, index=False)
print(f"Saved staged: {staged_path}")
print(f"File size: {staged_path.stat().st_size / 1e6:.1f} MB")

# Write staged metadata
staged_meta = {
    "run_id": RUN_ID,
    "dataset": DATASET_NAME,
    "staged_at_utc": datetime.now(timezone.utc).isoformat(),
    "source_files": len(ALL_FILES),
    "staged_path": str(staged_path),
    "shape": {
        "rows": len(df_staged),
        "cols": len(df_staged.columns)
    },
    "splits": df_staged["split"].value_counts().to_dict(),
    "is_attack_counts": df_staged["is_attack"].value_counts().to_dict(),
    "dtypes": df_staged.dtypes.astype(str).to_dict(),
    "columns": df_staged.columns.tolist()
}

staged_meta_path = STAGED_DIR / f"hai_staged_meta_run_{RUN_ID}.json"
write_json(staged_meta_path, staged_meta)
print(f"Saved metadata: {staged_meta_path}")

Saved staged: work/hai_21_03/data/staged/hai_staged_run_20260221_135249_utc.parquet
File size: 119.9 MB
Saved metadata: work/hai_21_03/data/staged/hai_staged_meta_run_20260221_135249_utc.json


---

# 3. Curate: Analysis-Ready Features

Add engineered features and data quality flags.  

We will be using the dataset after injecting sensor failures.  
It now contains normal data, malicious attacks, and sensor failures.

In [19]:
# Get Injected Files
INJECTED_DIR = DATA_DIR / "injected"
injected_files = sorted(INJECTED_DIR.glob("hai_21_03_*.parquet"))

if not injected_files:
    raise FileNotFoundError(
        f"No injected parquet found in {INJECTED_DIR}. "
        "Run the data failure injection notebook through completion"
    )

# Use the most recent injected file (last alphabetically = latest RUN_ID)
injected_path = injected_files[-1]
print(f"Loading: {injected_path}")

# Load into dataframe
df_injected = pd.read_parquet(injected_path)
print(f"Loaded injected data: {df_injected.shape}")

# Identify sensor columns: all float32 columns that are not metadata
SENSOR_COLS = [c for c in df_injected.columns
              if c not in [
                  'attack', 'attack_P1', 'attack_P2', 'attack_P3',
                  'file_source', 'split', 'timestamp', 'observation_day',
                  'is_attack', 'label',
                  'fault_type', 'fault_sensor', 'fault_start', 'fault_end', 'fault_severity'
              ]]

print(f"Sensor columns identified: {len(SENSOR_COLS)}")

Loading: work/hai_21_03/data/injected/hai_21_03_injected_20260221_135449_utc.parquet
Loaded injected data: (1323608, 94)
Sensor columns identified: 79


## 3.1 Create Curated DataFrame   
Transforms the injected staged DataFrame into a warehouse-ready curated DataFrame by adding time-derived columns and enforcing schema

In [20]:
def curate_data(df: pd.DataFrame) -> pd.DataFrame:
    df = df.copy()

    # 1. Confirm required columns are present
    required = (
        ['timestamp', 'label', 'is_attack',
         'fault_type', 'fault_sensor', 'fault_start', 'fault_end', 'fault_severity']
        + SENSOR_COLS
    )
    require_columns(df, required, context="curate_data")

    # 2. Confirm label is correctly typed and has no NaNs
    if df['label'].isna().any():
        raise PipelineError("curate_data: label column contains NaN values")
    df['label'] = df['label'].astype('int8')

    # 3. Add seconds_since_start - Captures system startup vs steady-state behavior
    if not pd.api.types.is_datetime64_any_dtype(df['timestamp']):
        df['timestamp'] = pd.to_datetime(df['timestamp'], utc=True)

    t_min = df['timestamp'].min()
    df['seconds_since_start'] = (
        (df['timestamp'] - t_min).dt.total_seconds().astype('float32')
    )

    # 4. Drop raw HAI attack flags (replaced by unified label column)
    df = df.drop(columns=['attack', 'attack_P1', 'attack_P2', 'attack_P3'])

    return df

print("curate data function ready")

curate data function ready


## 3.2 Test Curation Function

In [21]:
# Test Curation Function

df_curated = curate_data(df_injected)

print(f"Input shape:  {df_injected.shape}")
print(f"Output shape: {df_curated.shape}")
print(f"\nNew columns added:")
new_cols = [c for c in df_curated.columns if c not in df_injected.columns]
print(f"  {new_cols}")
print(f"\nDropped columns:")
dropped_cols = [c for c in df_injected.columns if c not in df_curated.columns]
print(f"  {dropped_cols}")
print(f"\nLabel distribution:")
print(df_curated['label'].value_counts().sort_index().rename({0: 'normal', 1: 'attack', 2: 'fault'}))
print(f"\nDtype check:")
print(df_curated[['label', 'seconds_since_start']].dtypes)
print(f"\nSeconds since start range: {df_curated['seconds_since_start'].min():.0f} – {df_curated['seconds_since_start'].max():.0f}")
display(df_curated.head(3))

Input shape:  (1323608, 94)
Output shape: (1323608, 91)

New columns added:
  ['seconds_since_start']

Dropped columns:
  ['attack', 'attack_P1', 'attack_P2', 'attack_P3']

Label distribution:
label
normal    1178988
attack       8947
fault      135673
Name: count, dtype: int64

Dtype check:
label                     int8
seconds_since_start    float32
dtype: object

Seconds since start range: 0 – 2923200


Unnamed: 0,P1_B2004,P1_B2016,P1_B3004,P1_B3005,P1_B4002,P1_B4005,P1_B400B,P1_B4022,P1_FCV01D,P1_FCV01Z,P1_FCV02D,P1_FCV02Z,P1_FCV03D,P1_FCV03Z,P1_FT01,P1_FT01Z,P1_FT02,P1_FT02Z,P1_FT03,P1_FT03Z,P1_LCV01D,P1_LCV01Z,P1_LIT01,P1_PCV01D,P1_PCV01Z,P1_PCV02D,P1_PCV02Z,P1_PIT01,P1_PIT02,P1_PP01AD,P1_PP01AR,P1_PP01BD,P1_PP01BR,P1_PP02D,P1_PP02R,P1_STSP,P1_TIT01,P1_TIT02,P2_24Vdc,P2_ASD,P2_AutoGO,P2_CO_rpm,P2_Emerg,P2_HILout,P2_MSD,P2_ManualGO,P2_OnOff,P2_RTR,P2_SIT01,P2_SIT02,P2_TripEx,P2_VT01,P2_VTR01,P2_VTR02,P2_VTR03,P2_VTR04,P2_VXT02,P2_VXT03,P2_VYT02,P2_VYT03,P3_FIT01,P3_LCP01D,P3_LCV01D,P3_LH,P3_LIT01,P3_LL,P3_PIT01,P4_HT_FD,P4_HT_LD,P4_HT_PO,P4_HT_PS,P4_LD,P4_ST_FD,P4_ST_GOV,P4_ST_LD,P4_ST_PO,P4_ST_PS,P4_ST_PT01,P4_ST_TT01,file_source,split,timestamp,observation_day,is_attack,label,fault_type,fault_sensor,fault_start,fault_end,fault_severity,seconds_since_start
0,0.10178,1.58771,403.788544,985.373535,32.595268,100.0,2839.585205,36.810101,100.0,99.916077,0.0,-1.86768,50.907261,51.950069,176.086426,845.695496,1978.721558,2843.375488,243.388016,989.141174,10.8929,10.8429,402.709473,40.741249,41.32233,12.0,12.26196,1.34293,0.27557,540833.0,540833.0,0.0,0.0,1.0,1.0,1.0,34.887699,35.147099,28.03162,0.0,1.0,54116.0,0.0,725.213623,763.193237,0.0,1.0,2880.0,790.0,789.765076,1.0,11.9104,10.0,10.0,10.0,10.0,-2.8687,-1.0189,3.7751,5.633,-25.0,688.0,15888.0,70.0,18082.0,20.0,-23.0,0.00029,76.801208,73.585808,0.0,464.066101,0.0047,20469.0,386.266663,380.316833,0.0,10044.0,27567.0,hai_test1_run_20260221_135249_utc,test,2020-07-07 15:00:00+00:00,2020-07-07,0,0,,,NaT,NaT,,0.0
1,0.10178,1.58725,403.788544,985.373535,32.595268,100.0,2843.375488,36.808949,100.0,99.916077,0.0,-1.86768,50.746071,51.965328,173.797562,840.477051,1986.923218,2845.060059,243.006561,992.620178,10.80512,10.8429,402.811737,40.86124,41.32233,12.0,12.26196,1.34216,0.2771,540833.0,540833.0,0.0,0.0,1.0,1.0,1.0,34.887699,35.147099,28.02301,0.0,1.0,54114.0,0.0,721.740723,763.193237,0.0,1.0,2880.0,789.0,789.13147,1.0,11.98856,10.0,10.0,10.0,10.0,-2.9842,-1.2637,3.1689,5.4158,-25.0,648.0,15952.0,70.0,18043.0,20.0,-23.0,0.00051,76.924187,73.89325,0.0,464.228882,0.0021,20489.0,386.302856,380.027466,0.0,10040.0,27564.0,hai_test1_run_20260221_135249_utc,test,2020-07-07 15:00:01+00:00,2020-07-07,0,0,,,NaT,NaT,,1.0
2,0.10178,1.59519,403.788544,985.373535,32.595268,100.0,2845.060059,36.828789,100.0,99.916077,0.0,-1.86768,50.662289,51.965328,174.560516,835.258423,1978.721558,2837.339111,242.815857,993.924683,10.80029,10.8429,402.76062,41.02906,41.32233,12.0,12.26196,1.34369,0.2771,540833.0,540833.0,0.0,0.0,1.0,1.0,1.0,34.887699,35.147099,28.02993,0.0,1.0,54082.0,0.0,718.157959,763.193237,0.0,1.0,2880.0,786.0,785.816528,1.0,11.974,10.0,10.0,10.0,10.0,-3.4939,-1.5398,2.9615,5.5532,-25.0,616.0,16000.0,70.0,18024.0,20.0,-23.0,0.00022,77.04715,74.200684,0.0,466.905334,0.0013,20604.0,389.738831,381.528503,0.0,10037.0,27565.0,hai_test1_run_20260221_135249_utc,test,2020-07-07 15:00:02+00:00,2020-07-07,0,0,,,NaT,NaT,,2.0


## 3.3 Choose Curated Columns and Write  
Define the curated column list  
Injection metadata columns are retained for validation and error analysis  


In [23]:
CURATED_COLS = (
    # Timestamp and time features
    ['timestamp', 'observation_day', 'seconds_since_start']
    # Provenance
    + ['file_source', 'split']
    # All 79 sensor readings
    + SENSOR_COLS
    # Unified label (0=normal, 1=attack, 2=fault)
    + ['label', 'is_attack']
    # Fault injection metadata — retained for validation, excluded from features later
    + ['fault_type', 'fault_sensor', 'fault_start', 'fault_end', 'fault_severity']
)

# Verify all columns exist
missing = [c for c in CURATED_COLS if c not in df_curated.columns]
if missing:
    raise PipelineError(f"Curated column selection: missing columns {missing}")

df_final = df_curated[CURATED_COLS].copy()

# Write to warehouse
curated_path = WH_DIR / f"hai_curated_{RUN_ID}.parquet"
df_final.to_parquet(curated_path, index=False)

print(f"Curated shape: {df_final.shape}")
print(f"Saved: {curated_path}")
print(f"File size: {curated_path.stat().st_size / 1e6:.1f} MB")
print(f"\nColumn groups:")
print(f"  Time features:      3")
print(f"  Provenance:         2")
print(f"  Sensor readings:    {len(SENSOR_COLS)}")
print(f"  Label columns:      2")
print(f"  Fault metadata:     5")
print(f"  Total:              {len(CURATED_COLS)}")

Curated shape: (1323608, 91)
Saved: work/hai_21_03/data/warehouse/hai_curated_20260221_135249_utc.parquet
File size: 127.1 MB

Column groups:
  Time features:      3
  Provenance:         2
  Sensor readings:    79
  Label columns:      2
  Fault metadata:     5
  Total:              91


---

# 4. Validate: Contracts + Anomalies + Canaries

Define validation rules and check data quality

## 4.1 Define Contracts  
Define validation contracts for the curated HAI dataset  
* `required_cols`: must exist and have zero NaN values
* `range_checks`: physical plausibility of sensor values

In [33]:
REQUIRED_COLS = [
    'timestamp', 'observation_day', 'seconds_since_start',
    'file_source', 'split', 'label', 'is_attack'
]
SENSOR_REQUIRED_NONFAULT = SENSOR_COLS 


# Compute physical plausibility bounds from normal operation rows only.
# Using observed min/max on label==0 rows with a 10% margin to allow for
# legitimate excursions near the boundary of normal operation.
# Injected faults and attacks may legitimately exceed these bounds.

df_normal = df_final[df_final['label'] == 0]

RANGE_CHECKS = {}
for col in SENSOR_COLS:
    col_min = float(df_normal[col].min())
    col_max = float(df_normal[col].max())
    margin = (col_max - col_min) * 0.10
    RANGE_CHECKS[col] = (col_min - margin, col_max + margin)

print(f"Range checks computed for {len(RANGE_CHECKS)} sensors")
print(f"  (derived from {len(df_normal):,} normal operation rows, ±10% margin)")
print(f"\nSample bounds:")
for col in ['P1_FT01', 'P1_LIT01', 'P1_PIT01', 'P1_TIT01', 'P2_CO_rpm']:
    lo, hi = RANGE_CHECKS[col]
    print(f"  {col:20s}: [{lo:10.2f}, {hi:10.2f}]")

Range checks computed for 79 sensors
  (derived from 1,178,988 normal operation rows, ±10% margin)

Sample bounds:
  P1_FT01             : [    -96.23,     873.55]
  P1_LIT01            : [    302.08,     674.84]
  P1_PIT01            : [     -0.13,       2.65]
  P1_TIT01            : [     34.38,      37.18]
  P2_CO_rpm           : [  51337.40,   55064.60]


## 4.2 Implement Validation Checks  
Run contract checks on the curated DataFrame  

Checks:  
1. Required columns exist
2. Required columns have no NaN values
3. Range violations < 5% per sensor (warning) or > 20% (failure)
4. Label column contains only valid values {0, 1, 2}
5. No rows carry both attack and fault labels simultaneously
6. Timestamps are monotonically non-decreasing within each split

In [36]:
failures = []
warnings = []

def validate_data(df: pd.DataFrame, required_cols: list, range_checks: dict) -> dict:

    # 1. Required columns exist
    missing_cols = [c for c in required_cols if c not in df.columns]
    if missing_cols:
        failures.append(f"Missing required columns: {missing_cols}")
    
    # 2. Required columns have no NaNs, except for sensors in fault rows. Dropout faults cause NaN.
    for col in required_cols:
        if col not in df.columns:
            continue
        n_null = int(df[col].isna().sum())
        if n_null > 0:
            failures.append(f"Required column '{col}' has {n_null:,} NaN values")

    non_fault_mask = df['label'] != 2
    for col in SENSOR_COLS:
        if col not in df.columns:
            continue
        n_null = int(df.loc[non_fault_mask, col].isna().sum())
        if n_null > 0:
            failures.append(
                f"Sensor '{col}' has {n_null:,} NaN values in non-fault rows "
                f"(label != 2) — unexpected data quality issue"
            )
    
    # 3. Range violations
    for col, (lo, hi) in range_checks.items():
        if col not in df.columns:
            warnings.append(f"Range check skipped — '{col}' not in DataFrame")
            continue
        n_total = len(df)
        n_violation = int(((df[col] < lo) | (df[col] > hi)).sum())
        pct = n_violation / n_total * 100
        if pct > 20.0:
            failures.append(
                f"Range violation FAILURE '{col}': {n_violation:,} rows ({pct:.1f}%) "
                f"outside [{lo:.3f}, {hi:.3f}]"
            )
        elif pct > 5.0:
            warnings.append(
                f"Range violation WARNING '{col}': {n_violation:,} rows ({pct:.1f}%) "
                f"outside [{lo:.3f}, {hi:.3f}]"
            )
    
    # 4. Label values are valid
    valid_labels = {0, 1, 2}
    actual_labels = set(df['label'].unique())
    invalid_labels = actual_labels - valid_labels
    if invalid_labels:
        failures.append(f"Label column contains invalid values: {invalid_labels}")
    
    # 5. No row carries bot attack and fault labels
    overlap = int(((df['label'] == 1) & (df['is_attack'] == 0)).sum())
    if overlap > 0:
        failures.append(
            f"Label/is_attack mismatch: {overlap:,} rows have label=1 but is_attack=0"
        )
    attack_fault_overlap = int(((df['is_attack'] == 1) & (df['label'] == 2)).sum())
    if attack_fault_overlap > 0:
        failures.append(
            f"Attack/fault overlap: {attack_fault_overlap:,} rows carry both "
            f"is_attack=1 and label=2"
        )
    
    # 6. Time stamps non-decreasing within each split
    for split_name, split_df in df.groupby('split'):
        n_decreasing = int((split_df['timestamp'].diff().dt.total_seconds() < 0).sum())
        if n_decreasing > 0:
            failures.append(
                f"Timestamp ordering violation in split='{split_name}': "
                f"{n_decreasing:,} out-of-order rows"
            )
    
    passed = len(failures) == 0
    return {'passed': passed, 'failures': failures, 'warnings': warnings}

print("validation data function ready")

validation data function ready


## 4.3 Run Validation

In [37]:
# Investigate: are these NaNs present in the original injected data
# or were they introduced during curation?
print("NaN counts in df_injected vs df_final:\n")
nan_cols = [
    'P1_B2016', 'P1_B4002', 'P1_B4005', 'P1_FCV01D', 'P1_FCV01Z',
    'P1_FT01Z', 'P1_FT02Z', 'P1_LCV01D', 'P1_LCV01Z', 'P1_PIT01',
    'P1_TIT02', 'P2_VXT03', 'P2_VYT02', 'P2_VYT03', 'P3_LIT01', 'P3_PIT01'
]

print(f"{'Column':<20} {'injected':>10} {'final':>10} {'split':>10} {'label':>10}")
print("-" * 62)
for col in nan_cols:
    n_injected = int(df_injected[col].isna().sum())
    n_final    = int(df_final[col].isna().sum())
    # Which split do the NaNs fall in?
    null_mask = df_final[col].isna()
    split_counts = df_final.loc[null_mask, 'split'].value_counts().to_dict()
    label_counts = df_final.loc[null_mask, 'label'].value_counts().to_dict()
    print(f"{col:<20} {n_injected:>10} {n_final:>10} {str(split_counts):>10} {str(label_counts):>10}")

NaN counts in df_injected vs df_final:

Column                 injected      final      split      label
--------------------------------------------------------------
P1_B2016                     16         16 {'train': 16}    {2: 16}
P1_B4002                     48         48 {'train': 48}    {2: 48}
P1_B4005                     71         71 {'train': 71}    {2: 71}
P1_FCV01D                    90         90 {'train': 90}    {2: 90}
P1_FCV01Z                    70         70 {'train': 70}    {2: 70}
P1_FT01Z                     60         60 {'train': 60}    {2: 60}
P1_FT02Z                     84         84 {'train': 84}    {2: 84}
P1_LCV01D                   144        144 {'test': 72, 'train': 72}   {2: 144}
P1_LCV01Z                    10         10 {'train': 10}    {2: 10}
P1_PIT01                     25         25 {'train': 25}    {2: 25}
P1_TIT02                     45         45 {'train': 45}    {2: 45}
P2_VXT03                    200        200 {'train': 200}   {2: 200}
P2_

In [38]:
# Run validation against the curated dataset
validation_report = validate_data(df_final, REQUIRED_COLS, RANGE_CHECKS)

# Print results
status = "PASSED" if validation_report['passed'] else "FAILED"
print(f"Validation status: {status}")
print(f"  Failures: {len(validation_report['failures'])}")
print(f"  Warnings: {len(validation_report['warnings'])}")

if validation_report['failures']:
    print(f"\nFailures:")
    for f in validation_report['failures']:
        print(f"  {f}")

if validation_report['warnings']:
    print(f"\nWarnings:")
    for w in validation_report['warnings']:
        print(f"  {w}")

if validation_report['passed'] and not validation_report['warnings']:
    print("\nAll contracts passed with no warnings.")

Validation status: PASSED
  Failures: 0



## 4.4 Anomaly Flags and Investigation  
Computes anomaly statistics across the curated dataset.  

Calculates, but does not modify the DataFrame.  
Results are used in final validation report

In [42]:
def create_anomaly_summary(df: pd.DataFrame, range_checks: dict) -> dict:
    n_total = len(df)

    # 1. Missingness by column 
    missingness = {}
    for col in SENSOR_COLS:
        n_null = int(df[col].isna().sum())
        if n_null > 0:
            missingness[col] = {
                'n_null': n_null,
                'pct_null': round(n_null / n_total * 100, 4),
                'in_fault_rows': int(df.loc[df['label'] == 2, col].isna().sum()),
                'in_normal_rows': int(df.loc[df['label'] == 0, col].isna().sum()),
                'in_attack_rows': int(df.loc[df['label'] == 1, col].isna().sum()),
            }

    # 2. Out-of-range counts by column
    range_violations = {}
    for col, (lo, hi) in range_checks.items():
        if col not in df.columns:
            continue
        mask = (df[col] < lo) | (df[col] > hi)
        n_violation = int(mask.sum())
        if n_violation > 0:
            label_breakdown = df.loc[mask, 'label'].value_counts().to_dict()
            range_violations[col] = {
                'n_violations': n_violation,
                'pct_violations': round(n_violation / n_total * 100, 4),
                'by_label': label_breakdown
            }

    # 3. Suspicious rows: NaN in non-fault sensor rows 
    non_fault_nan_rows = int(
        df.loc[df['label'] != 2, SENSOR_COLS].isna().any(axis=1).sum()
    )

    return {
        'total_rows': n_total,
        'missing_sensor_cols': len(missingness),
        'missingness': missingness,
        'range_violation_cols': len(range_violations),
        'range_violations': range_violations,
        'non_fault_nan_rows': non_fault_nan_rows,
    }

print("anomaly summary function ready")

anomaly summary function ready


## 4.5 Run Anomaly Analysis

In [43]:
anomaly_summary = create_anomaly_summary(df_final, RANGE_CHECKS)

print(f"Total rows:                    {anomaly_summary['total_rows']:,}")
print(f"Sensors with missingness:      {anomaly_summary['missing_sensor_cols']}")
print(f"Sensors with range violations: {anomaly_summary['range_violation_cols']}")
print(f"Non-fault rows with NaN:       {anomaly_summary['non_fault_nan_rows']}")

if anomaly_summary['missingness']:
    print(f"\nMissingness detail (fault rows only, as expected):")
    for col, stats in anomaly_summary['missingness'].items():
        print(f"  {col:<20s} {stats['n_null']:>4} NaN  "
              f"(normal={stats['in_normal_rows']}, "
              f"attack={stats['in_attack_rows']}, "
              f"fault={stats['in_fault_rows']})")

if anomaly_summary['range_violations']:
    print(f"\nRange violations:")
    for col, stats in anomaly_summary['range_violations'].items():
        print(f"  {col:<20s} {stats['n_violations']:>6} rows "
              f"({stats['pct_violations']:.3f}%)  by label: {stats['by_label']}")
else:
    print(f"\nNo range violations detected.")

Total rows:                    1,323,608
Sensors with missingness:      16
Sensors with range violations: 29
Non-fault rows with NaN:       0

Missingness detail (fault rows only, as expected):
  P1_B2016               16 NaN  (normal=0, attack=0, fault=16)
  P1_B4002               48 NaN  (normal=0, attack=0, fault=48)
  P1_B4005               71 NaN  (normal=0, attack=0, fault=71)
  P1_FCV01D              90 NaN  (normal=0, attack=0, fault=90)
  P1_FCV01Z              70 NaN  (normal=0, attack=0, fault=70)
  P1_FT01Z               60 NaN  (normal=0, attack=0, fault=60)
  P1_FT02Z               84 NaN  (normal=0, attack=0, fault=84)
  P1_LCV01D             144 NaN  (normal=0, attack=0, fault=144)
  P1_LCV01Z              10 NaN  (normal=0, attack=0, fault=10)
  P1_PIT01               25 NaN  (normal=0, attack=0, fault=25)
  P1_TIT02               45 NaN  (normal=0, attack=0, fault=45)
  P2_VXT03              200 NaN  (normal=0, attack=0, fault=200)
  P2_VYT02               72 NaN  (no

**Interpretation:**  
* **All missingness is fault only** - dropout faults working correctly
* **Non-fault NaN rows = 0** - no data quality issues in normal or attack rows
* **Range violations are small** - worst case is `P3_LCP01D` at 0.181%
* **Range violations split cleanly by label** - mostly label 2, faults pushing sensors out of bounds or label 1, attacks manipulating process values. Zero label 0 violations.

## 4.6 Canary Checks  

Monitor dataset-level health metrics that should remain stable.  
Flag days with anomalous row counts or high sensor missingness that could silently degrade model training.

In [44]:
def create_canary_summary(df: pd.DataFrame) -> dict:
    
    # 1. Observations per day 
    daily_counts = df.groupby('observation_day').size()
    median_daily = float(daily_counts.median())
    low_threshold = median_daily * 0.50  # days with < 50% of median

    low_days = daily_counts[daily_counts < low_threshold]

    # 2. Class distribution 
    label_counts = df['label'].value_counts().sort_index().to_dict()
    total = len(df)
    label_pcts = {int(k): round(v / total * 100, 3) for k, v in label_counts.items()}

    # 3. Per-column missingness across all rows
    overall_missingness = {}
    for col in SENSOR_COLS:
        n_null = int(df[col].isna().sum())
        if n_null > 0:
            overall_missingness[col] = round(n_null / total * 100, 4)

    # 4. Days with high missingness (any sensor > 30% null that day) 
    high_miss_days = []
    for day, day_df in df.groupby('observation_day'):
        for col in SENSOR_COLS:
            pct = day_df[col].isna().mean() * 100
            if pct > 30.0:
                high_miss_days.append({
                    'day': str(day),
                    'col': col,
                    'pct_null': round(pct, 2)
                })

    return {
        'daily_counts': {
            'min':    int(daily_counts.min()),
            'median': int(median_daily),
            'max':    int(daily_counts.max()),
            'n_days': int(len(daily_counts)),
            'low_count_days': {str(k): int(v) for k, v in low_days.items()},
        },
        'class_distribution': {
            'counts': {int(k): int(v) for k, v in label_counts.items()},
            'pct':    label_pcts,
        },
        'overall_missingness_pct': overall_missingness,
        'high_missingness_days':   high_miss_days,
    }

print("canary summary function ready")

canary summary function ready


## 4.7 Run Canary Analysis

In [45]:
# Run canary analysis
canary_summary = create_canary_summary(df_final)

dc = canary_summary['daily_counts']
print(f"Daily row counts:")
print(f"  Days in dataset:  {dc['n_days']}")
print(f"  Min rows/day:     {dc['min']:,}")
print(f"  Median rows/day:  {dc['median']:,}")
print(f"  Max rows/day:     {dc['max']:,}")

if dc['low_count_days']:
    print(f"  Low count days (< 50% of median):")
    for day, count in dc['low_count_days'].items():
        print(f"    {day}: {count:,} rows")
else:
    print(f"  No low count days detected.")

cd = canary_summary['class_distribution']
print(f"\nClass distribution:")
print(f"  Normal  (0): {cd['counts'][0]:>9,}  ({cd['pct'][0]:.2f}%)")
print(f"  Attack  (1): {cd['counts'][1]:>9,}  ({cd['pct'][1]:.2f}%)")
print(f"  Fault   (2): {cd['counts'][2]:>9,}  ({cd['pct'][2]:.2f}%)")

print(f"\nSensors with any missingness: {len(canary_summary['overall_missingness_pct'])}")

if canary_summary['high_missingness_days']:
    print(f"\nHigh missingness days (>30% null):")
    for entry in canary_summary['high_missingness_days']:
        print(f"  {entry['day']}  {entry['col']:<20s}  {entry['pct_null']:.1f}%")
else:
    print(f"No high missingness days detected.")

Daily row counts:
  Days in dataset:  21
  Min rows/day:     7,200
  Median rows/day:  86,400
  Max rows/day:     129,601
  Low count days (< 50% of median):
    2020-07-07: 32,400 rows
    2020-07-08: 10,801 rows
    2020-07-09: 32,400 rows
    2020-07-14: 21,601 rows
    2020-07-28: 39,601 rows
    2020-08-04: 7,200 rows
    2020-08-10: 39,601 rows

Class distribution:
  Normal  (0): 1,178,988  (89.07%)
  Attack  (1):     8,947  (0.68%)
  Fault   (2):   135,673  (10.25%)

Sensors with any missingness: 16
No high missingness days detected.


---

# 5. Leakage Audit

Document potential temporal leakage risks.

## 5.1 Write Leakage Checklist  
Confirms that no future-looking information contaminates the feature matrix.
Evaluated at curation time

In [46]:
leakage_checklist = [
    {
        "check": "Normalization stats computed on train only",
        "status": "DEFERRED",
        "notes": "StandardScaler will be fit on train split only and applied to "
                 "val/test in the Feature Engineering notebook."
    },
    {
        "check": "Rolling window features use only past observations",
        "status": "DEFERRED",
        "notes": "Rolling windows will use closed='left' or shift(1) to ensure "
                 "each row sees only its own history. Computed in Feature Engineering."
    },
    {
        "check": "fault_start and fault_end excluded from feature matrix",
        "status": "CONFIRMED",
        "notes": "These columns are injection metadata. They are retained in the "
                 "warehouse for validation and error analysis but will be explicitly "
                 "excluded from the feature matrix during Feature Engineering."
    },
    {
        "check": "fault_type and fault_sensor excluded from feature matrix",
        "status": "CONFIRMED",
        "notes": "These are injection metadata columns, not observable signals. "
                 "A real ICS deployment would not have access to these values."
    },
    {
        "check": "is_attack excluded from feature matrix",
        "status": "CONFIRMED",
        "notes": "is_attack is derived from the HAI attack flags and is part of "
                 "the label definition. Including it as a feature would be direct "
                 "label leakage."
    },
    {
        "check": "Train/val/test split is time-based, not random",
        "status": "CONFIRMED",
        "notes": "Split will be performed by timestamp order (60/20/20) in the "
                 "Feature Engineering notebook. Random splitting is not used — it "
                 "would allow future readings to inform past predictions."
    },
    {
        "check": "seconds_since_start computed on full dataset before split",
        "status": "ACCEPTABLE",
        "notes": "seconds_since_start uses the global dataset minimum timestamp "
                 "as its reference point. This is a fixed constant (dataset start), "
                 "not a statistic derived from labels or future rows. No leakage risk."
    },
]

print("Leakage Audit Checklist")
print("=" * 60)
for item in leakage_checklist:
    print(f"\n[{item['status']}] {item['check']}")
    print(f"  {item['notes']}")

Leakage Audit Checklist

[DEFERRED] Normalization stats computed on train only
  StandardScaler will be fit on train split only and applied to val/test in the Feature Engineering notebook.

[DEFERRED] Rolling window features use only past observations
  Rolling windows will use closed='left' or shift(1) to ensure each row sees only its own history. Computed in Feature Engineering.

[CONFIRMED] fault_start and fault_end excluded from feature matrix
  These columns are injection metadata. They are retained in the warehouse for validation and error analysis but will be explicitly excluded from the feature matrix during Feature Engineering.

[CONFIRMED] fault_type and fault_sensor excluded from feature matrix
  These are injection metadata columns, not observable signals. A real ICS deployment would not have access to these values.

[CONFIRMED] is_attack excluded from feature matrix
  is_attack is derived from the HAI attack flags and is part of the label definition. Including it as a feat

---

# 6. Write Final Artifacts

Consolidate validation results and create run log

## 6.1 Write Validation Report

In [47]:
def write_validation_report(
    validation_report: dict,
    anomaly_summary: dict,
    canary_summary: dict,
    leakage_checklist: list,
    path: Path
) -> None:

    report = {
        "run_id":            RUN_ID,
        "generated_at_utc":  datetime.now(timezone.utc).isoformat(),
        "dataset":           DATASET_NAME,
        "curated_path":      str(WH_DIR / f"hai_curated_{RUN_ID}.parquet"),
        "contracts":         validation_report,
        "anomalies":         anomaly_summary,
        "canaries":          canary_summary,
        "leakage_checklist": leakage_checklist,
    }
    write_json(path, report)
    print(f"Saved: {path}")
    print(f"Size:  {path.stat().st_size / 1e3:.1f} KB")

report_path = REF_DIR / f"validation_report_{RUN_ID}.json"
write_validation_report(
    validation_report,
    anomaly_summary,
    canary_summary,
    leakage_checklist,
    report_path
)


Saved: work/hai_21_03/data/reference/validation_report_20260221_135249_utc.json
Size:  10.5 KB


## 6.2 Create Run Log

In [48]:
def create_run_log(injected_path: Path, curated_path: Path, report_path: Path) -> dict:
    return {
        "run_id":           RUN_ID,
        "generated_at_utc": datetime.now(timezone.utc).isoformat(),
        "stage":            "Stage 3-6: Curation and Validation",
        "row_definition":   "One row = one second of ICS sensor readings. "
                            "Label 0=normal, 1=cyber attack, 2=injected sensor fault.",
        "inputs": {
            "injected_parquet": {
                "path":       str(injected_path),
                "size_bytes": injected_path.stat().st_size,
            }
        },
        "outputs": {
            "curated_parquet": {
                "path":       str(curated_path),
                "size_bytes": curated_path.stat().st_size,
                "rows":       len(df_final),
                "cols":       len(df_final.columns),
            },
            "validation_report": {
                "path": str(report_path),
            }
        },
        "validation_passed": validation_report['passed'],
        "notes": [
            "Sensor NaNs in fault rows (label=2) are expected — intermittent dropout faults.",
            "Range violations are confined to label=1 and label=2 rows — correct behavior.",
            "Low row-count days are dataset boundary artifacts, not data quality issues.",
            "Leakage audit has two DEFERRED items to be confirmed in Feature Engineering.",
        ]
    }

run_log = create_run_log(
    injected_path  = INJECTED_DIR / sorted(INJECTED_DIR.glob("hai_21_03_*.parquet"))[-1].name,
    curated_path   = WH_DIR / f"hai_curated_{RUN_ID}.parquet",
    report_path    = report_path
)

run_log_path = RUN_DIR / f"run_{RUN_ID}.json"
write_json(run_log_path, run_log)
print(f"Saved: {run_log_path}")
print(f"Size:  {run_log_path.stat().st_size / 1e3:.1f} KB")
print(f"\nValidation passed: {run_log['validation_passed']}")

Saved: work/hai_21_03/data/reference/pipeline_runs/run_20260221_135249_utc.json
Size:  1.2 KB

Validation passed: True


---

# 7. Self-Check and Reflection

Document insights and potential issues

## 7.1 Pipeline Reflection

In [49]:
reflection = [
    "Row definition: One row = one second of ICS sensor readings from the HAI-21.03 "
    "dataset. Label 0=normal operation, 1=cyber attack (original HAI labels), "
    "2=injected sensor fault (synthetic).",

    "Required columns: timestamp, label, is_attack, and all 79 sensor columns are "
    "required. Fault metadata columns (fault_type, fault_sensor, fault_start, "
    "fault_end, fault_severity) are retained for validation but excluded from the "
    "feature matrix.",

    "NaN policy: Sensor NaNs are only permitted in fault rows (label=2) where they "
    "represent intermittent dropout fault signatures. 16 sensors carry dropout NaNs, "
    "all confined to fault rows. Zero NaNs in normal or attack rows.",

    "Range checks: Physical plausibility bounds derived empirically from 1,178,988 "
    "normal operation rows with a 10% margin. 29 sensors show out-of-range violations, "
    "all confined to label=1 or label=2 rows — confirming bounds correctly characterize "
    "normal operation and that faults/attacks produce distinguishable sensor excursions.",

    "Biggest anomaly found: P3_LCP01D has the highest range violation count at 2,390 "
    "rows (0.181%), split across label=2 (2,350) and label=1 (40). This sensor appears "
    "sensitive to both fault injection and cyber attack manipulation.",

    "Likely breakage scenario: A future pipeline run with a different injection seed "
    "could produce fault rows in the test split, violating the train-only injection "
    "contract. The injected parquet path is pinned to a specific RUN_ID to prevent this.",

    "Temporal leakage risk: seconds_since_start is computed from the global dataset "
    "minimum before splitting — acceptable since it references a fixed constant, not "
    "a label-derived statistic. All other leakage risks are deferred to and documented "
    "in the Feature Engineering notebook.",

    "Data quality insight: Low row-count days (7 days below 50% of median) are "
    "dataset boundary artifacts where HAI source files start or end mid-day. "
    "2020-08-04 is the thinnest at 7,200 rows (2 hours). Split boundaries in "
    "Feature Engineering should avoid these partial days.",

    "Pipeline reproducibility: All artifacts are timestamped with RUN_ID "
    f"({RUN_ID}). The run log links the injected parquet input to the curated "
    "parquet and validation report outputs. Re-running this notebook with the "
    "same injected parquet will produce identical outputs.",
]

print("Pipeline Reflection")
print("=" * 60)
for i, note in enumerate(reflection, 1):
    print(f"\n{i}. {note}")

Pipeline Reflection

1. Row definition: One row = one second of ICS sensor readings from the HAI-21.03 dataset. Label 0=normal operation, 1=cyber attack (original HAI labels), 2=injected sensor fault (synthetic).

2. Required columns: timestamp, label, is_attack, and all 79 sensor columns are required. Fault metadata columns (fault_type, fault_sensor, fault_start, fault_end, fault_severity) are retained for validation but excluded from the feature matrix.

3. NaN policy: Sensor NaNs are only permitted in fault rows (label=2) where they represent intermittent dropout fault signatures. 16 sensors carry dropout NaNs, all confined to fault rows. Zero NaNs in normal or attack rows.

4. Range checks: Physical plausibility bounds derived empirically from 1,178,988 normal operation rows with a 10% margin. 29 sensors show out-of-range violations, all confined to label=1 or label=2 rows — confirming bounds correctly characterize normal operation and that faults/attacks produce distinguishable se

# From here, move to the Feature Engineering Notebook