# Testing

In [None]:
import json
import os
from pathlib import Path

from lapsim.encoder.partition import Partition

from lapsim.normalisation import TransformNormalisation

FORESIGHT = 120
SAMPLING = 4
NORMALISATION_BOUNDS_PATH = "bounds.json"

# This assumes all tracks were encoded when partitions is not defined/or equals 0
SPLICED_DATA_PATH = Path(r"../dataset/spliced/test")
TEST_DATA_PATH = Path(r"../dataset/encoded/test")
TEST_PARTITIONS = [x for x in os.listdir(TEST_DATA_PATH) if x[0] != '.']

bounds = TransformNormalisation.load(NORMALISATION_BOUNDS_PATH)
bounds.transform.foresight = FORESIGHT
bounds.transform.sampling = SAMPLING


In [None]:
import torch
import torch.nn as nn
import torch.nn.functional as F
from torch.nn import HuberLoss
from torch.optim import NAdam


def hard_sigmoid(x):
    return torch.clamp((x + 2.5) / 5, min=0, max=1)


class LapSimModel(nn.Module):

    def __init__(self):
        super().__init__()

        self.d1 = nn.Linear(739, 450)
        self.d2 = nn.Linear(450, 200)
        self.d3 = nn.Linear(200, 200)
        self.d4 = nn.Linear(200, 9)
        self.d5 = nn.Linear(200, 9)

        self.loss = HuberLoss()
        self.optimiser = NAdam(self.parameters())

    def forward(self, windows, vehicles):
        x = torch.concatenate((vehicles, windows), axis=1)

        x = F.sigmoid(self.d1(x))
        x = F.sigmoid(self.d2(x))
        x = F.sigmoid(self.d3(x))

        pos = hard_sigmoid(self.d4(x))
        vel = hard_sigmoid(self.d5(x))

        return pos, vel

def get_device():
    if torch.cuda.is_available():
        return torch.device("cuda")
    elif torch.backends.mps.is_available():
        return torch.device("mps")

    return torch.device("cpu")


def tensor(x):
    return torch.tensor(x, dtype=torch.float32).to(device)


device = get_device()

model = LapSimModel().to(device)
model.load_state_dict(torch.load("ls1.pt"))


In [None]:
from toolkit.tracks.models import Track
from lapsim.eval import evaluate

evaluations = []

for i, partition_name in enumerate(TEST_PARTITIONS):
    print(f"\r{i} - {partition_name}" + " " * 20, end="")

    partition = Partition.load(TEST_DATA_PATH / partition_name)
    x, (y_pos, y_vel), vehicles = bounds.normalise_and_transform(partition)

    # Predict data
    pred_pos, pred_vel = model(tensor(x), tensor(vehicles))
    pred_pos, pred_vel = bounds.detransform_and_denormalise(
        len(partition.angles[0]),
        position=pred_pos.cpu().detach().numpy(),
        velocity=pred_vel.cpu().detach().numpy()
    )

    # Load up the segmentation lines from the spliced track outputs
    spliced_path = SPLICED_DATA_PATH / partition_name
    with open(spliced_path, "r") as f:
        data = json.load(f)['track']
        original_track = Track(**data)

    # Copy the track and set the predictions from the model
    track_copy = Track(**original_track.model_dump())
    for i in range(len(original_track.segmentations)):
        original_track.segmentations[i].pos = pred_pos[i]
        original_track.segmentations[i].vel = pred_vel[i]

    # Evaluate the model
    evaluation = evaluate(original_track, track_copy)
    evaluations.append((partition_name, evaluation, original_track, track_copy))

print("Done")


In [None]:
from lapsim.render import RenderItem, plot_full

plot_full(
    tracks=[
        RenderItem(
            track=evaluations[0][2],
            label="Predicted",
            color="red"
        ),
        RenderItem(
            track=evaluations[0][3],
            label="Ground Truth",
            color="green"
        ),
    ],
    title="..."
)

In [None]:
from lapsim.eval import Evaluation
import pandas as pd

# import pandas as pd
# from neural_simulator_toolkit.lapsim.models.evaluation import Evaluation

# filter out none apex errors as they currently result in an issue
evaluatable_comparisons = [x[1] for x in evaluations if x[1].position.apex_mean is not None]

combined = Evaluation.combine(evaluatable_comparisons)

print("\n".join([
    "Laptime",
    f" - Abs Error: {combined.laptime.abs_error}",
    f" - Percentage: {combined.laptime.percentage}",
    f" - Error per minute: {combined.laptime.error_per_minute}",
    "Position",
    f" - Max: {combined.position.max}",
    f" - Mean: {combined.position.mean}",
    f" - Mean Abs: {combined.position.mean_absolute}",
    f" - RMSE: {combined.position.rmse}",
    f" - Ci95: {combined.position.ci95}",
    f" - Percentage Mean: {combined.position.percentage_mean}",
    f" - Percentage Max: {combined.position.percentage_max}",
    f" - Percentage Ci95: {combined.position.percentage_ci95}",
    f" - Apex Mean: {combined.position.apex_mean}",
    f" - Apex Mean Abs: {combined.position.apex_mean_absolute}",
    f" - Apex Max: {combined.position.apex_max}",
    "Velocity",
    f" - Max: {combined.velocity.max}",
    f" - Mean: {combined.velocity.mean}",
    f" - Mean Abs: {combined.velocity.mean_absolute}",
    f" - RMSE: {combined.velocity.rmse}",
    f" - Ci95: {combined.velocity.ci95}",
    f" - Percentage Mean: {combined.velocity.percentage_mean}",
    f" - Percentage Max: {combined.velocity.percentage_max}",
    f" - Percentage Ci95: {combined.velocity.percentage_ci95}",
    f" - Apex Mean: {combined.velocity.apex_mean}",
    f" - Apex Mean Abs: {combined.velocity.apex_mean_absolute}",
    f" - Apex Max: {combined.velocity.apex_max}",
]))


## Combine Results 
This combines results together based on track/vehicle results

In [None]:
errors_by_track = {}
errors_by_vehicle = {}

for key, value, _, _ in evaluations:
    tokens = key.split(" - ")
    vehicle, track = tokens[0], " - ".join(tokens[1:])
    
    if track not in errors_by_track:
        errors_by_track[track] = []
    if vehicle not in errors_by_vehicle:
        errors_by_vehicle[vehicle] = []

    if value.position.apex_mean is not None:
        errors_by_track[track].append(value)
        errors_by_vehicle[vehicle].append(value)

for key in errors_by_vehicle:
    errors_by_vehicle[key] = Evaluation.combine(errors_by_vehicle[key])
for key in errors_by_track:
    errors_by_track[key] = Evaluation.combine(errors_by_track[key])


In [None]:
def errors_to_df(errors):
    keys = list(errors)

    return pd.DataFrame({
        "Type": keys,
        "LapTime Error": [errors[x].laptime.abs_error for x in keys],
        "LapTime Percentage": [errors[x].laptime.percentage for x in keys],
        "LapTime Error Per Minute": [errors[x].laptime.error_per_minute for x in keys],

        "Position Error": [errors[x].position.mean_absolute for x in keys],
        "Position ci95": [errors[x].position.ci95 for x in keys],

        "Velocity Error": [errors[x].velocity.mean_absolute for x in keys],
        "Velocity ci95": [errors[x].velocity.ci95 for x in keys]
    })

vehicles_df = errors_to_df(errors_by_vehicle)
track_df = errors_to_df(errors_by_track)


In [None]:
track_df