In [3]:
# ingestion.py
import requests
from typing import Dict, Any, Optional

def get_oauth2_token(token_url: str, client_id: str, client_secret: str, scope: Optional[str]=None) -> str:
    data = {"grant_type": "client_credentials"}
    if scope:
        data["scope"] = scope
    resp = requests.post(token_url, data=data, auth=(client_id, client_secret), timeout=10)
    resp.raise_for_status()
    return resp.json()["access_token"]

def fetch_api_oauth(api_url: str, token_url: str, client_id: str, client_secret: str, params: Dict = None) -> Dict[str,Any]:
    token = get_oauth2_token(token_url, client_id, client_secret)
    headers = {"Authorization": f"Bearer {token}", "Accept": "application/json"}
    r = requests.get(api_url, headers=headers, params=params, timeout=20)
    r.raise_for_status()
    return r.json()

def fetch_api_mtls(api_url: str, cert_tuple: tuple, ca_bundle: Optional[str] = None, params: Dict=None) -> Dict[str,Any]:
    # cert_tuple = ("/path/client.crt", "/path/client.key")
    verify = ca_bundle if ca_bundle else True
    r = requests.get(api_url, cert=cert_tuple, verify=verify, params=params, timeout=20)
    r.raise_for_status()
    return r.json()


In [4]:
# normalize.py
import pandas as pd
from typing import List, Dict, Any

CANONICAL_COLUMNS = [
    "train_number","station_code","timestamp",
    "scheduled_arrival","scheduled_departure","actual_arrival",
    "speed_kmph","signal_aspect","rolling_stock_health",
    "weather_condition","source"
]

def parse_ts(x):
    return pd.to_datetime(x, errors="coerce")

def normalize_timetable(timetable_json: List[Dict[str,Any]]) -> pd.DataFrame:
    rows = []
    for r in timetable_json:
        rows.append({
            "train_number": str(r.get("train_number") or r.get("train_no") or ""),
            "station_code": r.get("station_code") or r.get("station") or "",
            "timestamp": parse_ts(r.get("last_update") or r.get("timestamp")),
            "scheduled_arrival": parse_ts(r.get("scheduled_arrival")),
            "scheduled_departure": parse_ts(r.get("scheduled_departure")),
            "actual_arrival": parse_ts(r.get("actual_arrival")),
            "speed_kmph": r.get("speed_kmph"),
            "signal_aspect": None,
            "rolling_stock_health": None,
            "weather_condition": r.get("weather"),
            "source": "timetable"
        })
    return pd.DataFrame(rows)[CANONICAL_COLUMNS]

def normalize_signalling(signalling_json: List[Dict[str,Any]]) -> pd.DataFrame:
    rows=[]
    for r in signalling_json:
        rows.append({
            "train_number": str(r.get("train_number","")),
            "station_code": r.get("station_code") or r.get("block_id") or "",
            "timestamp": parse_ts(r.get("timestamp")),
            "scheduled_arrival": None,
            "scheduled_departure": None,
            "actual_arrival": None,
            "speed_kmph": r.get("approach_speed"),
            "signal_aspect": r.get("aspect"),
            "rolling_stock_health": None,
            "weather_condition": r.get("weather"),
            "source": "signalling"
        })
    return pd.DataFrame(rows)[CANONICAL_COLUMNS]

def normalize_rolling(rolling_json: List[Dict[str,Any]]) -> pd.DataFrame:
    rows=[]
    for r in rolling_json:
        rows.append({
            "train_number": str(r.get("train_number","")),
            "station_code": r.get("nearest_station") or "",
            "timestamp": parse_ts(r.get("timestamp")),
            "scheduled_arrival": None,
            "scheduled_departure": None,
            "actual_arrival": parse_ts(r.get("actual_arrival")),
            "speed_kmph": r.get("speed"),
            "signal_aspect": None,
            "rolling_stock_health": r.get("health_score"),
            "weather_condition": r.get("weather"),
            "source": "rolling_stock"
        })
    return pd.DataFrame(rows)[CANONICAL_COLUMNS]

def merge_all(dfs: List[pd.DataFrame]) -> pd.DataFrame:
    df = pd.concat(dfs, ignore_index=True, sort=False)
    for col in ["timestamp","scheduled_arrival","scheduled_departure","actual_arrival"]:
        df[col] = pd.to_datetime(df[col], errors="coerce")
    return df


In [7]:
# features.py
import pandas as pd
import redis
import json
from typing import Optional

