In [28]:
# Imports
import math
import numpy as np
import pandas as pd
from sklearn.preprocessing import MinMaxScaler
from sklearn.metrics import mean_squared_error, mean_absolute_error
import torch
import torch.nn as nn
from torch.utils.data import Dataset, DataLoader


df = pd.read_csv('GOOGL_historical_data.csv')

In [29]:
# -----------------------------
# 1) Daten vorbereiten
# -----------------------------
# Erwartet: df mit Spalten 'Date' und 'Close'
assert 'Close' in df.columns, "DataFrame df muss die Spalte 'Close' enthalten."

close = df['Close'].values.astype(np.float32).reshape(-1, 1)

# Parameter
look_back = 10         # Fensterlänge
split_percent = 0.8    # 80/20 Split
split_idx = int(len(close) * split_percent)

# Skalierung: NUR auf Train fitten
scaler = MinMaxScaler()
close_train = scaler.fit_transform(close[:split_idx])
close_test  = scaler.transform(close[split_idx:])

def make_windows(arr, L):
    X, y = [], []
    for i in range(len(arr) - L):
        X.append(arr[i:i+L, 0])   # Sequenz der Länge L
        y.append(arr[i+L, 0])     # nächster Wert
    X = np.array(X, dtype=np.float32)  # [N, L]
    y = np.array(y, dtype=np.float32).reshape(-1, 1)  # [N, 1]
    return X, y

X_train, y_train = make_windows(close_train, look_back)
X_test,  y_test  = make_windows(close_test,  look_back)

In [30]:
# -----------------------------
# 2) Torch Datasets / Loader
# -----------------------------
class WindowDataset(Dataset):
    def __init__(self, X, y):
        self.X = torch.from_numpy(X)
        self.y = torch.from_numpy(y)
    def __len__(self):
        return len(self.X)
    def __getitem__(self, i):
        return self.X[i], self.y[i]

train_ds = WindowDataset(X_train, y_train)
test_ds  = WindowDataset(X_test, y_test)

batch_size = 64
train_dl = DataLoader(train_ds, batch_size=batch_size, shuffle=False)  # Zeitreihen: kein Shuffle
test_dl  = DataLoader(test_ds,  batch_size=batch_size, shuffle=False)

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

In [31]:
# -----------------------------
# 3) BayesianLinear-Schicht
#    (Blundell et al., 2015)
# -----------------------------
class BayesianLinear(nn.Module):
    def __init__(self, in_features, out_features, prior_sigma=1.0):
        super().__init__()
        # Variational Posterior-Parameter (Gewichte)
        self.w_mu   = nn.Parameter(torch.zeros(out_features, in_features))
        self.w_rho  = nn.Parameter(torch.full((out_features, in_features), -3.0))  # kleine Std am Start
        # Variational Posterior-Parameter (Bias)
        self.b_mu   = nn.Parameter(torch.zeros(out_features))
        self.b_rho  = nn.Parameter(torch.full((out_features,), -3.0))
        # Prior (fix)
        self.prior_sigma = prior_sigma
        self.prior = torch.distributions.Normal(loc=0.0, scale=prior_sigma)
        # Standard-Normal für Sampling
        self.normal = torch.distributions.Normal(0, 1)

    def _softplus(self, x):
        return torch.log1p(torch.exp(x))  # numerisch stabiler als exp direkt

    def sample_weights(self):
        w_sigma = self._softplus(self.w_rho)
        b_sigma = self._softplus(self.b_rho)
        eps_w = self.normal.sample(self.w_mu.shape).to(self.w_mu.device)
        eps_b = self.normal.sample(self.b_mu.shape).to(self.b_mu.device)
        w = self.w_mu + w_sigma * eps_w
        b = self.b_mu + b_sigma * eps_b
        return w, b, w_sigma, b_sigma

    def forward(self, x):
        # Weight-Sampling via Reparametrisierung
        w, b, w_sigma, b_sigma = self.sample_weights()
        out = x @ w.t() + b  # [B, out_features]
        # KL-Divergenz q||p für Gewichte + Bias (Gaussian-Gaussian, closed form)
        # KL(N(μ,σ)||N(0,σ0)) = log(σ0/σ) + (σ^2 + μ^2)/(2σ0^2) - 0.5
        prior_var = self.prior_sigma ** 2
        w_var = w_sigma**2
        b_var = b_sigma**2
        kl_w = torch.log(self.prior_sigma / w_sigma).sum() + 0.5 * ((w_var + self.w_mu**2) / prior_var - 1).sum()
        kl_b = torch.log(self.prior_sigma / b_sigma).sum() + 0.5 * ((b_var + self.b_mu**2) / prior_var - 1).sum()
        kl = kl_w + kl_b
        return out, kl

