In [1]:
import yfinance as yf
import pandas as pd
import numpy as np
import torch
import torch.nn as nn
import torch.optim as optim
from sklearn.preprocessing import StandardScaler
from sklearn.model_selection import train_test_split
from torch.utils.data import TensorDataset, DataLoader
from sklearn.decomposition import PCA
import matplotlib.pyplot as plt
import torch
import torch.nn as nn
import torch.optim as optim
from sklearn.preprocessing import StandardScaler
from sklearn.metrics import precision_score, recall_score, f1_score, roc_auc_score, roc_curve
from torch.utils.data import TensorDataset, DataLoader

## AI in trading
In this final section, I will explore the role of artificial intelligence (AI) in developing effective trading algorithms. So far, we have focused on traditional approaches such as momentum strategies and mean-reversion, where we applied concepts from physics, including the harmonic oscillator and stochastic differential equations. While these methods are grounded in theory, they rely heavily on assumptions—about market efficiency, distribution of returns, or stationarity—that may not always hold true in real-world financial data. These assumptions can introduce significant limitations and potential sources of error.

In contrast, machine learning models can learn patterns directly from data without relying on rigid predefined assumptions. This data-driven approach allows for greater flexibility and adaptability, making AI a powerful tool for uncovering complex, non-linear relationships in financial markets that traditional models might miss.


## Data prep and feature engineering
Before training any machine learning model, careful data preparation and feature engineering are essential. In the context of trading, raw price data alone is often insufficient. Instead, we derive a variety of features that may capture underlying market dynamics. These include technical indicators such as moving averages, momentum, relative strength index (RSI), Bollinger bands, and volatility measures. The goal is to provide the model with inputs that reflect both short-term and long-term behavior of the asset. To avoid look-ahead bias, all features are computed using only past and present data available at the time of prediction. The data is then standardized to ensure that all features are on comparable scales, which helps neural networks converge more efficiently during training. Additionally, time-series data is split using walk-forward validation rather than random sampling, to better simulate real-world trading conditions and prevent data leakage.

## Bulding a classification model
I will begin by building a classification modlel deciding if the price is going to go up or down on the next day.

In [2]:

ticker = 'AAPL'
df = yf.download(ticker, start='2010-01-01', end='2023-12-31')
df = df[['Close']]

  df = yf.download(ticker, start='2010-01-01', end='2023-12-31')
[*********************100%***********************]  1 of 1 completed


After getting the data we need to engineer our features

In [3]:

# Feature engineering
df['Return'] = df['Close'].pct_change()
df['MA_5'] = df['Close'].rolling(window=5).mean()
df['Volatility_10'] = df['Close'].rolling(window=10).std()

def compute_rsi(series, period=14):
    delta = series.diff()
    gain = delta.where(delta > 0, 0).rolling(window=period).mean()
    loss = -delta.where(delta < 0, 0).rolling(window=period).mean()
    rs = gain / loss
    return 100 - (100 / (1 + rs))

df['RSI_14'] = compute_rsi(df['Close'])

Now we need to define a target to train our neural network, drop all NaN values and define our features, scaling them.

In [4]:
# Target: 1 if next day's close > today's, else 0
df['Target'] = (df['Close'].shift(-1) > df['Close']).astype(int)

# 4. Drop NaNs
df.dropna(inplace=True)

# 5. Features and scaling
features = ['Return', 'MA_5', 'Volatility_10', 'RSI_14']
X = df[features].values
y = df['Target'].values

scaler = StandardScaler()
X_scaled = scaler.fit_transform(X)


Now we need to create sequences for our LSTM neural network since it works with them.

In [5]:

def create_sequences(X, y, seq_length=10):
    xs, ys = [], []
    for i in range(len(X) - seq_length):
        x_seq = X[i:i+seq_length]
        y_seq = y[i+seq_length]
        xs.append(x_seq)
        ys.append(y_seq)
    return np.array(xs), np.array(ys)

