### **Notebook 01 – Build merged Velo/Fuss + MeteoSwiss daily dataset**

This notebook builds a clean, daily-level dataset that combines:

- Velo/Fuss counts from the Basel open data portal.
- MeteoSwiss hourly weather observations for the Basel station.

The goal is to:
1. Load both raw sources.
2. Aggregate them to a common daily granularity.
3. Merge them on the calendar date.
4. Run basic data quality checks and export `merged_dataset.csv` for later analysis.

#### **Imports and configuration**

Here we import the required Python libraries and configure the project paths and parameters. 

The data directory is addressed via a relative path so that the notebook remains reproducible on different machines as long as the folder structure is the same.

In [31]:
# ---------------- Imports and Configuration ----------------
from pathlib import Path
from IPython.display import display
import numpy as np
import pandas as pd


# ---------------- Paths & configuration ----------------
DATA_DIR = Path("data")  # relative to the project root

VELO_CSV = DATA_DIR / "converted_Velo_Fuss_Count.csv"
METEO_CSV = DATA_DIR / "ogd-smn_bas_h_historical_2020-2029.csv"
METEO_META = DATA_DIR / "ogd-smn_meta_parameters.csv"
MERGED_CSV = DATA_DIR / "merged_dataset.csv"

YEARS = [2024]       # choose year(s) to analyze
CHUNKSIZE = 500_000  # rows per chunk when streaming Velo data

print("Current working directory:", Path(".").resolve())
print("Data directory           :", DATA_DIR.resolve())
print("Velo CSV                 :", VELO_CSV)
print("Meteo CSV                :", METEO_CSV)
print("Meteo meta               :", METEO_META)
print("Merged output            :", MERGED_CSV)
print("\nAnalyzing year(s)", YEARS, "with chunk size:", CHUNKSIZE, "\n")

# Sanity checks for reproducibility
assert VELO_CSV.exists(), f"File not found: {VELO_CSV}"
assert METEO_CSV.exists(), f"File not found: {METEO_CSV}"
assert METEO_META.exists(), f"File not found: {METEO_META}"

Current working directory: C:\Users\FHNW\Documents\daw\project
Data directory           : C:\Users\FHNW\Documents\daw\project\data
Velo CSV                 : data\converted_Velo_Fuss_Count.csv
Meteo CSV                : data\ogd-smn_bas_h_historical_2020-2029.csv
Meteo meta               : data\ogd-smn_meta_parameters.csv
Merged output            : data\merged_dataset.csv

Analyzing year(s) [2024] with chunk size: 500000 



#### **Data loading and aggregation**

The next cell defines the functions for loading and transforming the raw data:

- Load MeteoSwiss data and rename parameters using the metadata and parses timestamps.
- Aggregate MeteoSwiss hourly observations to daily temperature, precipitation and wind speed.
- Streams the large Velo/Fuss CSV in chunks, cleans invalid records, and aggregates to daily totals per site.
- Check completeness of calendar days per year.

In [32]:
# ---------------- MeteoSwiss: load + aggregate ----------------
def load_meteo_data(meteo_csv, meta_csv):
    """Load MeteoSwiss data and rename columns using metadata."""
    print("\nLoading MeteoSwiss data and metadata...\n")
    data = pd.read_csv(meteo_csv, sep=";", encoding="latin1")
    meta = pd.read_csv(meta_csv, sep=";", encoding="latin1")

    print("MeteoSwiss data head:")
    display(data.head(3))
    print("\nMeteoSwiss metadata head:")
    display(meta.head(3))

    rename_dict = dict(
        zip(meta["parameter_shortname"], meta["parameter_description_en"])
    )
    data = data.rename(columns=rename_dict)

    data["reference_timestamp"] = pd.to_datetime(
        data["reference_timestamp"],
        format="%d.%m.%Y %H:%M",
        dayfirst=True,
        errors="coerce",
    )

    print("\nMeteoSwiss data with fixed column names (from metadata) and parsed timestamps:")
    display(data.head(3))
    return data


