In [None]:
import os
import numpy as np
import torch
from torch.utils.data import Dataset, DataLoader
import torch.nn as nn
import torch.optim as optim
from sklearn.metrics import accuracy_score, f1_score
import pandas as pd

# Define the Dataset class
class NpyDataset(Dataset):
    def __init__(self, root_dir, sequence_length=300, test_mode=False, ignore_empty=True):
        self.root_dir = root_dir
        self.sequence_length = sequence_length
        self.test_mode = test_mode
        self.ignore_empty = ignore_empty
        self.files = []

        if self.test_mode:
            for file_name in os.listdir(root_dir):
                if file_name.endswith('.npy'):
                    self.files.append((os.path.join(root_dir, file_name), -1))
        else:
            self.classes = sorted(os.listdir(root_dir))
            for class_idx, class_name in enumerate(self.classes):
                class_folder = os.path.join(root_dir, class_name)
                for file_name in os.listdir(class_folder):
                    if file_name.endswith('.npy'):
                        self.files.append((os.path.join(class_folder, file_name), class_idx))

        # Filter out empty feature files if ignore_empty is True
        if self.ignore_empty:
            self.files = [file for file in self.files if self.check_file(file[0])]

    def __len__(self):
        return len(self.files)

    def __getitem__(self, idx):
        file_path, class_idx = self.files[idx]
        data = np.load(file_path, allow_pickle=True).item()
        features = data['resnet']

        if len(features.shape) == 1:
            features = np.expand_dims(features, axis=0)

        if len(features) < self.sequence_length:
            pad_length = self.sequence_length - len(features)
            features = np.pad(features, ((0, pad_length), (0, 0)), mode='constant', constant_values=0)
        elif len(features) > self.sequence_length:
            features = features[:self.sequence_length]

        features = torch.tensor(features, dtype=torch.float32)
        label = class_idx

        return features, label

    def check_file(self, file_path):
        data = np.load(file_path, allow_pickle=True).item()
        features = data['resnet']
        return features.shape[0] > 0

# Set up directories
train_dir = 'dataset_files/resnet_training_features/ResNet_Training_Features/frontal_view'
val_dir = 'dataset_files/ResNet_val_features/ResNet_val_features/frontal_view'
test_dir = 'dataset_files/ResNet_test_features/frontal_view'

# Hyperparameters
batch_size = 32
sequence_length = 300

# Create datasets and dataloaders
train_dataset = NpyDataset(train_dir, sequence_length)
val_dataset = NpyDataset(val_dir, sequence_length)
test_dataset = NpyDataset(test_dir, sequence_length, test_mode=True)

def collate_fn(batch):
    features, labels = zip(*batch)
    max_length = max(feature.size(0) for feature in features)
    padded_features = []

    for feature in features:
        length = feature.size(0)
        if length < max_length:
            pad_length = max_length - length
            padded_feature = torch.cat([feature, torch.zeros(pad_length, feature.size(1))], dim=0)
        else:
            padded_feature = feature[:max_length]
        padded_features.append(padded_feature)

    features_tensor = torch.stack(padded_features)
    labels_tensor = torch.tensor(labels, dtype=torch.long)

    return features_tensor, labels_tensor

train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True, collate_fn=collate_fn)
val_loader = DataLoader(val_dataset, batch_size=batch_size, shuffle=False, collate_fn=collate_fn)
test_loader = DataLoader(test_dataset, batch_size=batch_size, shuffle=False, collate_fn=collate_fn)

# Define the model
class SimpleNN(nn.Module):
    def __init__(self):
        super(SimpleNN, self).__init__()
        self.fc1 = nn.Linear(2048, 512)
        self.fc2 = nn.Linear(512, 6)

    def forward(self, x):
        x = x.mean(dim=1)
        x = torch.relu(self.fc1(x))
        x = self.fc2(x)
        return x

model = SimpleNN()

# Define loss function and optimizer
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=0.001)

# Define the training function
def train_model(model, criterion, optimizer, train_loader, val_loader, num_epochs=10, save_path='best_model.pth'):
    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
    model.to(device)

    best_val_f1 = 0.0

    for epoch in range(num_epochs):
        model.train()
        running_loss = 0.0
        correct = 0
        total = 0

        for inputs, labels in train_loader:
            inputs, labels = inputs.to(device), labels.to(device)
            optimizer.zero_grad()

            outputs = model(inputs)
            loss = criterion(outputs, labels)
            loss.backward()
            optimizer.step()

            running_loss += loss.item() * inputs.size(0)
            _, predicted = torch.max(outputs.data, 1)
            total += labels.size(0)
            correct += (predicted == labels).sum().item()

        epoch_loss = running_loss / len(train_loader.dataset)
        epoch_acc = correct / total
        print(f"Epoch {epoch+1}/{num_epochs}, Loss: {epoch_loss:.4f}, Accuracy: {epoch_acc:.4f}")

        # Validation
        model.eval()
        val_loss = 0.0
        val_correct = 0
        val_total = 0
        val_labels = []
        val_preds = []

        with torch.no_grad():
            for inputs, labels in val_loader:
                inputs, labels = inputs.to(device), labels.to(device)

                outputs = model(inputs)
                loss = criterion(outputs, labels)

                val_loss += loss.item() * inputs.size(0)
                _, predicted = torch.max(outputs.data, 1)
                val_total += labels.size(0)
                val_correct += (predicted == labels).sum().item()

                val_labels.extend(labels.cpu().numpy())
                val_preds.extend(predicted.cpu().numpy())

        val_epoch_loss = val_loss / len(val_loader.dataset)
        val_epoch_acc = val_correct / val_total
        val_f1 = f1_score(val_labels, val_preds, average='weighted')
        print(f"Validation Loss: {val_epoch_loss:.4f}, Validation Accuracy: {val_epoch_acc:.4f}, Validation F1: {val_f1:.4f}")

        # Save best model
        if val_f1 > best_val_f1:
            best_val_f1 = val_f1
            torch.save(model.state_dict(), save_path)

    print(f"Best Validation F1: {best_val_f1:.4f}")

# Train the model
train_model(model, criterion, optimizer, train_loader, val_loader, num_epochs=50)

# Load the best model for testing
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model.load_state_dict(torch.load('best_model.pth'))
model.to(device)

# Testing and generate results CSV
model.eval()
test_preds = []
test_files = []

with torch.no_grad():
    for inputs, _ in test_loader:
        inputs = inputs.to(device)

        outputs = model(inputs)
        _, predicted = torch.max(outputs.data, 1)

        test_preds.extend(predicted.cpu().numpy())
        test_files.extend(test_loader.dataset.files)

# Create a DataFrame for the results
results = []
for (file_path, _), pred in zip(test_files, test_preds):
    file_name = os.path.basename(file_path)
    one_hot_pred = [0] * 6
    one_hot_pred[pred] = 1
    results.append([file_name] + one_hot_pred)

df = pd.DataFrame(results, columns=['video_name', 'left_turn', 'right_turn', 'lane_change_left', 'lane_change_right', 'straight', 'slow_stop'])
df.to_csv('task1_test_result_format.csv', index=False)