# Merge Wiener Linien Historical Data with Station HW 25

This notebook merges Wiener Linien historical incidents with station HW 25 weather data
using an as-of merge on `timestamp` (last known weather at or before the incident).

Filters applied:
- Ignore records where `PartitionKey == "stoerunglang"`
- Ignore records where `category == "stoerunglang"`


In [1]:
import json
import os

import pandas as pd
from pymongo import MongoClient


In [2]:
MONGO_URI = os.getenv("MONGO_URI", "mongodb://mongodb:27017")
DB_NAME = os.getenv("MONGO_DB", "big_data_austria")
WL_COLLECTION = os.getenv("WL_HIST_COLLECTION", "wienerlinien_historical")
HW_COLLECTION = os.getenv("HW_COLLECTION", "Station_HW_25")


In [3]:
TIMESTAMP_CANDIDATES = [
    "timestamp",
    "time_start",
    "timestart",
    "time",
    "eventTime",
    "event_time",
    "startTime",
    "start_time",
    "fromTime",
    "from_time",
    "from",
    "firstSeen",
    "lastSeen",
    "endDate",
    "resumeDate",
    "createdAt",
    "created_at",
    "lastUpdate",
    "last_update",
]


def _parse_data(value):
    if isinstance(value, dict):
        return value
    if isinstance(value, str):
        try:
            return json.loads(value)
        except json.JSONDecodeError:
            return {}
    return {}


def _normalize_wl(df):
    if "data" not in df.columns:
        return df
    data_series = df["data"].apply(_parse_data)
    data_df = pd.json_normalize(data_series, sep="_")
    return pd.concat([df.drop(columns=["data"]), data_df], axis=1)


def _filter_stoerunglang(df):
    if "PartitionKey" in df.columns:
        df = df[~df["PartitionKey"].fillna("").str.lower().eq("stoerunglang")]
    if "category" in df.columns:
        df = df[~df["category"].fillna("").str.lower().eq("stoerunglang")]
    return df


def _pick_timestamp_column(df, candidates):
    for candidate in candidates:
        for col in df.columns:
            if col.lower() == candidate.lower():
                return col
    return None


def _to_timestamp(series):
    return pd.to_datetime(series, errors="coerce", utc=True)


In [4]:
client = MongoClient(MONGO_URI)
db = client[DB_NAME]

wl_docs = list(db[WL_COLLECTION].find({}, {"_id": 0}))
hw_docs = list(db[HW_COLLECTION].find({}, {"_id": 0}))

wl_df = pd.DataFrame(wl_docs)
hw_df = pd.DataFrame(hw_docs)

print(f"Wiener Linien records: {len(wl_df)}")
print(f"HW 25 records: {len(hw_df)}")


Wiener Linien records: 14329
HW 25 records: 8641


In [5]:
wl_df = _normalize_wl(wl_df)
wl_df = _filter_stoerunglang(wl_df)

if "time_start" not in wl_df.columns:
    raise ValueError("Expected 'time_start' in Wiener Linien data after normalization. Columns: " + ", ".join(wl_df.columns))

wl_df["timestamp"] = _to_timestamp(wl_df["time_start"])
wl_df = wl_df.dropna(subset=["timestamp"]).sort_values("timestamp")

wl_df.head()


Unnamed: 0,RowKey,PartitionKey,category,dataHash,endDate,exportDate,firstSeen,imported_at,lastSeen,migration,...,attributes_relatedLineTypes_30,attributes_relatedLineTypes_27A,attributes_relatedLineTypes_,attributes_relatedLineTypes_88A,attributes_relatedLineTypes_27B,attributes_relatedLineTypes_60A,attributes_relatedLineTypes_34A,attributes_relatedLineTypes_76B,attributes_relatedLineTypes_76A,timestamp
14165,ivu_1351_143_dc3ab21e,stoerungkurz,stoerungkurz,dc3ab21e,2025-12-02T23:59:59.000+0100,2025-12-16T13:39:15.722843+01:00,2025-12-02T00:00:30.820320+01:00,2026-01-15T20:22:21.417852+00:00,2025-12-03T00:00:28.485384+01:00,True,...,,,,,,,,,,2025-11-29 23:00:17+00:00
13468,ivu_2343_433_cae892a0,stoerungkurz,stoerungkurz,cae892a0,2025-12-03T23:59:59.000+0100,2025-12-16T13:39:15.722843+01:00,2025-12-03T00:00:23.473419+01:00,2026-01-15T20:22:21.417852+00:00,2025-12-04T00:00:30.937108+01:00,True,...,,,,,,,,,,2025-11-30 23:00:17+00:00
13469,ivu_2343_531_c00fbff2,stoerungkurz,stoerungkurz,c00fbff2,2025-12-03T23:59:59.000+0100,2025-12-16T13:39:15.722843+01:00,2025-12-03T00:00:23.331308+01:00,2026-01-15T20:22:21.417852+00:00,2025-12-04T00:00:30.825207+01:00,True,...,,,,,,,,,,2025-11-30 23:00:17+00:00
14168,ivu_2343_531_341bf39a,stoerungkurz,stoerungkurz,341bf39a,2025-12-02T23:59:59.000+0100,2025-12-16T13:39:15.722843+01:00,2025-12-02T00:00:31.083103+01:00,2026-01-15T20:22:21.417852+00:00,2025-12-03T00:00:23.331308+01:00,True,...,,,,,,,,,,2025-11-30 23:00:17+00:00
14167,ivu_2343_433_d0bfb1d2,stoerungkurz,stoerungkurz,d0bfb1d2,2025-12-02T23:59:59.000+0100,2025-12-16T13:39:15.722843+01:00,2025-12-02T00:00:30.950265+01:00,2026-01-15T20:22:21.417852+00:00,2025-12-03T00:00:23.473419+01:00,True,...,,,,,,,,,,2025-11-30 23:00:17+00:00


