In [16]:
from features_labels_extract import extract_features_and_labels
import torch
from torch.utils.data import Dataset
import numpy as np
from sklearn.metrics import (accuracy_score, classification_report, confusion_matrix, 
                             hamming_loss, ConfusionMatrixDisplay, 
                             precision_score, recall_score, f1_score, roc_curve, auc)

In [25]:
import yfinance as yf
# Load Apple Stock Data (Ensure Close & Volume columns)
data = yf.download("TGT", start="2000-01-01", end="2024-01-01")[["Close", "Volume"]].dropna()

# Extract features and labels for Momentum strategy
X, y = extract_features_and_labels("TGT", data, strategy="momentum")

# Check Output Shapes
print("Feature Shape:", X.shape)  # Expected: (num_samples, num_features, num_days)
print("Label Shape:", y.shape)  # Expected: (num_samples,)


[*********************100%***********************]  1 of 1 completed


Final dataset size: 591 samples
Feature Shape: (591, 3, 30)
Label Shape: (591,)


In [28]:
np.unique(y,return_counts=True)

(array([0, 1]), array([427, 164], dtype=int64))

In [None]:
class TradingDataset(Dataset):
    def __init__(self, features, labels):
        self.features = torch.tensor(features, dtype=torch.float32)
        self.labels = torch.tensor(labels, dtype=torch.float32).view(-1, 1)

    def __len__(self):
        return len(self.features)

    def __getitem__(self, idx):
        return self.features[idx], self.labels[idx]

In [None]:
import torch.nn as nn
import torch.nn.functional as F

class CNNBlock(nn.Module):
    def __init__(self, in_channels, out_channels, kernel_size=3):
        super(CNNBlock, self).__init__()
        self.conv = nn.Conv1d(in_channels, out_channels, kernel_size=kernel_size, padding='same')
        self.bn = nn.BatchNorm1d(out_channels)
        self.pool = nn.MaxPool1d(kernel_size=2, stride=2)
        self.dropout = nn.Dropout(0.5)

    def forward(self, x):
        x = F.relu(self.bn(self.conv(x)))  # Conv -> BN -> ReLU
        x = self.pool(x)  # Max pooling
        x = self.dropout(x)  # Dropout
        return x

