# 📌 Bitcoin-Datenvorverarbeitung für Machine Learning

Dieses Notebook bereitet Bitcoin-Stundendaten für das Training eines Machine-Learning-Modells vor. 
Es umfasst:
- Bereinigung der Rohdaten
- Berechnung technischer Indikatoren
- Erzeugung relativer Features
- Standardisierung der Daten für das Modell

## 🔧 1. Bibliotheken importieren
Wir laden die notwendigen Bibliotheken für Datenverarbeitung und technische Analyse.


In [1]:
import pandas as pd
import numpy as np
import joblib
from sklearn.preprocessing import StandardScaler
from ta.trend import SMAIndicator, EMAIndicator, MACD
from ta.momentum import RSIIndicator
from ta.volatility import BollingerBands, AverageTrueRange

## 📥 2. Daten laden
Wir importieren die Bitcoin-Stundendaten aus einer CSV-Datei. 
Die Spalten enthalten Open-High-Low-Close-Werte (OHLC), Volumendaten und Zeitstempel.

In [2]:
df = pd.read_csv("raw_data/2023-2018_BTC-USD_Data_1h.csv", sep=",")

## 🧼 3. Datenbereinigung
In diesem Schritt entfernen wir nicht benötigte Spalten, wandeln das Datum um und interpolieren fehlende Werte.

In [3]:
def clean_data(df: pd.DataFrame) -> pd.DataFrame:
    """
    Bereinigt die Bitcoin-Stundendaten für das Machine-Learning-Modell und loggt jede Änderung.

    Schritte:
    1. Entfernt nicht benötigte Spalten (`symbol`, `unix`).
    2. Konvertiert `date` in ein datetime-Format und setzt es als Index.
    3. Überprüft und meldet fehlende Werte, bevor sie interpoliert werden.
    4. Entfernt Duplikate und gibt an, welche Zeilen entfernt wurden.
    
    :param df: Pandas DataFrame mit Bitcoin-Daten.
    :return: Bereinigter Pandas DataFrame.
    """

    # Sortiere die Daten aufsteigend
    df = df.sort_values(by='unix', ascending=True)  # Falls `unix` noch vorhanden ist

    # Entferne unnötige Spalten
    drop_cols = ['symbol', 'unix']
    df = df.drop(columns=[col for col in drop_cols if col in df.columns], errors='ignore')

    # Konvertiere `date` in datetime-Format
    df['datetime'] = pd.to_datetime(df['date'], format='%d.%m.%Y %H:%M')
    df = df.drop(columns=['date'])

    # Überprüfe auf fehlende Werte und logge sie
    missing_values = df[df.isna().any(axis=1)]
    if not missing_values.empty:
        print(f"Fehlende Werte vor Interpolation:\n{missing_values}\n")
    else:
        print("Es existieren keine fehlenden Werte.")
    
    # Fehlende Werte mit linearer Interpolation füllen
    df = df.interpolate(method='linear', limit_direction='both')

    # Entferne Duplikate basierend auf dem Index (datetime)
    duplicates = df[df.index.duplicated(keep='first')]
    if not duplicates.empty:
        print(f"Entfernte Duplikate:\n{duplicates}\n")
    else:
        print("Es existieren keine doppelten Datensätze.")

    df = df[~df.index.duplicated(keep='first')]

    return df


df_cleaned = clean_data(df)


Es existieren keine fehlenden Werte.
Es existieren keine doppelten Datensätze.


## 🔄 4. Nullwerte interpolieren
Wir ersetzen `0`-Werte in Volumendaten durch `NaN`, um eine lineare Interpolation durchführen zu können.

In [4]:
def interpolate_columns_with_zeros(df: pd.DataFrame, columns: list) -> pd.DataFrame:
    """
    Ersetzt `0`-Werte in den angegebenen Spalten durch Interpolationen.

    - Konvertiert `0`-Werte in `NaN`, um sie interpolieren zu können.
    - Nutzt lineare Interpolation, um die Lücken zu schließen.

    :param df: Pandas DataFrame mit den zu bearbeitenden Spalten.
    :param columns: Liste der Spaltennamen, die interpoliert werden sollen.
    :return: DataFrame mit interpolierten Werten in den angegebenen Spalten.
    """
    for column in columns:
        # Ersetze 0-Werte durch NaN, um sie interpolieren zu können
        df[column] = df[column].replace(0, np.nan)
        # Interpoliere die NaN-Werte (linear)
        df[column] = df[column].interpolate(method='linear', limit_direction='both')

    return df

