In [10]:
import os
import time
import pickle
import keyboard
import numpy as np
import random
import pandas as pd
from scipy.interpolate import interp1d
import matplotlib.pyplot as plt

import tminterface as tmi
from tminterface.interface import TMInterface, Client

# Useful functions

In [11]:
def discrete_to_continuous(n):

    """
    Equivalents:
    
    0: no action
    1: left
    2: left + acceleration
    3: acceleration
    4: right + acceleration
    5: right
    """

    current_action = {
    'sim_clear_buffer': True,  
    "steer":           0,
    "accelerate":      False, 
    "brake" :          False
    }
    
    if n == 1:
        current_action["steer"] = -65536
    if n == 2:
        current_action["steer"] = -65536
        current_action["accelerate"] = True
    if n == 3:
        current_action["accelerate"] = True
    if n == 4:
        current_action["steer"] = 65536
        current_action["accelerate"] = True
    if n == 5:
        current_action["steer"] = 65536
        
    return current_action
    

def distance_3D(x, y, z, x0, y0, z0):
    d_x = x - x0
    d_y = y - y0
    d_z = z - z0
    dis = np.sqrt( d_x**2 + d_y**2 + d_z**2)
    return dis

In [12]:
def centerline_objective(track_name):
    run_folder = "track_data/"+ track_name + "/run-1"
    positions = pickle.load(open(os.path.join(run_folder, "positions.pkl"), "rb"))
    finish_time = positions[-1]["time"]/1000

    raw_points = [list(pos['position'].to_numpy()) for pos in positions]
    df = pd.DataFrame(raw_points)
    ema = df.ewm(com=40).mean()
    raw_points = ema.values.tolist()
        
    # remove duplicates:
    points = [raw_points[0]]
    for point in raw_points[1:]:
        if point != points[-1]:
            points.append(point)
        else:
            for i in range(len(point)):
                point[i] += 0.01
            points.append(point)
    points = np.array(points)

    # Time along the track:
    time = np.linspace(0, 1, len(points))

    interpolator =  interp1d(time, points, kind='slinear', axis=0)
    alpha = np.linspace(0, 1, len(points))
    curve = interpolator(alpha)

    return curve, alpha

# Abstract Client

In [17]:
class AbstractClient(Client):

    def __init__(self):
        super().__init__()
        self.dna = None
        self.period = 1.0
        self.period_ms = np.floor(1000*self.period)
        self.final_state = None
        self.start_state = None
        self.is_finished = False
        self.finish_dna = None
        self.crashed = False
        
    def on_registered(self, iface: TMInterface) -> None:
        iface.execute_command("press delete")
        print(f'Registered to {iface.server_name}')

    def on_run_step(self, iface, _time: int):
        self.action(iface, _time)

    def on_checkpoint_count_changed(self, iface, current: int, target: int):
        if current >= 1 and current == target:
            self.is_finished = True
            self.finish_dna = self.dna.copy()
            iface.prevent_simulation_finish()
            print(iface.get_simulation_state().position)

    def reset_detection(self, _time, state):
        if state.position[1] < 9.2:
            return True
    
        if _time >= 500:
            local_velocity = state.scene_mobil.current_local_speed
            local_velocity = np.array(list(local_velocity.to_numpy()))
            local_velocity = local_velocity*3.6 
            if local_velocity[2] < 1:
                return True

        if state.scene_mobil.has_any_lateral_contact:
            return True

        return False

    def action(self, iface, _time: int):
        if _time >= 0 and _time < len(self.dna)*self.period_ms:
            
            action = self.dna[int(np.floor(_time/self.period_ms))]
            command = discrete_to_continuous(action)
            print(action, _time)
            iface.set_input_state(**command)

            if self.reset_detection(_time, iface.get_simulation_state()):
                self.crashed = True
                self.finish(iface, _time)
            else: 
                self.crashed = False
                
        if _time == len(self.dna)*self.period_ms:
            self.finish(iface, _time)

    def finish(self, iface,  _time):
        self.final_state = iface.get_simulation_state()
        iface.rewind_to_state(self.start_state)

# Training Client and Replay Client

