# Strommarktdaten Deutschland - SMARD.de Analyse

Dieses Notebook:


# Trading-Bot Backtest

In dieser Zelle wird ein einfacher regelbasierter Trading-Bot implementiert und auf der Datei `strompreise_deutschland_20251023_121022.csv` backgetestet.

- Startkapital: 10.000 EUR
- Strategie (Beispiel): Mean-Reversion mit Z-Score Ein-/Ausstiegen
- Einheitengröße: 1 MWh pro Trade (fest)

Die folgende Codezelle lädt die CSV, führt den Backtest aus und gibt Gewinn/Verlust sowie eine Equity-Visualisierung aus.

In [5]:
# Backtest: Lade CSV, definiere einfachen Mean-Reversion-Bot und führe Simulation durch
import pandas as pd
import numpy as np
from pathlib import Path
import plotly.express as px
import plotly.graph_objects as go
import math

# Datei suchen (relativ zum Workspace/Notebook)
filename = 'strompreise_deutschland_20251023_121022.csv'
files = list(Path('.').rglob(filename))
if not files:
    files = list(Path('..').rglob(filename))
if not files:
    raise FileNotFoundError(f"Datei {filename} nicht gefunden. Bitte stelle sicher, dass sie im Workspace vorhanden ist.")

file_path = files[0]
print(f"Lade Datei: {file_path}")

# CSV lesen
df = pd.read_csv(file_path)

# Versuche Datetime-Spalte zu finden / erzeugen
if 'datetime' in df.columns:
    df['datetime'] = pd.to_datetime(df['datetime'])
elif 'timestamp' in df.columns:
    # timestamp kann ms sein
    try:
        df['datetime'] = pd.to_datetime(df['timestamp'], unit='ms')
    except Exception:
        df['datetime'] = pd.to_datetime(df['timestamp'])
else:
    # Heuristik: erste Spalte als datetime
    first_col = df.columns[0]
    try:
        df['datetime'] = pd.to_datetime(df[first_col])
    except Exception:
        raise ValueError('Konnte keine Datumsspalte in der CSV finden. Erstelle bitte eine Spalte `datetime` oder `timestamp`.')

# Werte-Spalte finden
value_col = None
for candidate in ['value', 'price', 'preis', 'wert']:
    if candidate in df.columns:
        value_col = candidate
        break
# Fallback: wenn nur 2 Spalten und eine ist datetime
if value_col is None:
    other_cols = [c for c in df.columns if c != 'datetime']
    if len(other_cols) >= 1:
        value_col = other_cols[0]
    else:
        raise ValueError('Konnte keine Werte-Spalte in der CSV finden.')

# Bereinigen
df = df[['datetime', value_col]].rename(columns={value_col: 'price'})
df['price'] = pd.to_numeric(df['price'], errors='coerce')
df = df.dropna(subset=['price']).sort_values('datetime').reset_index(drop=True)

print(f"Eingelesene Daten: {len(df)} Zeilen von {df['datetime'].min()} bis {df['datetime'].max()}")

# Backtest-Parameter
START_CAPITAL = 10000.0
UNIT_SIZE = 1.0  # MWh per trade
ROLL_WINDOW = 96  # 1 Tag (24h * 4 Viertelstunden)
Z_ENTRY = -1.0
Z_EXIT = 1.0

# Berechne Rolling-Statistiken
df['rolling_mean'] = df['price'].rolling(window=ROLL_WINDOW, min_periods=20).mean()
df['rolling_std'] = df['price'].rolling(window=ROLL_WINDOW, min_periods=20).std()
df['zscore'] = (df['price'] - df['rolling_mean']) / df['rolling_std']

# Simulation
cash = START_CAPITAL
position = 0.0
entry_price = None
trades = []
equity_curve = []

for idx, row in df.iterrows():
    price = row['price']
    z = row['zscore']
    time = row['datetime']

    # equity mark-to-market
    equity = cash + position * price
    equity_curve.append({'datetime': time, 'equity': equity})

    # Signals (nur handeln wenn Rolling-Stats vorhanden)
    if not np.isfinite(z):
        continue

    # Entry: wenn stark unter dem Mittel (z <= Z_ENTRY) und keine Position
    if z <= Z_ENTRY and position == 0:
        # buy UNIT_SIZE
        cost = UNIT_SIZE * price
        if cash >= cost:
            cash -= cost
            position += UNIT_SIZE
            entry_price = price
            trades.append({'entry_time': time, 'entry_price': price, 'exit_time': None, 'exit_price': None, 'profit': None})
    # Exit: wenn stark über dem Mittel (z >= Z_EXIT) und Position offen
    elif z >= Z_EXIT and position > 0:
        proceeds = UNIT_SIZE * price
        cash += proceeds
        # finalize last trade
        if trades and trades[-1]['exit_time'] is None:
            trades[-1]['exit_time'] = time
            trades[-1]['exit_price'] = price
            trades[-1]['profit'] = (price - trades[-1]['entry_price']) * UNIT_SIZE
        position = 0.0
        entry_price = None

