In [None]:
import torch
import torch.nn as nn
import torch.nn.functional as F
from torch.utils.data import DataLoader, TensorDataset

import numpy as np
from sklearn.model_selection import train_test_split
from sklearn.metrics import classification_report, balanced_accuracy_score
from sklearn.utils import class_weight
import math
import gc
import os
import pickle
import random
from typing import List, Tuple
import tqdm
from sklearn.model_selection import train_test_split
import torch.optim as optim
import matplotlib.pyplot as plt
from IPython.display import clear_output
from res_pred_utils import get_smooth_egovel

os.chdir("/root/shared/Anytime-Lidar/tools")

In [None]:
from nuscenes import NuScenes

dataset_version = 'v1.0-trainval'
root_path = "../data/nuscenes/" + dataset_version
nusc = NuScenes(version=dataset_version, dataroot=root_path, verbose=True)

In [None]:
# CONSTANTS
do_classification = True

with open('resolution_dataset.pkl', 'rb') as f:
    io_dict = pickle.load(f)
    print(io_dict['fields'])
    io_tuples = io_dict['data']

# Each tuple has: 'coords', 'features', 'resolution', 'sample_tkn'
# remove duplicates first
scores = [tuple(io_tpl[1][:, 6].ravel()) for io_tpl in io_tuples]
mask = np.ones(len(scores), dtype=bool)
scores_set = set()
for i, scr in enumerate(scores):
    if scr in scores_set:
        mask[i] = False
    else:
        scores_set.add(scr)

io_tuples = [io_tpl for m, io_tpl in zip(mask, io_tuples) if m]
print('Number of samples in dataset after removing duplicates:', len(io_tuples))

labels = [torch.tensor(io_tpl[1], dtype=torch.int)[:, -1] for io_tpl in io_tuples]
label_dists = torch.stack([torch.bincount(l - 1, minlength=10) for l in labels])
num_labels_normalizer = 100 #max(100, label_dists.max())
label_dists = label_dists.float() / num_labels_normalizer
print('label_dists', label_dists.size())

sample_tokens = [io_tpl[3] for io_tpl in io_tuples]
egovels = torch.tensor([np.linalg.norm(get_smooth_egovel(nusc, sample_tkn)[1][:2]) \
           for sample_tkn in sample_tokens]).float()
# mask = torch.logical_not(torch.isnan(egovels))
egovels[torch.isnan(egovels)] = 0.
egovels /= 15.0 # normalize
# print('egovels', egovels.size(), egovels.min(), egovels.max())
# print(egovels[:10])

inp_data = torch.cat((label_dists, egovels.unsqueeze(-1)), dim=1)
print('Inputs dataset:', inp_data.size())
print(inp_data[:10])

ap_scores = torch.tensor([io_tpl[2] for io_tpl in io_tuples], dtype=torch.float)

if do_classification:
    outp_data = torch.argmax(ap_scores, dim=1)
else:
    mins = ap_scores.min(1).values.unsqueeze(-1).repeat(1,5)
    maxs = ap_scores.max(1).values.unsqueeze(-1).repeat(1,5)
    outp_data = (ap_scores - mins) / (maxs-mins)
print('outp_data', outp_data.size())
print(outp_data[:10])

In [None]:
# Define the neural network architecture
class SimpleNN(nn.Module):
    def __init__(self):
        super(SimpleNN, self).__init__()
        self.layers = nn.Sequential(
            nn.Linear(11, 256),
            nn.BatchNorm1d(256),
            nn.ReLU(),
            # nn.Dropout(p=0.5),
            nn.Linear(256, 128),
            nn.BatchNorm1d(128),
            nn.ReLU(),
            nn.Linear(128, 64),
            nn.BatchNorm1d(64),
            nn.ReLU(),
            nn.Linear(64, 5))
            
    def forward(self, x):
        return self.layers(x)
# Initialize the model, loss function, and optimizer
model = SimpleNN().cuda()
model.train()
if do_classification:
    criterion = nn.CrossEntropyLoss()
else:
    criterion = nn.MSELoss()  # Using MSE as we have continuous values
optimizer = optim.Adam(model.parameters(), lr=1e-4)
scheduler = torch.optim.lr_scheduler.ReduceLROnPlateau(optimizer, mode='min', factor=0.5, patience=5)

dataset = TensorDataset(inp_data, outp_data)
dataloader = DataLoader(dataset, batch_size=512, shuffle=True)

epochs = 500
for epoch in range(epochs):
    running_loss = 0.0
    for batch_inputs, batch_outputs in dataloader:
        # Zero the gradients
        optimizer.zero_grad()
        
        # Forward pass
        predictions = model(batch_inputs.cuda())
        loss = criterion(predictions, batch_outputs.cuda())
        
        # Backward pass and optimization
        loss.backward()
        optimizer.step()
        
        running_loss+=loss.item()
    
    # Print loss every 10 epochs
    scheduler.step(running_loss)
    if (epoch+1) % 10 == 0:
        print(f"Epoch [{epoch+1}/{epochs}], Loss: {loss.item():.4f}")


In [None]:
model.eval()
with torch.no_grad():
    outputs = model(inp_data.cuda())
    _, predictions = torch.max(outputs, 1)
    print((predictions == outp_data.cuda()).sum() / outp_data.size(0))

In [None]:
#ONNX export
model.eval()
input_names=['objcount_and_egovel']
print('Input shape:', inp_data[:1, :].size())
# dynamic_axes = {
#     "objcount_and_egovel": {
#         0: "batch"
#     }
# }

torch.onnx.export(
        model,
        inp_data[:1, :].cuda(),
        'resolution_pred_mdl.onnx',
        input_names=input_names,
        output_names=["res_scores"],
#         dynamic_axes=dynamic_axes,
        opset_version=17,
#         custom_opsets={"cuda_slicer": 17},
)
print('Done!')