In [None]:
import pandas as pd
import numpy as np
import torch
import torch.nn as nn
from sklearn.preprocessing import MinMaxScaler, StandardScaler
from sklearn.model_selection import train_test_split
from sklearn.metrics import mean_squared_error, r2_score
import matplotlib.pyplot as plt
from sklearn.metrics import mean_absolute_error
from sklearn.metrics import explained_variance_score


In [None]:
df = pd.read_csv('Combined_3year_data.csv')

df['date'] = pd.to_datetime(df['date'])
print(df.groupby('date').head())
df['combined_sentiment_lag1'] = df.groupby('ticker')['average_sentiment'].shift(1)
df['combined_volume_lag1'] = df.groupby('ticker')['comment_volume'].shift(1)
df['sentiment_missing'] = df['combined_sentiment_lag1'].isna().astype(int)

df['Target_lag1'] = df.groupby('ticker')['Target'].shift(1)
df['Target_lag2'] = df.groupby('ticker')['Target'].shift(2)
df['Target_smooth'] = df.groupby('ticker')['Target'].transform(lambda x: x.rolling(3).mean())

df.dropna(subset=['Target_lag1', 'Target_lag2', 'Target_smooth', 'combined_sentiment_lag1'], inplace=True)

features = [

    'RealizedVol_3d',
    'combined_sentiment_lag1',
    'combined_volume_lag1',
    'sentiment_missing',
    'Target_lag1',
    'Target_lag2'

]

target = 'Target_smooth'

df.fillna(method='ffill', inplace=True)

scaler_X = MinMaxScaler()
df[features] = scaler_X.fit_transform(df[features])

scaler_y = StandardScaler()
df[target] = scaler_y.fit_transform(df[[target]])

In [None]:
def create_sequences(data, seq_len, features, target):
    X, y = [], []
    for _, group in data.groupby('ticker'):
        group = group.sort_values('date')
        for i in range(len(group) - seq_len):
            seq_x = group[features].iloc[i:i+seq_len].values
            seq_y = group[target].iloc[i+seq_len]
            X.append(seq_x)
            y.append(seq_y)
    return np.array(X), np.array(y)

sequence_length = 10
X, y = create_sequences(df, sequence_length, features, target)


In [None]:
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, shuffle=False)

X_train = torch.tensor(X_train, dtype=torch.float32)
y_train = torch.tensor(y_train, dtype=torch.float32).view(-1, 1)
X_test = torch.tensor(X_test, dtype=torch.float32)
y_test = torch.tensor(y_test, dtype=torch.float32).view(-1, 1)

In [None]:
class VolatilityLSTM(nn.Module):
    def __init__(self, input_dim, hidden_dim=128, num_layers=3, output_dim=1):
        super(VolatilityLSTM, self).__init__()
        self.lstm = nn.LSTM(input_dim, hidden_dim, num_layers, batch_first=True, dropout=0.3)
        self.fc = nn.Linear(hidden_dim, output_dim)

    def forward(self, x):
        out, _ = self.lstm(x)
        out = self.fc(out[:, -1, :])
        return out

model = VolatilityLSTM(input_dim=len(features))


In [None]:
criterion = nn.MSELoss()
optimizer = torch.optim.Adam(model.parameters(), lr=0.001)

epochs = 300
for epoch in range(epochs):
    model.train()
    optimizer.zero_grad()
    output = model(X_train)
    loss = criterion(output, y_train)
    loss.backward()
    optimizer.step()
    print(f"Epoch {epoch+1}/{epochs}, Loss: {loss.item():.4f}")

In [None]:
model.eval()
with torch.no_grad():
    y_pred_scaled = model(X_test).numpy()
    y_pred = scaler_y.inverse_transform(y_pred_scaled)
    y_true = scaler_y.inverse_transform(y_test.numpy())

