### Model for stock valuation prediction that combines multiple approaches

In [None]:
import torch
import torch.nn as nn
import torch.optim as optim
import pandas as pd
import numpy as np
from sklearn.preprocessing import StandardScaler
from sklearn.model_selection import train_test_split
import matplotlib.pyplot as plt
from torch.utils.data import DataLoader, TensorDataset

In [None]:
class StockValuationModel(nn.Module):
    def __init__(self, input_size, hidden_size=128, num_layers=2, dropout=0.3):
        super(StockValuationModel, self).__init__()
        
        self.lstm = nn.LSTM(
            input_size=input_size,
            hidden_size=hidden_size,
            num_layers=num_layers,
            batch_first=True,
            dropout=dropout,
            bidirectional=True
        )
        
        self.attention = nn.MultiheadAttention(
            embed_dim=hidden_size * 2,  # *2 for bidirectional
            num_heads=8,
            dropout=dropout,
            batch_first=True
        )
        
        self.regression_head = nn.Sequential(
            nn.Linear(hidden_size * 2, 64),
            nn.ReLU(),
            nn.Dropout(dropout),
            nn.Linear(64, 32),
            nn.ReLU(),
            nn.Dropout(dropout),
            nn.Linear(32, 3)  # 3 outputs: undervalued, fairly valued, overvalued probabilities
        )
        
        self.softmax = nn.Softmax(dim=1)
        
    def forward(self, x):
        # LSTM layer
        lstm_out, (hidden, cell) = self.lstm(x)
        
        # Attention mechanism
        attended_out, attention_weights = self.attention(
            lstm_out, lstm_out, lstm_out
        )
        
        # Use the last hidden state for classification
        last_hidden = attended_out[:, -1, :]
        
        # Regression head
        output = self.regression_head(last_hidden)
        probabilities = self.softmax(output)
        
        return probabilities, attention_weights

In [None]:
def prepare_features(df, sequence_length=60):
    """Prepare features for the model"""
    feature_columns = ['open', 'high', 'low', 'close', 'volume', 'ret_1', 'ret_5', 
                      'rng_1', 'vol_20', 'vwap_20', 'dist_vwap']
    
    # Select and normalize features
    features = df[feature_columns].values
    scaler = StandardScaler()
    features_scaled = scaler.fit_transform(features)
    
    # Create sequences
    X, y = [], []
    for i in range(sequence_length, len(df)):
        X.append(features_scaled[i-sequence_length:i])
        
        # Create labels: 0=undervalued, 1=fairly valued, 2=overvalued
        current_price = df['close'].iloc[i]
        future_5_return = df['ret_fwd_5'].iloc[i] if 'ret_fwd_5' in df.columns else 0
        triple_barrier = df['y_tb_20'].iloc[i] if 'y_tb_20' in df.columns else 0
        
        # Simple valuation heuristic based on forward returns
        if future_5_return > 0.02:  # 2% positive return expected
            label = 0  # undervalued
        elif future_5_return < -0.02:  # 2% negative return expected
            label = 2  # overvalued
        else:
            label = 1  # fairly valued
            
        y.append(label)
    
    return np.array(X), np.array(y), scaler


def train_model(model, train_loader, val_loader, num_epochs=100, learning_rate=0.001):
    """Train the model"""
    criterion = nn.CrossEntropyLoss()
    optimizer = optim.Adam(model.parameters(), lr=learning_rate, weight_decay=1e-5)
    scheduler = optim.lr_scheduler.ReduceLROnPlateau(optimizer, patience=10, factor=0.5)
    
    train_losses = []
    val_losses = []
    val_accuracies = []
    
    for epoch in range(num_epochs):
        # Training
        model.train()
        train_loss = 0
        for batch_X, batch_y in train_loader:
            optimizer.zero_grad()
            outputs, _ = model(batch_X)
            loss = criterion(outputs, batch_y)
            loss.backward()
            torch.nn.utils.clip_grad_norm_(model.parameters(), max_norm=1.0)
            optimizer.step()
            train_loss += loss.item()
        
        # Validation
        model.eval()
        val_loss = 0
        correct = 0
        total = 0
        
        with torch.no_grad():
            for batch_X, batch_y in val_loader:
                outputs, _ = model(batch_X)
                loss = criterion(outputs, batch_y)
                val_loss += loss.item()
                
                _, predicted = torch.max(outputs.data, 1)
                total += batch_y.size(0)
                correct += (predicted == batch_y).sum().item()
        
        train_losses.append(train_loss / len(train_loader))
        val_losses.append(val_loss / len(val_loader))
        val_accuracies.append(100 * correct / total)
        
        scheduler.step(val_loss)
        
        if (epoch + 1) % 20 == 0:
            print(f'Epoch [{epoch+1}/{num_epochs}], '
                  f'Train Loss: {train_losses[-1]:.4f}, '
                  f'Val Loss: {val_losses[-1]:.4f}, '
                  f'Val Acc: {val_accuracies[-1]:.2f}%')
    
    return train_losses, val_losses, val_accuracies


def evaluate_model(model, test_loader):
    """Evaluate model performance"""
    model.eval()
    all_predictions = []
    all_targets = []
    all_probabilities = []
    
    with torch.no_grad():
        for batch_X, batch_y in test_loader:
            outputs, attention_weights = model(batch_X)
            probabilities, predicted = torch.max(outputs, 1)
            
            all_predictions.extend(predicted.cpu().numpy())
            all_targets.extend(batch_y.cpu().numpy())
            all_probabilities.extend(outputs.cpu().numpy())
    
    return np.array(all_predictions), np.array(all_targets), np.array(all_probabilities)