def compute_labels_and_features(df: pd.DataFrame) -> pd.DataFrame:
    # Ensure scheduled & actual are present where available
    df["delay_minutes"] = (df["actual_arrival"] - df["scheduled_arrival"]).dt.total_seconds() / 60.0
    df["delay_minutes"] = df["delay_minutes"].fillna(0.0)

    # Sort per train to compute previous delays
    df = df.sort_values(["train_number","scheduled_arrival"]).reset_index(drop=True)

    # prev_delay: last station delay for this train
    df["prev_delay"] = df.groupby("train_number")["delay_minutes"].shift(1).fillna(0.0)

    # rolling average of last 3 delays (excluding current)
    df["avg_prev3_delay"] = df.groupby("train_number")["delay_minutes"].apply(
        lambda s: s.shift(1).rolling(window=3, min_periods=1).mean()).fillna(0.0)

    # temporal
    df["minute_of_day"] = df["scheduled_arrival"].dt.hour*60 + df["scheduled_arrival"].dt.minute
    df["day_of_week"] = df["scheduled_arrival"].dt.dayofweek
    df["is_weekend"] = df["day_of_week"].isin([5,6]).astype(int)

    # fill numeric fields
    df["speed_kmph"] = pd.to_numeric(df["speed_kmph"], errors="coerce").fillna(df["speed_kmph"].median())
    df["rolling_stock_health"] = pd.to_numeric(df["rolling_stock_health"], errors="coerce").fillna(df["rolling_stock_health"].median())

    # Target classification: delayed if > 5 minutes
    df["delayed"] = (df["delay_minutes"] > 5.0).astype(int)

    return df

def push_online_features_to_redis(df: pd.DataFrame, redis_url: str="redis://localhost:6379/0"):
    r = redis.from_url(redis_url)
    # For each train, push latest prev_delay and avg_prev3_delay
    latest = df.sort_values("scheduled_arrival").groupby("train_number").tail(1)
    for _, row in latest.iterrows():
        key = f"train:{row['train_number']}:online"
        payload = {
            "prev_delay": float(row["prev_delay"]),
            "avg_prev3_delay": float(row["avg_prev3_delay"]),
            "last_update": str(row["scheduled_arrival"])
        }
        r.set(key, json.dumps(payload))


In [6]:
!pip install redis

Collecting redis
  Downloading redis-6.4.0-py3-none-any.whl.metadata (10 kB)