In [32]:
# -----------------------------
# 4) BNN-Modell: MLP auf Fenstern
#    (einfach, robust, unabhängig von LSTM)
# -----------------------------
class BNN(nn.Module):
    def __init__(self, L, hidden=64, prior_sigma=1.0):
        super().__init__()
        self.fc1 = BayesianLinear(L, hidden, prior_sigma)
        self.act = nn.ReLU()
        self.fc2 = BayesianLinear(hidden, 1, prior_sigma)
        # homoskedastische Beobachtungs-Std (lernbares, positives Skalar)
        self.log_sigma_y = nn.Parameter(torch.tensor(-2.0))

    def forward(self, x):
        # x: [B, L]
        h, kl1 = self.fc1(x)
        h = self.act(h)
        mu, kl2 = self.fc2(h)   # prädiktiver Mittelwert
        kl = kl1 + kl2
        sigma_y = torch.nn.functional.softplus(self.log_sigma_y) + 1e-6
        return mu, sigma_y, kl

In [33]:
# -----------------------------
# 5) Training (ELBO)
# -----------------------------
model = BNN(L=look_back, hidden=64, prior_sigma=1.0).to(device)
optim = torch.optim.Adam(model.parameters(), lr=1e-3)

def elbo_step(x, y, N):
    # Vorwärts: stochastisch wegen Weight-Sampling
    mu, sigma_y, kl = model(x)
    # Negative Log-Likelihood unter Normal(mu, sigma_y)
    nll = 0.5*torch.log(2*math.pi*sigma_y**2) + 0.5*((y - mu)**2)/(sigma_y**2)
    nll = nll.mean()                         # Mittel über Batch
    elbo = nll + kl / N                      # KL/N (Blundell et al.)
    return elbo, nll.item(), kl.item(), sigma_y.item()

N = len(train_ds)
epochs = 80
for epoch in range(1, epochs+1):
    model.train()
    total_elbo, total_nll, total_kl = 0.0, 0.0, 0.0
    for xb, yb in train_dl:
        xb = xb.to(device)
        yb = yb.to(device)
        optim.zero_grad()
        elbo, nll_val, kl_val, _ = elbo_step(xb, yb, N)
        elbo.backward()
        optim.step()
        total_elbo += elbo.item() * len(xb)
        total_nll  += nll_val     * len(xb)
        total_kl   += kl_val      * len(xb)
    if epoch % 10 == 0 or epoch == 1:
        print(f"Epoch {epoch:3d} | ELBO: {total_elbo/N:.4f} | NLL: {total_nll/N:.4f} | KL: {total_kl/N:.4f}")

Epoch   1 | ELBO: 0.8240 | NLL: 0.3514 | KL: 1932.9593
Epoch  10 | ELBO: -0.5722 | NLL: -1.0426 | KL: 1923.7362
Epoch  20 | ELBO: -0.6189 | NLL: -1.0873 | KL: 1915.5762
Epoch  30 | ELBO: -0.7923 | NLL: -1.2590 | KL: 1908.4499
Epoch  40 | ELBO: -0.8421 | NLL: -1.3073 | KL: 1902.3488
Epoch  50 | ELBO: -1.1261 | NLL: -1.5891 | KL: 1893.8408
Epoch  60 | ELBO: -1.0625 | NLL: -1.5228 | KL: 1882.5554
Epoch  70 | ELBO: -1.1869 | NLL: -1.6434 | KL: 1867.4292
Epoch  80 | ELBO: -1.3872 | NLL: -1.8400 | KL: 1851.8745


In [36]:
# -----------------------------
# 6) MC-Inferenz (T Vorwärtsläufe)
# -----------------------------
model.eval()
with torch.no_grad():
    Xte = torch.from_numpy(X_test).to(device)
    yte = torch.from_numpy(y_test).to(device)

    T = 100  # Anzahl MC-Samples
    mu_samples = []
    sigma_obs = None

    for _ in range(T):
        mu, sigma_y, _ = model(Xte)
        mu_samples.append(mu.cpu().numpy())
        sigma_obs = sigma_y.item()  # homoskedastisch: identisch in allen Läufen

    mu_samples = np.stack(mu_samples, axis=0).squeeze(-1)   # [T, N]
    mu_mean_scaled = mu_samples.mean(axis=0, keepdims=True).T  # [N,1]
    epistemic_var = mu_samples.var(axis=0, ddof=1, keepdims=True).T  # [N,1]
    aleatoric_var = (sigma_obs**2) * np.ones_like(mu_mean_scaled)     # [N,1]
    pred_std_scaled = np.sqrt(epistemic_var + aleatoric_var)

    # Intervalle (gaussian approx)
    q025 = mu_mean_scaled - 1.96 * pred_std_scaled
    q975 = mu_mean_scaled + 1.96 * pred_std_scaled

