In [None]:
!wget --no-check https://mobility.iiit.ac.in/dataset/rip_test_set/VGG16_test_features.zip
!unzip VGG16_test_features.zip

In [2]:
import os
import glob
import numpy as np

def check_files(rootdir):
    views = ['frontal_view/VGG16_features', 'left_view', 'right_view']
    file_shapes = {view: [] for view in views}

    for view in views:
        file_paths = glob.glob(os.path.join(rootdir, view, '*.npy'))
        for file_path in file_paths:
            try:
                data = np.load(file_path)
                file_shapes[view].append((file_path, data.shape))
            except Exception as e:
                print(f"Error loading {file_path}: {e}")

    return file_shapes

In [3]:
# !ls /content/VGG16_test_features/frontal_view/VGG16_features | wc -l
# There were some corrupt / incorrect files. Since our dataloader was crashing, we had to find them manually.
!rm /kaggle/working/VGG16_test_features/frontal_view/VGG16_features/09a30412-fd64-4ba0-8d5d-8fa5bd68b9e0.npy
!rm /kaggle/working/VGG16_test_features/left_view/09a30412-fd64-4ba0-8d5d-8fa5bd68b9e0.npy
!rm /kaggle/working/VGG16_test_features/right_view/09a30412-fd64-4ba0-8d5d-8fa5bd68b9e0.npy

In [4]:
# Path to your dataset
# rootdir = '/kaggle/input/riptide-test-new/VGG16_test_features'
rootdir = "/kaggle/working/VGG16_test_features"

file_shapes = check_files(rootdir)

# Print the shapes of the files
for view, shapes in file_shapes.items():
    print(f"\nView: {view}")
    for file_path, shape in shapes:
        print(f"File: {file_path}, Shape: {shape}")

# Save the shapes to a text file for further analysis if needed
with open('file_shapes.txt', 'w') as f:
    for view, shapes in file_shapes.items():
        f.write(f"\nView: {view}\n")
        for file_path, shape in shapes:
            f.write(f"File: {file_path}, Shape: {shape}\n")


View: frontal_view/VGG16_features
File: /kaggle/working/VGG16_test_features/frontal_view/VGG16_features/bd0e6ef3-0ee3-42d2-ba89-87e27f31da87.npy, Shape: (360, 1, 512)
File: /kaggle/working/VGG16_test_features/frontal_view/VGG16_features/b8cf26e4-7329-4cfa-aca7-00bd40975ff0.npy, Shape: (180, 1, 512)
File: /kaggle/working/VGG16_test_features/frontal_view/VGG16_features/60ad70f9-ecde-4f47-87ec-41bbc1221d94.npy, Shape: (172, 1, 512)
File: /kaggle/working/VGG16_test_features/frontal_view/VGG16_features/7d43bedb-13c3-4b9d-9050-f1652b110a1e.npy, Shape: (180, 1, 512)
File: /kaggle/working/VGG16_test_features/frontal_view/VGG16_features/dec19835-0fb4-4887-8178-19df3b497153.npy, Shape: (254, 1, 512)
File: /kaggle/working/VGG16_test_features/frontal_view/VGG16_features/ab0832d7-1a2b-43a6-bec7-10cb25ca7983.npy, Shape: (240, 1, 512)
File: /kaggle/working/VGG16_test_features/frontal_view/VGG16_features/66e56d2d-d4c4-42d2-b5b7-eb141fbd097f.npy, Shape: (156, 1, 512)
File: /kaggle/working/VGG16_test_f

In [None]:
def custom_accuracy(predictions, targets):
    """
    Compute the accuracy based on the provided formula.
    Args:
        predictions (Tensor): The predicted labels.
        targets (Tensor): The ground truth labels.
    Returns:
        float: The accuracy.
    """
    correct_predictions = (predictions == targets).sum().item()
    accuracy = correct_predictions / targets.size(0)
    return accuracy

