In [1]:
%matplotlib notebook
import numpy as np
import matplotlib.pyplot as plt
import matplotlib

import pendulum
import tqdm

# Goal of the project
The goal of this project is to learn a policy for an inverted pendulum model to make it do a swing-up motion. Beyond the task of inverting a pendulum, the goal is to also gain an understanding on how value iteration and Q-learning work, their limitations and advantages.

To make the problem interesting, the inverted pendulum has a limit on the maximum torque it can apply, therefore it is necessary for the pendulum to do a few "back and forth" motions to be able to reach the inverted position ($\theta=\pi$) from the standing still non-inverted position ($\theta=0$). 

<img src='pendulum.png' width="120">

In the following, we will write $x = \begin{pmatrix} \theta \\ \dot{\theta} \end{pmatrix}$ as the vector of states of the system. We will also work with time-discretized dynamics, and refer to $x_n$ as the state at time $t = n \Delta t$ (assuming discretization time $\Delta t$)

We want to minimize the following discounted cost function
$$\sum_{i=0}^{\infty} \alpha^i g(x_i, u_i)$$ where 
$$g(x_i, u_i) = (\theta-\pi)^2 + 0.01 \cdot \dot{\theta}_i^2 + 0.0001 \cdot u_i^2 \qquad \textrm{and} \qquad\alpha=0.99$$
This cost mostly penalizes deviations from the inverted position but also encourages small velocities and control.


In [2]:
# we can create a robot
robot = pendulum.Pendulum()

# assume we set theta and dtheta = 0 and u = -5, we can get the next state using
x = np.array([0,0])
u = -5
x_next = robot.next_state(x, u)

## Part 2 - Tabular Q-learning
Now we want to implement the Q-learning algorithm (with a table and discretized states / actions). The difference with value iteration is that we will not assume that we can access the next state given any (x,u). We will need to run a realistic episode where we always start the pendulum at the position $[0,0]$.
1. Write a function ```q_learning(robot)``` that implements the tabular Q-learning algorithm (use episodes of 100 timesteps and an epsilon greedy policy with $\epsilon=0.1$). The function should return the Q-table as a 50x50x3 numpy array (assume that only three controls are possible as in Part 1).

2. How can you compute the optimal policy from the Q function? And the optimal value function? Write a function that does this using as an input the Q-table computed by ```q_learning(robot)```.

3. How many epsilodes (approximately) does it take for Q-learning to learn how to invert the pendulum when $u \in \{-5,0,5\}$? (use a learning rate of 0.1)

4. How does the estimates of the value function and policy compare to the ones computed with the Value Iteration algorithm? (plot these functions) Do you see any qualitative differences in terms of the computed policy compared to the optimal solution computed with value iteration? (e.g. do they achieve the same cost? did Q-learning find the optimal value function?  why?)

5. How is learning affected when changing $\epsilon$ and the learning rate?

6. Can you learn how to invert the pendulum with the sparse cost function define in Part 1.4? Is it easier or harder? Why?

In [43]:
discretized_theta = np.linspace(0, 2*np.pi, 50, endpoint=False)

discretized_thetadot = np.linspace(-6, 6, 50)
def cost(x,u):
    theta = x[0]
    theta_dot = x[1]
    res = (theta-np.pi)**2+0.01*theta_dot**2+0.0001*u**2
    return res

In [47]:
def cost(x,u):
    theta = x[0]
    theta_dot = x[1]
    res = (theta-np.pi)**2+0.01*theta_dot**2+0.0001*u**2
    return res

class Q_solver:
    def __init__(self,robot, costfn = cost, actions=None, max_iters=5000, sparse_loss=False):
        self.robot: pendulum = robot
        if actions is None:
            actions = [-5, 0, 5]
        self.action_list = np.array(actions)
        self.lr = 0.1
        self.epslion = 0.1
        self.step_in_iter = 100
        self.alpha = 0.99
        self.max_iters = max_iters
        self.sparse_loss = sparse_loss
        self.costfn = costfn
        self.discretized_theta = discretized_theta
        self.discretized_theta_dot = discretized_thetadot
        self.space_shape = [len(discretized_theta),len(discretized_thetadot)]

        self.num_states = 50*50
        self.nu = 3
        self.nq = 50
        self.make_state_transfer_table()
    def make_state_transfer_table(self):
        next_state_index = np.empty([self.num_states, self.nu], dtype=np.int32)
        for i in range(self.num_states):
            for k in range(self.nu):
                x_next = robot.next_state(self.get_states(i), self.action_list[k])
                next_state_index[i, k] = self.get_index(x_next)

        self.state_transfer_table = next_state_index #[250 3 2]


    def get_index(self,x):
        ind_q = np.argmin((x[0] - self.discretized_theta) ** 2)
        ind_v = np.argmin((x[1] - self.discretized_theta_dot) ** 2)
        return ind_q + ind_v * self.nq

    def get_states(self, index):
        iv, ix = np.divmod(index, self.nq)
        return np.array([self.discretized_theta[ix], self.discretized_theta_dot[iv]])


    def iterate(self):
        q = np.zeros([self.num_states, self.nu])
        q_Last = np.zeros([self.num_states, self.nu])
        for i in tqdm.tqdm(range(self.max_iters)):  #

            # choose initial state x0
            x_0 = np.array([0, 0])
            x_index = self.get_index(x_0)
            for j in range(self.step_in_iter):

                if np.random.uniform(0, 1) > self.epslion:
                    u_index = np.argmin(q[x_index, :])
                else:
                    u_index = np.random.randint(0, self.nu - 1)
                # observe x_t+1
                next_index = self.state_transfer_table[x_index, u_index]
                # compute g(x_t,u(x_t))
                x = self.get_states(x_index)
                u = self.action_list[u_index]
                # compute TDerror
                TDerror = self.costfn(x, u) + self.alpha * min(q[next_index, :]) - q[
                    x_index, u_index]
                q[x_index, u_index] = q[x_index, u_index] + self.lr * TDerror
                x_index = next_index

            # we update the current Q function if there is any change otherwise we are done
            if ((q_Last - q) ** 2 < 1e-5).all():
                break
            else:
                q_Last = q.copy()

        policy = np.zeros(self.space_shape)
        value_function = np.zeros(self.space_shape)
        for k in range(self.num_states):
            iv, ix = np.divmod(k, self.nq)
            policy[ix,iv] = self.action_list[np.argmin(q[k, :])]
            value_function[ix,iv] = min(q[k, :])
        return value_function,policy
