# Energy Data Analysis

This notebook reads and caches data from multiple CSV files, organizes them into separate text files, and normalizes the data year by year based on user-defined variables. It also calculates the average annual surplus (as a percentage of annual consumption) and the average annual loss.

In [1]:
import re
from pathlib import Path
import pandas as pd
import sys

try:
    BASE = Path(__file__).resolve().parents[1]
except NameError:
    # In notebooks ist __file__ nicht definiert — benutze das aktuelle Arbeitsverzeichnis (VSCode öffnet Projekt-Root)
    BASE = Path.cwd()
RAW_DIR_DEFAULT = BASE / "data" / "raw"

print(f"Projekt Basisverzeichnis: {RAW_DIR_DEFAULT}")

CATEGORY_PATTERNS = {
    "production": ["Realisierte_Erzeugung", "Realisierte_Erzeugung_"],
    "consumption": ["Realisierter_Stromverbrauch", "Realisierter_Stromverbrauch_"],
    "prices": ["Gro_handelspreise", "Gro_handelspreise_"],
    "capacity": ["Installierte_Erzeugungsleistung"],
}

def parse_years_from_filename(name: str):
    """Versucht, Jahresbereich aus Dateinamen zu extrahieren.
    Erkanntes Format: ..._YYYYmmddHHMM_YYYYmmddHHMM... oder ..._YYYYmmdd_YYYYmmdd...
    Wenn das End-Datum genau auf 01.01 00:00 steht, wird das Endjahr exklusiv behandelt
    (z.B. 201501010000_201701010000 -> Jahre 2015,2016)."""
    m = re.search(r'(\d{8,12})[_-](\d{8,12})', name)
    if not m:
        return None
    start, end = m.group(1), m.group(2)
    sy = int(start[:4])
    ey = int(end[:4])
    # parse end components if available
    emonth = int(end[4:6]) if len(end) >= 6 else 1
    eday = int(end[6:8]) if len(end) >= 8 else 1
    ehour = int(end[8:10]) if len(end) >= 10 else 0
    emin = int(end[10:12]) if len(end) >= 12 else 0
    # if end is exactly 01-01 00:00 -> treat end year as exclusive
    if emonth == 1 and eday == 1 and ehour == 0 and emin == 0:
        years = list(range(sy, ey))
    else:
        years = list(range(sy, ey + 1))
    return sorted(set(y for y in years if y >= 0))

def parse_years_from_file(path: Path):
    """Fallback: CSV einlesen und Jahre aus 'Datum von'/'Datum bis' ermitteln."""
    try:
        df = pd.read_csv(path, sep=';', decimal=',', thousands='.', usecols=lambda c: 'Datum' in c or 'Datum von' in c or 'Datum bis' in c, low_memory=False)
    except Exception:
        # letzter Ausweg: kein Datum gefunden
        return None
    # suche nach Spalten
    date_cols = [c for c in df.columns if 'Datum' in c]
    years = set()
    for c in date_cols:
        try:
            dates = pd.to_datetime(df[c], dayfirst=True, errors='coerce')
            years.update(d.dropna().year.astype(int).tolist())
        except Exception:
            continue
    return sorted(y for y in years if y >= 0) if years else None

def detect_category(name: str):
    for cat, patterns in CATEGORY_PATTERNS.items():
        for p in patterns:
            if p in name:
                return cat
    return "other"

def main(raw_dir: Path = None):
    if not raw_dir.exists():
        print(f"Raw directory not found: {raw_dir}")
        return 1

    results = {}
    files = sorted(raw_dir.glob("*.csv"))
    if not files:
        print(f"Keine .csv Dateien in {raw_dir} gefunden.")
        return 0

    for f in files:
        cat = detect_category(f.name)
        years = parse_years_from_filename(f.name)
        if years is None:
            years = parse_years_from_file(f)  # fallback
        if years is None:
            years = []
        results.setdefault(cat, {"files": [], "years": set()})
        results[cat]["files"].append(str(f.relative_to(raw_dir.parent)))  # Pfad relativ zum Projekt root
        results[cat]["years"].update(years)

    # Drucke Übersicht
    total_files = 0
    print("Gefundene Kategorien und Jahre (Projekt-Relative Pfade):\n")
    for cat, info in sorted(results.items()):
        files_count = len(info["files"])
        total_files += files_count
        years_sorted = sorted(info["years"])
        print(f"- Kategorie: {cat}")
        print(f"  Dateien: {files_count}")
        print(f"  Jahre (anzahl {len(years_sorted)}): {years_sorted}")
        # optional: zeige ein paar Pfade
        for p in info["files"][:10]:
            print(f"    - {p}")
        if files_count > 10:
            print(f"    ... +{files_count-10} weitere Dateien")
        print("")
    print(f"Gesamtanzahl .csv Dateien untersucht: {total_files}")
    return 0

if __name__ == "__main__":
    raw_dir = RAW_DIR_DEFAULT
    (main(Path(raw_dir)))

