In [16]:
import os
import math
import torch
import torch.nn as nn
import numpy as np
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader, random_split
import pandas as pd
from sklearn.model_selection import train_test_split
from sklearn.metrics import accuracy_score
import time
from sklearn.preprocessing import LabelEncoder
import torch
import torch.nn as nn
import math

In [17]:
from sklearn.discriminant_analysis import StandardScaler


class CustomDataset(Dataset):
    def __init__(self, csv_path="data/dataset_train_2024.csv"):
        # Load data from CSV
        data = pd.read_csv(csv_path)
        
        # Extract features
        self.sequences_1 = data.iloc[:, 1:129].values * 100  # Columns 1-128 (1-based indexing)
        self.sequences_2 = data.iloc[:, 129:257].values * 100  # Columns 129-256
        self.extra_feature = data.iloc[:, 257].values.reshape(-1, 1)  # Column 257

        # Combine features
        all_features = np.hstack([self.sequences_1, self.sequences_2, self.extra_feature])
        
        # Normalize features
        self.scaler = StandardScaler()
        self.normalized_features = self.scaler.fit_transform(all_features)
        self.features = torch.tensor(self.normalized_features, dtype=torch.float32)


        # Encode labels
        self.label_encoder = LabelEncoder()
        self.labels = torch.tensor(self.label_encoder.fit_transform(data.iloc[:, -1]), dtype=torch.long)

    def __len__(self):
        return len(self.features)

    def __getitem__(self, idx):
        return self.features[idx], self.labels[idx]
    
    def inverseTransform(self, array):
        return self.label_encoder.inverse_transform(array)

In [18]:
class RNNClassifier(nn.Module):
    def __init__(self, input_dim, hidden_dim, num_layers, num_classes, bidirectional=False):
        super(RNNClassifier, self).__init__()
        self.hidden_dim = hidden_dim
        self.num_layers = num_layers
        self.bidirectional = bidirectional
        self.rnn = nn.GRU(
            input_dim,
            hidden_dim,
            num_layers,
            batch_first=True,
            bidirectional=bidirectional
        )
        # Adjust the output dimension if bidirectional is used
        self.fc = nn.Linear(hidden_dim * (2 if bidirectional else 1), num_classes)

    def forward(self, x):
        # RNN forward pass
        # Input shape: [batch_size, seq_len, input_dim]
        out, _ = self.rnn(x)  # out: [batch_size, seq_len, hidden_dim]
        # Use the last hidden state for classification
        out = out[:, -1, :]  # [batch_size, hidden_dim]
        out = self.fc(out)  # [batch_size, num_classes]
        return out


In [19]:
def reformat_tensor(tensor):
  """
  Reformats a tensor from shape [32, 257] to [32, 128, 3] for transformer encoder input.
  """
  batch_size = tensor.shape[0]
  # Extract sequences
  seq1 = tensor[:, :128]
  seq2 = tensor[:, 128:256]
  # Extract noise and expand it to the correct shape
  noise = tensor[:, -1].unsqueeze(1).expand(batch_size, 128)
  # Stack the measures and noise along the last dimension
  return torch.stack([seq1, seq2, noise], dim=2)

In [20]:
# Hyperparameters
hidden_dim = 64
num_layers = 2
bidirectional = True  # Set to False for unidirectional RNN
num_classes = 5
learning_rate = 0.001
epochs = 50
batch_size = 32

# Prepare Data
dataset = CustomDataset()
train_size = int(0.8 * len(dataset))  # 80% for training
test_size = len(dataset) - train_size  # 20% for testing
train_data, test_data = random_split(dataset, [train_size, test_size])

train_loader = DataLoader(train_data, batch_size=batch_size, shuffle=True)
test_loader = DataLoader(test_data, batch_size=batch_size)

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(f"Using device: {device}")

# Model Initialization
model = RNNClassifier(
    input_dim=3,  # From reformat_tensor output
    hidden_dim=hidden_dim,
    num_layers=num_layers,
    num_classes=num_classes,
    bidirectional=bidirectional
).to(device)

# Loss and optimizer
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=learning_rate)

# Training Loop
print("Training RNN model...")
for epoch in range(epochs):
    model.train()
    total_loss = 0
    for features, labels in train_loader:
        features = reformat_tensor(features).to(device)  # Reshape for RNN
        labels = labels.to(device)

        optimizer.zero_grad()
        outputs = model(features)
        loss = criterion(outputs, labels)
        loss.backward()
        optimizer.step()
        total_loss += loss.item()

    print(f"Epoch {epoch + 1}/{epochs}, Loss: {total_loss / len(train_loader):.4f}")

# Evaluation
model.eval()
all_preds, all_labels = [], []
with torch.no_grad():
    for features, labels in test_loader:
        features = reformat_tensor(features).to(device)
        labels = labels.to(device)

        outputs = model(features)
        preds = torch.argmax(outputs, dim=1)
        all_preds.extend(preds.cpu().numpy())
        all_labels.extend(labels.cpu().numpy())

accuracy = accuracy_score(all_labels, all_preds)
print(f"Test Accuracy: {accuracy:.4f}")


Using device: cuda
Training RNN model...
Epoch 1/50, Loss: 1.4985
Epoch 2/50, Loss: 0.8541
Epoch 3/50, Loss: 0.6091


KeyboardInterrupt: 