# Am Ende offene Positionen liquidieren
if position > 0:
    last_price = df['price'].iloc[-1]
    cash += position * last_price
    if trades and trades[-1]['exit_time'] is None:
        trades[-1]['exit_time'] = df['datetime'].iloc[-1]
        trades[-1]['exit_price'] = last_price
        trades[-1]['profit'] = (last_price - trades[-1]['entry_price']) * UNIT_SIZE
    position = 0.0

final_equity = cash
pnl = final_equity - START_CAPITAL
return_pct = (pnl / START_CAPITAL) * 100

trades_df = pd.DataFrame(trades)
equity_df = pd.DataFrame(equity_curve)

# Ergebnisse ausgeben
print('\nBacktest Ergebnis:')
print(f'Startkapital: {START_CAPITAL:.2f} EUR')
print(f'Endkapital: {final_equity:.2f} EUR')
print(f'Gewinn / Verlust: {pnl:.2f} EUR ({return_pct:.2f} % )')
print(f'Anzahl Trades: {len(trades_df)}')
if not trades_df.empty:
    wins = (trades_df['profit'] > 0).sum()
    losses = (trades_df['profit'] <= 0).sum()
    print(f'Gewonnene Trades: {wins}, Verlorene Trades: {losses}, Winrate: {wins / len(trades_df) * 100:.2f}%')

# Equity-Plot
fig = px.line(equity_df, x='datetime', y='equity', title='Equity Curve des Backtests')
fig.update_yaxes(title='Equity (EUR)')
fig.show()

# Trade-Details anzeigen und speichern
if not trades_df.empty:
    display(trades_df.head(20))
    trades_out = 'trades_backtest_strommarkt.csv'
    trades_df.to_csv(trades_out, index=False)
    print(f'Trades gespeichert in: {trades_out}')

# Equity CSV speichern
equity_out = 'equity_curve_backtest.csv'
equity_df.to_csv(equity_out, index=False)
print(f'Equity-Kurve gespeichert in: {equity_out}')


Lade Datei: strompreise_deutschland_20251023_121022.csv


  df['datetime'] = pd.to_datetime(df[first_col])


ValueError: Konnte keine Datumsspalte in der CSV finden. Erstelle bitte eine Spalte `datetime` oder `timestamp`.

In [6]:
# Trading-Bot (erweitert) — mehrere Strategien, Positionsgrößen und Performance-Kennzahlen
# Lädt die angegebene CSV, führt den Backtest durch und speichert Ergebnisse.

import pandas as pd
import numpy as np
from pathlib import Path
import math
import plotly.express as px
import plotly.graph_objects as go
from datetime import datetime

# --- Konfiguration ---
CSV_FILENAME = 'strompreise_deutschland_20251023_121022.csv'
START_CAPITAL = 10000.0
RISK_PER_TRADE = 0.05        # Anteil des Kapitals, der pro Trade riskiert werden soll (z.B. 0.05 = 5%)
MIN_UNITS = 1                # minimale Anzahl Einheiten zu handeln
TRANSACTION_COST_PER_UNIT = 0.0  # fixe Transaktionskosten pro Einheit (EUR)
STRATEGY = 'zscore'          # 'zscore' oder 'sma'

# Z-Score (Mean Reversion) Parameter
ROLL_WINDOW = 96    # Rolling window in periods (96 ~ 1 Tag bei 15min-Auflösung)
Z_ENTRY = -1.0
Z_EXIT = 0.0

# SMA Momentum Parameter (alternativ)
SMA_SHORT = 12
SMA_LONG = 48

# --- Lade CSV ---
files = list(Path('.').rglob(CSV_FILENAME))
if not files:
    files = list(Path('..').rglob(CSV_FILENAME))
if not files:
    raise FileNotFoundError(f"Datei {CSV_FILENAME} nicht gefunden. Bitte stelle sicher, dass sie im Workspace vorhanden ist.")
