# ETL Pipeline

This notebook cleans the raw data and creates an hourly aggregated `masterdata_2024.csv`.

In [7]:
import pandas as pd
import glob, os
from dotenv import load_dotenv
load_dotenv()

PATH_DATA = os.getenv('path_to_data')
PATH_MASTER = os.getenv('path_to_master')

## Weather data

In [8]:
def fix_dates(df):
    df['DATUM'] = df['MESS_DATUM'].astype(str)
    df['DATUM'] = pd.to_datetime(df['DATUM'].str.slice(0, 10), format='%Y%m%d%H')
    df.set_index('DATUM', inplace=True)
    return df

def rename_columns(df):
    renames = {}  # add column mappings if necessary
    return df.rename(columns=renames)

def remove_duplicate_columns(df):
    return df.drop(columns=['eor', 'STATIONS_ID', 'MESS_DATUM'], errors='ignore')

def clean_weather_files():
    for file in glob.glob(os.path.join(PATH_DATA, 'schnarrenberg_dwd*.csv')):
        df = pd.read_csv(file, delimiter=';')
        df = fix_dates(df)
        df = rename_columns(df)
        out = os.path.join(PATH_DATA, f'clean_{os.path.basename(file)}')
        df.to_csv(out)

def aggregate_weather():
    files = glob.glob(os.path.join(PATH_DATA, 'clean_schnarrenberg_dwd*.csv'))
    date_range = pd.date_range('2023-01-01', '2025-12-31 23:00:00', freq='h')
    df = pd.DataFrame(index=date_range)
    for file in files:
        df_new = pd.read_csv(file)
        df_new = remove_duplicate_columns(df_new)
        df_new['DATUM'] = pd.to_datetime(df_new['DATUM'])
        df_new.set_index('DATUM', inplace=True)
        df = df.merge(df_new, left_index=True, right_index=True, how='left', suffixes=('', '_y'))
    df.dropna(how='all', inplace=True)
    df.to_csv(os.path.join(PATH_DATA, 'clean_wetter_komplett.csv'))
    return df

## Measurement data

In [9]:
def clean_measurements():
    df = pd.read_csv(os.path.join(PATH_MASTER, 'messungen_2024.csv'), delimiter=';').T
    df.columns = df.iloc[0]
    df = df.iloc[1:].copy()
    df.columns = ['zeit', 'ecoli', 'entro', 'pos_neg']
    df.index.rename('DATUM', inplace=True)
    for col in ['ecoli', 'entro']:
        df[col] = df[col].str.replace('>800', '8000').str.replace('>80', '8000').str.replace('>', '').astype(float)
    df.reset_index(inplace=True)
    month_map = {'Jan': '01.', 'Febr': '02.', 'Mär': '03.', 'Apr': '04.', 'Mai': '05.', 'Jun': '06.', 'Jul': '07.', 'Aug': '08.', 'Sep': '09.', 'Okt': '10.', 'Nov': '11.', 'Dez': '12.'}
    for k,v in month_map.items():
        df['DATUM'] = df['DATUM'].str.replace(k, v)
    def fill_year(d):
        return d if d.endswith('24') else d + ' 24'
    def fix_date(d):
        parts = d.split(' ')
        return '20'+parts[-1]+'-'+parts[1][:-1]+'-'+parts[0][:-1]
    def add_time(date, time):
        hour = time.str.split(':').str[0]
        return date + ' ' + hour + ':00:00'
    df['DATUM'] = df['DATUM'].apply(fill_year).apply(fix_date)
    df['DATUM'] = add_time(df['DATUM'], df['zeit'])
    df['DATUM'] = pd.to_datetime(df['DATUM']).dt.round('h')
    df.set_index('DATUM', inplace=True)
    df.to_csv(os.path.join(PATH_MASTER, 'messungen_clean_2024.csv'))
    return df

## Additional water temperature

## Combine all data