In [6]:
if "timestamp" not in hw_df.columns:
    raise ValueError("HW 25 data has no 'timestamp' column. Columns: " + ", ".join(hw_df.columns))

hw_df["timestamp"] = _to_timestamp(hw_df["timestamp"])
hw_df = hw_df.dropna(subset=["timestamp"]).sort_values("timestamp")

hw_df.head()


Unnamed: 0,timestamp,RR,TL,P,FF,SO,RF,station_id
0,2025-11-01 00:00:00+00:00,0.0,8.7,993.5,0.3,0.0,78.0,HW_25
1,2025-11-01 00:10:00+00:00,0.0,8.5,993.5,0.5,0.0,79.0,HW_25
2,2025-11-01 00:20:00+00:00,0.0,8.5,993.7,1.2,0.0,80.0,HW_25
3,2025-11-01 00:30:00+00:00,0.0,8.3,993.6,0.6,0.0,82.0,HW_25
4,2025-11-01 00:40:00+00:00,0.0,8.3,993.6,0.4,0.0,82.0,HW_25


In [7]:
merged = pd.merge_asof(
    wl_df,
    hw_df,
    on="timestamp",
    direction="backward",
)

merged.head()


Unnamed: 0,RowKey,PartitionKey,category,dataHash,endDate,exportDate,firstSeen,imported_at,lastSeen,migration,...,attributes_relatedLineTypes_76B,attributes_relatedLineTypes_76A,timestamp,RR,TL,P,FF,SO,RF,station_id
0,ivu_1351_143_dc3ab21e,stoerungkurz,stoerungkurz,dc3ab21e,2025-12-02T23:59:59.000+0100,2025-12-16T13:39:15.722843+01:00,2025-12-02T00:00:30.820320+01:00,2026-01-15T20:22:21.417852+00:00,2025-12-03T00:00:28.485384+01:00,True,...,,,2025-11-29 23:00:17+00:00,0.0,3.0,990.0,0.9,0.0,80.0,HW_25
1,ivu_2343_433_cae892a0,stoerungkurz,stoerungkurz,cae892a0,2025-12-03T23:59:59.000+0100,2025-12-16T13:39:15.722843+01:00,2025-12-03T00:00:23.473419+01:00,2026-01-15T20:22:21.417852+00:00,2025-12-04T00:00:30.937108+01:00,True,...,,,2025-11-30 23:00:17+00:00,0.0,2.6,992.2,1.0,0.0,98.0,HW_25
2,ivu_2343_531_c00fbff2,stoerungkurz,stoerungkurz,c00fbff2,2025-12-03T23:59:59.000+0100,2025-12-16T13:39:15.722843+01:00,2025-12-03T00:00:23.331308+01:00,2026-01-15T20:22:21.417852+00:00,2025-12-04T00:00:30.825207+01:00,True,...,,,2025-11-30 23:00:17+00:00,0.0,2.6,992.2,1.0,0.0,98.0,HW_25
3,ivu_2343_531_341bf39a,stoerungkurz,stoerungkurz,341bf39a,2025-12-02T23:59:59.000+0100,2025-12-16T13:39:15.722843+01:00,2025-12-02T00:00:31.083103+01:00,2026-01-15T20:22:21.417852+00:00,2025-12-03T00:00:23.331308+01:00,True,...,,,2025-11-30 23:00:17+00:00,0.0,2.6,992.2,1.0,0.0,98.0,HW_25
4,ivu_2343_433_d0bfb1d2,stoerungkurz,stoerungkurz,d0bfb1d2,2025-12-02T23:59:59.000+0100,2025-12-16T13:39:15.722843+01:00,2025-12-02T00:00:30.950265+01:00,2026-01-15T20:22:21.417852+00:00,2025-12-03T00:00:23.473419+01:00,True,...,,,2025-11-30 23:00:17+00:00,0.0,2.6,992.2,1.0,0.0,98.0,HW_25