In [None]:
def custom_f1_score(predictions, labels):
    tp = 0  # (true positive) correct prediction of the maneuver in a video
    fp = 0  # (false predictions) prediction is different than the actual performed maneuver
    fpp = 0 # (false positive prediction) a maneuver-action predicted, but the driver is driving straight
    mp = 0  # (missed prediction) a driving-straight predicted, but a maneuver is performed

    for i in range(len(predictions)):
        if predictions[i] == labels[i]:
            tp += 1
        else:
            fp += 1

            # a maneuver-action predicted, but the driver is driving straight
            if predictions[i] == 1:
                mp += 1

            # a driving-straight predicted, but a maneuver is performed
            if labels[i] == 1:
                fpp += 1

    if tp + fp + fpp == 0:
        precision = 0
        return 0
    else:
        precision = tp / (tp + fp + fpp)

    if tp + fp + mp == 0:
        recall = 0
        return 0
    else:
        recall = tp / (tp + fp + mp)

    if precision + recall == 0:
        f1 = 0
    else:
        f1 = 2 * precision * recall / (precision + recall)

    return f1

In [5]:
import os
import glob
import numpy as np
import torch
from torch.utils.data import Dataset, DataLoader

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

class TestDataset(Dataset):
    def __init__(self, rootdir, seq_len=400, normalize_len=True, merge_views=False):
        self.rootdir = rootdir
        self.seq_len = seq_len
        self.normalize_len = normalize_len
        self.merge_views = merge_views
        self.views = ['frontal_view/VGG16_features', 'left_view', 'right_view']

        self.file_paths = {view: glob.glob(os.path.join(rootdir, view, '*.npy')) for view in self.views}

        if merge_views:
            num_files = len(self.file_paths['frontal_view/VGG16_features'])
            assert all(len(self.file_paths[view]) == num_files for view in self.views), "All views must have the same number of files."
            self.all_paths = list(zip(self.file_paths['frontal_view/VGG16_features'], self.file_paths['left_view'], self.file_paths['right_view']))
        else:
            self.all_paths = self.file_paths['frontal_view/VGG16_features']

    def __len__(self):
        return len(self.all_paths)

    def resize_sequence(self, features):
        if features.shape[0] < self.seq_len:
            diff = self.seq_len - features.shape[0]
            pad = np.zeros((diff, features.shape[1], features.shape[2]))  # Ensure pad has the same dimensions as features
            features = np.concatenate((features, pad), axis=0)
        else:
            mid_point = features.shape[0] // 2
            window_size = self.seq_len // 2
            features = features[int(mid_point - window_size): int(mid_point + window_size), :, :]
        return features

    def __getitem__(self, idx):
        if self.merge_views:
            paths = self.all_paths[idx]
            features_list = [np.load(path) for path in paths]
            filenames = [os.path.basename(path) for path in paths]
            if self.normalize_len:
                features_list = [self.resize_sequence(features) for features in features_list]
            features = np.stack(features_list, axis=2)  # shape: (seq_len, feature_dim, num_views)
            features = features.squeeze(1)  # Remove the singleton dimension
            features = torch.from_numpy(features).float()
            return {"features": features, "filenames": filenames}
        else:
            path = self.all_paths[idx]
            features = np.load(path)
            filename = os.path.basename(path)
            if self.normalize_len:
                features = self.resize_sequence(features)
            features = torch.from_numpy(features).float()
            return {"features": features, "filenames": [filename]}

def test_collate_fn(batch):
    features_list = [res["features"] for res in batch]
    filenames_list = [res["filenames"] for res in batch]

    features = torch.stack(features_list, dim=0)
    filenames = [filename for sublist in filenames_list for filename in sublist]
    return {
        "features": features,
        "filenames": filenames
    }

In [6]:
# Creating the Test DataLoaders
# rootdir = '/kaggle/input/riptide-test-new/VGG16_test_features'
rootdir = "/kaggle/working/VGG16_test_features"

# Test dataset and loader for multi-view (merged views)
test_dataset_multi = TestDataset(rootdir=rootdir, merge_views=True)
test_loader_multi = DataLoader(test_dataset_multi, batch_size=32, shuffle=False, collate_fn=test_collate_fn)

# Test dataset and loader for single-view (frontal view)
test_dataset_frontal = TestDataset(rootdir=rootdir, merge_views=False)
test_loader_frontal = DataLoader(test_dataset_frontal, batch_size=32, shuffle=False, collate_fn=test_collate_fn)

