In [None]:
import pickle
import numpy as np
from sklearn.model_selection import train_test_split

## load in dataset

In [None]:
experiment = 1

In [None]:
experimentData = ['experiment6', 'experiment7']
folder = experimentData[experiment - 1]
# Save Data
X_train = np.loadtxt(folder + "/X_train.npy")
X_test = np.loadtxt(folder + "/X_test.npy")
y_train = np.loadtxt(folder + "/y_train.npy")
y_test = np.loadtxt(folder + "/y_test.npy")

In [None]:
if experiment == 6:
    label_keys = 10
elif experiment == 3:
    label_keys = 11

# Model Training

In [1]:
import torch
import torch.nn as nn
import torchvision
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader
from torchvision.models import resnet18  # ResNet-18 architecture

### set the device to CPU

In [None]:
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

## Conver X and Y into a PyTorch Tensor

In [None]:
X_train_tensor = torch.tensor(X_train, dtype=torch.float32).to(device)
X_test_tensor = torch.tensor(X_test, dtype=torch.float32).to(device)
y_train_tensor = torch.tensor(y_train, dtype=torch.int64).to(device)
y_test_tensor = torch.tensor(y_test, dtype=torch.int64).to(device)

In [None]:
class CustomDataset(Dataset):
    def __init__(self, X, Y):
        self.X = X
        self.Y = Y

    def __len__(self):
        return len(self.X)

    def __getitem__(self, idx):
        return self.X[idx], self.Y[idx]

In [None]:
batch_size = 32

In [None]:
train_dataset = CustomDataset(X_train_tensor, y_train_tensor)
test_dataset = CustomDataset(X_test_tensor, y_test_tensor)
train_loader = DataLoader(train_dataset,batch_size=batch_size, shuffle=True)
test_loader = DataLoader(test_dataset,batch_size=batch_size, shuffle=True)

## Neural Network using Pytorch

In [None]:
class BasicNeuralModel(nn.Module):
    def __init__(self, input_dim, hidden_dim, output_dim):
        super(BasicNeuralModel, self).__init__()
        self.fc1 = nn.Linear(input_dim, hidden_dim)
        self.fc2 = nn.Linear(hidden_dim, hidden_dim)
        self.fc3 = nn.Linear(hidden_dim, output_dim)
        self.relu = nn.ReLU()

    def forward(self, x):
        x = x.view(x.size(0), -1)  # Reshape to (batch_size, 40 * 33 * 3)
        x = self.fc1(x)
        x = self.relu(x)
        x = self.fc2(x)
        x = self.relu(x)
        x = self.fc3(x)
        return x

In [None]:
# Define the dimensions based on your data
input_dim = 40 * 33 * 3  # 40 frames, 33 points, 3 dimensions
hidden_dim = 256
output_dim = len(label_keys)

In [None]:
# Instantiate the model
model = BasicNeuralModel(input_dim, hidden_dim, output_dim)
model = model.to(device)

In [None]:
num_epochs =  600
learning_rate = 0.001

In [None]:
criterion = nn.CrossEntropyLoss()
optimizer = torch.optim.Adam(model.parameters(), lr=learning_rate)

In [None]:
# Set the model in training mode
model.train()

for epoch in range(num_epochs):    
    for batch_inputs, batch_targets in train_loader:  # You need to create a DataLoader for your data
        batch_inputs = batch_inputs.float().to(device)
        batch_targets  = batch_targets.to(device)
        # Zero the gradients
        optimizer.zero_grad()
        
        # Forward pass
        outputs = model(batch_inputs)
        # Compute the loss
        loss = criterion(outputs, batch_targets)
        
        # Backpropagation
        loss.backward()
        
        # Update the weights
        optimizer.step()
    
    # Print the loss for this epoch
    print(f'Epoch [{epoch + 1}/{num_epochs}], Loss: {loss.item():.4f}')

### Save model

In [None]:
torch.save(model.state_dict(), folder + '/BasicNeuralModel.pth')

## Basic NerualNet Accuracy