In [37]:
from sklearn.metrics import mean_squared_error, mean_absolute_error, r2_score
# -----------------------------
# 7) Rückskalierung & Kennzahlen
# -----------------------------
y_test_inv = scaler.inverse_transform(y_test)
y_pred_inv = scaler.inverse_transform(mu_mean_scaled)
q025_inv   = scaler.inverse_transform(q025)
q975_inv   = scaler.inverse_transform(q975)

# Fehlermaße
rmse = math.sqrt(mean_squared_error(y_test_inv, y_pred_inv))
mae  = mean_absolute_error(y_test_inv, y_pred_inv)
r2   = r2_score(y_test_inv, y_pred_inv)

# Relative Fehler
mean_price = np.mean(y_test_inv)
rel_rmse = rmse / mean_price * 100
rel_mae = mae / mean_price * 100
mape = np.mean(np.abs((y_test_inv - y_pred_inv) / y_test_inv)) * 100

# Coverage 95%
coverage95 = np.mean((y_test_inv[:,0] >= q025_inv[:,0]) & (y_test_inv[:,0] <= q975_inv[:,0]))

# Ausgabe
print("\n📊 BNN (Bayes-by-Backprop) Performance:")
print(f"RMSE: {rmse:.4f}   |  Rel. RMSE: {rel_rmse:.2f}%")
print(f"MAE:  {mae:.4f}   |  Rel. MAE:  {rel_mae:.2f}%")
print(f"MAPE: {mape:.2f}%")
print(f"R²:   {r2:.4f}")
print(f"95%-Coverage: {coverage95:.3f}")



📊 BNN (Bayes-by-Backprop) Performance:
RMSE: 5.0141   |  Rel. RMSE: 3.88%
MAE:  4.0707   |  Rel. MAE:  3.15%
MAPE: 3.20%
R²:   0.9622
95%-Coverage: 1.000


In [39]:
from docx import Document
from sklearn.metrics import mean_squared_error, mean_absolute_error, r2_score

# -----------------------------
# 7) Ergebnisse in Word-Dokument speichern
# -----------------------------

# Rücktransformation der Vorhersagen & Targets
y_test_inv = scaler.inverse_transform(y_test)
mu_mean_inv = scaler.inverse_transform(mu_mean_scaled)

# Hilfsfunktion für Metriken
def metrics(y_true, y_pred):
    mse = mean_squared_error(y_true, y_pred)
    rmse = np.sqrt(mse)
    mae = mean_absolute_error(y_true, y_pred)
    mape = np.mean(np.abs((y_true - y_pred) / y_true)) * 100
    r2 = r2_score(y_true, y_pred)
    rel_rmse = rmse / np.mean(y_true) * 100
    rel_mae  = mae / np.mean(y_true) * 100
    return mse, rmse, rel_rmse, mae, rel_mae, mape, r2

# Datumsspalte auf Testsplit ausrichten
date_test = pd.to_datetime(df['Date'].values[split_idx+look_back:])

# === Zeiträume definieren ===
periods = {
    "Gesamter Testsplit": ("2020-01-01", date_test.max()),
    "Finanzkrise 2008": ("2008-09-01", "2009-06-30"),
    "Corona-Krise": ("2020-02-01", "2020-12-31"),
    "Russland-Ukraine Krieg": ("2022-02-24", "2022-06-30"),
    "Stabiler Aufwärtstrend": ("2017-01-01", "2017-12-31")
}

# Neues Dokument
doc = Document()
doc.add_heading("Bayesian Neural Network Performance Ergebnisse", level=1)

# === Ergebnisse berechnen & ins Dokument schreiben ===
for name, (start, end) in periods.items():
    mask = (date_test >= pd.to_datetime(start)) & (date_test <= pd.to_datetime(end))
    if mask.sum() == 0:
        continue
    y_true = y_test_inv[mask]
    y_pred = mu_mean_inv[mask]
    mse, rmse, rel_rmse, mae, rel_mae, mape, r2 = metrics(y_true, y_pred)

    doc.add_heading(f"{name}", level=2)
    doc.add_paragraph(f"MSE: {mse:.4f}")
    doc.add_paragraph(f"RMSE: {rmse:.4f}  |  Rel. RMSE: {rel_rmse:.2f}%")
    doc.add_paragraph(f"MAE: {mae:.4f}   |  Rel. MAE: {rel_mae:.2f}%")
    doc.add_paragraph(f"MAPE: {mape:.2f}%")
    doc.add_paragraph(f"R²: {r2:.4f}")