solver = Q_solver(robot,cost,max_iters=50000)
value,policy = solver.iterate()



 14%|█▍        | 7241/50000 [00:15<01:30, 471.66it/s]


In [50]:
policy

array([[ 0., -5., -5., ...,  5.,  5.,  5.],
       [-5., -5., -5., ...,  5.,  5.,  5.],
       [ 0.,  0., -5., ...,  0.,  5.,  5.],
       ...,
       [-5., -5.,  0., ...,  5.,  5.,  0.],
       [-5., -5., -5., ...,  5.,  0.,  5.],
       [-5., -5., -5., ...,  5.,  5.,  5.]])

In [51]:
# here is some code to plot results, assuming a policy and a value function are given
# this can be used to answer questions in both Part 1 and 2
# we plot the value function
plt.figure(figsize=[6,6])
plt.imshow(value.T, extent=[0., 2*np.pi, -6, 6], aspect='auto')
plt.xlabel('Pendulum Angle')
plt.ylabel('Velocity')
plt.title('Value Function')

# we plot the policy
plt.figure(figsize=[6,6])
plt.imshow(policy.T, extent=[0., 2*np.pi, -6, 6], aspect='auto')
plt.xlabel('Pendulum Angle')
plt.ylabel('Velocity')
plt.title('Policy')

# now we simulate the dynamics for 100 time steps
x0 = np.array([0.,0.])

def controller(x):
    theta = np.linspace(0, 2*np.pi, 50, endpoint=False)
    dtheta = np.linspace(-6, 6, 50)
    
    th_index = np.argmin(np.abs(theta - x[0]))
    dth_index = np.argmin(np.abs(dtheta - x[1]))
    return policy[th_index, dth_index]



<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>

In [52]:
x, u = robot.simulate(x0, controller, 30)

# and plot the results
time = np.linspace(0.,20., len(x[0,:]))
plt.figure()
plt.subplot(3,1,1)
plt.plot(time,x[0,:])
plt.ylabel('angle')
plt.subplot(3,1,2)
plt.plot(time,x[1,:])
plt.ylabel('velocity')
plt.subplot(3,1,3)
plt.plot(time[:-1],u)
plt.ylabel('control')



<IPython.core.display.Javascript object>

Text(0, 0.5, 'control')

In [53]:
# and show an animation
robot.animate_robot(x, robot.delta_t)

### 2.6
sparse loss

In [54]:
discretized_theta = np.linspace(0, 2*np.pi, 50, endpoint=False)

discretized_thetadot = np.linspace(-6, 6, 50)


In [66]:
def cost(x,u):
    theta = x[0]
    theta_dot = x[1]
    res = (theta-np.pi)**2+0.01*theta_dot**2+0.0001*u**2
    return res

In [78]:
def cost(x,u):
    theta = x[0]
    theta_dot = x[1]
    res = 0 if abs(theta-np.pi)<0.2  else 1
#     res = (theta-np.pi)**2
    return res

