In [2]:
import torch
from torch.nn.utils.rnn import pad_sequence, pack_padded_sequence, pad_packed_sequence
import os
import numpy as np


def create_mask_tensor(max_len, seq_len):
    tensor = torch.zeros((max_len, ))
    tensor[:seq_len] = 1
    return tensor

with open("train_indices.txt") as f:
    train_indices = set(f.read().strip().split())

with open("test_indices.txt") as f:
    test_indices = set(f.read().strip().split())

with open("val_indices.txt") as f:
    val_indices = set(f.read().strip().split())

train_inputs, val_inputs, test_inputs = [], [], []
train_labels, val_labels, test_labels = [], [], []
train_lengths, val_lengths, test_lengths = [], [], []

for f in os.listdir("samples"):
    arr = np.loadtxt(f"samples/{f}", delimiter=',')
    if arr.size == 0:
        continue
    if arr.ndim == 1:
        arr = arr.reshape(1,24)
    label = torch.tensor(arr[:,-2], dtype=torch.long)
    arr = torch.tensor(np.delete(arr, -2, axis=1), dtype=torch.float32)
    if f[:-6] in train_indices:
        train_inputs.append(arr)
        train_labels.append(label)
        train_lengths.append(arr.shape[0])
    elif f[:-6] in val_indices:
        val_inputs.append(arr)
        val_labels.append(label)
        val_lengths.append(arr.shape[0])
    elif f[:-6] in test_indices:
        test_inputs.append(arr)
        test_labels.append(label)
        test_lengths.append(arr.shape[0])


all_train_inputs = torch.vstack(train_inputs)
mean = all_train_inputs.mean(dim=(0), keepdim=True)  # Compute mean across samples & time
std = all_train_inputs.std(dim=(0), keepdim=True)    # Compute std deviation

train_inputs = [(x - mean) / (std + 1e-6) for x in train_inputs]
val_inputs = [(x - mean) / (std + 1e-6) for x in val_inputs]
test_inputs = [(x - mean) / (std + 1e-6) for x in test_inputs]

train_mask = torch.stack([create_mask_tensor(max(train_lengths), length) for length in train_lengths])
val_mask = torch.stack([create_mask_tensor(max(val_lengths), length) for length in val_lengths])
test_mask = torch.stack([create_mask_tensor(max(test_lengths), length) for length in test_lengths])
print(train_mask.shape, val_mask.shape, test_mask.shape)

x_train_norm = pad_sequence(train_inputs, batch_first=True, padding_value=0)
x_val_norm = pad_sequence(val_inputs, batch_first=True, padding_value=0)
x_test_norm = pad_sequence(test_inputs, batch_first=True, padding_value=0)
print(x_train_norm.shape, x_val_norm.shape, x_test_norm.shape)

padded_train_labels = pad_sequence(train_labels, batch_first=True, padding_value=-1)
padded_val_labels = pad_sequence(val_labels, batch_first=True, padding_value=-1)
padded_test_labels = pad_sequence(test_labels, batch_first=True, padding_value=-1)
print(padded_train_labels.shape, padded_val_labels.shape, padded_test_labels.shape)

num_ones = (padded_train_labels == 1).sum().item()
num_zeros = (padded_train_labels == 0).sum().item()

# Compute weight for positive class (1)
pos_weight = torch.tensor(num_zeros / (num_ones + 1e-6))


torch.Size([68943, 172, 23]) torch.Size([8841, 162, 23]) torch.Size([8833, 158, 23])
torch.Size([68943, 172]) torch.Size([8841, 162]) torch.Size([8833, 158])


In [3]:
import torch.nn as nn
import torch.optim as optim


class LSTMClassifier(nn.Module):
    def __init__(self, input_dim, hidden_dim, num_layers, bidirectional=True):
        super(LSTMClassifier, self).__init__()
        self.lstm = nn.LSTM(input_dim, hidden_dim, num_layers, batch_first=True, bidirectional=bidirectional)
        self.fc = nn.Linear(hidden_dim * (2 if bidirectional else 1), 1)  # Adjust for bidirectional
    
    def forward(self, x):
        lstm_out, _ = self.lstm(x)  # (batch_size, seq_length, hidden_dim)
        logits = self.fc(lstm_out)  # (batch_size, seq_length, 1)
        return logits.squeeze(-1)   # (batch_size, seq_length)

class GRUClassifier(nn.Module):
    def __init__(self, input_dim, hidden_dim, num_layers, bidirectional=True):
        super(GRUClassifier, self).__init__()
        self.gru = nn.GRU(input_dim, hidden_dim, num_layers, batch_first=True, bidirectional=bidirectional)
        self.fc = nn.Linear(hidden_dim * (2 if bidirectional else 1), 1)  # Adjust for bidirectional
        
    def forward(self, x):
        gru_out, _ = self.gru(x)  # (batch_size, seq_length, hidden_dim * num_directions)
        logits = self.fc(gru_out)  # (batch_size, seq_length, 1)
        return logits.squeeze(-1)  # (batch_size, seq_length)