In [18]:
class TrainingClient(AbstractClient):

    def __init__(self, period=0.5, n_steps=1, horizon=4, n_trials=200, training_track_name="Deterministic_Proof"):
        super().__init__()
        # DNA = memory + gene
        self.memory = np.array([])
        self.past_tries = [[]]
        self.n_steps = n_steps
        self.horizon = horizon
        self.max_trials = n_trials
        self.n_trial = 1

        self.gene = None
        self.generate_dna()
        self.period = period
        self.period_ms = np.floor(1000*self.period)

        centerline, alpha = centerline_objective(training_track_name)
        self.centerline = centerline
        self.alpha = alpha

        self.centerline_x = self.centerline[:,0]
        self.centerline_y = self.centerline[:,1]
        self.centerline_z = self.centerline[:,2]

        self.best_gene = self.gene
        self.best_perf = 0
        self.anchor = 0

    def generate_dna(self):
        if self.gene is not None:
            self.past_tries.append(list(self.gene))
        self.gene = np.random.randint(low=1, high=6, size=self.horizon)
        while list(self.gene) in self.past_tries:
            self.gene = np.random.randint(low=1, high=6, size=self.horizon)
        self.dna = np.concatenate([self.memory, self.gene])

    def objective_function(self):
        position = self.final_state.position
        
        # compute distance 
        dis = distance_3D(self.centerline_x, self.centerline_y, self.centerline_z, 
                          position[0], position[1], position[2])
        # find the minima
        glob_min_idx = np.argmin(dis)
        associated_time = self.alpha[glob_min_idx]
        return associated_time

    def on_run_step(self, iface, _time: int):
        self.action(iface, _time)
        if _time == self.anchor - 10: # IMPORTANT: 1 step offset to prevent missing inputs
            self.start_state = iface.get_simulation_state()
            
    def finish(self, iface, _time):
        self.n_trial += 1
        self.final_state = iface.get_simulation_state()

        # OBECTIVE FUNCTION CONDITION
        if self.objective_function() > self.best_perf and self.crashed is False:
            self.best_gene = self.gene
            self.best_perf = self.objective_function()

        if self.n_trial == self.max_trials:
            if self.best_gene is not None:
                self.memory = np.concatenate([self.memory, self.best_gene[:self.n_steps]])
                self.anchor += self.period_ms*self.n_steps
                print(self.best_perf)
                
            self.n_trial = 0
            self.past_tries = [[]]
            self.best_perf = 0
            self.best_gene = None
        
        iface.rewind_to_state(self.start_state)
        self.generate_dna()
        
class ReplayClient(AbstractClient):

    def __init__(self, period, dna):
        super().__init__()
        self.period = period
        self.period_ms = np.floor(1000*self.period)
        self.dna = dna

    def on_run_step(self, iface, _time: int):
        self.action(iface, _time)
        if _time == -10:
            self.start_state = iface.get_simulation_state()
                
    def finish(self, iface, _time):
        self.final_state = iface.get_simulation_state()
        iface.rewind_to_state(self.start_state)

# Training Client

In [19]:
interface = TMInterface()
client = TrainingClient(period= 0.5, n_steps=2, horizon=4, n_trials=50)

interface.register(client)
print("Start")

while client.is_finished is False:
    time.sleep(0.001)

    if keyboard.is_pressed("q"):
        print("Keybord Interrupt")
        break

if client.is_finished:
    best_memory = client.finish_dna
else:
    best_memory = client.memory

interface.close()
print(best_memory)

Start
Registered to TMInterface0
2.0 0
2.0 10
2.0 20
2.0 30
2.0 40
2.0 50
2.0 60
2.0 70
2.0 80
2.0 90
2.0 100
2.0 110
2.0 120
2.0 130
2.0 140
2.0 150
2.0 160
2.0 170
2.0 180
2.0 190
2.0 200
2.0 210
2.0 220
2.0 230
2.0 240
2.0 250
2.0 260
2.0 270
2.0 280
2.0 290
2.0 300
2.0 310
2.0 320
2.0 330
2.0 340
2.0 350
2.0 360
2.0 370
2.0 380
2.0 390
2.0 400
2.0 410
2.0 420
2.0 430
2.0 440
2.0 450
2.0 460
2.0 470
2.0 480
2.0 490
1.0 500
1.0 510
1.0 520
1.0 530
1.0 540
1.0 550
1.0 560
1.0 570
1.0 580
1.0 590
1.0 600
1.0 610
1.0 620
1.0 630
1.0 640
1.0 650
1.0 660
1.0 670
1.0 680
1.0 690
1.0 700
1.0 710
1.0 720
3.0 0
3.0 10
3.0 20
3.0 30
3.0 40
3.0 50
3.0 60
3.0 70
3.0 80
3.0 90
3.0 100
3.0 110
3.0 120
3.0 130
3.0 140
3.0 150
3.0 160
3.0 170
3.0 180
3.0 190
3.0 200
3.0 210
3.0 220
3.0 230
3.0 240
3.0 250
3.0 260
3.0 270
3.0 280
3.0 290
3.0 300
3.0 310
3.0 320
3.0 330
3.0 340
3.0 350
3.0 360
3.0 370
3.0 380
3.0 390
3.0 400
3.0 410
3.0 420
3.0 430
3.0 440
3.0 450
3.0 460
3.0 470
3.0 480
3.0 490
4.0 5

# Replay Client

In [16]:
interface = TMInterface()
# best_memory = np.array([2, 4, 3, 3, 2, 1, 4, 3, 2, 3, 3, 4, 4, 4, 4])
client = ReplayClient(period= 0.5, dna=best_memory)

interface.register(client)

print("Start")

while True:
    time.sleep(0.001)

    if keyboard.is_pressed("q"):
        print("Keybord Interrupt")
        break
interface.close()

Start
Registered to TMInterface0
[169.56076049804688, 9.356457710266113, 304.5128479003906]
[169.56076049804688, 9.356457710266113, 304.5128479003906]
[169.56076049804688, 9.356457710266113, 304.5128479003906]
Keybord Interrupt


# Testing

In [12]:
finish_time = 42.17

finish_pos = [809.48828125, 9.372119903564453, 249.02342224121094]