# SIT796 Task 3.1D
Brenton Adey
222165064

In [2]:
import numpy as np
from numpy import pi, sin, cos

import random

from graphviz import Digraph

# Define the Dynamics of Acrobot
See Brockman et al. (2016)

In [3]:
# Define constants of for Acrobot Components
dt = 0.2
LINK_LENGTH_1 = 1.0  # [m]
LINK_LENGTH_2 = 1.0  # [m]
LINK_MASS_1 = 1.0  #: [kg] mass of link 1
LINK_MASS_2 = 1.0  #: [kg] mass of link 2
LINK_COM_POS_1 = 0.5  #: [m] position of the center of mass of link 1
LINK_COM_POS_2 = 0.5  #: [m] position of the center of mass of link 2
LINK_MOI = 1.0  #: moments of inertia for both links
MAX_VEL_1 = 4 * pi
MAX_VEL_2 = 9 * pi

In [4]:
def dsdt(state_augemented):
    m1 = LINK_MASS_1
    m2 = LINK_MASS_2
    l1 = LINK_LENGTH_1
    lc1 = LINK_COM_POS_1
    lc2 = LINK_COM_POS_2
    I1 = LINK_MOI
    I2 = LINK_MOI
    g = 9.8
    a = state_augemented[-1]
    s = state_augemented[:-1]
    theta1 = s[0]
    theta2 = s[1]
    dtheta1 = s[2]
    dtheta2 = s[3]
    d1 = (
        m1 * lc1**2
        + m2 * (l1**2 + lc2**2 + 2 * l1 * lc2 * cos(theta2))
        + I1
        + I2
    )
    d2 = m2 * (lc2**2 + l1 * lc2 * cos(theta2)) + I2
    phi2 = m2 * lc2 * g * cos(theta1 + theta2 - pi / 2.0)
    phi1 = (
        -m2 * l1 * lc2 * dtheta2**2 * sin(theta2)
        - 2 * m2 * l1 * lc2 * dtheta2 * dtheta1 * sin(theta2)
        + (m1 * lc1 + m2 * l1) * g * cos(theta1 - pi / 2)
        + phi2
    )
    
    ddtheta2 = (a + d2 / d1 * phi1 - phi2) / (m2 * lc2**2 + I2 - d2**2 / d1)
    ddtheta1 = -(d2 * ddtheta2 + phi1) / d1
    
    return dtheta1, dtheta2, ddtheta1, ddtheta2, 0.0

def rk4(derivs, y0, t):
    """
    Integrate 1-D or N-D system of ODEs using 4-th order Runge-Kutta.
    Example for 2D system:
        >>> def derivs(x):
        ...     d1 =  x[0] + 2*x[1]
        ...     d2 =  -3*x[0] + 4*x[1]
        ...     return d1, d2
        >>> dt = 0.0005
        >>> t = np.arange(0.0, 2.0, dt)
        >>> y0 = (1,2)
        >>> yout = rk4(derivs, y0, t)
    Args:
        derivs: the derivative of the system and has the signature ``dy = derivs(yi)``
        y0: initial state vector
        t: sample times
    Returns:
        yout: Runge-Kutta approximation of the ODE
    """

    try:
        Ny = len(y0)
    except TypeError:
        yout = np.zeros((len(t),), np.float_)
    else:
        yout = np.zeros((len(t), Ny), np.float_)

    yout[0] = y0

    for i in np.arange(len(t) - 1):

        this = t[i]
        dt = t[i + 1] - this
        dt2 = dt / 2.0
        y0 = yout[i]

        k1 = np.asarray(derivs(y0))
        k2 = np.asarray(derivs(y0 + dt2 * k1))
        k3 = np.asarray(derivs(y0 + dt2 * k2))
        k4 = np.asarray(derivs(y0 + dt * k3))
        yout[i + 1] = y0 + dt / 6.0 * (k1 + 2 * k2 + 2 * k3 + k4)
    # We only care about the final timestep and we cleave off action value which will be zero
    return yout[-1][:4]