def aggregate_daily_meteo(df, years):
    """Aggregate hourly MeteoSwiss data to daily temp/precip/wind."""
    print("\nAggregating MeteoSwiss data to daily level...")
    df = df[df["reference_timestamp"].dt.year.isin(years)].copy()

    daily = (
        df.groupby(df["reference_timestamp"].dt.floor("D"))
        .agg(
            temp_mean_C=("Air temperature 2 m above ground; hourly mean", "mean"),
            precip_mm=("Precipitation; hourly total", "sum"),
            wind_mean_ms=("Wind speed scalar; hourly mean in m/s", "mean"),
        )
        .reset_index()
        .rename(columns={"reference_timestamp": "date"})
    )

    print("Daily MeteoSwiss data head:")
    display(daily.head(3))
    return daily


# ---------------- Velo/Fuss: streaming + aggregate ----------------
def aggregate_velo_streaming(velo_csv, years, chunksize):
    """
    Stream Velo/Fuss CSV and aggregate to daily totals per site.

    Returns columns: ['SiteCode', 'SiteName', 'date', 'daily_total'].
    """
    print("\nLoading Velo/Fuss data in streaming mode...")
    print("Aggregating daily totals while streaming...\n")

    usecols = ["Date", "SiteCode", "SiteName", "Total"]
    daily_agg = []
    chunk_count = 0

    for chunk in pd.read_csv(
        velo_csv,
        sep=";",
        encoding="utf-8",  # Basel portal is UTF-8; fixes 'Dreirosenbrücke'
        usecols=usecols,
        chunksize=chunksize,
        low_memory=False,
    ):
        chunk_count += 1

        # Parse dates and keep only valid ones in the selected years
        chunk["Date"] = pd.to_datetime(
            chunk["Date"], errors="coerce", dayfirst=True
        )
        chunk = chunk.dropna(subset=["Date"])
        chunk = chunk[chunk["Date"].dt.year.isin(years)]
        if chunk.empty:
            continue

        # Ensure numeric counts and drop negatives
        chunk["Total"] = pd.to_numeric(chunk["Total"], errors="coerce")
        chunk = chunk.dropna(subset=["Total"])
        chunk = chunk[chunk["Total"] >= 0]

        # Create daily date column and aggregate per site + date
        chunk["date"] = chunk["Date"].dt.floor("D")
        daily = (
            chunk.groupby(["SiteCode", "SiteName", "date"], as_index=False)["Total"]
            .sum()
        )
        daily_agg.append(daily)

    if not daily_agg:
        raise ValueError("No Velo/Fuss data found for requested years.")

    result = pd.concat(daily_agg, ignore_index=True)
    result = result.rename(columns={"Total": "daily_total"})

    # Second aggregation to ensure one row per (SiteCode, SiteName, date)
    result = (
        result.groupby(["SiteCode", "SiteName", "date"], as_index=False)["daily_total"]
        .sum()
    )

    print(f"Finished processing {chunk_count} chunks.")
    print("Daily Velo/Fuss data (head):")
    display(result.head(3))
    return result


# ---------------- Helper: missing days per year ----------------
def check_missing_days(df, col, years):
    """Count missing calendar days per year based on a datetime column."""
    out = {}
    for y in years:
        expected = pd.date_range(f"{y}-01-01", f"{y}-12-31", freq="D")
        present = df.loc[df[col].dt.year == y, col].drop_duplicates()
        missing = expected.difference(present)
        out[y] = len(missing)
    return out

#### **Merging daily Velo/Fuss and MeteoSwiss data**

This cell defines the merge step. We join the two daily datasets on the `date` column and add simple calendar features (`weekday`, `is_weekend`). 

The function prints the merged shape and displays the first rows as a quick preview of the final structure that will be used for all downstream analysis.