file_path = files[0]
print(f"Lade Datei: {file_path}")

# CSV mit Semikolon-Trennzeichen lesen
df = pd.read_csv(file_path, sep=';')

# Datetime-Spalte ist bereits vorhanden und korrekt benannt
if 'datetime' not in df.columns:
    raise ValueError('Die CSV muss eine Spalte namens "datetime" enthalten.')

# Konvertiere datetime zu Pandas datetime
df['datetime'] = pd.to_datetime(df['datetime'])

# Preis-Spalte (value) ist bereits vorhanden
if 'value' not in df.columns:
    raise ValueError('Die CSV muss eine Spalte namens "value" enthalten.')

df = df[['datetime', 'value']].rename(columns={'value': 'price'})
df['price'] = pd.to_numeric(df['price'], errors='coerce')
df = df.dropna(subset=['price']).sort_values('datetime').reset_index(drop=True)

print(f"Eingelesene Daten: {len(df)} Zeilen von {df['datetime'].min()} bis {df['datetime'].max()}")

# Indikatoren berechnen
if STRATEGY == 'zscore':
    df['rolling_mean'] = df['price'].rolling(window=ROLL_WINDOW, min_periods=20).mean()
    df['rolling_std'] = df['price'].rolling(window=ROLL_WINDOW, min_periods=20).std()
    df['zscore'] = (df['price'] - df['rolling_mean']) / df['rolling_std']
elif STRATEGY == 'sma':
    df['sma_short'] = df['price'].rolling(SMA_SHORT, min_periods=1).mean()
    df['sma_long'] = df['price'].rolling(SMA_LONG, min_periods=1).mean()
else:
    raise ValueError('Unbekannte Strategie')

# Backtest-Simulation
cash = START_CAPITAL
position = 0.0
entry_price = None
trades = []
equity_curve = []

# Hilfsfunktion zur Bestimmung der Einheiten basierend auf Risiko
def units_from_risk(equity, price, risk_per_trade, transaction_cost_per_unit=0.0):
    # Vereinfacht: wir erlauben, dass die gesamte 'risk_per_trade' des Equity in Position investiert wird
    notional = equity * risk_per_trade
    units = math.floor(notional / (price + transaction_cost_per_unit))
    return max(int(units), 0)

for idx, row in df.iterrows():
    price = row['price']
    t = row['datetime']

    # mark-to-market equity
    equity = cash + position * price
    equity_curve.append({'datetime': t, 'equity': equity})

    # Signale
    if STRATEGY == 'zscore':
        z = row.get('zscore', np.nan)
        if not np.isfinite(z):
            continue
        # Entry long when z <= Z_ENTRY and no position
        if z <= Z_ENTRY and position == 0:
            units = units_from_risk(equity, price, RISK_PER_TRADE, TRANSACTION_COST_PER_UNIT)
            if units < MIN_UNITS:
                units = MIN_UNITS
            cost = units * price + units * TRANSACTION_COST_PER_UNIT
            if cash >= cost and units > 0:
                cash -= cost
                position += units
                trades.append({'entry_time': t, 'entry_price': price, 'units': units, 'exit_time': None, 'exit_price': None, 'profit': None})
        # Exit when z >= Z_EXIT
        elif z >= Z_EXIT and position > 0:
            units = position
            proceeds = units * price - units * TRANSACTION_COST_PER_UNIT
            cash += proceeds
            if trades and trades[-1]['exit_time'] is None:
                trades[-1]['exit_time'] = t
                trades[-1]['exit_price'] = price
                trades[-1]['profit'] = (price - trades[-1]['entry_price']) * trades[-1]['units'] - trades[-1]['units'] * TRANSACTION_COST_PER_UNIT * 2
            position = 0
    elif STRATEGY == 'sma':
        sma_s = row.get('sma_short')
        sma_l = row.get('sma_long')
        if np.isnan(sma_s) or np.isnan(sma_l):
            continue
        # Crossover logic
        prev = df.iloc[idx-1] if idx>0 else None
        prev_s = prev['sma_short'] if prev is not None else np.nan
        prev_l = prev['sma_long'] if prev is not None else np.nan
        # Golden cross -> buy
        if prev_s <= prev_l and sma_s > sma_l and position == 0:
            units = units_from_risk(equity, price, RISK_PER_TRADE, TRANSACTION_COST_PER_UNIT)
            if units < MIN_UNITS:
                units = MIN_UNITS
            cost = units * price + units * TRANSACTION_COST_PER_UNIT
            if cash >= cost and units > 0:
                cash -= cost
                position += units
                trades.append({'entry_time': t, 'entry_price': price, 'units': units, 'exit_time': None, 'exit_price': None, 'profit': None})
        # Death cross -> sell
        if prev_s >= prev_l and sma_s < sma_l and position > 0:
            units = position
            proceeds = units * price - units * TRANSACTION_COST_PER_UNIT
            cash += proceeds
            if trades and trades[-1]['exit_time'] is None:
                trades[-1]['exit_time'] = t
                trades[-1]['exit_price'] = price
                trades[-1]['profit'] = (price - trades[-1]['entry_price']) * trades[-1]['units'] - trades[-1]['units'] * TRANSACTION_COST_PER_UNIT * 2
            position = 0