df_interpolated = interpolate_columns_with_zeros(df_cleaned, columns=['Volume BTC', 'Volume USD'])

## 📊 5. Technische Indikatoren berechnen
Wir berechnen verschiedene Indikatoren, die für das Machine Learning nützlich sind:
- Gleitende Durchschnitte (SMA, EMA)
- MACD-Indikator
- RSI (Relative Strength Index)
- Bollinger-Bänder
- ATR (Average True Range)


In [5]:
def add_technical_indicators(df: pd.DataFrame) -> pd.DataFrame:
    """
    Fügt technische Indikatoren mithilfe der `ta`-Bibliothek zu den Bitcoin-Stundendaten hinzu.

    Berechnet:
    - 10er & 50er Simple Moving Average (SMA)
    - 10er Exponential Moving Average (EMA)
    - MACD (12, 26, 9)
    - Relative Strength Index (RSI) (14)
    - Bollinger Bands (20)
    - Average True Range (ATR) (14)
    
    :param df: Bereinigter Pandas DataFrame mit Bitcoin-Daten.
    :return: Pandas DataFrame mit technischen Indikatoren.
    """

    # Gleitende Durchschnitte
    df['SMA_10'] = SMAIndicator(close=df['close'], window=10).sma_indicator()
    df['SMA_50'] = SMAIndicator(close=df['close'], window=50).sma_indicator()
    df['EMA_10'] = EMAIndicator(close=df['close'], window=10).ema_indicator()

    # MACD
    macd = MACD(close=df['close'], window_slow=26, window_fast=12, window_sign=9)
    df['MACD'] = macd.macd()
    df['MACD_Signal'] = macd.macd_signal()

    # RSI
    df['RSI_14'] = RSIIndicator(close=df['close'], window=14).rsi()

    # Bollinger Bands
    bollinger = BollingerBands(close=df['close'], window=20, window_dev=2)
    df['Bollinger_High'] = bollinger.bollinger_hband()
    df['Bollinger_Low'] = bollinger.bollinger_lband()

    # Average True Range (ATR)
    df['ATR_14'] = AverageTrueRange(high=df['high'], low=df['low'], close=df['close'], window=14).average_true_range()

    return df

df_with_indicators = add_technical_indicators(df_interpolated)

## 📈 6. Relative Features berechnen
Wir berechnen relative Preisbewegungen, Trend- und Volatilitätsmerkmale, um aussagekräftigere Eingaben für unser Modell zu generieren.


In [6]:
def add_relative_features(df: pd.DataFrame) -> pd.DataFrame:
    """
    Berechnet relative Werte für bessere Modell-Performance.

    - Open, High, Low relativ zum Close (`open_rel`, `high_rel`, `low_rel`)
    - Preisbewegungen: Returns (`return_1h`, `return_24h`)
    - Gleitende Durchschnitte relativ zum Close (`SMA_10_rel`, `SMA_50_rel`, `EMA_10_rel`)
    - Close relativ zu SMA-Trends (`close_vs_SMA10`, `close_vs_SMA50`)
    - MACD & Signal-Linie relativ zum Close (`MACD_rel`, `MACD_Signal_rel`)
    - Bollinger-Band-Position als normierte Werte (`Bollinger_pct`)
    - Candle-Shape-Indikatoren (`body_size`, `upper_shadow`, `lower_shadow`)
    - Relative Volumenveränderung (`vol_change_1h`, `vol_usd_change_1h`)

    :param df: Pandas DataFrame mit OHLC & Indikatoren
    :return: Pandas DataFrame mit zusätzlichen relativen Features
    """

    epsilon = 1e-10  # Sicherheitspuffer gegen Division durch Null

    # Preisbewegungen (Returns)
    df['return_1h'] = df['close'].pct_change(periods=1)
    df['return_24h'] = df['close'].pct_change(periods=24)

    # Relative Open, High, Low zum Close
    df['open_rel'] = df['open'] / df['close'] - 1
    df['high_rel'] = df['high'] / df['close'] - 1
    df['low_rel'] = df['low'] / df['close'] - 1

    # Relative Moving Averages
    df['SMA_10_rel'] = df['SMA_10'] / df['close'] - 1
    df['SMA_50_rel'] = df['SMA_50'] / df['close'] - 1
    df['EMA_10_rel'] = df['EMA_10'] / df['close'] - 1

    # Close relativ zu SMA-Trends (Trendrichtung)
    df['close_vs_SMA10'] = (df['close'] - df['SMA_10']) / df['SMA_10']
    df['close_vs_SMA50'] = (df['close'] - df['SMA_50']) / df['SMA_50']

    # MACD & Signal relativ zum Close
    df['MACD_rel'] = df['MACD'] / df['close']
    df['MACD_Signal_rel'] = df['MACD_Signal'] / df['close']

    # Bollinger Bands normiert (zwischen 0 und 1)
    df['Bollinger_pct'] = (df['close'] - df['Bollinger_Low']) / (df['Bollinger_High'] - df['Bollinger_Low'] + epsilon)

    # Candle-Shape-Indikatoren
    df['body_size'] = (df['close'] - df['open']) / (df['high'] - df['low'] + epsilon)
    df['upper_shadow'] = (df['high'] - np.maximum(df['open'], df['close'])) / (df['high'] - df['low'] + epsilon)
    df['lower_shadow'] = (np.minimum(df['open'], df['close']) - df['low']) / (df['high'] - df['low'] + epsilon)

    # Relative Volumenveränderung
    df['vol_change_1h'] = df['Volume BTC'].pct_change(periods=1)
    df['vol_usd_change_1h'] = df['Volume USD'].pct_change(periods=1)

    return df

