In [1]:
import torch
import torch.nn as nn
import os
import pandas as pd
import utm
import random
import Transformer as tr
import preprocess as pr
import simpleTrajVisualizer as vis
from sklearn.model_selection import train_test_split

In [None]:
trajectory_data_file_path = ""  ## path to trajectory dataset
trained_model_file_path = ""    ## path to trained model 
scalar_file_path = ""           ## path to fitted scalar (leave empty if none)
use_existing_scalar = True      ## if you have an existing fitted scalar


In [2]:
traj_data = pd.read_csv(trajectory_data_file_path)

In [3]:
train_data_inputs, test_data_inputs, train_data_targets, test_data_targets, train_indx, test_indx = pr.preprocess_dataset(traj_data, max = 100, max_len = 50, input_len = 20, use_existing_scalar, scalar_file_path)

In [6]:
from Transformer import *
if __name__ == "__main__":
  device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
  best_model = torch.load(trained_model_file_path)


In [7]:
criterion = nn.CrossEntropyLoss()
import time
import math

def evaluate(eval_model, data_source_inputs, data_source_targets):
    eval_model.eval() # Turn on the evaluation mode
    total_loss = 0.

    with torch.no_grad():
        for i in range(0, data_source_inputs.size(0), 1):
            # data, targets = get_batch(data_source, i)
            data = data_source_inputs[i,:,:]
            targets = data_source_targets[i,:,:]
            output = eval_model(data, targets[:, :])
            output_flat = output
            total_loss += 1 * criterion(output[0,:,:], targets[0,:]).item()


    print("loss: ", total_loss / (data_source_inputs.size(0)))
    return total_loss / (data_source_inputs.size(0))


def get_predictions(eval_model, data_source_inputs, data_source_targets):
    eval_model.eval() # Turn on the evaluation mode
    preds_list = []
    target_list = []
    inputs_list = []
    with torch.no_grad():
        for i in range(0, data_source_inputs.size(0), 1):
            # data, targets = get_batch(data_source, i)
            data = data_source_inputs[i,:,:]
            targets = data_source_targets[i,:,:]
            output = eval_model(data, targets[:, :])
            preds_list.append(torch.argmax(output[0,:,:], dim=1).tolist())
            target_list.append(targets[0,:].tolist())
            inputs_list.append(data[0,:].tolist())

    return inputs_list, target_list, preds_list

def calculate_bin_acc(target_list, preds_list):
  acc = 0.
  nr_of_perfect_preds = 0
  for ind in range(len(target_list)):
    tmp_acc = sum(1 for x,y in zip(target_list[ind],preds_list[ind]) if x == y) / len(preds_list[0])
    acc += tmp_acc
    if (tmp_acc == 1):
      nr_of_perfect_preds += 1
  print("Accuracy: ", acc / len(target_list))
  print("% of perfect predictions: ", nr_of_perfect_preds / len(target_list))

def get_real_coords(target_list, preds_list, inputs_list, test_indx):
    """
    Get the real word coordinates of trajectories starting from bins
    All input parameteres must be lists of lists and be of the same length
    Returns a dataframe with vehicle ID and all of its real word coordinates in Lat/Long
    """
    x_cords_real = []
    y_cords_real = []
    x_cords_pred = []
    y_cords_pred = []
    vehicle_ids = []

    for ind in range(len(test_indx)):

      for inp in test_indx[ind]:
        for inp_nr in inp[:-1]:
          vehicle_ids.append(inp_nr)

      for inp in inputs_list[ind]:
        x = inp % 30
        y = inp / 30
        x_cords_real.append(x)
        y_cords_real.append(y)

        x_cords_pred.append(x)
        y_cords_pred.append(y)
        
      for inp in target_list[ind]:
        x = inp % 30
        y = inp / 30
        x_cords_real.append(x)
        y_cords_real.append(y)

      for inp in preds_list[ind]:
        x = inp % 30
        y = inp / 30
        x_cords_pred.append(x)
        y_cords_pred.append(y)

    dict_pd = {'Vehicle_ID': vehicle_ids, 'X_REAL': x_cords_real, 'Y_REAL': y_cords_real, 'X_PRED': x_cords_pred, 'Y_PRED': y_cords_pred} 
    output_df = pd.DataFrame(dict_pd)
    output_df[['X_REAL', 'Y_REAL']] = scaler.inverse_transform(output_df[['X_REAL', 'Y_REAL']])
    output_df[['X_PRED', 'Y_PRED']] = scaler.inverse_transform(output_df[['X_PRED', 'Y_PRED']])
    
    unique_vehicle_ids = output_df['Vehicle_ID'].unique()
    for id in unique_vehicle_ids:
      subset = traj_data.loc[traj_data['Vehicle_ID'] == id, ['X', 'Y']].head(1)
      x_origin, y_origin = subset.X.item(), subset.Y.item()
      output_df.loc[output_df['Vehicle_ID'] == id, ['X_REAL', 'X_PRED']] = output_df.loc[output_df['Vehicle_ID'] == id, ['X_REAL', 'X_PRED']] + x_origin
      output_df.loc[output_df['Vehicle_ID'] == id, ['Y_REAL', 'Y_PRED']] = output_df.loc[output_df['Vehicle_ID'] == id, ['Y_REAL', 'Y_PRED']] + y_origin
    return output_df

