In [None]:
import matplotlib
import matplotlib.pyplot as plt 

from matplotlib.patches import Rectangle
from matplotlib.collections import PatchCollection
from matplotlib.lines import Line2D

from math import *
from random import random

π = pi

In [None]:
matplotlib.style.use(['dark_background', 'bmh'])
%matplotlib widget

Car-trailer diagram (inverted image `res/car-trainer-k.png` available as well):
![car-trailer](res/car-trailer-w.png)

Car-trailer equation:
\begin{align}
\dot x &= s \cos \theta_0 \\
\dot y &= s \sin \theta_0 \\
\dot \theta_0 &= \frac{s}{L} \tan \phi \\
\dot \theta_1 &= \frac{s}{d_1} \sin(\theta_1 - \theta_0)
\end{align}
where $s$: signed speed, $\phi$: negative steering angle,

In [None]:
class Truck:
    def __init__(self, lesson=None, display=False, figure=None):

        self.W = 1  # car and trailer width, for drawing only
        self.L = 1 * self.W  # car length
        self.d = 4 * self.L  # d_1
        self.s = -0.1  # speed
        self.display = display
        
        self.box = [0, 40, -10, 10]
        if self.display:
            if figure is None:
                plt.close('all') # Close any other truck figures
                self.f = plt.figure(figsize=(10, 5), num='The truck backer-upper', facecolor='none')
                self.ax = self.f.add_axes([0.01, 0.01, 0.98, 0.98], facecolor='black')
                self.ax.axis('equal')
                b = self.box
                self.ax.axis([b[0] - 1, b[1], b[2], b[3]])
                self.ax.set_xticks([]); self.ax.set_yticks([])
                self.ax.axhline(); self.ax.axvline()

            # Passing in a figure allows multiple trucks to be displayed
            else:
                self.f = figure
                self.ax = self.f.axes[0]
                
            self.patches = list()
            
        if (lesson):
            self.sample_lesson(lesson)
        else:
            self.reset()

    
    def sample_lesson(self, lesson):
        (x0, x1), (y0, y1), (truckθ0, truckθ1), (trailerθ0, trailerθ1) = lesson
    
        self.θ0 = π / 180 * ((random() * (truckθ1 - truckθ0) + truckθ0)) 
        self.θ1 = π / 180 * ((random() * (trailerθ1 - trailerθ0) + trailerθ0))
        self.x = random() * (x1 - x0) + x0
        self.y = random() * (y1 - y0) + y0
        
    
    def reset(self, ϕ=0):
        self.ϕ = ϕ  # car initial steering angle
        
        # self.θ0 = deg2rad(30)  # car initial direction
        # self.θ1 = deg2rad(-30)  # trailer initial direction
        # self.x, self.y = 20, -5  # initial car coordinates
        
        self.θ0 = (random() - 0.5)  * 2 * π  # -π <= ϑ₀ < π
        self.θ1 = (random() - 0.5) * π + self.θ0  # -π/2 <= ϑ₁ - ϑ₀ < π/2
        self.x = random() * self.box[1]
        self.y = (random() - 0.5) * (self.box[3] - self.box[2])
        
        # If poorly initialise, then re-initialise
        if not self.valid():
            self.reset(ϕ)
        
        # Draw, if display is True
        if self.display: self.draw()
    
    def step(self, ϕ=0, dt=1):
        
        # Check for illegal conditions
        if self.is_jackknifed():
            print('The truck is jackknifed!')
            return
        
        if self.is_offscreen():
            print('The car or trailer is off screen')
            return
        
        self.ϕ = ϕ
        x, y, W, L, d, s, θ0, θ1, ϕ = self._get_atributes()
        
        # Perform state update
        self.x += s * cos(θ0) * dt
        self.y += s * sin(θ0) * dt
        self.θ0 += s / L * tan(ϕ) * dt
        self.θ1 += s / d * sin(θ0 - θ1) * dt
        
        return self.state()
    
    def state(self):
        return (self.x, self.y, self.θ0, self.θ1)
    
    def _get_atributes(self):
        return (
            self.x, self.y, self.W, self.L, self.d, self.s,
            self.θ0, self.θ1, self.ϕ
        )
    
    def _trailer_xy(self):
        x, y, W, L, d, s, θ0, θ1, ϕ = self._get_atributes()
        return x - d * cos(θ1), y - d * sin(θ1)
        
    def is_jackknifed(self):
        x, y, W, L, d, s, θ0, θ1, ϕ = self._get_atributes()
        return abs(θ0 - θ1) * 180 / π > 90
    
    def is_offscreen(self):
        x, y, W, L, d, s, θ0, θ1, ϕ = self._get_atributes()
        
        x1, y1 = x + 1.5 * L * cos(θ0), y + 1.5 * L * sin(θ0)
        x2, y2 = self._trailer_xy()
        
        b = self.box
        return not (
            b[0] <= x1 <= b[1] and b[2] <= y1 <= b[3] and
            b[0] <= x2 <= b[1] and b[2] <= y2 <= b[3]
        )
        
    def valid(self):
        return not self.is_jackknifed() and not self.is_offscreen()
        
    def draw(self):
        if not self.display: return
        if self.patches: self.clear()
        self._draw_car()
        self._draw_trailer()
        self.f.canvas.draw()
            
    def clear(self):
        for p in self.patches:
            p.remove()
        self.patches = list()
        
    def _draw_car(self):
        x, y, W, L, d, s, θ0, θ1, ϕ = self._get_atributes()
        ax = self.ax
        
        x1, y1 = x + L / 2 * cos(θ0), y + L / 2 * sin(θ0)
        bar = Line2D((x, x1), (y, y1), lw=5, color='C2', alpha=0.8)
        ax.add_line(bar)

        car = Rectangle(
            (x1, y1 - W / 2), L, W, color='C2', alpha=0.8, transform=
            matplotlib.transforms.Affine2D().rotate_deg_around(x1, y1, θ0 * 180 / π) +
            ax.transData
        )
        ax.add_patch(car)

        x2, y2 = x1 + L / 2 ** 0.5 * cos(θ0 + π / 4), y1 + L / 2 ** 0.5 * sin(θ0 + π / 4)
        left_wheel = Line2D(
            (x2 - L / 4 * cos(θ0 + ϕ), x2 + L / 4 * cos(θ0 + ϕ)),
            (y2 - L / 4 * sin(θ0 + ϕ), y2 + L / 4 * sin(θ0 + ϕ)),
            lw=3, color='C5', alpha=1)
        ax.add_line(left_wheel)

        x3, y3 = x1 + L / 2 ** 0.5 * cos(π / 4 - θ0), y1 - L / 2 ** 0.5 * sin(π / 4 - θ0)
        right_wheel = Line2D(
            (x3 - L / 4 * cos(θ0 + ϕ), x3 + L / 4 * cos(θ0 + ϕ)),
            (y3 - L / 4 * sin(θ0 + ϕ), y3 + L / 4 * sin(θ0 + ϕ)),
            lw=3, color='C5', alpha=1)
        ax.add_line(right_wheel)
        
        self.patches += [car, bar, left_wheel, right_wheel]
        
    def _draw_trailer(self):
        x, y, W, L, d, s, θ0, θ1, ϕ = self._get_atributes()
        ax = self.ax
            
        x, y = x - d * cos(θ1), y - d * sin(θ1) - W / 2
        trailer = Rectangle(
            (x, y), d, W, color='C0', alpha=0.8, transform=
            matplotlib.transforms.Affine2D().rotate_deg_around(x, y + W/2, θ1 * 180 / π) +
            ax.transData
        )
        ax.add_patch(trailer)
        
        self.patches += [trailer]