# Testing the DataLoaders
for i in test_loader_multi:
    print(i.keys())
    print(i['features'].shape)


for i in test_loader_frontal:
    print(i.keys())
    print(i['features'].shape)
    

dict_keys(['features', 'filenames'])
torch.Size([32, 400, 3, 512])
dict_keys(['features', 'filenames'])
torch.Size([32, 400, 3, 512])
dict_keys(['features', 'filenames'])
torch.Size([32, 400, 3, 512])
dict_keys(['features', 'filenames'])
torch.Size([32, 400, 3, 512])
dict_keys(['features', 'filenames'])
torch.Size([32, 400, 3, 512])
dict_keys(['features', 'filenames'])
torch.Size([32, 400, 3, 512])
dict_keys(['features', 'filenames'])
torch.Size([32, 400, 3, 512])
dict_keys(['features', 'filenames'])
torch.Size([32, 400, 3, 512])
dict_keys(['features', 'filenames'])
torch.Size([32, 400, 3, 512])
dict_keys(['features', 'filenames'])
torch.Size([11, 400, 3, 512])
dict_keys(['features', 'filenames'])
torch.Size([32, 400, 1, 512])
dict_keys(['features', 'filenames'])
torch.Size([32, 400, 1, 512])
dict_keys(['features', 'filenames'])
torch.Size([32, 400, 1, 512])
dict_keys(['features', 'filenames'])
torch.Size([32, 400, 1, 512])
dict_keys(['features', 'filenames'])
torch.Size([32, 400, 1, 5

In [7]:
import torch.optim as optim
import wandb
import torch.nn as nn

class Model(nn.Module):
    def __init__(self, num_classes=6):
        super(Model, self).__init__()
        self.num_classes = num_classes

    def forward(self, x):
        return x

    def fit(self, train_loader, valid_loader, epochs, lr, weight_decay, early_stopping_patience, is_wandb=False, device='cuda', use_scheduler=False, save_name="model"):
        optimizer = optim.Adam(self.parameters(), lr=lr, weight_decay=weight_decay)
        criterion = nn.CrossEntropyLoss()
        scheduler = None

        if use_scheduler:
            scheduler = optim.lr_scheduler.StepLR(optimizer, step_size=50, gamma=0.1)

        best_val_acc = 0
        patience_counter = 0

        for epoch in range(epochs):
            self.train()
            for batch_idx, data in enumerate(train_loader):
                features = data["features"]
                label = data["labels"]

                features = features.to(device)
                label = label.to(device)

                optimizer.zero_grad()
                output = self(features)
                loss = criterion(output, label)
                loss.backward()

                torch.nn.utils.clip_grad_norm_(self.parameters(), max_norm=1.0)

                optimizer.step()

                predictions = torch.argmax(output, dim=1)
                train_acc = custom_accuracy(predictions, label)
                train_f1 = custom_f1_score(predictions, label)

                if batch_idx % 100 == 0:
                    print(f"Epoch: {epoch+1} | Batch: {batch_idx} | CrossEntropyLoss: {loss.item():.3f} | Accuracy: {train_acc:.3f} | F1Score: {train_f1:.3f}")

            if scheduler:
                scheduler.step()

            self.eval()
            with torch.no_grad():
                val_f1_scores = []
                val_acc_scores = []
                for val_idx, val_data in enumerate(valid_loader):
                    val_features = val_data["features"]
                    val_label = val_data["labels"]

                    val_features = val_features.to(device)
                    val_label = val_label.to(device)

                    val_output = self(val_features)

                    val_predictions = torch.argmax(val_output, dim=1)
                    val_acc = custom_accuracy(val_predictions, val_label)
                    val_f1 = custom_f1_score(val_predictions, val_label)

                    val_f1_scores.append(val_f1)
                    val_acc_scores.append(val_acc)

                avg_val_f1_score = sum(val_f1_scores) / len(val_f1_scores)
                avg_val_acc_score = sum(val_acc_scores) / len(val_acc_scores)

                print(f"Validation -> Epoch: {epoch} | Accuracy: {avg_val_acc_score:.3f} | F1Score: {avg_val_f1_score:.3f}")

                # Early stopping check
                if avg_val_acc_score > best_val_acc:
                    best_val_acc = avg_val_acc_score
                    torch.save(self.state_dict(), f"{save_name}_best.pt")
                    print(f"Model saved with best validation accuracy: {best_val_acc:.3f} at epoch {epoch}")
                    patience_counter = 0  # Reset patience counter
                else:
                    patience_counter += 1

                if patience_counter >= early_stopping_patience:
                    print("Early stopping triggered")
                    break

            if epoch == epochs - 1:
                torch.save(self.state_dict(), f"{save_name}_last_epoch.pt")
                print(f"Model saved at the last epoch {epoch}")

In [8]:
class CNN_LSTM_MULTI(Model):
    def __init__(self, num_classes=6, lstm_layers=2, hidden_size=64, dropout_rate=0.25):
        super(CNN_LSTM_MULTI, self).__init__(num_classes)
        self.hidden_size = hidden_size
        self.lstm_layers = lstm_layers

        self.conv1 = nn.Conv1d(in_channels=512, out_channels=64, kernel_size=3, padding=1)
        self.bn1 = nn.BatchNorm1d(64)
        self.relu = nn.LeakyReLU()
        self.dropout = nn.Dropout(dropout_rate)

        self.lstm = nn.LSTM(64 * 3, hidden_size, batch_first=True, num_layers=lstm_layers, bidirectional=True, dropout=dropout_rate)
        self.fc = nn.Linear(hidden_size * 2, self.num_classes)
        self.softmax = nn.Softmax(dim=1)

    def forward(self, x):
        batch_size, seq_len, num_views, feature_dim = x.size()
        # x: [32, 230, 3, 512]

        # Process each view separately
        features = []
        for view in range(num_views):
            view_x = x[:, :, view, :]
            view_x = view_x.transpose(1, 2)  # Transpose to [batch_size, feature_dim, seq_len]
            view_x = self.conv1(view_x)
            view_x = self.bn1(view_x)
            view_x = self.relu(view_x)
            view_x = self.dropout(view_x)
            view_x = view_x.transpose(1, 2)  # Transpose back to [batch_size, seq_len, feature_dim]
            features.append(view_x)

        # Concatenate features from all views
        x = torch.cat(features, dim=2)
        # x: [32, 230, 64 * 3]

        self.lstm.flatten_parameters()
        lstm_out, _ = self.lstm(x)
        # LSTM processing, output shape: [32, 230, hidden_size * 2]

        lstm_out = lstm_out[:, -1, :]
        # Get the output from the last time step, output shape: [32, hidden_size * 2]

        output = self.fc(lstm_out)
        # Fully connected layer, output shape: [32, num_classes]

        output = self.softmax(output)
        # Softmax, output shape: [32, num_classes]

        return output

In [9]:
class CNN_LSTM_FRONTAL(Model):
    def __init__(self, num_classes=6, lstm_layers=2, hidden_size=64, dropout_rate=0.25):
        super(CNN_LSTM_FRONTAL, self).__init__(num_classes)
        self.hidden_size = hidden_size
        self.lstm_layers = lstm_layers

        self.conv1 = nn.Conv1d(in_channels=512, out_channels=64, kernel_size=3, padding=1)
        self.bn1 = nn.BatchNorm1d(64)
        self.relu = nn.LeakyReLU()
        self.dropout = nn.Dropout(dropout_rate)

        self.lstm = nn.LSTM(64, hidden_size, batch_first=True, num_layers=lstm_layers, bidirectional=True, dropout=dropout_rate)
        self.fc = nn.Linear(hidden_size * 2, self.num_classes)
        self.softmax = nn.Softmax(dim=1)

    def forward(self, x):
        batch_size, seq_len, num_views, feature_dim = x.size()
        # x: [32, 230, 1, 512]

        x = x.squeeze(2)  # Remove the dimension of size 1
        # x: [32, 230, 512]

        x = x.transpose(1, 2)  # Transpose to [batch_size, feature_dim, seq_len]
        # x: [32, 512, 230]

        x = self.conv1(x)
        x = self.bn1(x)
        x = self.relu(x)
        x = self.dropout(x)
        # x: [32, 64, 230]

        x = x.transpose(1, 2)  # Transpose back to [batch_size, seq_len, feature_dim]
        # x: [32, 230, 64]

        self.lstm.flatten_parameters()
        lstm_out, _ = self.lstm(x)
        # LSTM processing, output shape: [32, 230, hidden_size * 2]

        lstm_out = lstm_out[:, -1, :]
        # Get the output from the last time step, output shape: [32, hidden_size * 2]

        output = self.fc(lstm_out)
        # Fully connected layer, output shape: [32, num_classes]

        output = self.softmax(output)
        # Softmax, output shape: [32, num_classes]

        return output

In [12]:
import pandas as pd
import torch

In [13]:
def run_inference_and_save_results(model, dataloader, output_csv_path):
    model.eval()
    results = []

    with torch.no_grad():
        for data in dataloader:
            features = data["features"]
            filenames = data["filenames"]

            features = features.to(device)
            outputs = model(features)

            predictions = torch.argmax(outputs, dim=1).cpu().numpy()

            for filename, prediction in zip(filenames, predictions):
                result = [filename] + [1 if i == prediction else 0 for i in range(6)]
                results.append(result)

    # Convert results to DataFrame
    columns = ['filename', 'Right Lane Change', 'Straight', 'Left Turn', 'Slow-Stop', 'Right Turn', 'Left Lane Change']
    results_df = pd.DataFrame(results, columns=columns)

    # Save to CSV
    results_df.to_csv(output_csv_path, index=False)

In [16]:
# Load the models
model_path_multi = '/kaggle/input/icpr-multi-25th-july/pytorch/default/1/cnn_lstm_multi_model_final_lr_0.001_dropout_0.25 (1) 64 66.pt'
model_path_frontal = '/kaggle/input/icpr-frontal-25th-july/pytorch/default/1/cnn_lstm_frontal_model_final_lr_0.001_dropout_0.25 68 71.pt'

device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

model_multi = CNN_LSTM_MULTI(num_classes=6, lstm_layers=2, hidden_size=128, dropout_rate=0.25)
model_multi.load_state_dict(torch.load(model_path_multi, map_location=device))
model_multi.to(device)

CNN_LSTM_MULTI(
  (conv1): Conv1d(512, 64, kernel_size=(3,), stride=(1,), padding=(1,))
  (bn1): BatchNorm1d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
  (relu): LeakyReLU(negative_slope=0.01)
  (dropout): Dropout(p=0.25, inplace=False)
  (lstm): LSTM(192, 128, num_layers=2, batch_first=True, dropout=0.25, bidirectional=True)
  (fc): Linear(in_features=256, out_features=6, bias=True)
  (softmax): Softmax(dim=1)
)

In [17]:
model_frontal = CNN_LSTM_FRONTAL(num_classes=6, lstm_layers=2, hidden_size=128, dropout_rate=0.25)
model_frontal.load_state_dict(torch.load(model_path_frontal, map_location=device))
model_frontal.to(device)

CNN_LSTM_FRONTAL(
  (conv1): Conv1d(512, 64, kernel_size=(3,), stride=(1,), padding=(1,))
  (bn1): BatchNorm1d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
  (relu): LeakyReLU(negative_slope=0.01)
  (dropout): Dropout(p=0.25, inplace=False)
  (lstm): LSTM(64, 128, num_layers=2, batch_first=True, dropout=0.25, bidirectional=True)
  (fc): Linear(in_features=256, out_features=6, bias=True)
  (softmax): Softmax(dim=1)
)

In [18]:
# Paths for saving results
output_csv_path_multi = '/kaggle/working/task2_test_result.csv'
output_csv_path_frontal = '/kaggle/working/task1_test_result.csv'

In [19]:
# Run inference and save results for multi-view
run_inference_and_save_results(model_multi, test_loader_multi, output_csv_path_multi)

# Run inference and save results for single-view (frontal)
run_inference_and_save_results(model_frontal, test_loader_frontal, output_csv_path_frontal)

print("Inference completed and results saved.")

Inference completed and results saved.