seq_length = 10
X_seq, y_seq = create_sequences(X_scaled, y, seq_length)

Now we need a train test split and to convert what we have to tensors

In [6]:
#  Train-test split (time based)
split = int(len(X_seq) * 0.8)
X_train, X_test = X_seq[:split], X_seq[split:]
y_train, y_test = y_seq[:split], y_seq[split:]

# 8. Convert to tensors
X_train_tensor = torch.tensor(X_train, dtype=torch.float32)
y_train_tensor = torch.tensor(y_train.reshape(-1, 1), dtype=torch.float32)
X_test_tensor = torch.tensor(X_test, dtype=torch.float32)
y_test_tensor = torch.tensor(y_test.reshape(-1, 1), dtype=torch.float32)

In [7]:

# DataLoader
train_dataset = TensorDataset(X_train_tensor, y_train_tensor)
train_loader = DataLoader(train_dataset, batch_size=32, shuffle=False) 

# LSTM Model
class LSTMClassifier(nn.Module):
    def __init__(self, input_size, hidden_size=64, num_layers=2):
        super().__init__()
        self.lstm = nn.LSTM(input_size, hidden_size, num_layers, batch_first=True, dropout=0.2)
        self.fc = nn.Sequential(
            nn.Linear(hidden_size, 32),
            nn.ReLU(),
            nn.Linear(32, 1),
            nn.Sigmoid()
        )

    def forward(self, x):
        # x shape: (batch, seq_len, features)
        out, _ = self.lstm(x)
        out = out[:, -1, :]  # take the output of the last time step
        out = self.fc(out)
        return out

model = LSTMClassifier(input_size=len(features))
criterion = nn.BCELoss()
optimizer = optim.Adam(model.parameters(), lr=0.001)

# 11. Training loop
epochs = 30
for epoch in range(epochs):
    model.train()
    running_loss = 0
    correct = 0
    total = 0

    for X_batch, y_batch in train_loader:
        optimizer.zero_grad()
        outputs = model(X_batch)
        loss = criterion(outputs, y_batch)
        loss.backward()
        optimizer.step()

        running_loss += loss.item() * X_batch.size(0)
        preds = (outputs > 0.5).float()
        correct += (preds == y_batch).sum().item()
        total += y_batch.size(0)

    train_loss = running_loss / total
    train_acc = correct / total

    # Validation
    model.eval()
    with torch.no_grad():
        val_outputs = model(X_test_tensor)
        val_loss = criterion(val_outputs, y_test_tensor).item()
        val_preds = (val_outputs > 0.5).float()
        val_acc = (val_preds == y_test_tensor).float().mean().item()

    if (epoch + 1) % 5 == 0 or epoch == 0:
        print(f"Epoch {epoch+1}/{epochs} | Train Loss: {train_loss:.4f}, Train Acc: {train_acc:.4f} | Val Loss: {val_loss:.4f}, Val Acc: {val_acc:.4f}")

# 12. Final test accuracy
model.eval()
with torch.no_grad():
    test_outputs = model(X_test_tensor)
    test_preds = (test_outputs > 0.5).float()
    test_acc = (test_preds == y_test_tensor).float().mean().item()
print(f"Test Accuracy: {test_acc:.4f}")


Epoch 1/30 | Train Loss: 0.6918, Train Acc: 0.5298 | Val Loss: 0.6920, Val Acc: 0.5257
Epoch 5/30 | Train Loss: 0.6907, Train Acc: 0.5316 | Val Loss: 0.6948, Val Acc: 0.4929
Epoch 10/30 | Train Loss: 0.6889, Train Acc: 0.5395 | Val Loss: 0.6995, Val Acc: 0.4914
Epoch 15/30 | Train Loss: 0.6892, Train Acc: 0.5363 | Val Loss: 0.6935, Val Acc: 0.5386
Epoch 20/30 | Train Loss: 0.6843, Train Acc: 0.5498 | Val Loss: 0.7109, Val Acc: 0.5414
Epoch 25/30 | Train Loss: 0.6808, Train Acc: 0.5570 | Val Loss: 0.7089, Val Acc: 0.5357
Epoch 30/30 | Train Loss: 0.6767, Train Acc: 0.5502 | Val Loss: 0.7164, Val Acc: 0.5357
Test Accuracy: 0.5357