class TradingCNN(nn.Module):
    def __init__(self, input_channels, input_length):
        super(TradingCNN, self).__init__()

        # Feature Extraction: 3 CNN layers
        self.conv_layers = nn.Sequential(
            CNNBlock(input_channels, 64),
            CNNBlock(64, 128),
            CNNBlock(128, 256)
        )

        # Fully Connected Layers
        self.fc_layers = nn.Sequential(
            nn.Flatten(),
            nn.Linear(256 * (input_length // 8), 512),
            nn.BatchNorm1d(512),
            nn.ReLU(),
            nn.Dropout(0.5),
            nn.Linear(512, 1)  # Output: Binary classification
        )

    def forward(self, x):
        x = self.conv_layers(x)
        x = self.fc_layers(x)
        return x

In [None]:
import torch.optim as optim
from torch.utils.data import DataLoader
from ignite.engine import Engine, Events, create_supervised_trainer, create_supervised_evaluator
from ignite.metrics import Accuracy, Loss, Precision, Recall
from ignite.handlers import EarlyStopping, ModelCheckpoint, TerminateOnNan
from ignite.contrib.handlers import ProgressBar

# Load your data
X, y = extract_features_and_labels("TGT", data, strategy="momentum")

# Create datasets and dataloaders
train_dataset = TradingDataset(X, y)
train_loader = DataLoader(train_dataset, batch_size=32, shuffle=True)

# Define device
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

# Instantiate the model
input_channels = X.shape[1]
input_length = X.shape[2]
model = TradingCNN(input_channels, input_length).to(device)

# Define loss function and optimizer
criterion = nn.BCEWithLogitsLoss().to(device)
optimizer = optim.Adam(model.parameters(), lr=0.001)

# Create the trainer and evaluators
trainer = create_supervised_trainer(model, optimizer, criterion, device=device)
def thresholded_output_transform(output):
    y_pred, y = output
    y_pred = torch.round(torch.sigmoid(y_pred))  # Apply sigmoid and round to get binary output
    return y_pred, y
val_metrics = {
    "accuracy": Accuracy(output_transform=thresholded_output_transform),
    "precision": Precision(output_transform=thresholded_output_transform),
    "recall": Recall(output_transform=thresholded_output_transform),
    "loss": Loss(criterion)
}
train_evaluator = create_supervised_evaluator(model, metrics=val_metrics, device=device)
val_evaluator = create_supervised_evaluator(model, metrics=val_metrics, device=device)

# Attach progress bar to the trainer
pbar = ProgressBar(persist=True)
pbar.attach(trainer, output_transform=lambda x: {"loss": x})

# Validation after each epoch
@trainer.on(Events.EPOCH_COMPLETED)
def run_validation(engine):
    val_evaluator.run(train_loader)
    metrics = val_evaluator.state.metrics
    avg_accuracy = metrics['accuracy']
    avg_loss = metrics['loss']
    print(f"\nValidation - Epoch: {engine.state.epoch} | Avg accuracy: {avg_accuracy:.4f} | Avg loss: {avg_loss:.4f}")

# Early stopping and model checkpointing
early_stopping = EarlyStopping(patience=10, score_function=lambda engine: -engine.state.metrics["loss"], trainer=trainer)
val_evaluator.add_event_handler(Events.COMPLETED, early_stopping)
trainer.add_event_handler(Events.ITERATION_COMPLETED, TerminateOnNan())

checkpoint_handler = ModelCheckpoint(dirname='checkpoints', filename_prefix='trading_cnn', n_saved=1, create_dir=True, require_empty=False, score_function=lambda e: -e.state.metrics["loss"], score_name="val_loss", global_step_transform=lambda e, _: e.state.epoch)
val_evaluator.add_event_handler(Events.COMPLETED, checkpoint_handler, {'best_model': model})


In [None]:

# Run the training loop
trainer.run(train_loader, max_epochs=100)

In [22]:
data = yf.download("TGT", start="1990-01-01", end="2000-01-01")[["Close", "Volume"]].dropna()

# Extract features and labels for Momentum strategy
X, y = extract_features_and_labels("TGT", data, strategy="momentum")
test_dataset = TradingDataset(X, y)
test_loader = DataLoader(test_dataset, batch_size=32, shuffle=True)

[*********************100%***********************]  1 of 1 completed


Final dataset size: 240 samples


In [23]:
logits = []
labels_list = []

def evaluate_step(engine, batch):
    model.eval()
    with torch.no_grad():
        images, label = batch
        images = images.to(device)
        label = label.to(device)
        outputs = model(images)
        logits.append(outputs.cpu().numpy())
        labels_list.append(label.cpu().numpy())
    return outputs, label

evaluator = Engine(evaluate_step)
evaluator.run(test_loader)

# Concatenate results from test evaluation
logits_array = np.concatenate(logits, axis=0)
labels_array = np.concatenate(labels_list, axis=0)

# Convert logits to probabilities using sigmoid
probs = torch.sigmoid(torch.tensor(logits_array)).numpy()
preds = (probs > 0.5).astype(int)
y_true = labels_array.astype(int)
y_pred = preds.astype(int)

# Print classification report
print("\nClassification Report (Per-Class):")
print(classification_report(y_true, y_pred))


Classification Report (Per-Class):
              precision    recall  f1-score   support

           0       1.00      0.50      0.66       233
           1       0.06      1.00      0.11         7

    accuracy                           0.51       240
   macro avg       0.53      0.75      0.39       240
weighted avg       0.97      0.51      0.65       240



In [24]:
y

array([0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
       0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
       0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
       0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
       0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
       0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
       0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
       0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
       0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
       0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1, 0, 0, 0, 0, 0, 0, 1, 1, 1, 1,
       0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1, 1, 0, 0, 0, 0, 0, 0, 0, 0])

In [19]:
X.shape

(13, 3, 30)