In [1]:
import json
import numpy as np
import os
import random

import torch
import torch.nn as nn
import torch.optim as optim

## Data loader

In [2]:
import torch
from torch.utils.data import Dataset, DataLoader
import numpy as np
import os

class NPYDataset(Dataset):
    def __init__(self, main_directory):
        super().__init__()
        self.main_directory = main_directory
        self.filenames = self._load_filenames()

    def _load_filenames(self):
        # Traverse subdirectories to find all .npy files
        filenames = []
        for root, _, files in os.walk(self.main_directory):
            for file in files:
                if file.endswith('.npy'):
                    full_path = os.path.join(root, file)
                    filenames.append(full_path)
        return filenames

    def __len__(self):
        return len(self.filenames)

    def __getitem__(self, idx):
        file_path = self.filenames[idx]
        data = np.load(file_path, allow_pickle=True)
        keypoints = np.ascontiguousarray(data['data'], dtype=np.float32)
        label = np.float32(data['label'])
        # Convert to tensors
        keypoints = torch.tensor(keypoints)
        label = torch.tensor(label)
        return keypoints, label


In [3]:


# DataLoader setup
data_directory = 'splitted_datapoints_shot/train'
dataset = NPYDataset(data_directory)
dataloader = DataLoader(dataset, batch_size=1, shuffle=True)

val_dataset = NPYDataset('splitted_datapoints_shot/val')
val_loader = DataLoader(val_dataset, batch_size=1, shuffle=False)


In [4]:
len(dataloader)

5062

In [5]:
len(val_loader)

1267

In [6]:
for keypoints, labels in dataloader:
    # keypoints = keypoints.squeeze(0)
    print(keypoints.shape, labels.shape)