After creating the network and evaluating it we find out that the accuracy is no better than a random choice.

Despite our efforts, the AI model failed to consistently predict stock movements with meaningful accuracy. This outcome highlights the inherent difficulty of applying machine learning to financial markets. Unlike many other domains, markets are highly noisy, non-stationary, and influenced by countless external, often unpredictable, factors. Even sophisticated models like LSTMs struggle because patterns in financial data are weak, unstable, and can change over time(regime shifts). Additionally, the historical data we use may contain biases or may simply not carry enough predictive signal. Without access to high-quality, diverse data and advanced techniques like ensemble learning, feature selection, or reinforcement learning, building a reliable trading AI is extremely challenging. This failure is not just due to the model or features but reflects a deeper truth: financial prediction is one of the hardest real-world tasks in AI.


## Adding more evaluation, L2 regularization, lowering learn rate, batch learning

Now I am going to improve the model. I am going to rewrite some code for the sake of making the changes visible. **In general we would improve on the first model directly**

In [13]:
# Load data
ticker = 'AAPL'
df = yf.download(ticker, start='2010-01-01', end='2023-12-31')
# Use Close, and optionally Volume, High, Low for additional features
df = df[['Close', 'Volume', 'High', 'Low']]  # Adjust if only Close is needed


  df = yf.download(ticker, start='2010-01-01', end='2023-12-31')
[*********************100%***********************]  1 of 1 completed


The next thing we will do is probably one of the most important. We are going to select our features. **We know that it does not matter how good our model is if we are using poor data and have engineered our features badly. Trash in trash out.**

In [14]:

# Enhanced Features
df['Return'] = df['Close'].pct_change()
df['MA_5'] = df['Close'].rolling(window=5).mean()
df['Volatility_10'] = df['Close'].rolling(window=10).std()

# RSI
def compute_rsi(series, period=14):
    delta = series.diff()
    gain = delta.where(delta > 0, 0).rolling(window=period).mean()
    loss = -delta.where(delta < 0, 0).rolling(window=period).mean()
    rs = gain / loss
    return 100 - (100 / (1 + rs))

df['RSI_14'] = compute_rsi(df['Close'])

# Momentum 5-day
df['Momentum_5'] = df['Close'].diff(5)

# MACD
df['EMA_12'] = df['Close'].ewm(span=12, adjust=False).mean()
df['EMA_26'] = df['Close'].ewm(span=26, adjust=False).mean()
df['MACD'] = df['EMA_12'] - df['EMA_26']

# Bollinger Bands
df['BB_Middle'] = df['Close'].rolling(window=20).mean()
df['BB_Std'] = df['Close'].rolling(window=20).std()
df['BB_Upper'] = df['BB_Middle'] + 2 * df['BB_Std']
df['BB_Lower'] = df['BB_Middle'] - 2 * df['BB_Std']
df['BB_Width'] = df['BB_Upper'] - df['BB_Lower']  # Width of the bands

# Normalizing because we are using DL
df['Volume_Norm'] = df['Volume'] / df['Volume'].rolling(window=20).mean()

# Lagged features
df['Lag_1'] = df['Close'].shift(1)
df['Lag_3'] = df['Close'].shift(3)
df['Lag_5'] = df['Close'].shift(5)

# ATR
def compute_atr(high, low, close, period=14):
    tr1 = high - low
    tr2 = abs(high - close.shift())
    tr3 = abs(low - close.shift())
    tr = pd.concat([tr1, tr2, tr3], axis=1).max(axis=1)
    return tr.rolling(window=period).mean()

