In [2]:
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader, Dataset, random_split
from torch.nn.utils.rnn import pad_sequence, pack_padded_sequence, pad_packed_sequence
import pickle
import numpy as np

# Load the data
with open('train.pkl', 'rb') as f:
    train_data = pickle.load(f)

with open('test_no_target.pkl', 'rb') as f:
    test_data = pickle.load(f)


# Calculate min and max values in the dataset
all_sequences = [seq for seq, _ in train_data]
flat_sequences = np.concatenate(all_sequences)
min_val, max_val = flat_sequences.min(), flat_sequences.max()

# Normalize sequences to be between 0 and 1
def normalize_sequence(sequence, min_val, max_val):
    return (sequence - min_val) / (max_val - min_val)

train_data = [(normalize_sequence(seq, min_val, max_val), label) for seq, label in train_data]
test_data = [normalize_sequence(seq, min_val, max_val) for seq in test_data]


In [None]:

# Custom dataset
class SequenceDataset(Dataset):
    def __init__(self, data):
        self.data = data

    def __len__(self):
        return len(self.data)

    def __getitem__(self, idx):
        sequence, label = self.data[idx]
        return torch.tensor(sequence, dtype=torch.float), label

# Create dataset
dataset = SequenceDataset(train_data)

# Split dataset
train_size = int(0.7 * len(dataset))
val_size = len(dataset) - train_size
train_dataset, val_dataset = random_split(dataset, [train_size, val_size])

# Padding function
def pad_collate(batch, pad_value=-1):
    xx, yy = zip(*batch)
    x_lens = [len(x) for x in xx]
    xx_pad = pad_sequence(xx, batch_first=True, padding_value=pad_value)
    yy_pad = torch.tensor(yy)
    return xx_pad, yy_pad, x_lens

# DataLoader
train_loader = DataLoader(train_dataset, batch_size=32, shuffle=True, collate_fn=pad_collate)
val_loader = DataLoader(val_dataset, batch_size=32, shuffle=False, collate_fn=pad_collate)


In [3]:

# Define the model
class LSTM_Seq_Classifier(nn.Module):
    def __init__(self, input_size, hidden_size, num_layers, num_classes):
        super(LSTM_Seq_Classifier, self).__init__()
        self.hidden_size = hidden_size
        self.num_layers = num_layers
        self.lstm = nn.LSTM(input_size, hidden_size, num_layers, batch_first=True)
        self.fc = nn.Linear(hidden_size, num_classes)

    def forward(self, x, lengths, hidden):
        lengths = torch.tensor(lengths, dtype=torch.int64)
        x_packed = pack_padded_sequence(x, lengths, batch_first=True, enforce_sorted=False)
        packed_output, hidden = self.lstm(x_packed, hidden)

        output, _ = pad_packed_sequence(packed_output, batch_first=True)
        out = output[torch.arange(output.size(0)), lengths - 1]

        x = self.fc(out)
        
        return x, hidden

    def init_hidden(self, batch_size):
        hidden = torch.zeros(self.num_layers, batch_size, self.hidden_size).to(device)
        cell = torch.zeros(self.num_layers, batch_size, self.hidden_size).to(device)
        return (hidden, cell)

# Set device
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')


In [4]:

# Initialize model, loss, and optimizer
model = LSTM_Seq_Classifier(input_size=1, hidden_size=128, num_layers=2, num_classes=5).to(device)
class_weights = torch.tensor([0.05, 0.16, 0.5, 0.17, 0.3], device=device) 
criterion = nn.CrossEntropyLoss(weight=class_weights)
optimizer = optim.Adam(model.parameters(), lr=0.001)

In [6]:
num_epochs = 101

