In [93]:
import numpy as np
import pandas as pd
import os
from fitness import get_fitness
from models.Kriging import Kriging
from models.PR import Polynomial_Regression as PR
# from models.RBF import Model as RBF

In [94]:
#Curve Road
data_folders = [
    # '../data/routes_short_2023-05-12|17:04:09/', #814
    # '../data/routes_short_2023-05-26|17:51:48/', #721
    '../data/routes_short_2023-06-06|18:33:36/', #916 95% Route Finish Threshold
    '../data/routes_short_2023-06-07|14:26:32/', #727 95% Route Finish Threshold
]

In [95]:
scenario_header = ["cloudiness",
                   "precipitation",
                   "precipitation_deposits",
                   "wind_intensity",
                   "sun_azimuth_angle",
                   "sun_altitude_angle",
                   "fog_density",
                   "wetness",
                   "fog_falloff",
                   "vehicle_infront", 
                   "vehicle_opposite", 
                   "vehicle_side",
                   "start_offset",
                   "end_offset"]

scenarios = pd.read_csv(data_folders[0]+'scenario.csv',names=scenario_header)
for i in range(1, len(data_folders)):
    scenarios = pd.concat([scenarios, pd.read_csv(data_folders[i]+'scenario.csv',names=scenario_header)])
print(scenarios.shape)

(1635, 14)


In [96]:
select_criterions = ["RouteCompletionTest", 
                     "CollisionTest", 
                     "OutsideRouteLanesTest", 
                     "Timeout"]

fitness = get_fitness(data_folders[0])
for i in range(1, len(data_folders)):
    fitness = pd.concat([fitness, get_fitness(data_folders[i])])

fitness = fitness[select_criterions]
print(fitness.shape)

(1635, 4)


In [116]:
import torch
from torch.utils.data import Dataset
from torchvision import datasets
from torchvision.transforms import ToTensor
import matplotlib.pyplot as plt


class FitnessDataset(Dataset):
    def __init__(self, scenario_vectors, fitness_results):
        self.data = torch.from_numpy(scenario_vectors).float()
        self.label = torch.from_numpy(fitness_results).float()
    
    def __len__(self):
        return (len(self.label))
    
    def __getitem__(self, index):
        return self.data[index], self.label[index] 

In [104]:
import numpy as np

import torch
import torchvision
import torchvision.transforms as transforms

import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim

class MLP(nn.Module):

    def __init__(self):
        super(MLP, self).__init__()
        self.fc1 = nn.Linear(14, 32)
        # self.fc2 = nn.Linear(32, 64)
        # self.fc3 = nn.Linear(64, 32)
        self.fc4 = nn.Linear(32, 1)

    def forward(self, x):
        x = F.relu(self.fc1(x))
        # x = F.relu(self.fc2(x))
        # x = F.relu(self.fc3(x))
        x = self.fc4(x)
        return x

In [118]:
from sklearn.metrics import explained_variance_score

def train_loop(dataloader, model, loss_fn, optimizer, log=True):
    size = len(dataloader.dataset)
    for batch, (X, y) in enumerate(dataloader):
        # Compute prediction and loss
        pred = model(X)
        # print(pred.shape, y.shape)
        loss = loss_fn(pred, y)

        # Backpropagation
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()

        if log:
            if batch % 50 == 0:
                loss, current = loss.item(), (batch + 1) * len(X)
                print(f"loss: {loss:>7f}  [{current:>5d}/{size:>5d}]")


def test_loop(dataloader, model, loss_fn):
    size = len(dataloader.dataset)
    num_batches = len(dataloader)
    test_loss, correct = 0, 0

    with torch.no_grad():
        for X, y in dataloader:
            pred = model(X)
            test_loss += loss_fn(pred, y).item()
            # print(pred.argmax(axis=1))
            # print(y.argmax(axis=1))
            # print()
            # correct += explained_variance_score(pred.item(), y.item())

    test_loss /= num_batches
    # correct /= size
    # print(f"Test Error: \n EVS: {(correct):>0.3f}, Avg loss: {test_loss:>8f} \n")
    print(f"Avg loss: {test_loss:>8f} \n")

def test_all(data, label, model, loss_fn, log=True):
    test_loss, EVS, corr = 0, 0, 0

    with torch.no_grad():
        pred = model(torch.tensor(data).float())
        test_loss += loss_fn(pred, torch.tensor(label).float()).item()

        pred = pred.detach().numpy().reshape(label.shape[0])
        pred[pred>1]=1
        pred[pred<0]=0

        EVS = explained_variance_score(label.reshape(label.shape[0]), pred).round(3)
        corr = np.corrcoef(label.reshape(label.shape[0]), pred)[0,1].round(3)
        # print(pred.argmax(axis=1))
        # print(y.argmax(axis=1))
        # print()
        # correct += explained_variance_score(pred.item(), y.item())

    
    # correct /= size
    # print(f"Test Error: \n EVS: {(correct):>0.3f}, Avg loss: {test_loss:>8f} \n")
    if log:
        print(f"MSE loss: {test_loss:>8f}, EVS: {EVS:>4f}, corr: {corr:>4f} \n")

    return test_loss,EVS,corr

