In [None]:
import numpy as np

In [None]:
def discretize(Z, Zdim, Zmin, Zstep):
    ###
    #Discretize a variable Z using its dimension Zdim, its minimal values along each axis and the discretization steps
    res = [0]*Zdim #n-dimensional index
    for i in range(Zdim): #For each dimension
        elt = Z[i] #Extract the i-th element
        ind = int((elt - Zmin[i])//Zstep[i]) #Discretize
        res[i] = ind
    return(tuple(res)) #Return as tuple for array indexing

def DKL(f,g):
    div = 0
    for i in range(51):
        for j in range(51):
            if g[i][j] != 0 and f[i][j] != 0:
                div = div + f[i][j]
    return(div)

In [None]:
u_axis = [-2.5 + (n+0.5)*0.25 for n in range(20)]

In [None]:
class Pendulum():
    def __init__(self):
        self.l = 0.6
        self.m = 1
        self.max_torque = 10
        self.max_speed = 5
        self.state = np.array([0,0])
        self.last_input = None
        self.g = 9.81
        self.dt = 0.1
        self.angle_std = 3*2*np.pi/360
        self.speed_std = 0.1
        
    def random_init(self):
        ###
        #Random initialization
        self.state = np.array([np.random.uniform(-np.pi, np.pi),np.random.uniform(-sefl.max_speed, self.max_speed)])
        
    def step(self, u):
        #Simulation step
        u = np.clip(u, -self.max_torque, self.max_torque) #Clip the input as safety
        self.last_input = u
        accel = u/(self.m*self.l*self.l) + self.g*np.sin(self.state[0])/self.l #Dynamics for the acceleration
        
        speed = self.state[1]
        angle = self.state[0]
        
        new_speed = speed + accel*self.dt + np.random.normal(0,self.speed_std) #Calculate the new speed
        new_speed = np.clip(new_speed, -self.max_speed, self.max_speed) #Clip the speed as safety (in practice the bounds were chosen to ensure this very rarely happens)
        
        new_angle = angle + speed*self.dt + np.random.normal(0,self.angle_std) #New angle
        if new_angle < -np.pi: #Angle is modulo 2\pi
            new_angle = new_angle + 2*np.pi
        if new_angle > np.pi:
            new_angle = new_angle - 2*np.pi
        
        self.state = np.array([new_angle, new_speed])
        
    def set_state(self, angle, speed):
        ###
        #Explicitely set the state
        speed = np.clip(speed, -self.max_speed, self.max_speed)
        if angle < -np.pi:
            angle = angle + 2*np.pi
        if angle > np.pi:
            angle = angle - 2*np.pi
        self.state = np.array([angle, speed])


In [None]:
class QLearning():
    def __init__(self):
        ###
        #Initialize the Q-table and the parameters
        self.values = np.zeros((50,50,20))
        self.learning_rate = 0.5 #High learning rate for fast learning
        self.discount = 0.99
        self.epsilon = 0.9
        self.P = Pendulum()
        self.P.set_state(np.pi, 0)
        
    def reward(self, state):
        ###
        #Reward of a state
        return(-state[0]*state[0] - state[1]*state[1]*0.1)
    
    def Qval_update(self, r, s, snew, a):
        ###
        #Classical Q-value update given a reward, two states and an action
        stateInd = discretize(s, 2, [-np.pi, -5], [2*np.pi/50, 0.2]) #Discretize all values
        aInd = discretize([a], 1, [-2.5], [0.25])
        newStateInd = discretize(snew, 2, [-np.pi, -5], [2*np.pi/50, 0.2])
        self.values[stateInd[0],stateInd[1],aInd[0]] = (1-self.learning_rate)*self.values[stateInd[0],stateInd[1],aInd[0]] + self.learning_rate*(r + self.discount*max(self.values[newStateInd])) #Q-value update rule
    
    def training_step(self):
        ###
        #Training step: pick an action (\epsilon-greedy policy), update Q-val
        expl = np.random.rand() #Greedy or random selection
        state = self.P.state
        if expl>self.epsilon:
            u = np.random.choice(u_axis)
        else:
            stateInd = discretize(self.P.state, 2, [-np.pi, -5], [2*np.pi/50, 0.2])
            uind = np.argmax(self.values[stateInd]) #Pick the greedy action for the current state
            u = u_axis[uind]
        self.P.step(u) #Simulation step
        newState = self.P.state #Record the new state
        rwd = self.reward(state) #Get reward
        self.Qval_update(rwd, state, newState, u) #Q-val update
        return(rwd)
    
    def exploitation_step(self):
        ###
        #Similar to a training step but uses the greedy policy
        state = self.P.state
        stateInd = discretize(self.P.state, 2, [-np.pi, -5], [2*np.pi/50, 0.2])
        uind = np.argmax(self.values[stateInd])
        u = u_axis[uind]
        self.P.step(u)
        newState = self.P.state
        rwd = self.reward(state)
        self.Qval_update(rwd, state, newState, u)
        return(u)
    
    def episode(self):
        ###
        #Training episode, 500 steps
        self.P.set_state(np.pi, 0)
        tot = 0
        for i in range(500):
            r = self.training_step()
        tot = tot + r
        return(tot)

In [None]:
Q = QLearning() #Initialize a Q-learning, we will train it and save it along checkpoints
for i in range(20): #10k steps
    Q.episode()
Q20 = QLearning() #First checkpoint
Q20.values = Q.values.copy()

In [None]:
for i in range(180): #100k steps
    Q.episode()
Q200 = QLearning()
Q200.values = Q.values.copy()

In [None]:
for i in range(1800): #1M steps
    Q.episode()
Q2000 = QLearning()
Q2000.values = Q.values.copy()

In [None]:
for i in range(18000): #10M steps
    Q.episode()
Q20k = QLearning()
Q20k.values = Q.values.copy()

In [None]:
for i in range(80000): #50M steps
    Q.episode()
Q100k = QLearning()
Q100k.values = Q.values.copy()

In [None]:
def getMeans(QL):
    ###
    #Perform 50 simulations and averagle the resulting rewards for performance analysis
    fullH = np.zeros((50,299))
    uH = np.zeros((50,299))
    nSteps = 300

    for j in range(50): #Perform 50 simulations
        hist = [[0,0]]*nSteps #Initialize empty reward and input histories
        uhist = [0]*nSteps
        QL.P.set_state(np.pi, 0) #(re)nitialize the pendulum
        for i in range(nSteps-1):
            state = QL.P.state #Get state
            u = QL.exploitation_step() #Exploitation step
            hist[i+1] = -state[0]*state[0] - state[1]*state[1]*0.1 #Log reward
            uhist[i] = u
        fullH[j] = hist[1:]
        uH[j] = uhist[:299] #We won't return the input history here, but the function can be modified to do so
    means = [0]*299
    stds = [0]*299
    for i in range(299):
        means[i] = np.mean(fullH[:,i])
        stds[i] = np.std(fullH[:,i])
    return(means, stds)



def getMeans2(QL):
    ###
    #Perform 50 simulations and average the angle and input for experiments
    fullH = np.zeros((50,299))
    uH = np.zeros((50,299))
    nSteps = 300

    for j in range(50): #Do 50 simulations
        hist = [[0,0]]*nSteps #Initialize state and input histories
        uhist = [0]*nSteps
        QL.P.set_state(np.pi, 0) #(re)nitialize the pendulum
        for i in range(nSteps-1):
            state = QL.P.state #Get state
            u = QL.exploitation_step() #Step
            hist[i+1] = state #Log state
            uhist[i] = u #Log input
        fullH[j] = [x[0] for x in hist[1:]]
        uH[j] = uhist[:299]
    means = [0]*299
    stds = [0]*299
    umeans = [0]*299
    ustds = [0]*299
    for i in range(299): #Mean and std for state and input for every point
        means[i] = np.mean(fullH[:,i])
        stds[i] = np.std(fullH[:,i])
        umeans[i] = np.mean(uH[:,i])
        ustds[i]= np.std(uH[:,i])
    return(means, stds, umeans, ustds)

In [None]:
###
#This cell generates the data for the 'average reward' bar plot

res = [0]*6
res2 = [0]*6
QLs = [Q20, Q200, Q2000, Q20k, Q100k, Q500k]
for i in range(6):
    means, stds = getMeans(QLs[i])
    res[i] = np.mean(means[200:])
    res2[i] = np.mean(stds[200:])

In [None]:
fig, ax = plt.subplots()
ax.bar(range(5), res[:5], yerr=res2[:5], align='center', alpha=0.5, ecolor='black', capsize=10)
ax.set_ylabel('Mean reward')
ax.set_xticklabels(['0', '20', '200', '2k', '20k', '100k'])
ax.yaxis.grid(True)

plt.savefig('bar_plot_with_error_bars.png')
plt.show()

In [None]:
###
#Use our predefined function to get the average trajectories for two checkpoints

Q2000bis = QLearning()
Q2000bis.values = Q2000.values.copy()

means, stds, umeans, ustds = getMeans2(Q2000bis)

from matplotlib import pyplot as plt
from pylab import rcParams
plt.rcParams.update({'font.size': 18})

x = np.array([x/10 for x in range(299)])
y = np.array(means)
ci = np.array(stds)

fig, ax = plt.subplots()
ax.plot(x,y)
ax.fill_between(x, (y-ci), (y+ci), color='b', alpha=.1)
#ax.set_title('FPD input')
ax.set_xlabel('Time')
ax.set_ylabel('Angular position')
plt.savefig('QL_angle_ci.png', bbox_inches = 'tight')

In [None]:
x = np.array([x/10 for x in range(299)])
y = np.array(umeans)
ci = np.array(ustds)

fig, ax = plt.subplots()
ax.plot(x,y)
ax.fill_between(x, (y-ci), (y+ci), color='b', alpha=.1)
#ax.set_title('FPD input')
ax.set_xlabel('Time')
ax.set_ylabel('Control input')
plt.savefig('QL_input_ci.png', bbox_inches = 'tight')

In [None]:
Q100kbis = QLearning()
Q100kbis.values = Q100k.values.copy()

means, stds, umeans, ustds = getMeans2(Q100kbis)

from matplotlib import pyplot as plt
from pylab import rcParams
plt.rcParams.update({'font.size': 18})

x = np.array([x/10 for x in range(299)])
y = np.array(means)
ci = np.array(stds)

fig, ax = plt.subplots()
ax.plot(x,y)
ax.fill_between(x, (y-ci), (y+ci), color='b', alpha=.1)
#ax.set_title('FPD input')
ax.set_xlabel('Time')
ax.set_ylabel('Angular position')
plt.savefig('QL_angle_ci_100k.png', bbox_inches = 'tight')

In [None]:
x = np.array([x/10 for x in range(299)])
y = np.array(umeans)
ci = np.array(ustds)

fig, ax = plt.subplots()
ax.plot(x,y)
ax.fill_between(x, (y-ci), (y+ci), color='b', alpha=.1)
#ax.set_title('FPD input')
ax.set_xlabel('Time')
ax.set_ylabel('Control input')
plt.savefig('QL_input_ci_100k.png', bbox_inches = 'tight')