# 02 - Data Cleaning & Preprocessing

**Objective**: Load the merged timetable + disruption dataset, inspect data quality, handle missing values, remove duplicates, fix data types, and produce a clean dataset for feature engineering.

**Input**: `data/processed/ensign_timetable_with_disruptions.csv`  
**Output**: `data/processed/cleaned_dataset.csv`

In [None]:
# ============================================================
# CELL 1: Imports & Spark Session
# ============================================================
import findspark
findspark.init()

from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.types import *
import pandas as pd
import numpy as np

spark = SparkSession.builder \
    .appName("DataCleaning") \
    .master("local[*]") \
    .config("spark.sql.shuffle.partitions", "8") \
    .config("spark.driver.memory", "4g") \
    .getOrCreate()

print("Spark session ready!")
spark

In [None]:
# ============================================================
# CELL 2: Load Raw Merged Dataset
# ============================================================
DATA_PATH = r'F:\SOFTWARICA\big-data-transport-analytics\data\processed\ensign_timetable_with_disruptions.csv'

pdf = pd.read_csv(DATA_PATH)

print(f"Dataset loaded: {pdf.shape[0]:,} rows x {pdf.shape[1]} columns")
print(f"\nAll columns ({pdf.shape[1]}):")
for i, col in enumerate(pdf.columns, 1):
    print(f"  {i:2d}. {col:35s} dtype={pdf[col].dtype}")

pdf.head(3)

In [None]:
# ============================================================
# CELL 3: Data Quality Inspection
# ============================================================

print("=" * 65)
print("DATA QUALITY REPORT")
print("=" * 65)

# Missing values
print("\n1. MISSING VALUES:")
null_counts = pdf.isnull().sum()
null_pct = (pdf.isnull().sum() / len(pdf) * 100).round(2)
null_report = pd.DataFrame({'nulls': null_counts, 'pct': null_pct})
null_report = null_report[null_report['nulls'] > 0]
if len(null_report) > 0:
    print(null_report.to_string())
else:
    print("   No missing values found!")

# Duplicates
print(f"\n2. DUPLICATE ROWS:")
n_dupes = pdf.duplicated().sum()
print(f"   Full row duplicates: {n_dupes} ({n_dupes/len(pdf)*100:.2f}%)")

# Data type issues
print(f"\n3. DATA TYPES:")
print(f"   Numeric columns:  {pdf.select_dtypes(include=[np.number]).shape[1]}")
print(f"   Object columns:   {pdf.select_dtypes(include=['object']).shape[1]}")

# Value ranges for key numeric columns
print(f"\n4. VALUE RANGES (numeric):")
num_cols = ['departure_hour', 'stop_sequence', 'latitude', 'longitude',
            'active_disruptions', 'avg_severity_score', 'run_time_min',
            'day_of_week_num', 'is_peak_hour']
for col in num_cols:
    if col in pdf.columns:
        print(f"   {col:25s} min={pdf[col].min():>10.2f}  max={pdf[col].max():>10.2f}  mean={pdf[col].mean():>10.2f}")

# Unique values for categorical columns
print(f"\n5. CATEGORICAL COLUMNS:")
cat_cols = ['line_name', 'direction', 'operator_code', 'days_of_week']
for col in cat_cols:
    if col in pdf.columns:
        print(f"   {col:25s} unique={pdf[col].nunique():>5}  values={list(pdf[col].unique()[:8])}")

In [None]:
# ============================================================
# CELL 4: Handle Missing Values
# ============================================================

print("HANDLING MISSING VALUES")
print("=" * 50)

before_rows = len(pdf)

# Fill numeric nulls with 0 (disruption counts) or median (continuous)
disruption_cols = ['active_disruptions', 'unique_situations', 'avg_severity_score',
                   'max_severity_score', 'planned_count', 'unplanned_count',
                   'roadworks_count', 'roadclosed_count', 'other_reason_count']
for col in disruption_cols:
    if col in pdf.columns:
        nulls_before = pdf[col].isnull().sum()
        if nulls_before > 0:
            pdf[col] = pdf[col].fillna(0)
            print(f"  {col}: filled {nulls_before} nulls with 0")

# Fill remaining numeric with median
for col in pdf.select_dtypes(include=[np.number]).columns:
    nulls_before = pdf[col].isnull().sum()
    if nulls_before > 0:
        median_val = pdf[col].median()
        pdf[col] = pdf[col].fillna(median_val)
        print(f"  {col}: filled {nulls_before} nulls with median ({median_val:.2f})")

# Fill categorical nulls with 'unknown'
for col in pdf.select_dtypes(include=['object']).columns:
    nulls_before = pdf[col].isnull().sum()
    if nulls_before > 0:
        pdf[col] = pdf[col].fillna('unknown')
        print(f"  {col}: filled {nulls_before} nulls with 'unknown'")

remaining_nulls = pdf.isnull().sum().sum()
print(f"\nRemaining nulls after filling: {remaining_nulls}")

In [None]:
# ============================================================
# CELL 5: Remove Duplicates
# ============================================================

print("REMOVING DUPLICATES")
print("=" * 50)

before = len(pdf)
pdf = pdf.drop_duplicates()
after = len(pdf)
dropped = before - after

print(f"  Before: {before:,} rows")
print(f"  After:  {after:,} rows")
print(f"  Dropped: {dropped:,} duplicate rows ({dropped/before*100:.2f}%)")