In [None]:
# Set the model to evaluation mode
model.eval()

# Forward pass to get predictions
with torch.no_grad():
    test_predictions = model(X_test_tensor)

# Convert predictions to class labels (assuming your output is regression-like)
test_predictions_labels = torch.argmax(test_predictions, dim=1)

# Calculate the accuracy
correct = (test_predictions_labels == y_test_tensor).sum().item()
total = y_test_tensor.size(0)
accuracy = correct / total
print(f"Accuracy: {accuracy * 100:.2f}%")

Accuracy: 17.08%


In [None]:
model.eval()  # Set the model to evaluation mode
with torch.no_grad():

    correct = 0
    total = 0

    # Initialize a dictionary to store class-wise accuracy
    class_correct = {i: 0 for i in range(len(label_keys))}
    class_total = {i: 0 for i in range(len(label_keys))}

    for batch in test_loader:
        test_X, test_y = batch
        test_X, test_y = test_X.to(device), test_y.to(device)
        print(test_X.shape)

        # Forward pass
        test_outputs = model(test_X)
        _, predicted = torch.max(test_outputs, 1)
        # Compute overall accuracy
        correct += (predicted == test_y).sum().item()
        total += test_y.size(0)
        # Compute class-wise accuracy
        for i in range(len(label_keys)):
            class_total[i] += (test_y == i).sum().item()
            class_correct[i] += (predicted == i)[test_y == i].sum().item()

    overall_accuracy = correct / total
    print(f"Overall Test Accuracy: {overall_accuracy:.4f}")

    # Print class-wise accuracy
    for i in range(len(label_keys)):
        if class_total[i] == 0:
            print(f"Class {i} Accuracy: Nan")
        else:
            class_accuracy = class_correct[i] / class_total[i]
            print(f"Class {i} Accuracy: {class_accuracy:.4f}")

## Residual Neural Network using Pytorch

In [None]:
class ResidualBlock(nn.Module):
    def __init__(self, in_channels, out_channels, stride=1):
        super(ResidualBlock, self).__init__()
        
        # First convolution layer
        self.conv1 = nn.Conv2d(in_channels, out_channels, kernel_size=3, stride=stride, padding=1, bias=False)
        self.bn1 = nn.BatchNorm2d(out_channels) 
        self.relu = nn.ReLU(inplace=True)
        self.conv2 = nn.Conv2d(out_channels, out_channels, kernel_size=3, stride=1, padding=1, bias=False)
        self.bn2 = nn.BatchNorm2d(out_channels)
        
        # Shortcut connection if dimensions change
        self.shortcut = nn.Sequential()
        if stride != 1 or in_channels != out_channels:
            self.shortcut = nn.Sequential(
                nn.Conv2d(in_channels, out_channels, kernel_size=1, stride=stride, bias=False),
                nn.BatchNorm2d(out_channels)
            )
        
    def forward(self, x):
        # Main path
        out = self.conv1(x)
        out = self.bn1(out)
        out = self.relu(out)
        
        out = self.conv2(out)
        out = self.bn2(out)
        
        # Shortcut path
        shortcut = self.shortcut(x)
        
        # Residual connection
        out += shortcut
        out = self.relu(out)
        
        return out

