In [None]:
from google.colab import drive
drive.mount('/content/drive')

COMMONROAD_GEOMETRIC_PATH = "/content/drive/MyDrive/1 Universität/5. Semester/Autonomous Driving/code/"
REPO_PATH = "/content/drive/MyDrive/1 Universität/5. Semester/Autonomous Driving/code/project"

%matplotlib inline

! /opt/bin/nvidia-smi

# Setup

## CommonRoad Geometric

In [None]:
import torch

# environment setup
def format_pytorch_version(version):
  return version.split('+')[0]

TORCH_version = torch.__version__
TORCH = format_pytorch_version(TORCH_version)

def format_cuda_version(version):
  return 'cu' + version.replace('.', '')

CUDA_version = torch.version.cuda
CUDA = format_cuda_version(CUDA_version)

%cd {COMMONROAD_GEOMETRIC_PATH}

# install dependencies
!pip install torch-scatter     -f https://pytorch-geometric.com/whl/torch-{TORCH}+{CUDA}.html
!pip install torch-sparse      -f https://pytorch-geometric.com/whl/torch-{TORCH}+{CUDA}.html
!pip install torch-cluster     -f https://pytorch-geometric.com/whl/torch-{TORCH}+{CUDA}.html
!pip install torch-spline-conv -f https://pytorch-geometric.com/whl/torch-{TORCH}+{CUDA}.html
!pip install torch-geometric 

In [None]:
# commonroad-io installation
!pip install commonroad-io

In [None]:
# install commonroad geometric
!pip install -e ./commonroad-geometric-io/

## Imports

In [None]:
%cd {REPO_PATH}

In [None]:
import os
from commonroad_geometric_io.common.io import list_files
from torch_geometric.loader import DataLoader
from torch_geometric.data import Data

# Data Loader

Create train loader and test loader by torch_geometric.loader 

In [None]:
data_dir = 'dataset'
dataset_paths = list_files(data_dir, file_type='pth', join_paths=True)

def create_dataloader(dataset_path: str, mb_size: int) -> DataLoader:
            original_loader = torch.load(dataset_path)
            loader = DataLoader(
                original_loader,
                batch_size=mb_size,
                shuffle=True
            )
            return loader

train_loader = create_dataloader(dataset_paths[-1], 64)
test_loader = create_dataloader(dataset_paths[1], 64)
#32 ,16 ,8
print('length_of_training_data', len(train_loader.dataset))
print('length_of_test_data', len(test_loader.dataset))


ModuleNotFoundError: ignored

# Model Architechture

# Common utils

Some utils used in our task

In [None]:
import pickle

"""
storage and load pickle file
"""
def save_variable(v, filename):
    f = open(filename, 'wb')
    pickle.dump(v,f)
    f.close()
    return filename

def load_variable(filename):
    f = open(filename, 'rb')
    r = pickle.load(f)
    f.close()
    return r


# Training and Test utils

Utils used in model training and evaluation

In [None]:
import numpy as np
from torch.nn import SmoothL1Loss, BCELoss
import joblib

def train(data_loader, model, optimizer, epoch, loss_fn_cla, loss_fn_reg):
    """"
    Function for training a single epoch
    """
    train_loss = []
    for idx, data in enumerate(data_loader):
        optimizer.zero_grad()
        x=data.x.cuda()   
        edge_index = data.edge_index.cuda()
        edge_attr = data.edge_attr.cuda()
        y=data.y.cuda()

        cls_out, reg_out, en_cls_out, en_reg_out = model(x,edge_index,edge_attr)#.to(device)
        
        y_occupied_idx = y.gt(0.0)
        loss_cla = loss_fn_cla(cls_out, y_occupied_idx.float())
        
        loss_reg = loss_fn_reg(reg_out[y_occupied_idx], y[y_occupied_idx])
        loss = loss_cla + loss_reg*100
        
        loss.backward()
        optimizer.step()
        train_loss.append(loss.item())
        
        if (idx + 1) % 500 == 0:
            print("Train epoch: %d, iter: %d, loss_cla: %f" % (epoch, idx, loss_cla))
            print("Train epoch: %d, iter: %d, reg_loss: %f" % (epoch, idx, loss_reg*100))

    return np.mean(train_loss)


def val(data_loader, model, epoch, loss_fn_cla, loss_fn_reg):
    """
    Test(or evaluaction) function 
    """
    val_loss = []
    for idx, data in enumerate(data_loader):
        x=data.x.cuda()   
        edge_index = data.edge_index.cuda()
        edge_attr = data.edge_attr.cuda()
        y=data.y.cuda()

        cls_out, reg_out, en_cls_out, en_reg_out = model(x,edge_index,edge_attr)#.to(device)
        
        y_occupied_idx = y.gt(0.0)

        loss_cla = loss_fn_cla(cls_out, y_occupied_idx.float())
        loss_reg = loss_fn_reg(reg_out[y_occupied_idx], y[y_occupied_idx])
      
        loss = loss_cla + loss_reg*100
        
        val_loss.append(loss.item())
        
    return np.mean(val_loss)


Example of training a reconstruction model

In [None]:
from model.traffic_representation_net import EGATConvs, EdgeConv, MLP, TrafficRepresentationNet