def wrap(x, m, M):
    """Wraps ``x`` so m <= x <= M; but unlike ``bound()`` which
    truncates, ``wrap()`` wraps x around the coordinate system defined by m,M.\n
    For example, m = -180, M = 180 (degrees), x = 360 --> returns 0.
    Args:
        x: a scalar
        m: minimum possible value in range
        M: maximum possible value in range
    Returns:
        x: a scalar, wrapped
    """
    diff = M - m
    while x > M:
        x = x - diff
    while x < m:
        x = x + diff
    return x

def bound(x, m, M=None):
    """Either have m as scalar, so bound(x,m,M) which returns m <= x <= M *OR*
    have m as length 2 vector, bound(x,m, <IGNORED>) returns m[0] <= x <= m[1].
    Args:
        x: scalar
        m: The lower bound
        M: The upper bound
    Returns:
        x: scalar, bound between min (m) and Max (M)
    """
    if M is None:
        M = m[1]
        m = m[0]
    # bound x between min (m) and Max (M)
    return min(max(x, m), M)

In [5]:
def apply_torque(state, torque):
    state_augemented = np.append(state,torque)
    new_state = rk4(dsdt, state_augemented, [0,dt])

    new_state[0] = wrap(new_state[0], -pi, pi)
    new_state[1] = wrap(new_state[1], -pi, pi)
    new_state[2] = bound(new_state[2], -MAX_VEL_1, MAX_VEL_1)
    new_state[3] = bound(new_state[3], -MAX_VEL_2, MAX_VEL_2)
    
    return new_state

In [6]:
theta1 = random.uniform(-0.1,0.1)
theta2 = random.uniform(-0.1,0.1)
dtheta1 = random.uniform(-0.1,0.1)
dtheta2 = random.uniform(-0.1,0.1)

init_state = np.array([theta1,theta2,dtheta1,dtheta2])

new_state = apply_torque(init_state, 1)

# Discretise Acrobot State Space

In [7]:
# Δθ = 20˚ = π/9rad
# Δdθ = 0.5rad/s
# Number of states = 19*19*51*114 = 2,098,854

def discretise(state, D_angle=(pi/9), D_velocity=0.5):
    state[0] = round(state[0]/(D_angle))*(D_angle)
    state[1] = round(state[1]/(D_angle))*(D_angle)

    state[2] = round(state[2]/(D_velocity))*(D_velocity)
    state[3] = round(state[3]/(D_velocity))*(D_velocity)

    return state

In [8]:
# Example discretise function
discretise([4.5,0.5,5.523,-12.008])

[4.537856055185257, 0.3490658503988659, 5.5, -12.0]

In [9]:
# Define step function (apply torque then discretise)
def step(state, torque):
    return discretise(apply_torque(state, torque))

In [10]:
# Test Function to ensure dynamics can still operate
state = [0,0,0,0]
for _ in range(0,1000):
    state = step(state,random.choice([-1,0,1]))
    print(state)