df['ATR_14'] = compute_atr(df['High'], df['Low'], df['Close'])

now we need a target, to drop all NaN and also scale, we are repeating steps from before

In [15]:

# 3. Target: 1 if next day's close > today's, else 0
df['Target'] = (df['Close'].shift(-1) > df['Close']).astype(int)

# 4. Drop NaNs
df.dropna(inplace=True)

# 5. Features and scaling
features = ['Return', 'MA_5', 'Volatility_10', 'RSI_14', 'Momentum_5', 'MACD', 
            'BB_Width', 'Volume_Norm', 'Lag_1', 'Lag_3', 'Lag_5', 'ATR_14']
X = df[features].values
y = df['Target'].values

scaler = StandardScaler()
X_scaled = scaler.fit_transform(X)

These sequences are needed because we are working with time series.

In [None]:

# Create sequences
def create_sequences(X, y, seq_length=10):
    xs, ys = [], []
    for i in range(len(X) - seq_length):
        x_seq = X[i:i+seq_length]
        y_seq = y[i+seq_length]
        xs.append(x_seq)
        ys.append(y_seq)
    return np.array(xs), np.array(ys)

seq_length = 10
X_seq, y_seq = create_sequences(X_scaled, y, seq_length)


What we we have to do now is to split the data and then convert it into tensors.

In [7]:



 #Train-test split (time-based)
split = int(len(X_seq) * 0.8)
X_train, X_test = X_seq[:split], X_seq[split:]
y_train, y_test = y_seq[:split], y_seq[split:]

# Convert to tensors
X_train_tensor = torch.tensor(X_train, dtype=torch.float32)
y_train_tensor = torch.tensor(y_train.reshape(-1, 1), dtype=torch.float32)
X_test_tensor = torch.tensor(X_test, dtype=torch.float32)
y_test_tensor = torch.tensor(y_test.reshape(-1, 1), dtype=torch.float32)

# DataLoader. This is needed for batches
train_dataset = TensorDataset(X_train_tensor, y_train_tensor)
train_loader = DataLoader(train_dataset, batch_size=32, shuffle=False)



Now the interesting part, we will again define and train an LSTM-based binary classifier. The LSTMClassifier processes input sequences to capture temporal dependencies and predicts a probability using a sigmoid output. The ```train_and_evaluate``` function handles model training with class weighting (to address class imbalance), validation, early stopping, and evaluation using key metrics like accuracy, precision, recall, F1-score, and AUC. It also visualizes training/validation loss and plots the ROC curve to assess classification performance. The purpose is to try and make the model better while evaluating and seeing graphically what could be done better.

In [None]:
# LSTM Model
class LSTMClassifier(nn.Module):
    def __init__(self, input_size, hidden_size=64, num_layers=2):
        super().__init__()
        self.lstm = nn.LSTM(input_size, hidden_size, num_layers, batch_first=True, dropout=0.3)
        self.fc = nn.Sequential(
            nn.Linear(hidden_size, 32),
            nn.ReLU(), # After the first linear layer
            nn.Linear(32, 1),
            nn.Sigmoid() # After the last linear layer
        )

    def forward(self, x):
        out, _ = self.lstm(x)
        out = out[:, -1, :]  # Last time step
        out = self.fc(out)
        return out

