# Hands-on Lesson 10 - Deep reinforcement learning #

In this hands-on lesson, we will use a neural network to control an autonomous car.   

In [1]:
import numpy as np
import matplotlib.pyplot as plt

# tensorflow 
import tensorflow as tf
from tensorflow.keras import layers
from tensorflow.keras import Model

# To play animations
from matplotlib.animation import FuncAnimation
from IPython.display import HTML, display

## Autonomous car environement ##

In this environement, an autonomous car drives on a loop. Its goal is to finish the loop without touching the road borders.
The road is between $r_{int}$ and $r_{ext}$ defined in polar coordinates as
$$ r_{int} = R_{int}\left(1 + \epsilon \cos n_1\theta + \epsilon \sin n_2\theta\right)$$
$$ r_{ext} = R_{ext}\left(1 + \epsilon \cos n_1\theta + \epsilon \sin n_2\theta\right)$$

In [None]:
# PARAMETERS
N1 = 5
N2 = 3
RINT = 1
REXT = 1.5
REPS = 0.05
NSENSORS = 2

class autonomous_car:  
    def __init__(self):
        self.reset()
        self._fill_archive()
        
    def reset(self):
        self.rcar = (self._rint(0) + self._rext(0)) / 2 
        self.thetacar = 0
        self.phicar = np.pi/2
        self.rcararchive = []
        self.thetacararchive = []
        self.statearchive = []
        self.rarchive = []
        self.tarchive = []
        self._xy()
        state, _, _ = self._state()
        return state 

    def step(self, action):
        # Possible actions
        if action == 0: # STRAIGHT
            pas = 1
        if action == 1: # RIGHT
            self.phicar -= np.pi/10
            pas = 0.5
        if action == 2: # LEFT
            self.phicar += np.pi/10
            pas = 0.5
        if action == 3: # RIGHT
            self.phicar -= np.pi/20
            pas = 0.75
        if action == 4: # LEFT
            self.phicar += np.pi/20
            pas = 0.75
            
        self._xy()
        xnew = self.xcar + 0.1 * pas * np.cos(self.phicar)
        ynew = self.ycar + 0.1 * pas * np.sin(self.phicar)
        rcarnew = np.sqrt(xnew**2 + ynew**2)
        thetacarnew = np.arctan2(ynew,xnew)
        thetacarnew = np.remainder(thetacarnew + 0.3, 2*np.pi) - 0.3

        # Reward
        done = False
        reward = thetacarnew - self.thetacar 
        if thetacarnew > 1.65*np.pi:
            done = True
            reward = 10
        if rcarnew < self._rint(thetacarnew):
            done = True
            reward = -1
        if rcarnew > self._rext(thetacarnew):
            done = True
            reward = -1
            
        self.rcar = rcarnew
        self.thetacar = thetacarnew   
        state = self._fill_archive()
        return state, reward, done
            
    def _rint(self,theta):
        return RINT * (1 + REPS*(np.cos(N1*theta)+np.sin(N2*theta)))

    def _rext(self,theta):
        return REXT * (1 + REPS*(np.cos(N1*theta)+np.sin(N2*theta)))
    
    def _fill_archive(self):
        self.rcararchive.append(self.rcar)
        self.thetacararchive.append(self.thetacar)
        state, r, t = self._state()
        self.statearchive.append(state)
        self.rarchive.append(r)
        self.tarchive.append(t)
        return state
        
    def _xy(self):
        self.xcar = self.rcar * np.cos(self.thetacar)
        self.ycar = self.rcar * np.sin(self.thetacar)

    def _state(self):
        # theta = self.phicar + np.pi/2 + np.pi/6 * np.array((-1,1)).reshape(-1,1)  # SENSOR ANGLES  
        theta = self.phicar + np.pi/2 + np.pi/6 * np.linspace(-1,1, num=NSENSORS).reshape(-1,1)  # SENSOR ANGLES  
        theta_line =  self.thetacar + (theta - np.pi/2 - self.thetacar) * np.arange(0, 0.95, 0.01)
        
        rint = self.rcar * np.cos(self.thetacar - theta) / np.cos(theta_line - theta) - self._rint(theta_line)
        rext =-self.rcar * np.cos(self.thetacar - theta) / np.cos(theta_line - theta) + self._rext(theta_line)
        rint[:,-1] = -1
        rext[:,-1] = -1
        index = np.minimum(np.argmax(rint<0,axis=1), np.argmax(rext<0,axis=1))

        thetaborder = np.zeros(NSENSORS)
        for j in range(NSENSORS):
            thetaborder[j] = theta_line[j,index[j]]
        rborder = self.rcar * np.cos(self.thetacar - theta[:,0]) / np.cos(thetaborder - theta[:,0])
        
        x = rborder * np.cos(thetaborder)
        y = rborder * np.sin(thetaborder)
        d = np.sqrt((x - self.xcar)**2 + (y - self.ycar)**2)
        return d, rborder, thetaborder
    
    #__ rendering animation ________________________________   
    
    def render(self):
        theta = np.linspace(0, 2 * np.pi, num=100)
        fig, ax = plt.subplots(subplot_kw={'projection': 'polar'})
        ax.plot(theta, self._rint(theta),'r')
        ax.plot(theta, self._rext(theta),'r')
        ax.plot(1.65 * np.pi * np.ones(2), [self._rint(1.65 * np.pi), self._rext(1.65 * np.pi)], 'k:')
        fig_car, = ax.plot([], [], 'ok')
        fig_sense = []
        for j in range(NSENSORS):
            data, = ax.plot([], [], 'b')
            fig_sense.append(data)
        ax.grid(False)
        plt.axis('off')

        def animate(i):
            fig_car.set_data(self.thetacararchive[i], self.rcararchive[i])
            ax.set_title(str(self.statearchive[i]))
            for j in range(NSENSORS):
                fig_sense[j].set_data(
                    [self.thetacararchive[i], self.tarchive[i][j]], 
                    [self.rcararchive[i], self.rarchive[i][j]])
            return fig_car, # fig_sense,

        anim = FuncAnimation(
            fig, 
            animate,
            frames=len(self.rarchive),
            interval=100,
            blit=True
        ) 
        return anim