[0. 0. 0. 0.]
[0.  0.  0.  0.5]
[0.  0.  0.  0.5]
[0.  0.  0.  0.5]
[0. 0. 0. 1.]
[0.         0.34906585 0.         1.        ]
[0.         0.34906585 0.         0.5       ]
[0.         0.34906585 0.         0.        ]
[ 0.          0.34906585  0.         -0.5       ]
[ 0.          0.34906585  0.         -0.5       ]
[ 0.          0.34906585  0.         -0.5       ]
[ 0.          0.34906585  0.         -0.5       ]
[ 0.          0.34906585  0.         -0.5       ]
[ 0.          0.34906585  0.         -0.5       ]
[ 0.          0.34906585  0.         -1.        ]
[ 0.   0.   0.  -1.5]
[ 0.         -0.34906585  0.         -1.        ]
[ 0.         -0.34906585  0.          0.        ]
[ 0.         -0.34906585  0.          0.5       ]
[ 0.         -0.34906585  0.          1.        ]
[0.  0.  0.  1.5]
[0.         0.34906585 0.         1.        ]
[0.         0.34906585 0.         0.5       ]
[0.         0.34906585 0.         0.        ]
[ 0.          0.34906585  0.         -0.5       ]
[ 

# Generate the Discrete State Space

In [11]:
# Generate variable distributions. Note +0.001 perturbation to include final value
theta1_dist = np.around(np.arange(-pi,pi+0.001,pi/9)/(pi/9))*pi/9
theta2_dist = np.around(np.arange(-pi,pi+0.001,pi/9)/(pi/9))*pi/9
dtheta1_dist = np.around(np.arange(-MAX_VEL_1,MAX_VEL_1,0.5)/0.5)*0.5
dtheta2_dist = np.around(np.arange(-MAX_VEL_2,MAX_VEL_2,0.5)/0.5)*0.5

In [12]:
# Generate all possible combinations of states with the variables
# This code creates the multi-dimensional co-ordinate space, and then combines the co-ordinates to a single 1-D array of 6-D arrays
# Numpy is used for this for efficiency, at the sacrifice of readability
state_space = np.array(np.meshgrid(theta1_dist,theta2_dist,dtheta1_dist,dtheta2_dist)).T.reshape(-1,4)

# Generate Transition Probabilities

Goal is to get a dictionary of the form:
```python
{
    's0': {
        'a0': {'s0': 0.5, 's2': 0.5},
        'a1': {'s2': 1}
    },
    's1': {
        'a0': {'s0': 0.7, 's1': 0.1, 's2': 0.2},
        'a1': {'s1': 0.95, 's2': 0.05}
    },
    's2': {
        'a0': {'s0': 0.4, 's2': 0.6},
        'a1': {'s0': 0.3, 's1': 0.3, 's2': 0.4}
    }
}
```
Where sX is an index for a state and aX is an index for an action.
Note that there will only ever be one available possible future state per state/action pair, so all probabilities will be 1 unless that state is a terminating state, where the state will not have transition probabilities

In [13]:
# Define action space
action_space = {"a0":-1,"a1":0,"a2":1}

In [14]:
# Apply each action to every possible state
apply_a0 = np.array([step(state,action_space["a0"]) for state in state_space])
apply_a1 = np.array([step(state,action_space["a1"]) for state in state_space])
apply_a2 = np.array([step(state,action_space["a2"]) for state in state_space])

In [15]:
def transform_states_to_posint(states):
    states_transformed = np.empty(states.shape)
    # Transform arrays to ensure all non-negative integer values
    states_transformed[:,0] = (states[:,0]*(180/pi))
    states_transformed[:,1] = (states[:,1]*(180/pi))
    states_transformed[:,2] = (states[:,2]*(2))
    states_transformed[:,3] = (states[:,3]*(2))

    states_transformed[:,0] = states_transformed[:,0] + abs(min(states_transformed[:,0]))
    states_transformed[:,1] = states_transformed[:,1] + abs(min(states_transformed[:,1]))
    states_transformed[:,2] = states_transformed[:,2] + abs(min(states_transformed[:,2]))
    states_transformed[:,3] = states_transformed[:,3] + abs(min(states_transformed[:,3]))

    states_transformed = states_transformed.astype(int)

    return states_transformed

In [16]:
# Now perform index matching along the rows. Due to the size of the data, ravel_multi_index is used to create smaller linear-index unique representations of each 4D state
# See https://stackoverflow.com/questions/38674027/find-the-row-indexes-of-several-values-in-a-numpy-array for more details
def multi_dim_search(X, search_values):
    transformed_X = transform_states_to_posint(X)
    transformed_search_values = transform_states_to_posint(search_values)

    dims = transformed_X.max(0)+2
    X1D = np.ravel_multi_index(transformed_X.T,dims)
    searched_valuesID = np.ravel_multi_index(transformed_search_values.T,dims)
    sidx = X1D.argsort()
    out = sidx[np.searchsorted(X1D,searched_valuesID,sorter=sidx)]

    return out

In [17]:
apply_a0_idx_map = multi_dim_search(state_space, apply_a0)
apply_a1_idx_map = multi_dim_search(state_space, apply_a1)
apply_a2_idx_map = multi_dim_search(state_space, apply_a2)

In [18]:
# Define termination criteria
def reward_criteria(theta1, theta2):
   return -cos(theta1) - cos(theta2 + theta1) > 1.0

In [19]:
termination_states = [ reward_criteria(state[0],state[1]) for state in state_space ]

In [20]:
transition_probs = {
    "s"+str(idx) : {
        "a0" : {"s"+str(apply_a0_idx_map[idx]): 1},
        "a1" : {"s"+str(apply_a1_idx_map[idx]): 1},
        "a2" : {"s"+str(apply_a2_idx_map[idx]): 1}
    }
    for idx, _ in enumerate(state_space) if not(termination_states[idx])
}

# Generate Rewards

Goal is to get a dictionary of the form:
```python
{
    's1': -1,
    's2': -1
}
```
Where sX is an index. No reward indicates that the state is terminal

In [21]:
rewards = {
    "s"+str(idx) : -1
    for idx, _ in enumerate(state_space) if not(termination_states[idx])
}

# Plot Graph of Model

In [45]:
def get_possible_actions(transition_probs, state):
    """ return a tuple of possible actions in a given state """
    return tuple(transition_probs.get(state, {}).keys())

def get_next_states(transition_probs, state, action):
    """ return a dictionary of {next_state1 : P(next_state1 | state, action), next_state2: ...} """
    assert action in get_possible_actions(transition_probs, state), "cannot do action %s from state %s" % (action, state)
    return transition_probs[state][action]

def get_transition_prob(transition_probs, state, action, next_state):
    """ return P(next_state | state, action) """
    return get_next_states(transition_probs, state, action).get(next_state, 0.0)

def plot_graph(transition_probs, graph_size='10,10', s_node_size='1,5',
               a_node_size='0,5', rankdir='TB', n_nodes=100):
    """
    Function for pretty drawing MDP graph with graphviz library.
    Requirements:
    graphviz : https://www.graphviz.org/
    for ubuntu users: sudo apt-get install graphviz
    python library for graphviz
    for pip users: pip install graphviz
    :param mdp:
    :param graph_size: size of graph plot
    :param s_node_size: size of state nodes
    :param a_node_size: size of action nodes
    :param rankdir: order for drawing
    :return: dot object
    """
    s_node_attrs = {'shape': 'doublecircle',
                    'color': '#85ff75',
                    'style': 'filled',
                    'width': str(s_node_size),
                    'height': str(s_node_size),
                    'fontname': 'Arial',
                    'fontsize': '24'}

    a_node_attrs = {'shape': 'circle',
                    'color': 'lightpink',
                    'style': 'filled',
                    'width': str(a_node_size),
                    'height': str(a_node_size),
                    'fontname': 'Arial',
                    'fontsize': '20'}

    s_a_edge_attrs = {'style': 'bold',
                      'color': 'red',
                      'ratio': 'auto'}

    a_s_edge_attrs = {'style': 'dashed',
                      'color': 'blue',
                      'ratio': 'auto',
                      'fontname': 'Arial',
                      'fontsize': '16'}

    graph = Digraph(name='MDP')
    graph.attr(rankdir=rankdir, size=graph_size, dpi='1000')
    
    # Only plot the first n nodes
    n=0
    for state_node in transition_probs:
        graph.node(state_node, **s_node_attrs)

        for posible_action in get_possible_actions(transition_probs, state_node):
            action_node = state_node + "-" + posible_action
            graph.node(action_node,
                    label=str(posible_action),
                    **a_node_attrs)
            graph.edge(state_node, state_node + "-" +
                    posible_action, **s_a_edge_attrs)

            for posible_next_state in get_next_states(transition_probs, state_node, posible_action):
                probability = get_transition_prob(
                    transition_probs, state_node, posible_action, posible_next_state)
                    
                label_a_s_edge = 'p = ' + str(probability)

                graph.edge(action_node, posible_next_state,
                        label=label_a_s_edge, **a_s_edge_attrs)
        n += 1
        if n>n_nodes:
            break
    return graph

In [46]:
graph = plot_graph(transition_probs=transition_probs, graph_size='10,10', n_nodes=500)

In [47]:
# Render Graph with neato engine 
graph.render(engine='neato', outfile='graph.png')

'graph.png'

# Implement Policy Iteration

In [109]:
# Set policy iteration parameters
max_policy_eval = 1000  # Maximum number of policy evaluations
max_policy_iter = 1000  # Maximum number of policy iterations
states = [ 's'+str(state) for state, _ in enumerate(state_space) ]
actions = action_space.keys()
pi = {s: 'a0' for s in states}
V = {s: 0 for s in states}

gamma = 0.9  # discount factor
delta = 1  # Error tolerance

In [111]:
# Find state where Acrobot begins stationary
initial_state = "s"+str(np.where((state_space == [ 0,  0, 0, 0]).all(axis=1))[0][0])

# Number of steps to play
n_steps = 1000

# Try policy before optimisation
total_reward, average_reward = run_policy(transition_probs, rewards, pi, n_steps, initial_state)
print(f"Average reward over {n_steps} steps is {average_reward}")
print(f"Total reward over {n_steps} steps is {total_reward}")

Average reward over 1000 episodes is -1.0
Total reward over 1000 episodes is -1000


In [112]:
num_policy_eval = 0
num_policy_iter = 0

for i in range(max_policy_iter):
    # Initial assumption: policy is stable
    optimal_policy_found = True

    # Policy evaluation
    # Compute value for each state under current policy
    for j in range(max_policy_eval):
        max_diff = 0  # Initialize max difference
        for s in states:

            # Compute state value
            val = rewards.get(s,0)  # Get direct reward
            for s_next, s_next_prob in transition_probs.get(s,{}).get(pi[s],{}).items():
                val += s_next_prob * (
                        gamma * V[s_next]
                )  # Add discounted downstream values

            # Update maximum difference
            max_diff = max(max_diff, abs(val - V[s]))

            V[s] = val  # Update value
        
        num_policy_eval += 1
        # If diff smaller than threshold delta for all states, algorithm terminates
        if max_diff < delta:
            break

    # Policy improvement
    # With updated state values, improve policy if needed
    for s in states:

        val_max = V[s]
        for a in actions:
            val = rewards.get(s,0)  # Get direct reward
            for s_next, s_next_prob in transition_probs.get(s,{}).get(a,{}).items():
                val_a = val + s_next_prob * (
                        gamma * V[s_next]
                )  # Add discounted downstream values

            # Update policy if (i) action improves value and (ii) action different from current policy
            if val > val_max and pi[s] != a:
                pi[s] = a
                val_max = val
                optimal_policy_found = False
    num_policy_iter += 1

    # If policy did not change, algorithm terminates
    if optimal_policy_found:
        break

print(f"Number of Policy Evaluation steps: {num_policy_eval}")
print(f"Number of Policy Iteration steps: {num_policy_iter}")

Number of Policy Evaluation steps: 96
Number of Policy Iteration steps: 17


In [113]:
# Get summary of policy with action count
action_count = {"a0":0,"a1":0,"a2":0}
for action in pi.values():
    action_count[action] += 1

action_count

{'a0': 1672878, 'a1': 212922, 'a2': 213054}

In [114]:
# Find state where Acrobot begins stationary
initial_state = "s"+str(np.where((state_space == [ 0,  0, 0, 0]).all(axis=1))[0][0])

# Number of episodes to play
n_episodes = 1000

# Try new optimised policy
total_reward, average_reward = run_policy(transition_probs, rewards, pi, n_episodes, initial_state)
print(f"Average reward over {n_episodes} episodes is {average_reward}")
print(f"Total reward over {n_episodes} episodes is {total_reward}")

Average reward over 1000 episodes is -0.036
Total reward over 1000 episodes is -36