In [None]:
# ============================================================
# CELL 6: Fix Data Types & Validate Ranges
# ============================================================

print("DATA TYPE CORRECTIONS & RANGE VALIDATION")
print("=" * 50)

# Ensure correct numeric types
pdf['departure_hour'] = pdf['departure_hour'].astype(float)
pdf['is_peak_hour'] = pdf['is_peak_hour'].astype(int)
pdf['day_of_week_num'] = pdf['day_of_week_num'].astype(int)
pdf['stop_sequence'] = pdf['stop_sequence'].astype(int)
print("  Numeric types enforced.")

# Validate departure_hour is 0-23
invalid_hours = pdf[(pdf['departure_hour'] < 0) | (pdf['departure_hour'] > 23)]
if len(invalid_hours) > 0:
    print(f"  WARNING: {len(invalid_hours)} rows with invalid departure_hour -> clipping to [0,23]")
    pdf['departure_hour'] = pdf['departure_hour'].clip(0, 23)
else:
    print("  departure_hour range valid (0-23).")

# Validate latitude/longitude (UK should be lat ~50-56, lon ~-6 to 2)
invalid_coords = pdf[(pdf['latitude'] < 49) | (pdf['latitude'] > 61) |
                     (pdf['longitude'] < -8) | (pdf['longitude'] > 3)]
if len(invalid_coords) > 0:
    print(f"  WARNING: {len(invalid_coords)} rows with coordinates outside UK bounds")
else:
    print("  Coordinates valid (UK bounds).")

# Validate disruption counts are non-negative
for col in ['active_disruptions', 'planned_count', 'unplanned_count', 'roadworks_count']:
    neg = (pdf[col] < 0).sum()
    if neg > 0:
        print(f"  WARNING: {neg} negative values in {col} -> setting to 0")
        pdf[col] = pdf[col].clip(lower=0)

print("  All disruption counts non-negative.")

# Validate is_peak_hour is binary
assert pdf['is_peak_hour'].isin([0, 1]).all(), "is_peak_hour should be 0 or 1"
print("  is_peak_hour confirmed binary (0/1).")

print(f"\nFinal cleaned dataset: {pdf.shape[0]:,} rows x {pdf.shape[1]} columns")

In [None]:
# ============================================================
# CELL 7: Drop Unnecessary Columns for ML
# ============================================================

# Columns to DROP: IDs, redundant text, high cardinality, leakage risk
DROP_COLS = [
    'vehicle_journey_code',  # ID only
    'journey_code',          # ID only
    'service_code',          # redundant with line_name
    'operator_code',         # single value (ENSB)
    'operator_name',         # single value (Ensignbus)
    'stop_ref',              # ID only
    'stop_name',             # high cardinality text
    'origin',                # high cardinality text
    'destination',           # high cardinality text
    'departure_time',        # already encoded as departure_hour
    'scheduled_arrival',     # redundant with departure + sequence
    'departure_decimal',     # duplicate of departure_hour
    'start_date',            # date info captured in mid_date
    'end_date',              # date info captured in mid_date
    'days_of_week',          # encoded as day_of_week_num
    'mid_date',              # temporal leakage risk
    'regions_affected',      # string - not useful for ML directly
    'max_severity_score',    # highly correlated with avg_severity_score
    'other_reason_count',    # low signal, noisy
]

# Only drop columns that actually exist
cols_to_drop = [c for c in DROP_COLS if c in pdf.columns]
pdf_clean = pdf.drop(columns=cols_to_drop)

print("COLUMNS DROPPED (not useful for ML):")
for col in cols_to_drop:
    print(f"  - {col}")

print(f"\nCOLUMNS KEPT ({pdf_clean.shape[1]}):")
for i, col in enumerate(pdf_clean.columns, 1):
    print(f"  {i:2d}. {col:30s} dtype={pdf_clean[col].dtype}")

print(f"\nCleaned dataset: {pdf_clean.shape[0]:,} rows x {pdf_clean.shape[1]} columns")

In [None]:
# ============================================================
# CELL 8: Save Cleaned Dataset
# ============================================================
import os

output_dir = r'F:\SOFTWARICA\big-data-transport-analytics\data\processed'
os.makedirs(output_dir, exist_ok=True)

output_path = os.path.join(output_dir, 'cleaned_dataset.csv')
pdf_clean.to_csv(output_path, index=False)

file_size = os.path.getsize(output_path)
print(f"Saved: cleaned_dataset.csv")
print(f"  Path: {output_path}")
print(f"  Size: {file_size/1024:.1f} KB ({file_size/1024/1024:.2f} MB)")
print(f"  Rows: {pdf_clean.shape[0]:,}")
print(f"  Cols: {pdf_clean.shape[1]}")

print(f"\n--- Cleaning Summary ---")
print(f"  Input:   {DATA_PATH.split(chr(92))[-1]} ({pdf.shape[0]:,} x {pdf.shape[1]})")
print(f"  Output:  cleaned_dataset.csv ({pdf_clean.shape[0]:,} x {pdf_clean.shape[1]})")
print(f"  Nulls filled: disruption cols -> 0, numeric -> median, categorical -> 'unknown'")
print(f"  Duplicates removed: {before - after:,}")
print(f"  Columns dropped: {len(cols_to_drop)} (IDs, text, redundant, leakage)")
print(f"\n  Ready for 03_feature_engineering.ipynb")