Let's play with this environment with a random walk

In [None]:
myenv = autonomous_car()
myenv.reset()
for _ in range(20):
    action = np.random.randint(5)
    state, reward, done = myenv.step(action)
    print("state, action, reward, done", state, action, reward, done)
    if done: break

In [None]:
anim = myenv.render()
display(HTML(anim.to_jshtml(default_mode='reflect')))
plt.close() 

## Deep Q-learning algorithm

In [None]:
class Qnetwork(Model):
    def __init__(self, Nstates, Nhidden, Nactions):
        super(Qnetwork, self).__init__()
        self.input_layer = layers.InputLayer(input_shape = (Nstates,))
        self.dense1 = layers.Dense(Nhidden, activation='tanh')
        self.dense2 = layers.Dense(Nactions, activation='linear')

    @tf.function
    def call(self, inputs, **kwargs):
        x = self.input_layer(inputs)
        x = self.dense1(x)
        x = self.dense2(x)
        return x

    
class Qlearning():
    def __init__(self, Nstates, Nhidden, Nactions, LearningRate):
        self.model = Qnetwork(Nstates, Nhidden, Nactions)
        self.optimizer = tf.keras.optimizers.Adam(learning_rate=LearningRate, )
        self.loss = tf.keras.losses.MeanSquaredError()
        
    def Qvalues(self, state):        
        return self.model(state)
    
    def action(self, state):        
        q_values = self.model(state)
        return np.argmax(q_values[0])  
    
    @tf.function
    def train(self, state, Qtarget):
        train_variables = self.model.trainable_variables
        with tf.GradientTape() as tape:
            Qoutput = self.model(state)
            loss = self.loss(Qtarget, Qoutput)
        
        gradients = tape.gradient(loss, train_variables)
        self.optimizer.apply_gradients(zip(gradients, train_variables))

Now, let's try to train this network to control the car. 

In [None]:
N_STATES = NSENSORS
N_ACTIONS = 3
N_HIDDEN = 4
LEARNING_RATE = 0.003
DISCOUNT_FACTOR = 0.99
N_EPISODES = 1001
EPSILON = 0.9

myenv = autonomous_car()
myqlearning = Qlearning(
    Nstates=N_STATES, 
    Nhidden=N_HIDDEN, 
    Nactions=N_ACTIONS, 
    LearningRate=LEARNING_RATE)
step = 0
list_fullrewards = []

for episode in range(N_EPISODES):
    # initial state
    s = myenv.reset()
    total_reward = 0
    EPSILON *= 0.9975
    
    while True:
        # epsilon-greedy policy
        a = myqlearning.action(s[np.newaxis,:])
        if np.random.rand(1) < EPSILON:
            a = np.random.randint(N_ACTIONS)
        s_, r, done = myenv.step(a)
        
        Qtarget = myqlearning.Qvalues(s[np.newaxis,:])
        Q_ = myqlearning.Qvalues(s_[np.newaxis,:])
        maxQ_ = np.max(Q_[0])
        Qtarget = np.array(Qtarget)
        Qtarget[0,a] = r + DISCOUNT_FACTOR*maxQ_
        myqlearning.train(s[np.newaxis,:],Qtarget)
        s = s_
        total_reward += r
        if done:
            break
    
    list_fullrewards.append(total_reward)

    if episode % 100 == 0:
        print("Episode: ", episode, "  // epsilon: ", EPSILON, "  // Total reward: ", total_reward)

## Some questions now
- Plot an animation of the greedy policy
- Plot the function $a = \pi^*(s)$
- Plot the evolution of the loss/total reward as learning is implemented
- Play with the parameters (learning rate, hidden layers, number of neurons, etc.)
- Try to see if learning is better/faster with more sensors
- Try to play with the rewards to see if learning is better/faster
- Try to see if the car can learn different tracks
- Try to see if the car can learn random tracks
- Could you code learning with "experience replay" (i.e. mini-batch descent)?