In [10]:
def create_masterdata():
    weather = aggregate_weather()
    messungen = clean_measurements()
    df = weather.merge(messungen, left_index=True, right_index=True, how="left")
    # LUBW-Daten laden und verarbeiten
    df_lubw_raw = pd.read_csv(
        os.path.join(PATH_DATA, "messwerte_lubw.csv"),
        delimiter=";",
        parse_dates=["Datum"],
        dayfirst=True,
    )
    # Relevante Spalten extrahieren
    df_lubw = df_lubw_raw[["Messstation", "Gewässer", "Parameter", "Datum", "Tagesmittelwert"]]
    # Zahlen konvertieren (Komma → Punkt, "-" → NaN)
    df_lubw["Tagesmittelwert"] = df_lubw["Tagesmittelwert"].str.replace(",", ".")
    df_lubw["Tagesmittelwert"] = pd.to_numeric(df_lubw["Tagesmittelwert"], errors="coerce")
    # Kürzel erstellen z.B. We_Ne_Temperatur
    df_lubw["Kürzel"] = (
        df_lubw["Messstation"].str[:2].str.capitalize()
        + "_"
        + df_lubw["Gewässer"].str[:2].str.capitalize()
        + "_"
        + df_lubw["Parameter"]
            .str.replace("bei .*", "", regex=True)
            .str.replace(" ", "")
            .str.replace("ä", "ae")
            .str.replace("ö", "oe")
            .str.replace("ü", "ue")
            .str.replace("ß", "ss")
    )
    # Pivotieren: Datum als Index, Parameter als Spalten
    df_lubw_pivot = df_lubw.pivot(index="Datum", columns="Kürzel", values="Tagesmittelwert")
    # Resample auf 1h und auffüllen
    df_lubw_hourly = df_lubw_pivot.resample("1h").ffill()
    # In Masterdaten integrieren
    df = df.merge(df_lubw_hourly, left_index=True, right_index=True, how="left")
    df.index.name = "zeit"
    df.to_csv(os.path.join(PATH_MASTER, "masterdata_2024.csv"))
    return df

In [12]:
# Run the ETL pipeline
df = create_masterdata()

A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
  df_lubw["Tagesmittelwert"] = df_lubw["Tagesmittelwert"].str.replace(",", ".")
A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
  df_lubw["Tagesmittelwert"] = pd.to_numeric(df_lubw["Tagesmittelwert"], errors="coerce")
A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
  df_lubw["Kürzel"] = (


In [15]:
df[pd.notna(df["Ho_Ne_Sauerstoff"])]

Unnamed: 0_level_0,QN_2,V_TE002,V_TE005,V_TE010,V_TE020,V_TE050,V_TE100,QN_8,P,P0,...,Ho_Ne_Sauerstoff,Ho_Ne_Temperatur,"Ho_Ne_Truebung,quantitativ",Ho_Ne_pH-Wert,Pl_Fi_Temperatur,We_Ne_ElektrischeLeitfaehigkeit,We_Ne_Sauerstoff,We_Ne_Temperatur,"We_Ne_Truebung,quantitativ",We_Ne_pH-Wert
zeit,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1,Unnamed: 7_level_1,Unnamed: 8_level_1,Unnamed: 9_level_1,Unnamed: 10_level_1,Unnamed: 11_level_1,Unnamed: 12_level_1,Unnamed: 13_level_1,Unnamed: 14_level_1,Unnamed: 15_level_1,Unnamed: 16_level_1,Unnamed: 17_level_1,Unnamed: 18_level_1,Unnamed: 19_level_1,Unnamed: 20_level_1,Unnamed: 21_level_1
2024-01-01 00:00:00,3.0,-999.0,3.2,3.8,4.5,5.5,7.2,3.0,1009.9,971.6,...,12.0,7.8,4.7,,8.0,820.0,11.8,9.1,2.3,
2024-01-01 01:00:00,3.0,-999.0,2.9,3.6,4.4,5.5,7.2,3.0,1009.9,971.6,...,12.0,7.8,4.7,,8.0,820.0,11.8,9.1,2.3,
2024-01-01 02:00:00,3.0,-999.0,2.8,3.4,4.3,5.5,7.2,3.0,1010.2,971.9,...,12.0,7.8,4.7,,8.0,820.0,11.8,9.1,2.3,
2024-01-01 03:00:00,3.0,-999.0,2.7,3.4,4.2,5.5,7.1,3.0,1010.2,971.9,...,12.0,7.8,4.7,,8.0,820.0,11.8,9.1,2.3,
2024-01-01 04:00:00,3.0,-999.0,2.4,3.2,4.1,5.5,7.1,3.0,1010.7,972.3,...,12.0,7.8,4.7,,8.0,820.0,11.8,9.1,2.3,
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
2025-02-02 19:00:00,,,,,,,,1.0,1026.3,986.7,...,12.1,6.4,10.0,7.9,4.9,828.1,12.3,5.6,4.4,8.0
2025-02-02 20:00:00,,,,,,,,1.0,1026.8,987.1,...,12.1,6.4,10.0,7.9,4.9,828.1,12.3,5.6,4.4,8.0
2025-02-02 21:00:00,,,,,,,,1.0,1027.2,987.5,...,12.1,6.4,10.0,7.9,4.9,828.1,12.3,5.6,4.4,8.0
2025-02-02 22:00:00,,,,,,,,1.0,1027.2,987.4,...,12.1,6.4,10.0,7.9,4.9,828.1,12.3,5.6,4.4,8.0