Downloading redis-6.4.0-py3-none-any.whl (279 kB)
[?25l   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m0.0/279.8 kB[0m [31m?[0m eta [36m-:--:--[0m[2K   [91m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m[90m╺[0m[90m━━━━━━━[0m [32m225.3/279.8 kB[0m [31m7.0 MB/s[0m eta [36m0:00:01[0m[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m279.8/279.8 kB[0m [31m5.9 MB/s[0m eta [36m0:00:00[0m
[?25hInstalling collected packages: redis
Successfully installed redis-6.4.0


In [9]:
import pandas as pd
import lightgbm as lgb
from sklearn.model_selection import train_test_split
from sklearn.metrics import accuracy_score, f1_score
import joblib
import os

# Load dataset
df = pd.read_csv("/content/railway_records_large.csv", parse_dates=["scheduled_arrival","actual_arrival"])

# Target: delay > 5 min = delayed (1), else on-time (0)
df["delay_minutes"] = (df["actual_arrival"] - df["scheduled_arrival"]).dt.total_seconds()/60
df["delayed"] = (df["delay_minutes"] > 5).astype(int)

# Feature engineering
df["minute_of_day"] = df["scheduled_arrival"].dt.hour*60 + df["scheduled_arrival"].dt.minute
df["day_of_week"] = df["scheduled_arrival"].dt.dayofweek
df["is_weekend"] = df["day_of_week"].isin([5,6]).astype(int)

features = ["minute_of_day","day_of_week","is_weekend","speed_kmph","rolling_stock_health"]
X = df[features]
y = df["delayed"]

# Train/test split
X_train, X_valid, y_train, y_valid = train_test_split(X, y, test_size=0.2, shuffle=False)

# Train LightGBM
train_data = lgb.Dataset(X_train, label=y_train)
valid_data = lgb.Dataset(X_valid, label=y_valid)

params = {
    "objective": "binary",
    "metric": "binary_error",
    "learning_rate": 0.05,
    "num_leaves": 64,
    "verbose": -1
}
model = lgb.train(params, train_data, valid_sets=[train_data, valid_data],
                  num_boost_round=1000, callbacks=[lgb.early_stopping(stopping_rounds=50)])

# Predictions
y_pred = (model.predict(X_valid, num_iteration=model.best_iteration) > 0.5).astype(int)

acc = accuracy_score(y_valid, y_pred)
f1 = f1_score(y_valid, y_pred)

print("Accuracy:", acc)
print("F1 Score:", f1)

# Save the model and other artifacts
model_artifacts = {
    "model_tuple": ("lgb", model),
    "features": features,
    "metrics": {"accuracy": acc, "f1_score": f1, "threshold": 0.5} # Adding a threshold for the prediction
}

# Create directory if it doesn't exist
if not os.path.exists("./models"):
    os.makedirs("./models")

joblib.dump(model_artifacts, "./models/railway_classifier.joblib")
print("Model saved successfully!")

Training until validation scores don't improve for 50 rounds
Early stopping, best iteration is:
[1]	training's binary_error: 0.24575	valid_1's binary_error: 0.25
Accuracy: 0.75
F1 Score: 0.8571428571428571
Model saved successfully!


In [11]:
# serve.py
import joblib
import numpy as np
from fastapi import FastAPI, HTTPException, Depends
from pydantic import BaseModel
from datetime import datetime
import redis, json
import os

MODEL_PATH = "./models/railway_classifier.joblib"

app = FastAPI(title="Railway OnTime Classifier")

# Load the model and other artifacts
try:
    artifacts = joblib.load(MODEL_PATH)
    model_tuple = artifacts["model_tuple"]
    # encoders = artifacts["encoders"] # Encoders are not used in this version of serve.py
    features = artifacts["features"]
    metrics = artifacts["metrics"]
except FileNotFoundError:
    raise RuntimeError(f"Model file not found at {MODEL_PATH}. Please train the model first.")
except KeyError as e:
    raise RuntimeError(f"Missing key in model artifacts: {e}. Ensure all required artifacts are saved.")
except Exception as e:
    raise RuntimeError(f"Error loading model artifacts: {e}")


# Redis client for online features
r = redis.from_url("redis://localhost:6379/0")

# Simple token check (replace with OAuth2/JWT or mTLS in production)
def verify_token(token: str):
    # Replace with real verification: introspection, JWKS, or API gateway
    if token != "super-secret-token":
        raise HTTPException(status_code=401, detail="Invalid token")

class PredictRequest(BaseModel):
    train_number: str
    station_code: str
    scheduled_arrival: datetime
    speed_kmph: float = None
    rolling_stock_health: float = None
    # optional: client should pass token in header; for demo we accept token param
    token: str = None

class PredictResponse(BaseModel):
    delayed_probability: float
    delayed: bool
    threshold: float
    explanation: dict

def encode_with_le(val, le):
    try:
        return int(le.transform([str(val)])[0])
    except Exception:
        # unseen -> return median or 0
        return 0

def get_online_features(train_number: str):
    key = f"train:{train_number}:online"
    v = r.get(key)
    if not v:
        return {"prev_delay": 0.0, "avg_prev3_delay": 0.0}
    try:
        d = json.loads(v)
        return {"prev_delay": float(d.get("prev_delay",0.0)), "avg_prev3_delay": float(d.get("avg_prev3_delay",0.0))}
    except:
        return {"prev_delay": 0.0, "avg_prev3_delay": 0.0}

@app.post("/predict", response_model=PredictResponse)
def predict(req: PredictRequest):
    # token check (in header or body). Replace this placeholder.
    if req.token is None:
        raise HTTPException(status_code=401, detail="Missing token")
    verify_token(req.token)

    # Build features
    minute_of_day = req.scheduled_arrival.hour*60 + req.scheduled_arrival.minute
    day_of_week = req.scheduled_arrival.weekday()
    is_weekend = int(day_of_week in (5,6))
    speed = req.speed_kmph if req.speed_kmph is not None else 40.0
    health = req.rolling_stock_health if req.rolling_stock_health is not None else 0.9

    online = get_online_features(req.train_number)
    prev_delay = online["prev_delay"]
    avg_prev3_delay = online["avg_prev3_delay"]

    # Removed encoding for train_number and station_code as they are not used in this model version
    # train_le = encode_with_le(req.train_number, encoders["train_number"])
    # station_le = encode_with_le(req.station_code, encoders["station_code"])

    # Updated feature vector to match the trained model's features
    vec = np.array([[minute_of_day, day_of_week, is_weekend, speed, health]])
    kind, mdl = model_tuple
    if kind == "lgb":
        proba = mdl.predict(vec, num_iteration=mdl.best_iteration)[0]
    else:
        try:
            proba = mdl.predict_proba(vec)[0][1]
        except:
            proba = float(mdl.predict(vec)[0])

    threshold = metrics.get("threshold", 0.5)
    delayed = bool(proba > threshold)

    # minimal explanation: return feature values; for full explanations integrate SHAP server-side
    explanation = {
        "feature_values": {
            "minute_of_day": minute_of_day,
            "day_of_week": day_of_week,
            "speed_kmph": speed,
            "rolling_stock_health": health,
            "prev_delay": prev_delay, # Included online features in explanation
            "avg_prev3_delay": avg_prev3_delay # Included online features in explanation
        }
    }
    return PredictResponse(delayed_probability=float(proba), delayed=delayed, threshold=threshold, explanation=explanation)