df_with_relative_features = add_relative_features(df_with_indicators)

## 🎯 7. Feature-Selektion
Nicht relevante Spalten werden entfernt und die verbleibenden Features in eine logische Reihenfolge gebracht.

In [7]:
def filter_and_order_features(df: pd.DataFrame) -> pd.DataFrame:
    """
    Entfernt überflüssige Spalten und bringt die relevanten Features in eine logische Reihenfolge.

    :param df: Pandas DataFrame mit allen Features.
    :return: Pandas DataFrame mit den wichtigsten Features in optimierter Reihenfolge.
    """
    
    # Liste der Features, die behalten werden sollen (in sinnvoller Reihenfolge)
    feature_order = [
        "datetime",
        
        # Momentum & Volatilität
        "return_1h", "return_24h", "RSI_14", "ATR_14",
        
        # Relative OHLC-Werte
        "open_rel", "high_rel", "low_rel",
        
        # Trend-Indikatoren
        "SMA_10_rel", "SMA_50_rel", "EMA_10_rel", 
        "close_vs_SMA10", "close_vs_SMA50",
        
        # Momentum-Indikatoren
        "MACD_rel", "MACD_Signal_rel",
        
        # Volatilitäts-Indikatoren
        "Bollinger_pct",
        
        # Candlestick-Formationen
        "body_size", "upper_shadow", "lower_shadow",
        
        # Volumen-Daten
        "vol_change_1h", "vol_usd_change_1h"
    ]
    
    # Entferne alle Spalten, die nicht in feature_order sind
    df_filtered = df[feature_order]

    return df_filtered

df_final = filter_and_order_features(df_with_relative_features)


## 🕵️‍♂️ 8. Überprüfung auf NaN- oder Inf-Werte
Wir speichern Zeilen mit fehlenden oder unendlichen Werten in einer Datei zur Analyse.

In [8]:
def check_nan_and_inf_rows(df: pd.DataFrame, filename="nan_inf_report.txt"):
    """
    Speichert alle Zeilen mit NaN- oder inf-Werten in eine Textdatei, um zu überprüfen,
    ob diese Werte nur am Anfang oder auch mitten in den Daten auftreten.

    - Speichert die Anzahl der betroffenen Zeilen.
    - Speichert ALLE betroffenen Zeilen (mit NaN oder inf) in eine Datei.
    
    :param df: Pandas DataFrame.
    :param filename: Name der Textdatei zum Speichern der Ergebnisse.
    """
    # Finde alle Zeilen mit NaN oder inf-Werten
    affected_rows = df[(df.isna().any(axis=1)) | (df.isin([np.inf, -np.inf]).any(axis=1))]

    with open(filename, "w", encoding="utf-8") as file:
        if affected_rows.empty:
            file.write("✅ Keine NaN- oder inf-Werte im DataFrame.\n")
        else:
            file.write(f"⚠️ {len(affected_rows)} Zeilen enthalten NaN- oder inf-Werte.\n\n")
            file.write(affected_rows.to_string())  # Speichert ALLE betroffenen Zeilen

    print(f"✅ Vollständiger NaN/inf-Report gespeichert in: {filename}")