In [126]:
from sklearn.model_selection import train_test_split
from torch.utils.data import DataLoader

models = []

for i in range(4):

    scenario_criterion = pd.concat([scenarios, fitness],axis=1)
    scenario_vectors = scenario_criterion[scenario_criterion.columns.tolist()[:14]].to_numpy()
    fitness_results = scenario_criterion[scenario_criterion.columns.tolist()[14:]].to_numpy()
    fitness_results = fitness_results[:,[i]]

    X_train, X_test, y_train, y_test = train_test_split(scenario_vectors, fitness_results, test_size=0.33, random_state=14159)
    train_data = FitnessDataset(X_train, y_train)
    test_data = FitnessDataset(X_test, y_test)


    train_dataloader = DataLoader(train_data, batch_size=16, shuffle=True)
    test_dataloader = DataLoader(test_data, batch_size=16, shuffle=True)


    # Display image and label.
    train_features, train_labels = next(iter(train_dataloader))
    print(f"Feature batch shape: {train_features.size()}")
    print(f"Labels batch shape: {train_labels.size()}")
    label = train_labels[0]
    print(f"Label: {label}")

    mlp = MLP()
    # criterion = nn.CrossEntropyLoss()
    criterion = nn.MSELoss()
    optimizer = optim.SGD(mlp.parameters(), lr=0.005, momentum=0.9)
    # optimizer = optim.Adam(mlp.parameters(), lr=0.005)

    epochs = 200
    best_model = None
    best_result = (0, 0)

    for t in range(epochs):
        log = True if t%100 == 0 else False
        if log:
            print(f"Epoch {t+1}\n-------------------------------")
        train_loop(train_dataloader, mlp, criterion, optimizer, log)
        # test_loop(test_dataloader, mlp, criterion)
        _,EVS,corr = test_all(X_test, y_test, mlp, criterion, log)
        if corr > best_result[1]:
            best_result = (EVS,corr)
            best_model = (mlp,best_result)
    
    print(best_result)
    models.append(best_model)
    print("Done!\n\n")



Feature batch shape: torch.Size([16, 14])
Labels batch shape: torch.Size([16, 1])
Label: tensor([1.])
Epoch 1
-------------------------------
loss: 0.911556  [   16/ 1095]
loss: 0.125044  [  816/ 1095]
MSE loss: 0.077414, EVS: -0.149000, corr: -0.100000 

Epoch 101
-------------------------------
loss: 0.054645  [   16/ 1095]
loss: 0.035071  [  816/ 1095]
MSE loss: 0.053767, EVS: 0.211000, corr: 0.464000 

(0.237, 0.502)
Done!


Feature batch shape: torch.Size([16, 14])
Labels batch shape: torch.Size([16, 1])
Label: tensor([0.3980])
Epoch 1
-------------------------------
loss: 0.697140  [   16/ 1095]
loss: 0.150407  [  816/ 1095]
MSE loss: 0.118869, EVS: 0.042000, corr: 0.205000 

Epoch 101
-------------------------------
loss: 0.046048  [   16/ 1095]
loss: 0.027678  [  816/ 1095]
MSE loss: 0.063095, EVS: 0.506000, corr: 0.712000 

(0.52, 0.721)
Done!


Feature batch shape: torch.Size([16, 14])
Labels batch shape: torch.Size([16, 1])
Label: tensor([0.8005])
Epoch 1
-------------------

In [127]:
models

[(MLP(
    (fc1): Linear(in_features=14, out_features=32, bias=True)
    (fc4): Linear(in_features=32, out_features=1, bias=True)
  ),
  (0.237, 0.502)),
 (MLP(
    (fc1): Linear(in_features=14, out_features=32, bias=True)
    (fc4): Linear(in_features=32, out_features=1, bias=True)
  ),
  (0.52, 0.721)),
 (MLP(
    (fc1): Linear(in_features=14, out_features=32, bias=True)
    (fc4): Linear(in_features=32, out_features=1, bias=True)
  ),
  (0.336, 0.582)),
 (MLP(
    (fc1): Linear(in_features=14, out_features=32, bias=True)
    (fc4): Linear(in_features=32, out_features=1, bias=True)
  ),
  (0.076, 0.279))]

In [None]:
import joblib
for i, certion_label in enumerate(["RouteCompletionTest", "CollisionTest", "OutsideRouteLanesTest", "Timeout"]):
    joblib.dump(models, 'models/regression-MLP-{}.pkl'.format(certion_label))