In [None]:
class ResNet(nn.Module):
    def __init__(self, block, num_blocks, num_classes=10):
        super(ResNet, self).__init__()
        self.in_channels = 64
        
        # Initial convolutional layer
        self.conv1 = nn.Conv2d(40, 64, kernel_size=3, stride=1, padding=1, bias=False)
        self.bn1 = nn.BatchNorm2d(64)
        self.relu = nn.ReLU(inplace=True)
        
        # Create layers with residual blocks
        self.layer1 = self.make_layer(block, 64, num_blocks[0], stride=1)
        self.layer2 = self.make_layer(block, 128, num_blocks[1], stride=2)
        self.layer3 = self.make_layer(block, 256, num_blocks[2], stride=2)
        self.layer4 = self.make_layer(block, 512, num_blocks[3], stride=2)
        
        # Global average pooling and fully connected layer
        self.avg_pool = nn.AdaptiveAvgPool2d((1, 1))
        self.fc = nn.Linear(512, num_classes)
    def make_layer(self, block, out_channels, num_blocks, stride):
        layers = []
        layers.append(block(self.in_channels, out_channels, stride))
        self.in_channels = out_channels
        for _ in range(1, num_blocks):
            layers.append(block(out_channels, out_channels, stride=1))
        return nn.Sequential(*layers)
    
    def forward(self, x):
        out = self.conv1(x)
        out = self.bn1(out)
        out = self.relu(out)
        
        out = self.layer1(out)
        out = self.layer2(out)
        out = self.layer3(out)
        out = self.layer4(out)
        
        out = self.avg_pool(out)
        out = out.view(out.size(0), -1)
        out = self.fc(out)

        return out

In [None]:
# Create an instance of the CNN model
num_classes = len(label_keys)
model = ResNet(ResidualBlock, [2,2,2,2], num_classes).to(device)

In [None]:
num_epochs =  100

learning_rate = 0.1
# criterion = nn.MSELoss()
# optimizer = torch.optim.Adam(model.parameters(), lr=learning_rate)

In [None]:
criterion = nn.CrossEntropyLoss()
optimizer = torch.optim.SGD(model.parameters(), lr=learning_rate, momentum=0.9, weight_decay=5e-4)

#Piecwise Linear Schedule
sched_linear_1 = torch.optim.lr_scheduler.CyclicLR(optimizer, base_lr=0.005, max_lr=learning_rate, step_size_down=15, mode='triangular', verbose=False)
sched_linear_3 = torch.optim.lr_scheduler.LinearLR(optimizer, start_factor=0.005/learning_rate, end_factor=0.005/5, verbose=False)
scheduler = torch.optim.lr_scheduler.SequentialLR(optimizer, schedulers=[sched_linear_1, sched_linear_3], milestones=[30])

n_total_steps = len(train_loader)

In [None]:
# Set the model in training mode
model.train()

for epoch in range(num_epochs):    
    for batch_inputs, batch_targets in train_loader:  # You need to create a DataLoader for your data
        batch_inputs = batch_inputs.float().to(device)
        batch_targets  = batch_targets.to(device).to(torch.int64)

        # Forward pass
        outputs = model(batch_inputs) 
        loss = criterion(outputs, batch_targets) 

        
        # Backward and optimize
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()
    
    # Print the loss for this epoch
    print(f'Epoch [{epoch + 1}/{num_epochs}], Loss: {loss.item():.4f}')

In [None]:
torch.save(model.state_dict(), folder + 'ResNetModel.pth')

## Accuracy

In [None]:
model.eval()  # Set the model to evaluation mode
with torch.no_grad():

    correct = 0
    total = 0

    # Initialize a dictionary to store class-wise accuracy
    class_correct = {i: 0 for i in range(len(label_keys))}
    class_total = {i: 0 for i in range(len(label_keys))}

    for batch in train_loader:
        test_X, test_y = batch
        test_X, test_y = test_X.to(device), test_y.to(device)
        # Forward pass
        test_outputs = model(test_X)
        _, predicted = torch.max(test_outputs, 1)
        # Compute overall accuracy
        correct += (predicted == test_y).sum().item()
        total += test_y.size(0)
        # Compute class-wise accuracy
        for i in range(len(label_keys)):
            class_total[i] += (test_y == i).sum().item()
            class_correct[i] += (predicted == i)[test_y == i].sum().item()

    overall_accuracy = correct / total
    print(f"Overall Test Accuracy: {overall_accuracy:.4f}")

    # Print class-wise accuracy
    for i in range(len(label_keys)):
        if class_total[i] == 0:
            print(f"Class {i} Accuracy: Nan")
        else:
            class_accuracy = class_correct[i] / class_total[i]
            print(f"Class {i} Accuracy: {class_accuracy:.4f}")