mse = mean_squared_error(y_true, y_pred)
r2 = r2_score(y_true, y_pred)
mape = np.mean(np.abs((y_true - y_pred) / y_true)) * 100
direction_true = np.sign(y_true[1:] - y_true[:-1])
direction_pred = np.sign(y_pred[1:] - y_pred[:-1])
directional_acc = np.mean(direction_true == direction_pred)

print(f"\n--- EVALUATION METRICS ---")
print(f"R² Score         : {r2:.4f}")
print(f"MAPE             : {mape:.2f}%")
print(f"Directional Acc. : {directional_acc:.4f}")

plt.figure(figsize=(12, 6))
plt.plot(y_true, label='True Volatility', linewidth=2)
plt.plot(y_pred, label='Predicted Volatility', linewidth=2)
plt.title("LSTM Volatility Prediction Over Time (Combined Sentiment)", fontsize=14)
plt.xlabel("Time")
plt.ylabel("Volatility")
plt.legend()
plt.grid(True)
plt.tight_layout()
plt.show()

plt.figure(figsize=(8, 6))
plt.scatter(y_true, y_pred, alpha=0.5)
plt.plot([y_true.min(), y_true.max()], [y_true.min(), y_true.max()], 'r--', linewidth=2)
plt.title("LSTM: True vs Predicted Volatility", fontsize=14)
plt.xlabel("True Volatility")
plt.ylabel("Predicted Volatility")
plt.grid(True)
plt.tight_layout()
plt.show()


In [None]:
mse = mean_squared_error(y_true, y_pred)
rmse = np.sqrt(mse)
print(f"Test RMSE        : {rmse:.4f}")
print(f"R² Score         : {r2:.4f}")
print(f"MAPE             : {mape:.2f}%")
print(f"Directional Acc. : {directional_acc:.4f}")


In [None]:
baseline_preds = model(X_test).detach().numpy()
baseline_rmse = np.sqrt(mean_squared_error(y_test,baseline_preds))

importances = {}
for i, feat in enumerate(features):
    X_test_permuted = X_test.clone()
    idx = torch.randperm(X_test.shape[0])
    X_test_permuted[:, :, i] = X_test_permuted[idx,:,i]
    y_perm = model(X_test_permuted).detach().numpy()
    rmse = np.sqrt(mean_squared_error(y_test, y_perm))
    importances[feat] = rmse - baseline_rmse

plt.figure(figsize=(10,5))
plt.bar(importances.keys(),importances.values())
plt.title("Permutation Feature Importance (RMSE Increase)")
plt.ylabel("Δ RMSE")
plt.xticks(rotation=45)
plt.tight_layout()
plt.show()


In [None]:
mae = mean_absolute_error(y_true, y_pred)
print(f"MAE              : {mae:.4f}")
baseline_mae = mean_absolute_error(y_test, baseline_preds)
mae_importances = {}

for i, feat in enumerate(features):
    X_test_permuted = X_test.clone()
    idx = torch.randperm(X_test.shape[0])
    X_test_permuted[:, :, i] = X_test_permuted[idx, :, i]
    y_perm = model(X_test_permuted).detach().numpy()
    perm_mae = mean_absolute_error(y_test, y_perm)
    mae_importances[feat] = perm_mae - baseline_mae

plt.figure(figsize=(10, 5))
plt.bar(mae_importances.keys(), mae_importances.values(), color='slateblue')
plt.title("Permutation Feature Importance (MAE Increase)")
plt.ylabel("MAE")
plt.xticks(rotation=45)
plt.tight_layout()
plt.show()


In [None]:
def smape(y_true, y_pred):
    return 100 * np.mean(2 * np.abs(y_pred - y_true) / (np.abs(y_pred) + np.abs(y_true)))

smape_score = smape(y_true, y_pred)
print(f"SMAPE (%)        : {smape_score:.2f}%")

explained_var = explained_variance_score(y_true, y_pred)
print(f"Explained Var.   : {explained_var:.4f}")