In [51]:
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader, TensorDataset
import numpy as np
import pandas as pd

In [52]:
class EnhancedTransformer(nn.Module):
    def __init__(self, input_size, hidden_size, num_layers, num_heads, dropout, temperature=1.0):
        super(EnhancedTransformer, self).__init__()
        self.input_proj = nn.Linear(input_size, hidden_size)
        self.pos_encoder = PositionalEncoding(hidden_size, dropout)
        encoder_layers = nn.TransformerEncoderLayer(d_model=hidden_size, nhead=num_heads, dropout=dropout)
        self.transformer_encoder = nn.TransformerEncoder(encoder_layers, num_layers=num_layers)
        self.fc1 = nn.Linear(hidden_size, hidden_size)
        self.activation = nn.GELU()
        self.fc2 = nn.Linear(hidden_size, 2)  # Output size is now 2 for binary classification
        self.temperature = temperature
        self.dropout = nn.Dropout(dropout)

        self.init_weights()

    def init_weights(self):
        def _init_weights(m):
            if isinstance(m, nn.Linear):
                nn.init.xavier_uniform_(m.weight)
                if m.bias is not None:
                    nn.init.zeros_(m.bias)
            elif isinstance(m, nn.LayerNorm):
                nn.init.ones_(m.weight)
                nn.init.zeros_(m.bias)
        
        self.apply(_init_weights)

        # Special initialization for the final layer
        nn.init.normal_(self.fc2.weight, mean=0.0, std=0.02)
        if self.fc2.bias is not None:
            nn.init.zeros_(self.fc2.bias)

    def forward(self, x):
        x = self.input_proj(x)
        x = x.unsqueeze(1)  # Add sequence dimension
        x = self.pos_encoder(x)
        x = self.transformer_encoder(x)
        x = x.squeeze(1)
        x = self.fc1(x)
        x = self.activation(x)
        x = self.dropout(x)
        x = self.fc2(x)
        return x / self.temperature  # Apply temperature scaling


In [53]:
class TemperatureSoftmax(nn.Module):
    def __init__(self, temperature=1.0):
        super(TemperatureSoftmax, self).__init__()
        self.temperature = temperature

    def forward(self, x):
        return nn.functional.softmax(x / self.temperature, dim=-1)

In [54]:
class PositionalEncoding(nn.Module):
    def __init__(self, d_model, dropout=0.1, max_len=5000):
        super(PositionalEncoding, self).__init__()
        self.dropout = nn.Dropout(p=dropout)

        pe = torch.zeros(max_len, d_model)
        position = torch.arange(0, max_len, dtype=torch.float).unsqueeze(1)
        div_term = torch.exp(torch.arange(0, d_model, 2).float() * (-np.log(10000.0) / d_model))
        pe[:, 0::2] = torch.sin(position * div_term)
        pe[:, 1::2] = torch.cos(position * div_term)
        pe = pe.unsqueeze(0).transpose(0, 1)
        self.register_buffer('pe', pe)

    def forward(self, x):
        x = x + self.pe[:x.size(0), :]
        return self.dropout(x)

In [55]:
def create_binary_X_Y(df):
    X = []
    Y = []
    for row_id in range(len(df) - 1):
        for stock_id in range(1, 51):
            stock_columns = [col for col in df.columns if col.startswith(f'Stock_{stock_id}_')]
            stock_columns.append(f'Stock_{stock_id}')
            x = []
            for column_name in stock_columns:
                x.append(df.iloc[row_id][column_name])
            
            X.append(x)
            # Convert to binary classification: 1 if price increased, 0 if decreased or stayed the same
            price_change = df.iloc[row_id + 1][f'Stock_{stock_id}'] - df.iloc[row_id][f'Stock_{stock_id}']
            Y.append(1 if price_change > 0 else 0)
    
    X = np.array(X, dtype=np.float32)
    Y = np.array(Y, dtype=np.int64) 
    return X, Y

In [56]:
# Modify the training function
def train_model(model, train_loader, val_loader, criterion, optimizer, scheduler, num_epochs, patience, temperature_softmax):
    best_val_loss = float('inf')
    epochs_without_improvement = 0

    for epoch in range(num_epochs):
        model.train()
        train_loss = 0.0
        for batch in train_loader:
            inputs, targets = batch
            optimizer.zero_grad()
            outputs = model(inputs)
            outputs = temperature_softmax(outputs)
            loss = criterion(outputs, targets)
            loss.backward()
            optimizer.step()
            train_loss += loss.item()

        model.eval()
        val_loss = 0.0
        correct = 0
        total = 0
        with torch.no_grad():
            for batch in val_loader:
                inputs, targets = batch
                outputs = model(inputs)
                outputs = temperature_softmax(outputs)
                loss = criterion(outputs, targets)
                val_loss += loss.item()
                _, predicted = torch.max(outputs, 1)
                total += targets.size(0)
                correct += (predicted == targets).sum().item()

        train_loss /= len(train_loader)
        val_loss /= len(val_loader)
        val_accuracy = 100 * correct / total
        scheduler.step(val_loss)

        print(f'Epoch {epoch+1}/{num_epochs}, Train Loss: {train_loss:.4f}, Val Loss: {val_loss:.4f}, Val Accuracy: {val_accuracy:.2f}%')

        if val_loss < best_val_loss:
            best_val_loss = val_loss
            epochs_without_improvement = 0
            torch.save(model.state_dict(), f'best_binary_transformer_{round(best_val_loss, 3)}.pth')
        else:
            epochs_without_improvement += 1
            if epochs_without_improvement >= patience:
                print(f'Early stopping after {epoch+1} epochs')
                break