# Liquidation am Ende
if position > 0:
    last_price = df['price'].iloc[-1]
    cash += position * last_price - position * TRANSACTION_COST_PER_UNIT
    if trades and trades[-1]['exit_time'] is None:
        trades[-1]['exit_time'] = df['datetime'].iloc[-1]
        trades[-1]['exit_price'] = last_price
        trades[-1]['profit'] = (last_price - trades[-1]['entry_price']) * trades[-1]['units'] - trades[-1]['units'] * TRANSACTION_COST_PER_UNIT * 2
    position = 0

final_equity = cash
pnl = final_equity - START_CAPITAL

# Equity DataFrame
equity_df = pd.DataFrame(equity_curve).drop_duplicates(subset='datetime').reset_index(drop=True)

# Performance Kennzahlen
# Jahreslänge approximieren anhand Zeitspanne
days = (equity_df['datetime'].iloc[-1] - equity_df['datetime'].iloc[0]).total_seconds() / (3600*24)
years = max(days / 365.25, 1/365.25)

cagr = (final_equity / START_CAPITAL) ** (1/years) - 1

# Resample equity to daily for Sharpe
equity_daily = equity_df.set_index('datetime').resample('1D').last().ffill()
returns = equity_daily['equity'].pct_change().dropna()

if len(returns) > 1:
    mean_ret = returns.mean()
    std_ret = returns.std()
    # annualize daily returns
    sharpe = (mean_ret / std_ret) * math.sqrt(252) if std_ret > 0 else np.nan
else:
    sharpe = np.nan

# Max Drawdown
running_max = equity_df['equity'].cummax()
drawdown = (equity_df['equity'] - running_max) / running_max
max_dd = drawdown.min()

# Trades DataFrame
trades_df = pd.DataFrame(trades)

# Ausgabe
print('\nBacktest Ergebnis:')
print(f'Strategie: {STRATEGY}')
print(f'Startkapital: {START_CAPITAL:,.2f} EUR')
print(f'Endkapital: {final_equity:,.2f} EUR')
print(f'Gewinn/Verlust: {pnl:,.2f} EUR ({pnl/START_CAPITAL*100:,.2f} %)')
print(f'CAGR: {cagr*100:.2f} %')
print(f'Sharpe (daily returns): {sharpe:.2f}')
print(f'Max Drawdown: {max_dd*100:.2f} %')
print(f'Anzahl Trades: {len(trades_df)}')

# Plots: Equity und Drawdown
fig = go.Figure()
fig.add_trace(go.Scatter(x=equity_df['datetime'], y=equity_df['equity'], mode='lines', name='Equity'))
fig.update_layout(title='Equity Curve', yaxis_title='Equity (EUR)', xaxis_title='Datum')
fig.show()

fig2 = go.Figure()
fig2.add_trace(go.Scatter(x=equity_df['datetime'], y=drawdown, mode='lines', name='Drawdown'))
fig2.update_layout(title='Drawdown', yaxis_title='Drawdown', xaxis_title='Datum')
fig2.show()

# Speichern
trades_out = 'trades_backtest_strommarkt.csv'
equity_out = 'equity_curve_backtest.csv'
trades_df.to_csv(trades_out, index=False)
equity_df.to_csv(equity_out, index=False)
print(f'Trades gespeichert in: {trades_out}')
print(f'Equity-Kurve gespeichert in: {equity_out}')

# Kurze Vorschau der Trades
if not trades_df.empty:
    display(trades_df.head(20))
else:
    print('Keine Trades ausgeführt.')

Lade Datei: strompreise_deutschland_20251023_121022.csv
Eingelesene Daten: 247584 Zeilen von 2018-09-30 22:00:00 bis 2025-10-23 21:45:00


ZeroDivisionError: division by zero