In [33]:
# ---------------- Merge + simple feature engineering ----------------
def merge_datasets(daily_velo, daily_meteo):
    """
    Merge Velo/Fuss and Meteo data on 'date' and add weekday/weekend features.
    """
    print("\nMerging Velo/Fuss and Meteo datasets...")
    daily_velo["date"] = pd.to_datetime(daily_velo["date"])
    daily_meteo["date"] = pd.to_datetime(daily_meteo["date"])

    merged = pd.merge(daily_velo, daily_meteo, on="date", how="left")

    merged["weekday"] = merged["date"].dt.day_name()
    merged["is_weekend"] = merged["date"].dt.dayofweek >= 5

    print(f"Merged dataset shape: {merged.shape}")
    print("\nMerged dataset (head):")
    display(merged.head(3))
    return merged

#### **Build pipeline with basic quality checks**

The final cell wires all components together in a single `run_pipeline` function:

- Load MeteoSwiss data and metadata,
- Aggregate MeteoSwiss to daily level,
- Stream and aggregate Velo/Fuss counts to daily totals per site,
- Check completeness of calendar days for the selected year(s),
- Assert that there are no duplicated `(SiteCode, SiteName, date)` rows after aggregation,
- Merge the datasets and report missing values per column,
- Save the final merged dataset as `merged_dataset.csv`.

In [34]:
# ---------------- Run full pipeline ----------------
def run_pipeline():
    """Execute the full DAW pipeline and save merged dataset."""
    meteo_raw = load_meteo_data(METEO_CSV, METEO_META)
    daily_meteo = aggregate_daily_meteo(meteo_raw, YEARS)

    daily_velo = aggregate_velo_streaming(VELO_CSV, YEARS, CHUNKSIZE)

    # --- Simple data quality checks ---
    print("\nChecking missing days in both datasets:")
    print("MeteoSwiss:", check_missing_days(daily_meteo, "date", YEARS))
    print("Velo/Fuss:", check_missing_days(daily_velo, "date", YEARS))
   
    # Check for duplicates in daily Velo/Fuss
    dup_count = daily_velo.duplicated(["SiteCode", "SiteName", "date"]).sum()
    print("\nPossible duplicates in daily Velo/Fuss (SiteCode, SiteName, date):", dup_count)
    assert dup_count == 0, "Found duplicated (SiteCode, SiteName, date) rows in Velo/Fuss daily data."

    merged = merge_datasets(daily_velo, daily_meteo)

    # Check missing values in merged dataset
    print("\nMissing values per column in merged dataset:")
    print(merged.isna().sum())

    merged.to_csv(MERGED_CSV, index=False)
    print("\nMerged dataset saved as:", MERGED_CSV)
    return merged

# Run pipeline
merged_df = run_pipeline()


Loading MeteoSwiss data and metadata...

MeteoSwiss data head:


Unnamed: 0,station_abbr,reference_timestamp,tre200h0,tre200hn,tre200hx,tre005h0,tre005hn,ure200h0,pva200h0,tde200h0,...,fve010h0,rre150h0,htoauths,gre000h0,oli000h0,olo000h0,osr000h0,ods000h0,sre000h0,erefaoh0
0,BAS,01.01.2020 00:00,-0.8,-1.1,-0.6,-1.3,-1.9,93.9,5.4,-1.6,...,,0.0,,0,286.0,,,1.0,0,0.001
1,BAS,01.01.2020 01:00,-1.4,-2.3,-0.9,-3.4,-4.8,92.9,5.2,-2.4,...,,0.0,,0,229.0,,,1.0,0,-0.005
2,BAS,01.01.2020 02:00,-1.6,-2.1,-1.0,-5.1,-5.8,91.8,5.0,-2.7,...,,0.0,,0,225.0,,,1.0,0,-0.015



MeteoSwiss metadata head:


Unnamed: 0,parameter_shortname,parameter_description_de,parameter_description_fr,parameter_description_it,parameter_description_en,parameter_group_de,parameter_group_fr,parameter_group_it,parameter_group_en,parameter_granularity,parameter_decimals,parameter_datatype,parameter_unit
0,dkl010d0,Windrichtung; Tagesmittel,Direction du vent; moyenne journalière,Direzione del vento; media giornaliera,Wind direction; daily mean,Wind,Vent,Vento,Wind,D,0,Integer,°
1,dkl010h0,Windrichtung; Stundenmittel,Direction du vent; moyenne horaire,Direzione del vento; media oraria,Wind direction; hourly mean,Wind,Vent,Vento,Wind,H,0,Integer,°
2,dkl010z0,Windrichtung; Zehnminutenmittel,Direction du vent; moyenne sur 10 minutes,Direzione del vento; media su 10',Wind direction; ten minutes mean,Wind,Vent,Vento,Wind,T,0,Integer,°



MeteoSwiss data with fixed column names (from metadata) and parsed timestamps:


Unnamed: 0,station_abbr,reference_timestamp,Air temperature 2 m above ground; hourly mean,Air temperature 2 m above ground; hourly minimum,Air temperature 2 m above ground; hourly maximum,Air temperature at 5 cm above grass; hourly mean,Air temperature at 5 cm above grass; hourly minimum,Relative air humidity 2 m above ground; hourly mean,Vapour pressure 2 m above ground; hourly mean,Dew point 2 m above ground; hourly mean,...,Wind speed vectorial; hourly mean in m/s,Precipitation; hourly total,Snow depth (automatic measurement); hourly current value,Global radiation; hourly mean,Longwave incoming radiation; hourly mean,Longwave outgoing radiation; hourly mean,Shortwave reflected radiation; hourly mean,Diffuse radiation; hourly mean,Sunshine duration; hourly total,Reference evaporation from FAO; hourly total
0,BAS,2020-01-01 00:00:00,-0.8,-1.1,-0.6,-1.3,-1.9,93.9,5.4,-1.6,...,,0.0,,0,286.0,,,1.0,0,0.001
1,BAS,2020-01-01 01:00:00,-1.4,-2.3,-0.9,-3.4,-4.8,92.9,5.2,-2.4,...,,0.0,,0,229.0,,,1.0,0,-0.005
2,BAS,2020-01-01 02:00:00,-1.6,-2.1,-1.0,-5.1,-5.8,91.8,5.0,-2.7,...,,0.0,,0,225.0,,,1.0,0,-0.015



Aggregating MeteoSwiss data to daily level...
Daily MeteoSwiss data head:


Unnamed: 0,date,temp_mean_C,precip_mm,wind_mean_ms
0,2024-01-01,6.3,1.7,2.820833
1,2024-01-02,8.6625,8.4,5.141667
2,2024-01-03,11.2375,3.9,6.758333



Loading Velo/Fuss data in streaming mode...
Aggregating daily totals while streaming...

Finished processing 16 chunks.
Daily Velo/Fuss data (head):


Unnamed: 0,SiteCode,SiteName,date,daily_total
0,350,350 Dreirosenbrücke,2024-01-01,1825
1,350,350 Dreirosenbrücke,2024-01-02,2166
2,350,350 Dreirosenbrücke,2024-01-03,3379



Checking missing days in both datasets:
MeteoSwiss: {2024: 0}
Velo/Fuss: {2024: 0}

Possible duplicates in daily Velo/Fuss (SiteCode, SiteName, date): 0

Merging Velo/Fuss and Meteo datasets...
Merged dataset shape: (14142, 9)

Merged dataset (head):


Unnamed: 0,SiteCode,SiteName,date,daily_total,temp_mean_C,precip_mm,wind_mean_ms,weekday,is_weekend
0,350,350 Dreirosenbrücke,2024-01-01,1825,6.3,1.7,2.820833,Monday,False
1,350,350 Dreirosenbrücke,2024-01-02,2166,8.6625,8.4,5.141667,Tuesday,False
2,350,350 Dreirosenbrücke,2024-01-03,3379,11.2375,3.9,6.758333,Wednesday,False



Missing values per column in merged dataset:
SiteCode        0
SiteName        0
date            0
daily_total     0
temp_mean_C     0
precip_mm       0
wind_mean_ms    0
weekday         0
is_weekend      0
dtype: int64

Merged dataset saved as: data\merged_dataset.csv