check_nan_and_inf_rows(df_final, filename="train_nan_inf_report.txt")

✅ Vollständiger NaN/inf-Report gespeichert in: train_nan_inf_report.txt


## ✂️ 9. Entfernen der ersten 49 Zeilen
Wir löschen die ersten 49 Zeilen, da sie fehleranfällig sein könnten.

In [9]:
def drop_first_rows(df: pd.DataFrame, num_rows: int = 49) -> pd.DataFrame:
    """
    Löscht die ersten `num_rows` Zeilen eines DataFrames.

    :param df: Pandas DataFrame.
    :param num_rows: Anzahl der zu löschenden Zeilen (Standard: 49).
    :return: DataFrame ohne die ersten `num_rows` Zeilen.
    """
    return df.iloc[num_rows:].reset_index(drop=True)

df_final = drop_first_rows(df_final)

## ⚖️ 10. Standardisierung der Features
Die Daten werden skaliert (Mittelwert = 0, Standardabweichung = 1), und der Skaler wird gespeichert.

In [10]:
def standardize_features_with_datetime(df: pd.DataFrame, scaler_path: str = "scaler.pkl") -> pd.DataFrame:
    """
    Standardisiert numerische Features mit Z-Standardisierung (Mittelwert = 0, Std = 1),
    behält aber die 'datetime'-Spalte unverändert.
    Speichert den Skaler für zukünftige Verwendung auf Test- oder Live-Daten.

    :param df: Pandas DataFrame mit den zu standardisierenden Spalten.
    :param scaler_path: Dateipfad zum Speichern des Scalers (Standard: "scaler.pkl").
    :return: DataFrame mit standardisierten Features und der originalen 'datetime'-Spalte.
    """
    
    # 1️⃣ Sicherstellen, dass 'datetime' als Spalte erhalten bleibt
    datetime_col = df[['datetime']] if 'datetime' in df.columns else None

    # 2️⃣ Versuche, alle anderen Spalten numerisch zu konvertieren
    df_numeric = df.drop(columns=['datetime'], errors='ignore').apply(pd.to_numeric, errors='coerce')

    # 3️⃣ Sicherstellen, dass nur numerische Spalten standardisiert werden
    feature_cols = df_numeric.columns.tolist()
    
    # 4️⃣ StandardScaler initialisieren und Standardisierung durchführen
    scaler = StandardScaler()
    df_numeric[feature_cols] = scaler.fit_transform(df_numeric[feature_cols])

    # 5️⃣ Skaler speichern, damit er auf Test- und Live-Daten angewendet werden kann
    joblib.dump(scaler, scaler_path)
    print(f"✅ Skalierungsparameter gespeichert unter: {scaler_path}")

    # 6️⃣ Falls 'datetime' vorhanden war, wieder hinzufügen
    if datetime_col is not None:
        df_numeric.insert(0, 'datetime', datetime_col)

    return df_numeric

df_train = standardize_features_with_datetime(df_final)
df_train.to_csv("stand_data/2023-2018_stand_data.csv", index=False)

✅ Skalierungsparameter gespeichert unter: scaler.pkl


## 🔄 10.1 Gespeicherten Skaler auf Testdaten anwenden
Damit die Testdaten mit den gleichen Skalierungsparametern transformiert werden, 
laden wir den gespeicherten Skaler und wenden ihn auf die Testdaten an.

In [11]:
def apply_saved_scaler_with_datetime(df: pd.DataFrame, scaler_path: str = "scaler.pkl") -> pd.DataFrame:
    """
    Wendet einen gespeicherten Skaler auf neue Daten an (z. B. für Test- oder Live-Daten),
    behält aber die 'datetime'-Spalte unverändert.

    :param df: Pandas DataFrame mit den zu transformierenden Spalten.
    :param scaler_path: Dateipfad des gespeicherten Scalers.
    :return: DataFrame mit transformierten Features und der originalen 'datetime'-Spalte.
    """

    # 1️⃣ Sicherstellen, dass 'datetime' als Spalte bleibt
    datetime_col = df[['datetime']] if 'datetime' in df.columns else None

    # 2️⃣ Skaler laden
    scaler = joblib.load(scaler_path)

    # 3️⃣ Nur numerische Spalten auswählen und standardisieren
    df_numeric = df.drop(columns=['datetime'], errors='ignore').apply(pd.to_numeric, errors='coerce')
    df_numeric[df_numeric.columns] = scaler.transform(df_numeric[df_numeric.columns])

    # 4️⃣ Falls 'datetime' vorhanden war, wieder hinzufügen
    if datetime_col is not None:
        df_numeric.insert(0, 'datetime', datetime_col)

    return df_numeric