model_f = TrafficRepresentationNet(in_node=2,in_edge=3,out_channels=8,num_heads=1,out_conv=8) 
loss_cla = BCELoss()
loss_reg = SmoothL1Loss()
model_f = model_f.cuda()

optimizer = torch.optim.Adam(model_f.parameters(), lr=0.001, weight_decay=0)
lr_scheduler = torch.optim.lr_scheduler.ExponentialLR(optimizer,0.96, last_epoch=-1)

for epoch in range(50):   
  model_f.train()
  train_loss = train(train_loader, model_f, optimizer, epoch, loss_cla, loss_reg)

  model_f.eval()
  val_loss = val(test_loader, model_f, epoch, loss_fn_cla=loss_cla, loss_fn_reg=loss_reg)
  lr_scheduler.step()
  print("Train epoch: %d, lr: %f, loss: %f" % (epoch, lr_scheduler.get_lr()[0], train_loss))
  print("Test epoch: %d, loss: %f" % (epoch, val_loss))

#save model in model_dir
# model_dir = ''
# torch.save(model_f, model_dir)

In [None]:
def get_loss_en_dim(min_model_dir, dim_list, epochs):
  """
  Train model with different encoder output dimenstions 
  and get corresponding val_loss 

  min_model_dir: model path
  dim_list: encoder output dimenstion

  return val. loss
  """
  vals_loss = []
  for idx, dim in enumerate(dim_list):
    print('dim',dim)
    model_f = TrafficRepresentationNet(in_node=2,in_edge=3,out_channels=8,num_heads=1,out_conv=dim) 
    loss_cla = BCELoss()
    loss_reg = SmoothL1Loss()
    model_f = model_f.cuda()

    optimizer = torch.optim.Adam(model_f.parameters(), lr=0.001, weight_decay=0)
    lr_scheduler = torch.optim.lr_scheduler.ExponentialLR(optimizer,0.96, last_epoch=-1)

    vals = []
    for epoch in range(epochs):   
      model_f.train()
      train_loss = train(train_loader, model_f, optimizer, epoch, loss_cla, loss_reg)

      model_f.eval()
      val_loss = val(test_loader, model_f, epoch, loss_fn_cla=loss_cla, loss_fn_reg=loss_reg)
      vals.append(val_loss)
      lr_scheduler.step()
      print("Train epoch: %d, lr: %f, loss: %f" % (epoch, lr_scheduler.get_lr()[0], train_loss))
      print("Test epoch: %d, loss: %f" % (epoch, val_loss))
    
    min_save_dir = min_model_dir + str(idx) + '_' + str(dim) + '.pth'
    torch.save(model_f, min_save_dir)
    vals_loss.append(vals)

  return vals_loss

# PCA 

Utils of PCA analysis

In [None]:
def get_en_out(data_loader, model):
  """
  Get the encoder outputs of the given data from the model
  """
  flag = 1
  for idx, data in enumerate(data_loader):
    #if idx%5==0: 
      x=data.x.cuda()   
      edge_index = data.edge_index.cuda()
      edge_attr = data.edge_attr.cuda()
      y=data.y.cuda()
      cls_, reg_, en_cls_out, en_reg_out = model(x,edge_index,edge_attr)#.to(device)
        
      en_out = torch.cat((en_cls_out,en_reg_out),-1)
      if flag:
        en_outs = en_out
        flag = 0
      else:
        en_outs = torch.cat((en_outs,en_out),0)

  return en_outs

def vis_pca_heatmap(pca_model, en_outs):
    """"
    visualize pca heatmap
    """
    X_new = pca_model.transform(en_outs)
    x = X_new[:,0]
    y = X_new[:,1]

    xy = np.vstack([x,y])
    z = gaussian_kde(xy)(xy)
    z = preprocessing.maxabs_scale(z,axis=0, copy=True)

    idx1 = z.argsort()
    x2, y2, z2 = x[idx1], y[idx1], z[idx1]

    fig2, ax2 = plt.subplots()

    ax2.set_xlabel('PCA_dim1', fontsize=10)
    ax2.set_ylabel('PCA_dim2', fontsize=10)

    img = ax2.scatter(x2, y2, c=z2, s=0.5, edgecolors="none",cmap="inferno")#cmap='Reds')
    cbar = plt.colorbar(img, ax=ax2)

    plt.grid()

    plt.show()

Load model and get encoder output

In [None]:
model_dir1 = f'{REPO_PATH}/min_dim_model/128.pth'
model_l_ = torch.load(model_dir1)

en_outs = get_en_out(test_loader, model_l_)
en_outs_n = en_outs.cpu().detach().numpy() 

2D PCA decompostion on encoder output 

In [None]:
from sklearn.decomposition import PCA
import joblib
import numpy as np
import matplotlib.pyplot as plt
from scipy.stats import gaussian_kde
from sklearn import preprocessing

pca2 = PCA(n_components=2)
pca2.fit(en_outs_n)

print('n_components', pca2.n_components)
print('explained_variance_ratio_', pca2.explained_variance_ratio_)
print('explained_variance_', pca2.explained_variance_)
print('get_params',pca2.get_params)

#save PCA model in pca_dir
# pca_dir = ''
# joblib.dump(pca2, pca_dir+'pca2.m')