# Training and evaluation function (with early stopping, class weighting)
def train_and_evaluate(model, train_loader, X_test_tensor, y_test_tensor, criterion, optimizer, epochs, patience=5):
    train_losses, val_losses = [], []
    best_val_loss = float('inf')
    epochs_no_improve = 0
    best_model_state = None

    for epoch in range(epochs):
        model.train()
        running_loss = 0
        correct = 0
        total = 0

        for X_batch, y_batch in train_loader:
            optimizer.zero_grad()
            outputs = model(X_batch)
            loss = criterion(outputs, y_batch) # loss function telling us how wrong we are
            loss.backward()
            optimizer.step()

            running_loss += loss.item() * X_batch.size(0)
            preds = (outputs > 0.5).float()
            correct += (preds == y_batch).sum().item()
            total += y_batch.size(0)

        train_loss = running_loss / total
        train_acc = correct / total
        train_losses.append(train_loss)

        # Validation
        model.eval()
        with torch.no_grad():
            val_outputs = model(X_test_tensor)
            val_loss = criterion(val_outputs, y_test_tensor).item()
            val_preds = (val_outputs > 0.5).float()
            val_probs = val_outputs
            val_acc = (val_preds == y_test_tensor).float().mean().item()
            val_precision = precision_score(y_test_tensor.numpy(), val_preds.numpy(), zero_division=0)
            val_recall = recall_score(y_test_tensor.numpy(), val_preds.numpy(), zero_division=0)
            val_f1 = f1_score(y_test_tensor.numpy(), val_preds.numpy(), zero_division=0)
            val_auc = roc_auc_score(y_test_tensor.numpy(), val_probs.numpy())
            val_losses.append(val_loss)

        if (epoch + 1) % 5 == 0 or epoch == 0:
            print(f"Epoch {epoch+1}/{epochs} | Train Loss: {train_loss:.4f}, Train Acc: {train_acc:.4f} | "
                  f"Val Loss: {val_loss:.4f}, Val Acc: {val_acc:.4f}, Precision: {val_precision:.4f}, "
                  f"Recall: {val_recall:.4f}, F1: {val_f1:.4f}, AUC: {val_auc:.4f}")

        # Early stopping
        if val_loss < best_val_loss:
            best_val_loss = val_loss
            best_model_state = model.state_dict()
            epochs_no_improve = 0
        else:
            epochs_no_improve += 1
            if epochs_no_improve >= patience:
                print(f"Early stopping at epoch {epoch+1}")
                model.load_state_dict(best_model_state)
                break

    # Final test evaluation and ROC curve
    model.eval()
    with torch.no_grad():
        test_outputs = model(X_test_tensor)
        test_preds = (test_outputs > 0.5).float()
        test_probs = test_outputs
        test_acc = (test_preds == y_test_tensor).float().mean().item()
        test_precision = precision_score(y_test_tensor.numpy(), test_preds.numpy(), zero_division=0)
        test_recall = recall_score(y_test_tensor.numpy(), test_preds.numpy(), zero_division=0)
        test_f1 = f1_score(y_test_tensor.numpy(), test_preds.numpy(), zero_division=0)
        test_auc = roc_auc_score(y_test_tensor.numpy(), test_probs.numpy())

        # Plot ROC curve
        fpr, tpr, _ = roc_curve(y_test_tensor.numpy(), test_probs.numpy())
        plt.figure(figsize=(8, 6))
        plt.plot(fpr, tpr, label=f'ROC Curve (AUC = {test_auc:.4f})')
        plt.plot([0, 1], [0, 1], 'k--')
        plt.xlabel('False Positive Rate')
        plt.ylabel('True Positive Rate')
        plt.title('ROC Curve')
        plt.legend(loc='lower right')
        plt.grid(True)
        plt.show()

    print(f"Test Accuracy: {test_acc:.4f}, Precision: {test_precision:.4f}, Recall: {test_recall:.4f}, "
          f"F1: {test_f1:.4f}, AUC: {test_auc:.4f}")

    # Plot training and validation loss
    plt.figure(figsize=(8, 5))
    plt.plot(train_losses, label='Train Loss')
    plt.plot(val_losses, label='Validation Loss')
    plt.title('Training and Validation Loss')
    plt.xlabel('Epoch')
    plt.ylabel('Loss')
    plt.legend()
    plt.grid(True)
    plt.show()

    return model, test_preds, train_losses, val_losses