## 🧪 11. Vorbereitung der Testdaten
Die letzten 49 Zeilen der Trainingsdaten werden vor die Testdaten gesetzt, um Indikatoren korrekt zu berechnen. Anschließend erfolgt die Standardisierung.

In [12]:
def prepare_test_data_with_train_file(test_df: pd.DataFrame, train_df: pd.DataFrame, scaler_path: str) -> pd.DataFrame:
    """
    Bereitet die Testdaten vor, indem die letzten 49 Zeilen des Trainingsdatensatzes vor den Testdaten hinzugefügt werden.
    Führt alle Bereinigungen und Transformationen nach dem Zusammenfügen durch.

    Schritte:
    1. Sortiert die Trainings- und Testdaten zeitlich aufsteigend.
    2. Extrahiert automatisch die letzten 49 Zeilen aus den Trainingsdaten.
    3. Fügt die Trainingszeilen vor den Testdaten hinzu.
    4. Bereinigt und transformiert die zusammengefügten Daten.
    5. Entfernt die zusätzlichen 49 Zeilen wieder.
    6. Standardisiert die Testdaten mit der gespeicherten Skala.

    :param test_df: Pandas DataFrame mit Testdaten (Rohdaten).
    :param train_df: Pandas DataFrame mit Trainingsdaten (Rohdaten).
    :param scaler_path: Pfad zum gespeicherten Skaler.
    :return: Transformierter und bereinigter Testdatensatz.
    """
    # 1️⃣ Sortiere die Trainings- und Testdaten aufsteigend nach 'unix'
    train_df = train_df.sort_values(by="unix").reset_index(drop=True)
    test_df = test_df.sort_values(by="unix").reset_index(drop=True)

    # 2️⃣ Extrahiere die letzten 49 Zeilen aus den Trainingsdaten
    train_tail = train_df.tail(49)

    # 3️⃣ Füge die Trainingszeilen vor den Testdaten hinzu
    combined_df = pd.concat([train_tail, test_df], ignore_index=True)


    # 4️⃣ Bereinige und transformiere die zusammengefügten Daten
    combined_df = clean_data(combined_df)  # Konvertiert auch `date` zu `datetime`
    combined_df = interpolate_columns_with_zeros(combined_df, columns=['Volume BTC', 'Volume USD'])
    combined_df = add_technical_indicators(combined_df)
    combined_df = add_relative_features(combined_df)
    combined_df = filter_and_order_features(combined_df)
    check_nan_and_inf_rows(combined_df, filename="test_nan_inf_report.txt")

    # 5️⃣ Entferne die ersten 49 Zeilen (ursprüngliche Trainingsdaten)
    combined_df = drop_first_rows(combined_df, num_rows=len(train_tail))

    # 6️⃣ Wende den gespeicherten Skaler auf die Testdaten an
    combined_df = apply_saved_scaler_with_datetime(combined_df, scaler_path)

    return combined_df

test_df_raw = pd.read_csv("raw_data/2025-2024_BTC-USD_Data_1h.csv")
df_test_prepared = prepare_test_data_with_train_file(test_df_raw, df, scaler_path="scaler.pkl")
df_test_prepared.to_csv("stand_data/2025-2024_stand_data.csv", index=False)

Es existieren keine fehlenden Werte.
Es existieren keine doppelten Datensätze.
✅ Vollständiger NaN/inf-Report gespeichert in: test_nan_inf_report.txt


In [13]:
def convert_us_csv_to_de_csv(input_csv, output_csv):
    # Lese die US-CSV ein (Standard-Trennzeichen: Komma)
    df = pd.read_csv(input_csv, delimiter=',')
    
    # Schreibe die CSV im deutschen Format: 
    # - Separator: Semikolon
    # - Dezimaltrennzeichen: Komma
    df.to_csv(output_csv, sep=';', index=False, decimal=',')

# Beispielaufruf:
convert_us_csv_to_de_csv('stand_data/2023-2018_stand_data.csv', 'stand_data/de-Format_2023-2018_stand_data.csv')
convert_us_csv_to_de_csv('stand_data/2025-2024_stand_data.csv', 'stand_data/de-Format_2025-2024_stand_data.csv')