In [8]:
# Clean merged data to keep only relevant fields
START_COL_CANDIDATES = ["time_start", "timestart", "start_time", "startTime"]
END_COL_CANDIDATES = ["time_end", "timeend", "endDate", "end_date", "endTime", "resumeDate"]
TITLE_CANDIDATES = ["title", "data_title"]

start_col = _pick_timestamp_column(merged, START_COL_CANDIDATES)
end_col = _pick_timestamp_column(merged, END_COL_CANDIDATES)
title_col = _pick_timestamp_column(merged, TITLE_CANDIDATES)

if start_col is None:
    raise ValueError("No start time column found in merged data. Columns: " + ", ".join(merged.columns))
if end_col is None:
    raise ValueError("No end time column found in merged data. Columns: " + ", ".join(merged.columns))

WEATHER_COLS = [col for col in hw_df.columns if col != "timestamp"]
if not WEATHER_COLS:
    raise ValueError("No weather columns detected from HW 25 data.")

missing_weather = [c for c in WEATHER_COLS if c not in merged.columns]
if missing_weather:
    raise ValueError("Missing weather columns in merged data: " + ", ".join(missing_weather))

cleaned = merged.copy()
cleaned["start_time"] = _to_timestamp(cleaned[start_col])
cleaned["end_time"] = _to_timestamp(cleaned[end_col])
if title_col is not None:
    cleaned["title"] = cleaned[title_col]
else:
    print("Warning: no title column found; proceeding without title.")

same_time = (
    cleaned["start_time"].notna()
    & cleaned["end_time"].notna()
    & (cleaned["start_time"] == cleaned["end_time"])
)
cleaned = cleaned[~same_time]
cleaned = cleaned.sort_values(["start_time", "end_time"])
cleaned = cleaned.drop_duplicates(subset=["start_time", "end_time"], keep="first")

for col in WEATHER_COLS:
    if col != "station_id":
        cleaned[col] = pd.to_numeric(cleaned[col], errors="coerce")

base_cols = ["start_time", "end_time"]
if title_col is not None:
    base_cols.append("title")
cleaned = cleaned[base_cols + WEATHER_COLS]
cleaned = cleaned.dropna(subset=["start_time"])
cleaned = cleaned.dropna(subset=WEATHER_COLS, how="all")

cleaned.head()


Unnamed: 0,start_time,end_time,title,title.1,RR,TL,P,FF,SO,RF,station_id
0,2025-11-29 23:00:17+00:00,2025-12-02 22:59:59+00:00,Bauarbeiten Züge halten Alszeile 126,Bauarbeiten Züge halten Alszeile 126,0.0,3.0,990.0,0.9,0.0,80.0,HW_25
3,2025-11-30 23:00:17+00:00,2025-12-02 22:59:59+00:00,Gleisbauarbeiten Busse halten bei Franz-Jonas-...,Gleisbauarbeiten Busse halten bei Franz-Jonas-...,0.0,2.6,992.2,1.0,0.0,98.0,HW_25
1,2025-11-30 23:00:17+00:00,2025-12-03 22:59:59+00:00,Gleisbauarbeiten Haltestelle aufgelassen,Gleisbauarbeiten Haltestelle aufgelassen,0.0,2.6,992.2,1.0,0.0,98.0,HW_25
5,2025-11-30 23:00:18+00:00,2025-12-02 22:59:59+00:00,Gleisbauarbeiten Busse halten Wallensteinstraß...,Gleisbauarbeiten Busse halten Wallensteinstraß...,0.0,2.6,992.2,1.0,0.0,98.0,HW_25
6,2025-11-30 23:00:18+00:00,2025-12-03 22:59:59+00:00,Züge halten Gegenüber,Züge halten Gegenüber,0.0,2.6,992.2,1.0,0.0,98.0,HW_25


In [9]:
# Daily delay summary with weather at delay times
TARGET_WEATHER_COLS = ["RR", "TL", "P", "FF", "SO", "S0", "RF"]
weather_cols_for_avg = [c for c in TARGET_WEATHER_COLS if c in cleaned.columns]
if not weather_cols_for_avg:
    raise ValueError("No target weather columns found for daily averages.")