# Initialize and train model
model = LSTMClassifier(input_size=len(features))
# Class weights for imbalance
class_weights = torch.tensor([1.0 / (y_train.mean() + 1e-6), 1.0 / (1 - y_train.mean() + 1e-6)]).mean()
criterion = nn.BCELoss(weight=class_weights)
optimizer = optim.Adam(model.parameters(), lr=0.0005, weight_decay=1e-5)
epochs = 30
patience = 5

model, test_preds, train_losses, val_losses = train_and_evaluate(model, train_loader, X_test_tensor, y_test_tensor, criterion, optimizer, epochs, patience)

Epoch 1/30 | Train Loss: 1.3882, Train Acc: 0.5298 | Val Loss: 1.3892, Val Acc: 0.5257, Precision: 0.5257, Recall: 1.0000, F1: 0.6891, AUC: 0.4781


From the results it is vissible that we were not able to create a very reliable model. Despite our efforts the AUC is below 0.5 which is worse than a random choice. 

## Applying unsupervised learning methods for financial data analysis
In this part I am going to try and implement a well-know unsupervised machine learnign technique - **Principal component anaysis** for dimensionality reduction. I will try and analyze the daily returns of some tech stocks, identifying the **main sources of variation** in the data. , PCA helps identify the main sources of variation in the data. Each principal component represents a latent factor that explains a portion of the overall market behavior, allowing us to summarize complex relationships between assets into a few interpretable components. 

For the sake of this we will:
- download data
- extract ```adj close```
- calculate the returns
- standardize the returns
- apply PCA
- plot the cumulative explained variance
- show how the principal components contribute

In [None]:

tickers = ['MSFT', 'AAPL', 'GOOGL', 'AMZN', 'META']

data = yf.download(tickers, start="2022-01-01", end="2023-01-01", auto_adjust=False)

adj_close = data['Adj Close']

returns = adj_close.pct_change().dropna()

returns_scaled = (returns - returns.mean()) / returns.std()

pca = PCA()
pca.fit(returns_scaled)

plt.figure(figsize=(8, 5))
plt.plot(np.cumsum(pca.explained_variance_ratio_), marker='o')
plt.title("Cumulative Explained Variance by PCA Components")
plt.xlabel("Number of Components")
plt.ylabel("Cumulative Variance Explained")
plt.grid(True)
plt.show()

components_df = pd.DataFrame(pca.components_, columns=returns.columns)
print("Principal Components (Eigenvectors):")
print(components_df)


What do we see here?
The cumulative variance graph shows us how much of the overall variation in the data is captured by each principal component. As we can see, the first component explains the majority of the variance - over 70% - meaning that most of the movement across these stocks can be described by this single factor. The remaining components each explain a smaller portion, with rapidly diminishing contribution.

Next, let’s look at the principal components table, which shows how each stock contributes to each component. These are essentially the directions in the data that capture the most variation, and each row is a new axis - a linear combination of the original stocks.

- In the first component, all five stocks - AAPL, AMZN, GOOGL, META, and MSFT - have similar positive weights. This suggests that this component captures the overall market or tech-sector trend, since all stocks tend to move in the same direction here.
- In the second component, META has a very high loading (about 0.89), while the others are close to zero or negative. This implies that this component is largely driven by META-specific behavior, such as volatility or unique events that affected only Meta during this time period.
- The third component is dominated by Amazon (AMZN), indicating another stock-specific factor.
- In the later components, we start seeing more contrasting behavior. For example, the fourth component shows a strong positive contribution from Apple and a strong negative one from Google, meaning it captures scenarios where those two stocks diverge in price movement

## Resources
- https://builtin.com/artificial-intelligence/ai-trading-stock-market-tech#:~:text=AI%20stock%20trading%20uses%20machine%20learning%2C%20sentiment%20analysis,efficiency%20to%20mitigate%20risks%20and%20provide%20higher%20returns.
- https://www.sciencedirect.com/science/article/pii/S2772662221000102#:~:text=The%20stock%20market%20is%20turbulent%2C%20yet%20using%20artificial,as%20predictive%20analytics%20tools%20in%20the%20stock%20market.