In [79]:
class DQ_solver:
    def __init__(self, robot, costfn=cost, actions=None, max_iters=400, sparse_loss=False):
        self.robot: pendulum = robot
        if actions is None:
            actions = [-5, 0, 5]
        self.action_list = np.array(actions)
        self.lr = 0.1
        self.epslion = 0.1
        self.step_in_iter = 100
        self.gamma = 0.9
        self.max_iters = max_iters
        self.sparse_loss = sparse_loss
        self.costfn = costfn
        self.discretized_theta = discretized_theta
        self.discretized_theta_dot = discretized_thetadot
        self.space_shape = [len(discretized_theta), len(discretized_thetadot)]

        self.num_states = 50 * 50
        self.nu = 3
        self.nq = 50
        self.make_state_transfer_table()

    def make_state_transfer_table(self):
        next_state_index = np.empty([self.num_states, self.nu], dtype=np.int32)
        for i in range(self.num_states):
            for k in range(self.nu):
                x_next = robot.next_state(self.get_states(i), self.action_list[k])
                next_state_index[i, k] = self.get_index(x_next)

        self.state_transfer_table = next_state_index  # [250 3 2]

    def get_index(self, x):
        ind_q = np.argmin((x[0] - self.discretized_theta) ** 2)
        ind_v = np.argmin((x[1] - self.discretized_theta_dot) ** 2)
        return ind_q + ind_v * self.nq

    def get_states(self, index):
        iv, ix = np.divmod(index, self.nq)
        return np.array([self.discretized_theta[ix], self.discretized_theta_dot[iv]])

    def iterate(self):
        qA = np.zeros([self.num_states, self.nu])
        qB = np.zeros([self.num_states, self.nu])
        qA_Last = np.zeros([self.num_states, self.nu])
        qB_Last = np.zeros([self.num_states, self.nu])
        for i in tqdm.tqdm(range(self.max_iters)):  #

            # choose initial state x0
            x_0 = np.array([0, 0])
            x_index = self.get_index(x_0)
            for j in range(self.step_in_iter):

                if np.random.uniform(0, 1) > self.epslion:
                    score1 = qA[x_index, :]
                    score2 = qB[x_index, :]
                    u_index = np.argmin(score1 + score2)
                else:
                    u_index = np.random.randint(0, self.nu - 1)
                # observe x_t+1
                next_index = self.state_transfer_table[x_index, u_index]
                # compute g(x_t,u(x_t))
                x = self.get_states(x_index)
                u = self.action_list[u_index]
                # compute TDerror
                #     TDerror = self.costfn(x, u) + self.alpha * min(q[next_index, :]) - q[
                #         x_index, u_index]
                #     q[x_index, u_index] = q[x_index, u_index] + self.lr * TDerror
                if np.random.uniform(0, 1) > 0.5:
                    # updataA
                    action_from_A = np.argmin(qA[next_index, :])
                    TDerrorA = self.costfn(x, u) + self.gamma * qB[next_index, action_from_A] - qA[x_index, u_index]
                    qA[x_index, u_index] = qA[x_index, u_index] + self.lr * TDerrorA
                else:
                    # updataB
                    action_from_B = np.argmin(qB[next_index, :])
                    TDerrorB = self.costfn(x, u) + self.gamma * qA[next_index, action_from_B] - qB[x_index, u_index]
                    qB[x_index, u_index] = qB[x_index, u_index] + self.lr * TDerrorB

                x_index = next_index


            if ((qA_Last - qA) ** 2 < 1e-4).all() and ((qB_Last - qB) ** 2 < 1e-2).all():
                break
            else:
                qA_Last = qA.copy()
                qB_Last = qB.copy()
        qA = qA+qB
        policy = np.zeros(self.space_shape)
        value_function = np.zeros(self.space_shape)
        for k in range(self.num_states):
            iv, ix = np.divmod(k, self.nq)
            policy[ix, iv] = self.action_list[np.argmin(qA[k, :])]
            value_function[ix, iv] = min(qA[k, :])
        return value_function, policy


solver = DQ_solver(robot, cost,max_iters=40000)
value, policy = solver.iterate()

 43%|████▎     | 17014/40000 [00:33<00:45, 507.15it/s]


In [83]:
# here is some code to plot results, assuming a policy and a value function are given
# this can be used to answer questions in both Part 1 and 2
# we plot the value function
plt.figure(figsize=[6,6])
plt.imshow(value.T, extent=[0., 2*np.pi, -6, 6], aspect='auto')
plt.xlabel('Pendulum Angle')
plt.ylabel('Velocity')
plt.title('Value Function')

# we plot the policy
plt.figure(figsize=[6,6])
plt.imshow(policy.T, extent=[0., 2*np.pi, -6, 6], aspect='auto')
plt.xlabel('Pendulum Angle')
plt.ylabel('Velocity')
plt.title('Policy')

# now we simulate the dynamics for 100 time steps
x0 = np.array([0.,0.])

def controller(x):
    theta = np.linspace(0, 2*np.pi, 50, endpoint=False)
    dtheta = np.linspace(-6, 6, 50)
    
    th_index = np.argmin(np.abs(theta - x[0]))
    dth_index = np.argmin(np.abs(dtheta - x[1]))
    return policy[th_index, dth_index]




<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>

In [81]:
x, u = robot.simulate(x0, controller, 20)

# and plot the results
time = np.linspace(0.,20., len(x[0,:]))
plt.figure()
plt.subplot(3,1,1)
plt.plot(time,x[0,:])
plt.ylabel('angle')
plt.subplot(3,1,2)
plt.plot(time,x[1,:])
plt.ylabel('velocity')
plt.subplot(3,1,3)
plt.plot(time[:-1],u)
plt.ylabel('control')


<IPython.core.display.Javascript object>

Text(0, 0.5, 'control')

In [82]:
# and show an animation
robot.animate_robot(x, robot.delta_t)