# Dokument speichern
output_path = "BNN_Performance3.docx"
doc.save(output_path)
print(f"\n✅ Ergebnisse wurden gespeichert in: {output_path}")



✅ Ergebnisse wurden gespeichert in: BNN_Performance3.docx


In [26]:
from docx import Document
from sklearn.metrics import mean_squared_error, mean_absolute_error, r2_score

# -----------------------------
# 7) Ergebnisse in Word-Dokument speichern (GANZER Datensatz)
# -----------------------------

# Rücktransformation des gesamten Datensatzes
close_inv = scaler.inverse_transform(close)

# Alle Fenster + Targets für den GANZEN Datensatz erzeugen
X_all, y_all = make_windows(scaler.transform(close), look_back)

# In Torch-Format
X_all_torch = torch.from_numpy(X_all).to(device)

# MC-Inferenz auf gesamten Datensatz
model.eval()
with torch.no_grad():
    T = 100  # Anzahl MC-Samples
    mu_samples = []
    sigma_obs = None

    for _ in range(T):
        mu, sigma_y, _ = model(X_all_torch)
        mu_samples.append(mu.cpu().numpy())
        sigma_obs = sigma_y.item()

    mu_samples = np.stack(mu_samples, axis=0).squeeze(-1)  # [T, N]
    mu_mean_scaled = mu_samples.mean(axis=0, keepdims=True).T  # [N,1]
    epistemic_var = mu_samples.var(axis=0, ddof=1, keepdims=True).T
    aleatoric_var = (sigma_obs**2) * np.ones_like(mu_mean_scaled)
    pred_std_scaled = np.sqrt(epistemic_var + aleatoric_var)

# Rücktransformation in echte Skala
y_all_inv = scaler.inverse_transform(y_all)
mu_mean_inv = scaler.inverse_transform(mu_mean_scaled)

# Datumsspalte für GANZEN Datensatz
date_all = pd.to_datetime(df['Date'].values[look_back:])

# Hilfsfunktion für Metriken
def metrics(y_true, y_pred):
    mse = mean_squared_error(y_true, y_pred)
    rmse = np.sqrt(mse)
    mae = mean_absolute_error(y_true, y_pred)
    mape = np.mean(np.abs((y_true - y_pred) / y_true)) * 100
    r2 = r2_score(y_true, y_pred)
    rel_rmse = rmse / np.mean(y_true) * 100
    rel_mae  = mae / np.mean(y_true) * 100
    return mse, rmse, rel_rmse, mae, rel_mae, mape, r2

# === Zeiträume definieren ===
periods = {
    "Gesamter Zeitraum": ("2020-01-01", date_all.max()),
    "Finanzkrise 2008": ("2008-09-01", "2009-06-30"),
    "Corona-Krise": ("2020-02-01", "2020-12-31"),
    "Russland-Ukraine Krieg": ("2022-02-24", "2022-06-30"),
    "Stabiler Aufwärtstrend": ("2017-01-01", "2017-12-31")
}

# Neues Dokument
doc = Document()
doc.add_heading("Bayesian Neural Network Performance Ergebnisse", level=1)

# === Ergebnisse berechnen & ins Dokument schreiben ===
for name, (start, end) in periods.items():
    mask = (date_all >= pd.to_datetime(start)) & (date_all <= pd.to_datetime(end))
    if mask.sum() == 0:
        continue
    y_true = y_all_inv[mask]
    y_pred = mu_mean_inv[mask]
    mse, rmse, rel_rmse, mae, rel_mae, mape, r2 = metrics(y_true, y_pred)

    doc.add_heading(f"{name}", level=2)
    doc.add_paragraph(f"MSE: {mse:.4f}")
    doc.add_paragraph(f"RMSE: {rmse:.4f}  |  Rel. RMSE: {rel_rmse:.2f}%")
    doc.add_paragraph(f"MAE: {mae:.4f}   |  Rel. MAE: {rel_mae:.2f}%")
    doc.add_paragraph(f"MAPE: {mape:.2f}%")
    doc.add_paragraph(f"R²: {r2:.4f}")

# Dokument speichern
output_path = "BNN_Performance_AllPeriods2.docx"
doc.save(output_path)
print(f"\n✅ Ergebnisse wurden gespeichert in: {output_path}")



✅ Ergebnisse wurden gespeichert in: BNN_Performance_AllPeriods2.docx