def masked_bce_loss(logits, labels, mask, pos_weight):
    """
    Computes binary cross-entropy loss with masking.
    
    logits: (batch_size, seq_length)
    labels: (batch_size, seq_length)
    mask:   (batch_size, seq_length)
    """
    loss_fn = nn.BCEWithLogitsLoss(pos_weight=pos_weight, reduction='none')  # Get loss per element
    loss = loss_fn(logits, labels.float())  # Compute loss
    loss = loss * mask  # Apply mask
    return loss.sum() / mask.sum()  # Normalize by number of valid elements


In [4]:
# Hyperparameters
input_dim = 23
hidden_dim = 32
num_layers = 1
num_epochs = 50
learning_rate = 0.001
batch_size = 16

# Initialize model, optimizer
# model = GRUClassifier(input_dim, hidden_dim, num_layers)
model = LSTMClassifier(input_dim, hidden_dim, num_layers, bidirectional=False)
optimizer = optim.Adam(model.parameters(), lr=learning_rate)

# Convert to DataLoader
dataset = torch.utils.data.TensorDataset(x_train_norm, padded_train_labels, train_mask)
dataloader = torch.utils.data.DataLoader(dataset, batch_size=batch_size, shuffle=True)


In [5]:
# Training loop
for epoch in range(num_epochs):
    losses = []
    batch_weights = []
    for x_batch, y_batch, mask_batch in dataloader:
        optimizer.zero_grad()
        logits = model(x_batch)  # Forward pass
        loss = masked_bce_loss(logits, y_batch, mask_batch, pos_weight)  # Compute masked loss
        loss.backward()
        optimizer.step()
        losses.append(loss.item())
        batch_weights.append(mask_batch.sum())
    train_loss = sum(a * b for a, b in zip(losses, batch_weights)) / sum(batch_weights)
    print(f"Epoch {epoch+1}/{num_epochs}, Train Loss: {train_loss:.4f}", end = " ")
    val_logits = model(x_val_norm)
    val_loss = masked_bce_loss(val_logits, padded_val_labels, val_mask, pos_weight)
    print(f"Val Loss: {val_loss.item():.4f}")


Epoch 1/50, Train Loss: 0.8921 Val Loss: 0.8260
Epoch 2/50, Train Loss: 0.8309 Val Loss: 0.8051
Epoch 3/50, Train Loss: 0.7987 Val Loss: 0.7833
Epoch 4/50, Train Loss: 0.7858 Val Loss: 0.7709
Epoch 5/50, Train Loss: 0.7787 Val Loss: 0.7652
Epoch 6/50, Train Loss: 0.7715 Val Loss: 0.7592
Epoch 7/50, Train Loss: 0.7681 Val Loss: 0.7705
Epoch 8/50, Train Loss: 0.7631 Val Loss: 0.7539
Epoch 9/50, Train Loss: 0.7592 Val Loss: 0.7515
Epoch 10/50, Train Loss: 0.7570 Val Loss: 0.7540
Epoch 11/50, Train Loss: 0.7547 Val Loss: 0.7514
Epoch 12/50, Train Loss: 0.7520 Val Loss: 0.7459
Epoch 13/50, Train Loss: 0.7495 Val Loss: 0.7621
Epoch 14/50, Train Loss: 0.7482 Val Loss: 0.7435
Epoch 15/50, Train Loss: 0.7463 Val Loss: 0.7390
Epoch 16/50, Train Loss: 0.7451 Val Loss: 0.7570
Epoch 17/50, Train Loss: 0.7433 Val Loss: 0.7396
Epoch 18/50, Train Loss: 0.7424 Val Loss: 0.7358
Epoch 19/50, Train Loss: 0.7406 Val Loss: 0.7461
Epoch 20/50, Train Loss: 0.7392 Val Loss: 0.7386
Epoch 21/50, Train Loss: 0.73

In [6]:
with torch.no_grad():
    logits = model(x_test_norm)
    probs = torch.sigmoid(logits)
    predictions = (probs > 0.5).long()


In [7]:
# Compute correct predictions (where predictions == test_labels)
correct = (predictions == padded_test_labels).float()

# Apply mask to consider only relevant elements
correct_masked = correct * test_mask

# Compute accuracy only over masked elements
accuracy = correct_masked.sum() / test_mask.sum()
accuracy

tensor(0.7560)

In [8]:
# Convert to float for calculations
predictions = predictions.float()
padded_test_labels = padded_test_labels.float()
test_mask = test_mask.float()

# Compute True Positives (TP), False Positives (FP), and False Negatives (FN) with the mask applied
tp = ((predictions == 1) & (padded_test_labels == 1) & (test_mask == 1)).sum()
fp = ((predictions == 1) & (padded_test_labels == 0) & (test_mask == 1)).sum()
fn = ((predictions == 0) & (padded_test_labels == 1) & (test_mask == 1)).sum()

# Compute Precision, Recall, and F1-score (avoid division by zero)
precision = tp / (tp + fp + 1e-8)  # Adding small value to avoid division by zero
recall = tp / (tp + fn + 1e-8)

f1_score = 2 * (precision * recall) / (precision + recall + 1e-8)

print(f'Precision: {precision.item():.4f}')
print(f'Recall: {recall.item():.4f}')
print(f'F1 Score: {f1_score.item():.4f}')


Precision: 0.4950
Recall: 0.7929
F1 Score: 0.6095