agg_map = {"delay_count": ("start_time", "size")}
for col in weather_cols_for_avg:
    agg_map[col] = (col, "mean")
if "station_id" in cleaned.columns:
    agg_map["station_id"] = ("station_id", "first")

daily_summary = (
    cleaned
    .dropna(subset=["start_time"])
    .assign(date=lambda df: df["start_time"].dt.date)
    .groupby("date", as_index=False)
    .agg(**agg_map)
)

daily_summary["date"] = pd.to_datetime(daily_summary["date"])
daily_summary.head()


Unnamed: 0,date,delay_count,RR,TL,P,FF,SO,RF,station_id
0,2025-11-29,1,0.0,3.0,990.0,0.9,0.0,80.0,HW_25
1,2025-11-30,6,0.0,2.6,992.2,1.0,0.0,98.0,HW_25
2,2025-12-02,9,0.0,2.455556,993.488889,3.211111,0.0,97.333333,HW_25
3,2025-12-03,59,0.0,3.618644,991.891525,3.761017,0.0,93.033898,HW_25
4,2025-12-04,118,0.045763,4.588983,986.762712,3.014407,0.0,92.635593,HW_25


In [10]:
# Clear output collections in MongoDB if you want to rerun inserts
import os
from pymongo import MongoClient

MONGO_URI = os.getenv("MONGO_URI", "mongodb://mongodb:27017")
DB_NAME = os.getenv("MONGO_DB", "big_data_austria")
CLEANED_COLLECTION = os.getenv("CLEANED_COLLECTION", "wienerlinien_hw25_merged")
DAILY_COLLECTION = os.getenv("DAILY_COLLECTION", "wienerlinien_hw25_daily")

client = MongoClient(MONGO_URI)
db = client[DB_NAME]

delete_cleaned = db[CLEANED_COLLECTION].delete_many({})
delete_daily = db[DAILY_COLLECTION].delete_many({})

print(f"Cleared {delete_cleaned.deleted_count} documents from {CLEANED_COLLECTION}")
print(f"Cleared {delete_daily.deleted_count} documents from {DAILY_COLLECTION}")


Cleared 0 documents from wienerlinien_hw25_merged
Cleared 0 documents from wienerlinien_hw25_daily


In [11]:
# Persist cleaned and daily summary data to CSV and MongoDB
CLEANED_CSV_PATH = os.getenv("CLEANED_CSV_PATH", "merged_wienerlinien_hw25.csv")
cleaned.to_csv(CLEANED_CSV_PATH, index=False)
print(f"CSV written: {CLEANED_CSV_PATH}")

CLEANED_COLLECTION = os.getenv("CLEANED_COLLECTION", "wienerlinien_hw25_merged")
cleaned_out = cleaned.copy()
for col in ["start_time", "end_time"]:
    if col in cleaned_out.columns and pd.api.types.is_datetime64_any_dtype(cleaned_out[col]):
        if cleaned_out[col].dt.tz is not None:
            cleaned_out[col] = cleaned_out[col].dt.tz_convert("UTC").dt.tz_localize(None)
if not cleaned_out.empty:
    db[CLEANED_COLLECTION].insert_many(cleaned_out.to_dict(orient="records"))
print(f"MongoDB written: {CLEANED_COLLECTION} ({len(cleaned_out)} records)")

DAILY_CSV_PATH = os.getenv("DAILY_CSV_PATH", "wienerlinien_hw25_daily.csv")
daily_summary.to_csv(DAILY_CSV_PATH, index=False)
print(f"CSV written: {DAILY_CSV_PATH}")

DAILY_COLLECTION = os.getenv("DAILY_COLLECTION", "wienerlinien_hw25_daily")
daily_out = daily_summary.copy()
if "date" in daily_out.columns and pd.api.types.is_datetime64_any_dtype(daily_out["date"]):
    if daily_out["date"].dt.tz is not None:
        daily_out["date"] = daily_out["date"].dt.tz_convert("UTC").dt.tz_localize(None)
if not daily_out.empty:
    db[DAILY_COLLECTION].insert_many(daily_out.to_dict(orient="records"))
print(f"MongoDB written: {DAILY_COLLECTION} ({len(daily_out)} records)")


CSV written: merged_wienerlinien_hw25.csv


  db[CLEANED_COLLECTION].insert_many(cleaned_out.to_dict(orient="records"))


MongoDB written: wienerlinien_hw25_merged (1347 records)
CSV written: wienerlinien_hw25_daily.csv
MongoDB written: wienerlinien_hw25_daily (24 records)