In [None]:
truck = Truck(display=True)

In [None]:
ϕ = π/180 * (10)  # positive left, negative right
truck.step(ϕ)
truck.draw()

print(f"Cab angle: {truck.state()[2] * 180 / π:2f}")
print(f"Trailer angle: {truck.state()[3] * 180 / π:2f}")

In [None]:
truck.reset()

In [None]:
import torch
import torch.nn as nn
from torch.optim import SGD
from tqdm import tqdm

In [None]:
# Build expert data set

episodes = 10
inputs = list()
outputs = list()
truck = Truck(); episodes = 10_000  # uncomment for creating the data set

for episode in tqdm(range(episodes)):
    
    truck.reset()
    
    while truck.valid():
        initial_state = truck.state()
        ϕ = (random() - 0.5) * π / 2
        inputs.append((ϕ, *initial_state))
        outputs.append(truck.step(ϕ))
        truck.draw()

In [None]:
len(inputs), len(outputs)

In [None]:
state_size = 4
steering_size = 1
hidden_units_e = 45

emulator = nn.Sequential(
    nn.Linear(steering_size + state_size, hidden_units_e),
    nn.ReLU(),
    nn.Linear(hidden_units_e, state_size)
)

optimiser_e = SGD(emulator.parameters(), lr=0.005)
criterion = nn.MSELoss()