def calc_meter_dist(xt, yt, xp, yp):
    """
    Converts real_world Latitude Longitude coordinates into UTM coordinates
    to get a difference in meters between prediction and target values
    """
    tar_utm = utm.from_latlon(yt, xt)
    pred_utm = utm.from_latlon(yp, xp)

    dist_x = tar_utm[0] - pred_utm[0]
    dist_y = tar_utm[1] - pred_utm[1]
    
    return np.sqrt((dist_x ** 2) + (dist_y ** 2))

def calculate_performance_metrics(df):
    """
    Calculates Average Displacement Error (ADE) and Final Displacement Error (FDE)
    for a given data frame containing the features:
        X_REAL, Y_REAL, X_PRED, Y_PRED
    """
    unique_vehicle_ids = df['Vehicle_ID'].unique()
    res = []
    for id in unique_vehicle_ids:
        res.append(df[df['Vehicle_ID'] == id][20:])
    res_df = pd.concat(res)
        
    res_df['DIST'] = res_df.apply(lambda row: calc_meter_dist(row['X_REAL'],row['Y_REAL'],row['X_PRED'],row['Y_PRED']),axis=1) 
    ADE = res_df['DIST'].mean()
    
    FDE = 0
    counter = 0
    for id in unique_vehicle_ids:
        x = res_df[res_df['Vehicle_ID'] == id]
        FDE += x.tail(1).DIST.values
        counter += 1
    FDE = float(FDE/counter)
        
    print("Average Displacement Error (meters): ", ADE)
    print("Average Final Displacement Error: (meters)", FDE)
    return df


In [None]:
def create_output_file_for_predictions(model, data_inputs, data_targets)
    input_list, target_list, preds_list = get_predictions(model, data_inputs, data_targets)

    x_cords_real = []
    y_cords_real = []
    x_cords_pred = []
    y_cords_pred = []
    vehicle_ids = []

    for ind in range(0, len(test_indx)):
      #vehicle_ids += [i for i in test_indx[ind]]

      for inp in test_indx[ind]:
        for inp_nr in inp[:-1]:
          vehicle_ids.append(inp_nr)

      for inp in input_list[ind]:
        x = inp % 30
        y = inp / 30 
        x_cords_real.append(x)
        y_cords_real.append(y)

        x_cords_pred.append(x)
        y_cords_pred.append(y)
      for inp in target_list[ind]:
        x = inp % 30
        y = inp / 30 
        x_cords_real.append(x)
        y_cords_real.append(y)

      for inp in preds_list[ind]:
        x = inp % 30
        y = inp / 30 
        x_cords_pred.append(x)
        y_cords_pred.append(y)

    dict_pd = {'Vehicle_ID': vehicle_ids, 'X_REAL': x_cords_real, 'Y_REAL': y_cords_real, 'X_PRED': x_cords_pred, 'Y_PRED': y_cords_pred} 
    output_df = pd.DataFrame(dict_pd)
    output_df[['X_REAL', 'Y_REAL']] = scaler.inverse_transform(output_df[['X_REAL', 'Y_REAL']])
    output_df[['X_PRED', 'Y_PRED']] = scaler.inverse_transform(output_df[['X_PRED', 'Y_PRED']])
    
    unique_vehicle_ids = output_df['Vehicle_ID'].unique()
    for id in unique_vehicle_ids:
      subset = traj_data.loc[traj_data['Vehicle_ID'] == id, ['X', 'Y']].head(1)
      x_origin, y_origin = subset.X.item(), subset.Y.item()
      output_df.loc[output_df['Vehicle_ID'] == id, ['X_REAL', 'X_PRED']] = output_df.loc[output_df['Vehicle_ID'] == id, ['X_REAL', 'X_PRED']] + x_origin
      output_df.loc[output_df['Vehicle_ID'] == id, ['Y_REAL', 'Y_PRED']] = output_df.loc[output_df['Vehicle_ID'] == id, ['Y_REAL', 'Y_PRED']] + y_origin

    return output_df

In [8]:
input_list, target_list, preds_list = get_predictions(best_model, test_data_inputs, test_data_targets)
calculate_bin_acc(target_list, preds_list)
coords = get_real_coords(target_list, preds_list, input_list, test_indx)
d = calculate_performance_metrics(coords)

0.01561121226717566

In [None]:
vis_trajectory_scatter(best_model, test_data_inputs, test_data_targets, test_indx)

In [None]:
output_file = create_output_file_for_predictions(best_model, test_data_inputs, test_data_targets)