torch.Size([1, 1, 20, 33, 2]) torch.Size([1, 1])
torch.Size([1, 1, 20, 33, 2]) torch.Size([1, 1])
torch.Size([1, 1, 20, 33, 2]) torch.Size([1, 1])
torch.Size([1, 1, 20, 33, 2]) torch.Size([1, 1])
torch.Size([1, 1, 20, 33, 2]) torch.Size([1, 1])
torch.Size([1, 1, 20, 33, 2]) torch.Size([1, 1])
torch.Size([1, 1, 20, 33, 2]) torch.Size([1, 1])
torch.Size([1, 1, 20, 33, 2]) torch.Size([1, 1])
torch.Size([1, 1, 20, 33, 2]) torch.Size([1, 1])
torch.Size([1, 1, 20, 33, 2]) torch.Size([1, 1])
torch.Size([1, 1, 20, 33, 2]) torch.Size([1, 1])
torch.Size([1, 1, 20, 33, 2]) torch.Size([1, 1])
torch.Size([1, 1, 20, 33, 2]) torch.Size([1, 1])
torch.Size([1, 1, 20, 33, 2]) torch.Size([1, 1])
torch.Size([1, 1, 20, 33, 2]) torch.Size([1, 1])
torch.Size([1, 1, 20, 33, 2]) torch.Size([1, 1])
torch.Size([1, 1, 20, 33, 2]) torch.Size([1, 1])
torch.Size([1, 1, 20, 33, 2]) torch.Size([1, 1])
torch.Size([1, 1, 20, 33, 2]) torch.Size([1, 1])
torch.Size([1, 1, 20, 33, 2]) torch.Size([1, 1])
torch.Size([1, 1, 20

In [None]:
# len(val_loader)

302

## Model

In [7]:
class ConvLSTMModel(nn.Module):
    def __init__(self, input_shape, hidden_dim, num_classes):
        super(ConvLSTMModel, self).__init__()
        self.segment_length, self.keypoints, self.coords = input_shape
        
        self.conv1d = nn.Conv1d(in_channels=self.keypoints * self.coords, out_channels=32, kernel_size=3, padding=1)
        self.lstm = nn.LSTM(input_size=32, hidden_size=hidden_dim, batch_first=True)
        # Change to output a single value for binary classification
        self.fc = nn.Linear(hidden_dim, 1)  # Only one output neuron for binary classification

    def forward(self, x):
        x = x.view(x.size(0), self.keypoints * self.coords, self.segment_length)
        x = self.conv1d(x)
        x = torch.relu(x)
        x = x.transpose(1, 2)
        x, (hn, cn) = self.lstm(x)
        x = x[:, -1, :]
        x = self.fc(x)
        x = torch.sigmoid(x)  # Sigmoid activation for binary classification
        return x


In [8]:

# Assume 'model' is an instance of your model class
model = ConvLSTMModel(input_shape=(20, 33, 2), hidden_dim=64, num_classes=2)
criterion = nn.BCELoss()
optimizer = optim.Adam(model.parameters(), lr=0.0001)


In [9]:
# Define a function for evaluating the model on the validation set
def evaluate(model, val_loader, criterion):
    model.eval()  # Set the model to evaluation mode
    running_loss = 0.0
    correct_predictions = 0
    total_predictions = 0
    
    with torch.no_grad():  # No need to compute gradients for validation
        for keypoints, labels in val_loader:
            # Forward pass
            outputs = model(keypoints)  # outputs shape should be [batch_size, 1]
            
            # Compute loss
            loss = criterion(outputs, labels)
            running_loss += loss.item()
            
            # Calculate accuracy
            predicted_labels = outputs.round()  # Round the outputs to get the final predictions for binary classification
            correct_predictions += (predicted_labels == labels).sum().item()
            total_predictions += labels.size(0)
    
    # Calculate average loss and accuracy
    avg_loss = running_loss / len(val_loader)
    accuracy = correct_predictions / total_predictions
    return avg_loss, accuracy

In [10]:
import torch
from torch.utils.tensorboard import SummaryWriter

# Initialize TensorBoard
writer = SummaryWriter(log_dir='./runs/convLSTM')
# Variables to keep track of the best validation accuracy
best_val_acc = 0.0
best_model_path = 'shot_classifier_weights/best_model.pth'
last_model_path = 'shot_classifier_weights/last_model.pth'
num_epochs = 100
model.train()  # Set the model to training mode
for epoch in range(num_epochs):
    running_loss = 0.0
    correct_predictions = 0
    total_predictions = 0

    for keypoints, labels in dataloader:
        optimizer.zero_grad()
        # Forward pass
        outputs = model(keypoints)  # outputs shape should be [batch_size, 1]
        
        # Compute loss
        loss = criterion(outputs, labels)
        running_loss += loss.item()

        # Backward and optimize
        loss.backward()
        optimizer.step()
        # print('outputs', outputs)
        # Calculate accuracy
        predicted_labels = outputs.round()  # Round the outputs to get the final predictions for binary classification
        # print('predicted_labels',predicted_labels, 'True labels:', labels)
        correct_predictions += (predicted_labels == labels).sum().item()
        total_predictions += labels.size(0)

        # Calculate accuracy and loss for the epoch
    accuracy = correct_predictions / total_predictions * 100
    epoch_loss = running_loss / len(dataloader)

    # Perform validation
    val_loss, val_accuracy = evaluate(model, val_loader, criterion)
    
    # Log loss and accuracy to TensorBoard
    writer.add_scalar('Loss/train', epoch_loss, epoch)
    writer.add_scalar('Accuracy/train', accuracy, epoch)
    writer.add_scalar('Accuracy/val', val_accuracy, epoch)
    writer.add_scalar('Loss/val', val_loss, epoch)
    
    # Print validation metrics
    print(f'Epoch [{epoch+1}/{num_epochs}], Train Loss: {epoch_loss:.4f}, Train Accuracy: {accuracy:.2f}%')
    print(f'Epoch {epoch+1}/{num_epochs} - Validation Loss: {val_loss:.4f}, Validation Accuracy: {val_accuracy:.4f}')

    # Save the last model weights
    torch.save(model.state_dict(), last_model_path)
    
    # Save the best model weights based on validation accuracy
    if val_accuracy > best_val_acc:
        best_val_acc = val_accuracy
        torch.save(model.state_dict(), best_model_path)
        print(f'Best model saved at epoch {epoch+1} with val acc: {val_accuracy:.4f}')

print(f'Best validation accuracy: {best_val_acc:.4f}')
# Close the TensorBoard writer
writer.close()


Epoch [1/100], Train Loss: 0.6234, Train Accuracy: 68.43%
Epoch 1/100 - Validation Loss: 0.6181, Validation Accuracy: 0.6843
Best model saved at epoch 1 with val acc: 0.6843
Epoch [2/100], Train Loss: 0.6208, Train Accuracy: 68.43%
Epoch 2/100 - Validation Loss: 0.6197, Validation Accuracy: 0.6843
Epoch [3/100], Train Loss: 0.6187, Train Accuracy: 68.43%
Epoch 3/100 - Validation Loss: 0.6265, Validation Accuracy: 0.6843
Epoch [4/100], Train Loss: 0.6067, Train Accuracy: 68.43%
Epoch 4/100 - Validation Loss: 0.5871, Validation Accuracy: 0.6843
Epoch [5/100], Train Loss: 0.5916, Train Accuracy: 69.30%
Epoch 5/100 - Validation Loss: 0.5902, Validation Accuracy: 0.6882
Best model saved at epoch 5 with val acc: 0.6882
Epoch [6/100], Train Loss: 0.5776, Train Accuracy: 69.02%
Epoch 6/100 - Validation Loss: 0.5708, Validation Accuracy: 0.7609
Best model saved at epoch 6 with val acc: 0.7609
Epoch [7/100], Train Loss: 0.5530, Train Accuracy: 74.73%
Epoch 7/100 - Validation Loss: 0.5261, Valida

In [24]:
import torch
from sklearn.metrics import confusion_matrix, precision_score, recall_score, f1_score

# Define a function for evaluating the model on the validation set
def evaluate_with_confusion_matrix(model, val_loader, criterion):
    model.eval()  # Set the model to evaluation mode
    running_loss = 0.0
    all_labels = []
    all_predictions = []
    
    with torch.no_grad():  # No need to compute gradients for validation
        for keypoints, labels in val_loader:
            # Forward pass
            outputs = model(keypoints)  # outputs shape should be [batch_size, 1]
            # outputs = outputs.squeeze()
            # labels = labels.squeeze().long()
            # Compute loss
            loss = criterion(outputs, labels)
            running_loss += loss.item()
            
            # Get predictions
            predicted_labels = outputs.round()  # This should be along dimension 1 for batch data
            all_labels.append(labels.tolist()[0][0])
            all_predictions.append(predicted_labels.tolist()[0][0])
            
        # Calculate average loss
    avg_loss = running_loss / len(val_loader)
    
    # Calculate confusion matrix
    cm = confusion_matrix(all_labels, all_predictions)
    accuracy = np.trace(cm) / np.sum(cm)
    precision = precision_score(all_labels, all_predictions, average='binary')
    recall = recall_score(all_labels, all_predictions, average='binary')
    f1 = f1_score(all_labels, all_predictions, average='binary')
    print('Precision: ', precision)
    print('Recall: ', recall)
    print('f1-score: ', f1)
    return avg_loss, accuracy, cm

# Use the function
avg_loss, accuracy, cm = evaluate_with_confusion_matrix(model, val_loader, criterion)
print(f"Average Loss: {avg_loss}, Accuracy: {accuracy}")
print("Confusion Matrix:\n", cm)


Precision:  0.9045092838196287
Recall:  0.8525
f1-score:  0.8777348777348777
Average Loss: 0.2256295083247966, Accuracy: 0.9250197316495659
Confusion Matrix:
 [[831  36]
 [ 59 341]]


In [23]:
341/(59+341)

0.8525

In [21]:
831/(36+831)

0.9584775086505191

## Model simple LSTM

In [17]:
import torch
import torch.nn as nn
import torch.optim as optim
import numpy as np

# Define LSTM classifier model
class LSTMClassifier(nn.Module):
    def __init__(self, input_size, hidden_size, num_layers, output_size):
        super(LSTMClassifier, self).__init__()
        self.hidden_size = hidden_size
        self.num_layers = num_layers
        self.lstm = nn.LSTM(input_size, hidden_size, num_layers, batch_first=True)
        self.fc = nn.Linear(hidden_size, output_size)

    def forward(self, x):
        x = x.view(x.size(0), x.size(1), -1)
        
        h0 = torch.zeros(self.num_layers, x.size(0), self.hidden_size).to(x.device)
        c0 = torch.zeros(self.num_layers, x.size(0), self.hidden_size).to(x.device)
        out, _ = self.lstm(x, (h0, c0))
        out = self.fc(out[:, -1, :])
        return out

In [18]:
# # Generate sample data
# X_train = np.random.rand(100, 10, 50)  # 100 sequences of length 10 with 50 features each
# y_train = np.random.randint(2, size=(100,))  # Binary labels (0 or 1)

# # Convert data to PyTorch tensors
# X_train_tensor = torch.tensor(X_train, dtype=torch.float32)
# y_train_tensor = torch.tensor(y_train, dtype=torch.long)

# Define model parameters
input_size = 33*2
hidden_size = 64
num_layers = 2
output_size = 2


In [19]:
# Instantiate the model
model = LSTMClassifier(input_size, hidden_size, num_layers, output_size)

# Define loss function and optimizer
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=0.0001)


In [None]:
# Train the model
num_epochs = 500
for epoch in range(num_epochs):
    correct = 0
    total = 0
    running_loss = 0.0
    for keypoints, labels in dataloader:
        # Squeeze unnecessary dimensions
        keypoints = keypoints.squeeze(0)  # Now keypoints shape should be [1, 20, 17, 2]
        labels = labels.squeeze(0)        # Now labels shape should be [1]
        # print('keypoints:',keypoints.shape,'labels:', labels.shape)
        optimizer.zero_grad()
        outputs = model(keypoints)
        # print('outputs before',outputs, 'labels before',labels)

        outputs = outputs.squeeze()
        labels = labels.squeeze().long()
        # print('outputs after',outputs, 'labels after',labels)
        # break
        loss = criterion(outputs, labels)
        running_loss += loss.item()
        loss.backward()
        optimizer.step()
        _, predicted = torch.max(outputs.data, 0)
        # Total number of labels
        total += 1
        
        # Total correct predictions
        correct += (predicted == labels).sum().item()
    accuracy =  correct / total * 100
    epoch_loss = running_loss / len(dataloader)        
            # Print Loss
    print(' Loss: {}. Accuracy: {}'.format( epoch_loss, accuracy))


 Loss: 0.7066145324707032. Accuracy: 46.0
 Loss: 0.646399582028389. Accuracy: 74.0
 Loss: 0.6253186723589897. Accuracy: 62.0
 Loss: 0.6004764226451517. Accuracy: 66.0
 Loss: 0.5673007851839066. Accuracy: 64.0
 Loss: 0.5666879870928824. Accuracy: 68.0
 Loss: 0.5529879846051335. Accuracy: 68.0
 Loss: 0.5546136916801333. Accuracy: 70.0
 Loss: 0.5598513038922102. Accuracy: 68.0
 Loss: 0.5483754526730626. Accuracy: 70.0
 Loss: 0.5462060511251912. Accuracy: 68.0
 Loss: 0.5345488362945616. Accuracy: 74.0
 Loss: 0.6796253682486713. Accuracy: 60.0
 Loss: 0.5924330477602779. Accuracy: 68.0
 Loss: 0.5473819950129837. Accuracy: 70.0
 Loss: 0.5350996433291584. Accuracy: 72.0
 Loss: 0.5204374854825438. Accuracy: 74.0
 Loss: 0.6699774176906794. Accuracy: 68.0
 Loss: 0.5793360046669841. Accuracy: 68.0
 Loss: 0.5573308748006821. Accuracy: 66.0
 Loss: 0.5372575928643346. Accuracy: 76.0
 Loss: 0.5082164804823697. Accuracy: 74.0
 Loss: 0.5007441286928952. Accuracy: 74.0
 Loss: 0.5154678407032043. Accuracy

In [27]:
import torch
from torch.utils.tensorboard import SummaryWriter

# Initialize TensorBoard
writer = SummaryWriter(log_dir='./runs/convLSTM')
num_epochs = 100
for epoch in range(num_epochs):
    correct = 0
    total = 0
    running_loss = 0.0
    
    for keypoints, labels in dataloader:
        # Squeeze unnecessary dimensions
        keypoints = keypoints.squeeze(0)  # Now keypoints shape should be [1, 20, 17, 2]
        labels = labels.squeeze(0)        # Now labels shape should be [1]
        # print('keypoints:',keypoints.shape,'labels:', labels.shape)
        
        optimizer.zero_grad()
        outputs = model(keypoints)
        # print('outputs before',outputs, 'labels before',labels)

        outputs = outputs.squeeze()
        labels = labels.squeeze().long()
        # print('outputs after',outputs, 'labels after',labels)
        # break
        
        loss = criterion(outputs, labels)
        running_loss += loss.item()
        loss.backward()
        optimizer.step()
        
        _, predicted = torch.max(outputs.data, 0)
        # Total number of labels
        total += 1
        
        # Total correct predictions
        correct += (predicted == labels).sum().item()
    
    # Calculate accuracy and loss for the epoch
    accuracy = correct / total * 100
    epoch_loss = running_loss / len(dataloader)
    
    # Log loss and accuracy to TensorBoard
    writer.add_scalar('Loss/train', epoch_loss, epoch)
    writer.add_scalar('Accuracy/train', accuracy, epoch)
    
    print(f'Epoch [{epoch+1}/{num_epochs}], Loss: {epoch_loss:.4f}, Accuracy: {accuracy:.2f}%')

# Close the TensorBoard writer
writer.close()


RuntimeError: The size of tensor a (2) must match the size of tensor b (32) at non-singleton dimension 0