In [None]:
tensor_inputs = torch.Tensor(inputs)
tensor_outputs = torch.Tensor(outputs)

mean = tensor_inputs.mean(0)
std = tensor_inputs.std(0)
tensor_inputs = (tensor_inputs - mean) / std
tensor_outputs = (tensor_outputs - mean[1:]) / std[1:]

In [None]:
# Split the data into 80:20 for test:train.
test_size = int(len(tensor_inputs) * 0.8)
print(len(tensor_inputs), test_size)

train_inputs = tensor_inputs[:test_size]
train_outputs = tensor_outputs[:test_size]
test_inputs = tensor_inputs[test_size:]
test_outputs = tensor_outputs[test_size:]

In [None]:
len(train_inputs)

In [None]:
# Emulator training
cnt = 0
for i in torch.randperm(len(train_inputs)):
    ϕ_state = train_inputs[i]
    next_state_prediction = emulator(ϕ_state)
    next_state = train_outputs[i]
    
    loss = criterion(next_state_prediction, next_state)
    
    optimiser_e.zero_grad()
    loss.backward()
    optimiser_e.step()
    
    if cnt == 0 or (cnt + 1) % 10000 == 0:
        print(f'{cnt + 1:4d} / {len(train_inputs)}, {loss.item():.10f}')

    cnt += 1

In [None]:
# Test
total_loss = 0
with torch.no_grad():
    for idx, ϕ_state in enumerate(test_inputs):
        next_state_prediction = emulator(ϕ_state)
        next_state_prediction = next_state_prediction
        next_state = test_outputs[idx]
        total_loss += criterion(next_state_prediction, next_state).item()

ave_test_loss = total_loss/test_size

print(f'Test loss: {ave_test_loss:.10f}')

In [None]:
# Visual test. Initialize 2 trucks, step them, 1 with emulator one with actual dynamics
truck1 = Truck(display=True)
truck2 = Truck(display=True, figure=truck1.f)

truck2.θ0 = truck1.θ0
truck2.θ1 = truck1.θ1
truck2.x = truck1.x
truck2.y = truck1.y

truck2.draw()

In [None]:
ϕ = π/180 * (-0)  # positive left, negative right

truck1.step(ϕ)
truck1.draw()

def emulate_step(t, emulator):
    state_tensor = torch.cat((torch.tensor([ϕ]), torch.tensor(t.state())))
    state_tensor = (state_tensor - mean) / std # normalize
    em_state = emulator(state_tensor)
    em_state = em_state * std[1:] + mean[1:]  # unnormalize

    t.θ0 = em_state[2].item()
    t.θ1 = em_state[3].item()
    t.x = em_state[0].item()
    t.y = em_state[1].item()

    
    # print(f"Calculated trailer x, y {t.state()[3]:.2f} {t.state()[4]:.2f}")
    # print(f"Emulated trailer x, y {em_state[3]:.2f} {em_state[4]:.2f}")

truck2.ϕ = ϕ
emulate_step(truck2, emulator)
truck2.draw()

criterion = torch.nn.MSELoss()
loss = criterion(torch.tensor(truck2.state()), torch.tensor(truck1.state()))
print(f"Loss: {loss.item():.5f}")

In [None]:
# Setup a curriculum for training a controller that starts with easier examples and moves to harder ones
# Each "lesson specifies the xrange, yrange, cabin angle relative to x axis, and trailer angle relative to cabin
# Using https://tifu.github.io/truck_backer_upper/ as a reference for the following parts

import numpy as np


def generate_lessons(start_lesson, end_lesson, n=41):
    '''
    Interpolates between a starting lesson and ending lesson to generate n distint lessons
    '''

    lessons = []
    
    for srange, erange in zip(start_lesson, end_lesson):
        (s0, s1), (e0, e1) = srange, erange
        starts = np.round(np.linspace(s0, e0, n), decimals=2)
        ends = np.round(np.linspace(s1, e1, n), decimals=2)

        lessons.append(list(zip(starts, ends)))

    # Little trick to go from [ [xranges], [yranges], ... ] to [(xrange_1, yrange_1,...), (xrange2, yrange2,...),...]
    return list(zip(*lessons))


