In [1]:
import numpy as np
import os
import torch
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F
import matplotlib.pyplot as plt

In [4]:
def load_data(directory):
    data = []
    vehicle_ids = []

    for i, file_name in enumerate(os.listdir(directory)):

        # Limit elements in Tensor for performance purposes, remove this when ready for full scale training
        if(i == 3000):
            print(f'loaded {i}')
            break
            
        # Extract vehicle IDs from the filename
        vehicle_id = int(file_name.split('_')[0])
        vehicle_ids.append(vehicle_id)

        # Load NPY files
        file_path = os.path.join(directory, file_name)
        loaded_data = np.load(file_path)

        data.append(loaded_data)
        
        # Reshape loaded data and append to a list
        #reshaped_data = loaded_data.reshape((1, 32, 64))
        # data.append(reshaped_data)

    # Converting data and labels to PyTorch Tensors
    data = np.array(data)
    data = torch.from_numpy(data)
    vehicle_ids = torch.tensor(vehicle_ids, dtype=torch.long)
    
    # Adjust labels to be in range 0 to 1361
    if(torch.min(vehicle_ids).item() == 1):
        vehicle_ids = vehicle_ids - 1
    
    return (data, vehicle_ids)

In [5]:
# Load train data and labels from train directory
train_directory = "vehicle-x/train"
test_directory = "vehicle-x/test"

train_data, vehicle_ids_train = load_data(train_directory)
test_data, vehicle_ids_test = load_data(test_directory)

loaded 3000
loaded 3000


In [6]:
# Hyperparameters
num_epochs = 100
batch_size = 32
test_batch_size = 64
learning_rate = 0.01

class CustomDataset():
    def __init__(self, data_array, labels):
        self.data = data_array
        self.labels = labels

    def __len__(self):
        return len(self.data)

    def __getitem__(self, idx):
        return self.data[idx], self.labels[idx]

# Create custom dataset from the tensors
train_dataset = CustomDataset(train_data, vehicle_ids_train)
train_loader = torch.utils.data.DataLoader(train_dataset, batch_size=batch_size, shuffle=True)

test_dataset = CustomDataset(test_data, vehicle_ids_test)
test_loader = torch.utils.data.DataLoader(test_dataset, batch_size=test_batch_size, shuffle=True)

In [7]:
# Define the constructive cascade network
class CascadeLayer(nn.Module):
    def __init__(self, input_size, output_size):
        super(CascadeLayer, self).__init__()
        self.fc = nn.Linear(input_size, output_size)

    def forward(self, x):
        return torch.tanh(self.fc(x))

class CascadeNN(nn.Module):
    def __init__(self, input_size, output_size, initial_hidden_size=128):
        super(CascadeNN, self).__init__()
        self.cascade_hidden_size = 64
        self.input_size = input_size
        self.output_size = output_size
        self.layers = nn.ModuleList()

        # Initial layer
        self.layers.append(CascadeLayer(input_size, initial_hidden_size))

        # Output layer
        self.output_layer = nn.Linear(initial_hidden_size, output_size)

    def add_cascade_layer(self):
        # The new layer will take input from the last cascade layer and the original input
        new_input_size = self.layers[-1].fc.out_features + self.input_size
        new_layer = CascadeLayer(new_input_size, self.cascade_hidden_size)
        self.layers.append(new_layer)

        # Adjust the output layer to accomodate the new cascade layer
        self.output_layer = nn.Linear(self.output_layer.in_features + self.cascade_hidden_size, self.output_size)

    def forward(self, x):
        outputs = [x]

        for layer in self.layers:
            combined_input = torch.cat(outputs, dim=1)
            layer_output = layer(combined_input)
            outputs.append(layer_output)

        final_combined_input = torch.cat(outputs, dim=1)
        logits = self.output_layer(final_combined_input)
        return nn.functional.softmax(logits, dim=1)


# Device Configuration
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

# Initialise network
model = CascadeNN(2048, 1362).to(device)

In [8]:
def train():
    model.train()
    correct = 0
    for batch_idx, (batch_data, batch_labels) in enumerate(train_loader):
        batch_data = batch_data.to(device)
        batch_labels = batch_labels.to(device)
        
        optimizer.zero_grad()
        outputs = model(batch_data)

        pred = outputs.data.max(1, keepdim=True)[1] # get the index of the max log-probability
        correct += pred.eq(batch_labels.data.view_as(pred)).long().cpu().sum()
        
        loss = criterion(outputs, batch_labels)
        loss.backward()
        optimizer.step()

        if (batch_idx + 1) % 500 == 0:
            print(f'Epoch [{epoch + 1}/{num_epochs}], step[{batch_idx + 1}], loss: {loss.item():.4f}')

    print('Train Accuracy:{}/{}'.format(correct, len(train_loader.dataset)))

In [9]:
def test():
    model.eval()
    correct = 0

    for batch_data, batch_labels in test_loader:
        batch_data = batch_data.to(device)
        batch_labels = batch_labels.to(device)
        
        outputs = model(batch_data)
        
        pred = outputs.data.max(1, keepdim=True)[1] # get the index of the max log-probability
        correct += pred.eq(batch_labels.data.view_as(pred)).long().cpu().sum()

    print('Test accuracy:{}/{}\n'.format(correct, len(test_loader.dataset)))

In [10]:
# Train the model
criterion = nn.NLLLoss()
optimizer = optim.SGD(model.parameters(), lr=learning_rate)

for epoch in range(num_epochs):
    train()
    test()

RuntimeError: mat1 and mat2 shapes cannot be multiplied (32x2176 and 128x1362)