for epoch in range(num_epochs):
    model.train()
    correct_train = 0
    total_train = 0
    for sequences, labels, lengths in train_loader:
        sequences = sequences.unsqueeze(-1).to(device)
        labels = labels.to(device)
        hidden = model.init_hidden(sequences.size(0))
        
        outputs, hidden = model(sequences, lengths, hidden)
        loss = criterion(outputs, labels)
        
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()
        
        # Training accuracy
        _, predicted = torch.max(outputs.data, 1)
        total_train += labels.size(0)
        correct_train += (predicted == labels).sum().item()
    
    train_accuracy = 100 * correct_train / total_train

    # Validation accuracy
    model.eval()
    correct_val = 0
    total_val = 0
    with torch.no_grad():
        for sequences, labels, lengths in val_loader:
            sequences = sequences.unsqueeze(-1).to(device)
            labels = labels.to(device)
            hidden = model.init_hidden(sequences.size(0))
            
            outputs, hidden = model(sequences, lengths, hidden)
            _, predicted = torch.max(outputs.data, 1)
            total_val += labels.size(0)
            correct_val += (predicted == labels).sum().item()
    
    val_accuracy = 100 * correct_val / total_val

    print(f"Epoch [{epoch+1}/{num_epochs}], Loss: {loss.item():.4f}, Training Accuracy: {train_accuracy:.2f}%, Validation Accuracy: {val_accuracy:.2f}%")

Epoch [1/51], Loss: 1.4631, Training Accuracy: 36.36%, Validation Accuracy: 55.44%
Epoch [2/51], Loss: 1.4635, Training Accuracy: 40.98%, Validation Accuracy: 45.92%
Epoch [3/51], Loss: 1.3792, Training Accuracy: 46.57%, Validation Accuracy: 48.87%
Epoch [4/51], Loss: 1.4454, Training Accuracy: 44.00%, Validation Accuracy: 44.56%
Epoch [5/51], Loss: 1.1912, Training Accuracy: 49.44%, Validation Accuracy: 46.60%
Epoch [6/51], Loss: 1.3577, Training Accuracy: 51.24%, Validation Accuracy: 51.59%
Epoch [7/51], Loss: 1.5371, Training Accuracy: 52.84%, Validation Accuracy: 52.49%
Epoch [8/51], Loss: 1.0982, Training Accuracy: 55.96%, Validation Accuracy: 60.09%
Epoch [9/51], Loss: 1.4737, Training Accuracy: 57.02%, Validation Accuracy: 56.58%
Epoch [10/51], Loss: 1.6633, Training Accuracy: 57.07%, Validation Accuracy: 59.30%
Epoch [11/51], Loss: 0.7740, Training Accuracy: 59.02%, Validation Accuracy: 59.07%
Epoch [12/51], Loss: 0.9959, Training Accuracy: 58.82%, Validation Accuracy: 58.96%
E

In [10]:

model.eval()
with torch.no_grad():
    correct = 0
    total = 0
    for sequences, labels, lengths in train_loader:
        sequences = sequences.unsqueeze(-1).to(device)
        labels = labels.to(device)
        hidden = model.init_hidden(sequences.size(0))
        
        outputs, hidden = model(sequences, lengths, hidden)
        _, predicted = torch.max(outputs.data, 1)
        total += labels.size(0)
        correct += (predicted == labels).sum().item()

    print(f'Training Accuracy: {100 * correct / total:.2f}%')


Training Accuracy: 98.49%


In [11]:
# Validation
model.eval()
with torch.no_grad():
    correct = 0
    total = 0
    for sequences, labels, lengths in val_loader:
        sequences = sequences.unsqueeze(-1).to(device)
        labels = labels.to(device)
        hidden = model.init_hidden(sequences.size(0))
        
        outputs, hidden = model(sequences, lengths, hidden)
        _, predicted = torch.max(outputs.data, 1)
        total += labels.size(0)
        correct += (predicted == labels).sum().item()

    print(f'Validation Accuracy: {100 * correct / total:.2f}%')




Validation Accuracy: 71.54%


In [12]:
# Predict on test data
test_sequences = [torch.tensor(seq, dtype=torch.float) for seq in test_data]
test_dataset = SequenceDataset([(seq, 0) for seq in test_sequences])  # Dummy labels for the test set
test_loader = DataLoader(test_dataset, batch_size=32, shuffle=False, collate_fn=pad_collate)

predictions = []
model.eval()
with torch.no_grad():
    for sequences, _, lengths in test_loader:
        sequences = sequences.unsqueeze(-1).to(device)
        hidden = model.init_hidden(sequences.size(0))
        
        outputs, hidden = model(sequences, lengths, hidden)
        _, predicted = torch.max(outputs.data, 1)
        predictions.extend(predicted.cpu().numpy())


  return torch.tensor(sequence, dtype=torch.float), label


In [13]:

import pandas as pd
df = pd.DataFrame(predictions)
df.to_csv('predictions.csv', index=False, header=False)