start = ((4, 6), (0, 0), (0, 0), (0, 0))
end = (( 20, 40 ), ( -10, 10 ), ( -90, 90 ), ( -90, 90 ))
    
lessons = generate_lessons(start, end)

lessons

In [None]:
# Display 20 random initial position(s) of given lessons
truck = Truck(display=True)
trucks = [truck] + [Truck(display=True, figure=truck.f) for _ in range(19)]
l = 0

In [None]:
import time

print(f"Showing truck(s) from lesson {l+1}")

(x0, x1), (y0, y1), (truckθ0, truckθ1), (trailerθ0, trailerθ1) = lessons[l]

for truck in trucks:
    truck.sample_lesson(lessons[l])
    truck.draw()

l = ( l + 1) % 21 # Repeatedly running this cell loops through the lessons


In [None]:
S = 1000       # Samples per lesson
K = 1000       # Max number of steps

In [None]:
# Ok now let's try training a controller using these lessons
# Controller architecture
hidden_units_c = 50
controller = nn.Sequential(
    nn.Linear(state_size, hidden_units_c),
    nn.Tanh(),
    nn.Linear(hidden_units_c, 1),
    nn.Tanh()
)

In [None]:
# i.e. forward pass
def use_controller(truck, training=True, use_emulator=True):
    init_state = torch.tensor(truck.state()).float()
    state = (init_state - mean[1:]) / std[1:] # normalize 

    for step in range(K):
        output = controller(state)
        
        ϕ = (output * π / 4 - mean[0]) / std[0] # Transform output to normalized(-π/4) to normalized(π/4)
        unnorm_ϕ = ϕ * std[0] + mean[0]
        
        if use_emulator:
            # Takes in normalized state, including steering
            state = emulator(torch.cat((ϕ, state)))

            with torch.no_grad():
                unnorm_state = state * std[1:] + mean[1:]  # unnormalize                
                truck.ϕ = unnorm_ϕ.item() 
                truck.θ0 = unnorm_state[2].item()
                truck.θ1 = unnorm_state[5].item()
                truck.x = unnorm_state[0].item()
                truck.y = unnorm_state[1].item()
                
        else:
            print(unnorm_ϕ)
            truck.step(unnorm_ϕ)
            truck.ϕ = unnorm_ϕ
            state = torch.Tensor(truck.state())
            state = (state - mean[1:]) / std[1:] # normalize 

        if (truck.display):
            truck.draw()
            time.sleep(0.1)
                
        if not truck.valid(): # Jacknifed or reached offscreen (could be dock)
            # TODO reference implementation did not backprop if end reached?
            # if (truck.is_jackknifed()):
            #     return None
            break
            
    state = state * std[1:] + mean[1:]  # unnormalize
    return state


In [None]:
truck = Truck()

In [None]:
optimiser_c = SGD(controller.parameters(), lr=0.0001, momentum=0.9, nesterov=True)
criterion = nn.MSELoss()
target = torch.zeros(4).float() # Target of (trailer x, trailer y, trailer θ, cab trailer angle)


def train_sample(lesson):
    truck.sample_lesson(lesson)
    final_state = use_controller(truck)

    # Tune loss function to make angles a bit better
    angle_diff = (final_state[2] - final_state[5]) * 2
    final_state = torch.cat((final_state[3:], angle_diff.unsqueeze(0)))
    # Loss is only calculated with trailer xy and angle
    loss = criterion(final_state[3:], target, )
    
    optimiser_c.zero_grad()
    loss.backward()
    optimiser_c.step()
    return loss.item(), 1 if truck.is_jackknifed() else 0

diverged = False

for (i, lesson) in enumerate(lessons[:8]):
    print(f"Lesson {i + 1}")
    total_loss = 0
    num_jk = 0
    
    for i in range(S):
        loss, jk = train_sample(lesson)
        total_loss += loss
        num_jk += jk

        if ((i + 1) % 100 == 0):
            if (num_jk == 100):
                diverged = True
                print("model diverged :(")
                break

            
            print(f"Avg loss from iteration {i - 100 + 1}-{i + 1} is {total_loss/ 100:.2f}. Jackknifed: {num_jk}")
            total_loss = 0
            num_jk = 0
            
    if diverged:
        break



In [None]:
truck = Truck(display=True)


In [None]:
truck.sample_lesson(lessons[0])
final_state = use_controller(truck, use_emulator=True)