Projekt Basisverzeichnis: g:\Energiespeicher\energy-storage-analysis\data\raw
Gefundene Kategorien und Jahre (Projekt-Relative Pfade):

- Kategorie: capacity
  Dateien: 1
  Jahre (anzahl 10): [2015, 2016, 2017, 2018, 2019, 2020, 2021, 2022, 2023, 2024]
    - raw\Installierte_Erzeugungsleistung_201501010000_202501010000_Jahr.csv

- Kategorie: consumption
  Dateien: 6
  Jahre (anzahl 10): [2015, 2016, 2017, 2018, 2019, 2020, 2021, 2022, 2023, 2024]
    - raw\Realisierter_Stromverbrauch_201501010000_201701010000_Viertelstunde.csv
    - raw\Realisierter_Stromverbrauch_201701010000_201901010000_Viertelstunde.csv
    - raw\Realisierter_Stromverbrauch_201901010000_202101010000_Viertelstunde.csv
    - raw\Realisierter_Stromverbrauch_202101010000_202301010000_Viertelstunde.csv
    - raw\Realisierter_Stromverbrauch_202301010000_202501010000_Viertelstunde.csv
    - raw\Realisierter_Stromverbrauch_202401010000_202501010000_Viertelstunde.csv

- Kategorie: prices
  Dateien: 1
  Jahre (anzahl 1): [20

## Normalize Data Year by Year

In this section, we will normalize the data year by year based on user-defined variables.

In [9]:
import pandas as pd
import numpy as np
from pathlib import Path

# --- Projektstruktur ---
try:
    BASE = Path(__file__).resolve().parents[1]
except NameError:
    BASE = Path.cwd()

RAW_DIR = BASE / "data" / "raw"
PROCESSED_DIR = BASE / "data" / "processed"
PROCESSED_DIR.mkdir(parents=True, exist_ok=True)

# --- Normierungs-Konfiguration ---
NORMALIZE_MODE = "fixed_to_one"  # Verbrauch wird auf 1 normiert
NORM_TARGET_MWH = 1.0  # Zielwert für Verbrauchsnormalisierung

CONSUMPTION_COL = "Netzlast [MWh] Originalauflösungen"
DATE_COL = "Datum von"
PRODUCTION_COLS = [
    "Biomasse [MWh] Originalauflösungen",
    "Wasserkraft [MWh] Originalauflösungen",
    "Wind Offshore [MWh] Originalauflösungen", 
    "Wind Onshore [MWh] Originalauflösungen",
    "Photovoltaik [MWh] Originalauflösungen",
    "Sonstige Erneuerbare [MWh] Originalauflösungen",
    "Kernenergie [MWh] Originalauflösungen",
    "Braunkohle [MWh] Originalauflösungen", 
    "Steinkohle [MWh] Originalauflösungen",
    "Erdgas [MWh] Originalauflösungen",
    "Pumpspeicher [MWh] Originalauflösungen",
    "Sonstige Konventionelle [MWh] Originalauflösungen"
]

def parse_german_number_series(s: pd.Series) -> pd.Series:
    s = s.fillna("").astype(str).str.strip()
    s = s.str.replace(r'\.(?=\d{3}(?:[.\s]\d{3})*(?:,|$))', '', regex=True)
    s = s.str.replace(',', '.', regex=False)
    s = s.replace('', np.nan)
    return pd.to_numeric(s, errors='coerce')

def load_data(file_pattern):
    files = sorted(RAW_DIR.glob(file_pattern))
    if not files:
        print(f"Keine Dateien gefunden für Muster: {file_pattern}")
    else:
        print(f"{len(files)} Gefundene Dateien für Muster '{file_pattern}': {[f.name for f in files]}")
    
    dfs = []
    for f in files:
        df = pd.read_csv(f, sep=";", dtype=str, low_memory=False, encoding='utf-8')
        
        # Convert dates - both columns
        for date_col in ["Datum von", "Datum bis"]:
            if date_col in df.columns:
                df[date_col] = pd.to_datetime(df[date_col], dayfirst=True, errors='coerce')
        
        if DATE_COL in df.columns:
            df["Jahr"] = df[DATE_COL].dt.year
        
        df["source_file"] = f.name
        dfs.append(df)
    
    return pd.concat(dfs, ignore_index=True)

def main():

    print("Lade Verbrauchsdaten...")
    cons_df = load_data("*Realisierter_Stromverbrauch*.csv")

    print("Lade Produktionsdaten...")
    prod_df = load_data("*Realisierte_Erzeugung*.csv")

    print("Lade Kapazitätsdaten...")
    cap_df = load_data("*Installierte_Erzeugungsleistung*.csv")

    if cons_df.empty:
        print("Keine Verbrauchsdaten gefunden.")
        return

    # Use "Datum von" instead of "Datum"
    date_col_cons = "Datum von"
    date_col_prod = "Datum von"
    date_col_cap = "Datum von"

    # Get available years from each dataset
    years = sorted(cons_df["Jahr"].dropna().unique())
    
    print(f"Verfügbare Jahre im Verbrauchsdatensatz: {years}")
    print(f"Verfügbare Jahre im Produktionsdatensatz: {sorted(prod_df['Jahr'].dropna().unique()) if not prod_df.empty else 'Keine Produktionsdaten'}")
    print(f"Verfügbare Jahre im Kapazitätsdatensatz: {sorted(cap_df['Jahr'].dropna().unique()) if not cap_df.empty else 'Keine Kapazitätsdaten'}")

    for year in years:
        cons_y = cons_df[cons_df["Jahr"] == year].copy()
        if cons_y.empty:
            continue
        
        # Only use the Netzlast column for consumption
        cons_y[CONSUMPTION_COL] = parse_german_number_series(cons_y[CONSUMPTION_COL])
        
        # Calculate annual consumption (divide by 4 for quarter hours)
        cons_annual_mwh = cons_y[CONSUMPTION_COL].sum() / 4.0
        print(f"\n--- Jahr {year} ---")
        print(f"Jahresverbrauch = {cons_annual_mwh:.2f} MWh")

        # Normalize consumption to fraction of annual
        cons_out = cons_y.copy()
        cons_out[CONSUMPTION_COL + " [norm]"] = cons_y[CONSUMPTION_COL] / (cons_annual_mwh * 4.0)  
        
        # Save normalized consumption
        cons_out.to_csv(PROCESSED_DIR / f"consumption_normalized_{year}.csv", 
                       sep=";", decimal=",", index=False)

        if not prod_df.empty:
            # Filter production data for year
            prod_y = prod_df[prod_df[DATE_COL].dt.year == year].copy()
            
            # Convert all production columns
            for col in PRODUCTION_COLS:
                if col in prod_y.columns:
                    prod_y[col] = parse_german_number_series(prod_y[col])
                    # Normalize to fraction of annual consumption
                    if cons_annual_mwh > 0:
                        prod_y[f"{col} [norm]"] = prod_y[col] / (cons_annual_mwh * 4.0)

            # Calculate production shares
            prod_annual = {col: prod_y[col].sum() / 4.0 for col in PRODUCTION_COLS if col in prod_y.columns}
            total_prod = sum(prod_annual.values())
            
            print(f"Jahresproduktion gesamt: {total_prod:.2f} MWh")
            print("\nProduktionsanteile (% des Jahresverbrauchs):")
            for src, val in prod_annual.items():
                share = (val / cons_annual_mwh * 100.0) if cons_annual_mwh > 0 else 0
                print(f"  {src:50s}: {share:6.2f}%")

            prod_y.to_csv(PROCESSED_DIR / f"production_normalized_{year}.csv", 
                         sep=";", decimal=",", index=False)

        print(f"\nJahr {year} abgeschlossen: Verbrauch {cons_annual_mwh:.2f} MWh → normiert")

    print("\nNormierung abgeschlossen.")

if __name__ == "__main__":
    main()

Lade Verbrauchsdaten...
6 Gefundene Dateien für Muster '*Realisierter_Stromverbrauch*.csv': ['Realisierter_Stromverbrauch_201501010000_201701010000_Viertelstunde.csv', 'Realisierter_Stromverbrauch_201701010000_201901010000_Viertelstunde.csv', 'Realisierter_Stromverbrauch_201901010000_202101010000_Viertelstunde.csv', 'Realisierter_Stromverbrauch_202101010000_202301010000_Viertelstunde.csv', 'Realisierter_Stromverbrauch_202301010000_202501010000_Viertelstunde.csv', 'Realisierter_Stromverbrauch_202401010000_202501010000_Viertelstunde.csv']
Lade Produktionsdaten...
6 Gefundene Dateien für Muster '*Realisierte_Erzeugung*.csv': ['Realisierte_Erzeugung_201501010000_201701010000_Viertelstunde.csv', 'Realisierte_Erzeugung_201701010000_201901010000_Viertelstunde.csv', 'Realisierte_Erzeugung_201901010000_202101010000_Viertelstunde.csv', 'Realisierte_Erzeugung_202101010000_202301010000_Viertelstunde.csv', 'Realisierte_Erzeugung_202301010000_202501010000_Viertelstunde.csv', 'Realisierte_Erzeugung_2

## Calculate Average Annual Surplus and Loss

This section calculates the average annual surplus (as a percentage of annual consumption) and the average annual loss.

In [None]:
# Calculate average annual surplus and loss
def calculate_surplus_loss(normalized_consumption, normalized_production):
    surplus = normalized_production - normalized_consumption
    average_surplus = surplus.mean(axis=0)
    average_loss = (normalized_consumption - normalized_production).mean(axis=0)
    return average_surplus, average_loss

average_surplus, average_loss = calculate_surplus_loss(normalized_consumption, normalized_production)

# Display results
print(f"Average Annual Surplus (MWh): {average_surplus}")
print(f"Average Annual Loss (MWh): {average_loss}")
print(f"Average Annual Surplus (% of Consumption): {(average_surplus / normalized_consumption.sum(axis=1)).mean() * 100}")