In [None]:
# Load prepared data
try:
    df = pd.read_parquet("warehouse/sber_1d.parquet")
    print("Data loaded successfully!")
    print(f"Data shape: {df.shape}")
    print(f"Columns: {df.columns.tolist()}")
except FileNotFoundError:
    print("Data file not found. Using sample data generation...")
    # Generate sample data for demonstration
    np.random.seed(42)
    n_samples = 1000
    df = pd.DataFrame({
        'open': np.random.normal(100, 10, n_samples).cumsum() + 1000,
        'high': np.random.normal(105, 12, n_samples).cumsum() + 1000,
        'low': np.random.normal(95, 12, n_samples).cumsum() + 1000,
        'close': np.random.normal(100, 10, n_samples).cumsum() + 1000,
        'volume': np.random.lognormal(10, 1, n_samples),
        'ret_1': np.random.normal(0, 0.02, n_samples),
        'ret_5': np.random.normal(0, 0.05, n_samples),
        'rng_1': np.random.normal(0.03, 0.01, n_samples),
        'vol_20': np.random.normal(0.02, 0.005, n_samples),
        'vwap_20': np.random.normal(100, 10, n_samples).cumsum() + 1000,
        'dist_vwap': np.random.normal(0, 0.01, n_samples),
        'ret_fwd_5': np.random.normal(0, 0.05, n_samples),
        'y_tb_20': np.random.choice([-1, 0, 1], n_samples, p=[0.3, 0.4, 0.3])
    })

# Prepare features and labels
print("Preparing features...")
X, y, scaler = prepare_features(df, sequence_length=60)

print(f"X shape: {X.shape}, y shape: {y.shape}")
print(f"Class distribution: {np.bincount(y)}")

In [None]:
# Split data
X_train, X_temp, y_train, y_temp = train_test_split(
    X, y, test_size=0.3, random_state=42, shuffle=False
)
X_val, X_test, y_val, y_test = train_test_split(
    X_temp, y_temp, test_size=0.5, random_state=42, shuffle=False
)

# Convert to PyTorch tensors
X_train = torch.FloatTensor(X_train)
y_train = torch.LongTensor(y_train)
X_val = torch.FloatTensor(X_val)
y_val = torch.LongTensor(y_val)
X_test = torch.FloatTensor(X_test)
y_test = torch.LongTensor(y_test)

# Create data loaders
train_dataset = TensorDataset(X_train, y_train)
val_dataset = TensorDataset(X_val, y_val)
test_dataset = TensorDataset(X_test, y_test)

train_loader = DataLoader(train_dataset, batch_size=32, shuffle=True)
val_loader = DataLoader(val_dataset, batch_size=32, shuffle=False)
test_loader = DataLoader(test_dataset, batch_size=32, shuffle=False)

# Initialize model
input_size = X.shape[2]  # Number of features
model = StockValuationModel(input_size=input_size)

print(f"Model initialized with {sum(p.numel() for p in model.parameters()):,} parameters")

# Train model
print("Training model...")
train_losses, val_losses, val_accuracies = train_model(
    model, train_loader, val_loader, num_epochs=100
)

# Evaluate model
print("Evaluating model...")
predictions, targets, probabilities = evaluate_model(model, test_loader)

In [None]:
# Calculate accuracy metrics
from sklearn.metrics import accuracy_score, classification_report, confusion_matrix

accuracy = accuracy_score(targets, predictions)
print(f"\n=== MODEL PERFORMANCE ===")
print(f"Test Accuracy: {accuracy:.4f} ({accuracy*100:.2f}%)")
print(f"Baseline Accuracy (majority class): {max(np.bincount(targets)) / len(targets):.4f}")

print("\n=== CLASSIFICATION REPORT ===")
class_names = ['Undervalued', 'Fairly Valued', 'Overvalued']
print(classification_report(targets, predictions, target_names=class_names))

print("\n=== CONFUSION MATRIX ===")
cm = confusion_matrix(targets, predictions)
print(cm)

# Plot training history
plt.figure(figsize=(15, 5))

plt.subplot(1, 3, 1)
plt.plot(train_losses, label='Train Loss')
plt.plot(val_losses, label='Val Loss')
plt.xlabel('Epoch')
plt.ylabel('Loss')
plt.legend()
plt.title('Training History')

plt.subplot(1, 3, 2)
plt.plot(val_accuracies)
plt.xlabel('Epoch')
plt.ylabel('Accuracy (%)')
plt.title('Validation Accuracy')

plt.subplot(1, 3, 3)
# Plot actual vs predicted
plt.scatter(targets, predictions, alpha=0.6)
plt.xlabel('Actual')
plt.ylabel('Predicted')
plt.title('Actual vs Predicted')

plt.tight_layout()
plt.show()

In [None]:
# Show some predictions
print("\n=== SAMPLE PREDICTIONS ===")
sample_indices = np.random.choice(len(targets), min(10, len(targets)), replace=False)
for idx in sample_indices:
    actual = class_names[targets[idx]]
    predicted = class_names[predictions[idx]]
    confidence = np.max(probabilities[idx])
    print(f"Actual: {actual:<15} Predicted: {predicted:<15} Confidence: {confidence:.3f}")