In [57]:
# Main execution
df = pd.read_csv('stock_data_with_indicators.csv')
training_org_df = df[49:375]
testing_org_df = df[375:]

X_train, Y_train = create_binary_X_Y(training_org_df)
X_test, Y_test = create_binary_X_Y(testing_org_df)

# finding ratio
unique, counts = np.unique(Y_test, return_counts=True)

# Create a dictionary of the counts
class_distribution = dict(zip(unique, counts))

# Calculate the ratio
total = len(Y_test)
ratio_0 = class_distribution.get(0, 0) / total
ratio_1 = class_distribution.get(1, 0) / total

print("Class distribution:")
print(f"Class 0: {class_distribution.get(0, 0)} ({ratio_0:.2%})")
print(f"Class 1: {class_distribution.get(1, 0)} ({ratio_1:.2%})")

# Normalize the input data
mean = X_train.mean(axis=0)
std = X_train.std(axis=0)
X_train = (X_train - mean) / std
X_test = (X_test - mean) / std

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

X_train = torch.from_numpy(X_train).float().to(device)
X_val = torch.from_numpy(X_test).float().to(device)
y_train = torch.from_numpy(Y_train).long().to(device)
y_val = torch.from_numpy(Y_test).long().to(device)

train_dataset = TensorDataset(X_train, y_train)
val_dataset = TensorDataset(X_val, y_val)
train_loader = DataLoader(train_dataset, batch_size=128, shuffle=True)
val_loader = DataLoader(val_dataset, batch_size=64)

Class distribution:
Class 0: 3420 (55.16%)
Class 1: 2780 (44.84%)


In [58]:
print(X_train[0])
print(Y_train[0])

tensor([-1.6217, -1.6203, -1.6261, -1.6233, -1.6256, -1.6251, -1.0157, -1.6217,
        -1.6116, -1.6293, -0.3886, -0.3798, -1.6280])
1


In [59]:
# Initialize model
input_size = X_train.shape[1]
hidden_size = 256
num_layers = 5
num_heads = 8
dropout = 0.2
temperature = 1.0  # You can adjust this value

model = EnhancedTransformer(input_size, hidden_size, num_layers, num_heads, dropout, temperature).to(device)
temperature_softmax = TemperatureSoftmax(temperature).to(device)

# Training setup
criterion = nn.CrossEntropyLoss()  # Change to CrossEntropyLoss for 2-class output
optimizer = optim.AdamW(model.parameters(), lr=0.001, weight_decay=1e-4)
scheduler = optim.lr_scheduler.ReduceLROnPlateau(optimizer, mode='min', factor=0.5, patience=5, min_lr=1e-6)



In [60]:
# Train the model
train_model(model, train_loader, val_loader, criterion, optimizer, scheduler, num_epochs=100, patience=30, temperature_softmax=temperature_softmax)

Epoch 1/100, Train Loss: 0.6948, Val Loss: 0.6915, Val Accuracy: 55.16%
Epoch 2/100, Train Loss: 0.6932, Val Loss: 0.6894, Val Accuracy: 55.16%
Epoch 3/100, Train Loss: 0.6925, Val Loss: 0.6908, Val Accuracy: 55.16%
Epoch 4/100, Train Loss: 0.6926, Val Loss: 0.6940, Val Accuracy: 44.84%
Epoch 5/100, Train Loss: 0.6927, Val Loss: 0.6894, Val Accuracy: 55.16%
Epoch 6/100, Train Loss: 0.6925, Val Loss: 0.6917, Val Accuracy: 55.16%
Epoch 7/100, Train Loss: 0.6930, Val Loss: 0.6910, Val Accuracy: 55.16%
Epoch 8/100, Train Loss: 0.6925, Val Loss: 0.6916, Val Accuracy: 55.16%
Epoch 9/100, Train Loss: 0.6926, Val Loss: 0.6897, Val Accuracy: 55.16%
Epoch 10/100, Train Loss: 0.6925, Val Loss: 0.6898, Val Accuracy: 55.16%
Epoch 11/100, Train Loss: 0.6923, Val Loss: 0.6891, Val Accuracy: 55.16%
Epoch 12/100, Train Loss: 0.6924, Val Loss: 0.6895, Val Accuracy: 55.16%
Epoch 13/100, Train Loss: 0.6924, Val Loss: 0.6897, Val Accuracy: 55.16%


KeyboardInterrupt: 

In [None]:
# Evaluation
model.eval()
with torch.no_grad():
    test_sequence = X_val[:3]
    logits = model(test_sequence)
    probabilities = temperature_softmax(logits)
    _, predictions = torch.max(probabilities, 1)
    print(f'Probabilities: {probabilities}')
    print(f'Predictions: {predictions}')
    print(f'Actual labels: {y_val[:3]}')

Probabilities: tensor([[0.5401, 0.4599],
        [0.5401, 0.4599],
        [0.5401, 0.4599]])
Predictions: tensor([0, 0, 0])
Actual labels: tensor([0, 1, 1])
