In [1]:
from CartPole import CartPole, remap_angle
import numpy as np
import matplotlib
import pandas as pd
import seaborn as sns
matplotlib.use('TkAgg') 
import scipy

import matplotlib.pyplot as plt
import jax.numpy as jnp
import jax
jax.config.update("jax_enable_x64", True)

import pickle

In [None]:
# important helper functions for all tasks
def get_std(X):
    return np.std(X, axis=0)

def convert_dict_to_array(data):
    # zip all the values in the dictionary together and convert it to a numpy array
    # suppose the keys are not known beforehand
    keys = list(data.keys())
    values = [data[key] for key in keys]
    return np.array(list(zip(*values)))



"\ndef convert_dict_to_array(data):\n    # zip all the values in the dictionary together\n    return np.array(list(zip(data['cart_location'], data['cart_velocity'], data['pole_angle'], data['pole_velocity'])))\n"

---
## Task 1.1

We are assuming the system is a markov state. This can be shown in this case from the equations of motion. 

We are also assuming that the system is time invariant. The system dynamics can be started at any time instant and will still behave the right way

Maybe use a heatmap to show the initial conditions that lead to flip over

In [3]:
def rollout(initial_state, initial_force, num_steps, visual=True):
    """
    Simulate the CartPole environment for a given number of steps.
    
    Args:
        initial_state (tuple): The initial state of the environment.
        it should be a tuple of the form (cart_location, cart_velocity, 
                                        pole_angle, pole_velocity).

        initial_force (float): The initial force applied to the cart.
        num_steps (int): The number of steps to simulate.
    
    Returns:
        data: A dictionary containing the cart location, cart velocity, 
              pole angle and pole angular velocity at each step.
    """
    env = CartPole(visual=visual)
    env.reset()

    data = {'cart_location': [],
            'cart_velocity': [],
            'pole_angle': [],
            'pole_velocity': []
        }
    
    # Set the initial state
    env.setState(initial_state)

    # Perform the action for the specified number of steps
    for step in range(num_steps + 1):
        # Store the current state
        data['cart_location'].append(env.cart_location)
        data['cart_velocity'].append(env.cart_velocity)
        data['pole_angle'].append(env.pole_angle)
        data['pole_velocity'].append(env.pole_velocity)

        # Perform the action
        env.performAction(initial_force)

        # remap the angle to be between -pi and pi
        env.remap_angle()
    
    # close the plot
    if visual:
        env.close_plot()
        plt.close()
        
    return data

    

In [None]:
# Plot functions

# plot y vs x
def plot_x_y(x, y, xlabel, ylabel):
    plt.figure()
    ax = plt.gca()
    plt.xlim(min(x) - 0.1, max(x) + 0.1)
    plt.ylim(min(y) - 0.1, max(y) + 0.1)
    ax.plot(x, y, 'r-')
    plt.xlabel(xlabel)
    plt.ylabel(ylabel)
    plt.title(f'{ylabel} vs {xlabel}')
    plt.grid()
    plt.show()

# plot y vs time
def plot_y(y, ylabel):
    plt.figure()
    ax = plt.gca()
    plt.xlim(0, len(y) + 2)
    plt.ylim(min(y) - 0.1, max(y) + 0.1)
    ax.plot(np.arange(0, len(y)),y, 'r-')
    plt.ylabel(ylabel)
    plt.title(f'{ylabel} vs Iterations')
    plt.xlabel('Iterations')
    plt.grid()
    plt.show()


# plot multiple y vs time
def plot_data_time(data, verdict):
    fig, ax = plt.subplots(nrows=2, ncols=2, figsize=(15, 5))
    titles = ['cart_location', 'cart_velocity', 'pole_angle', 'pole_velocity']
    for i in range(2):
        for j in range(2):
            title = titles[i * 2 + j]
            ax[i, j].plot(np.arange(0, len(data[title])), data[title], 'r-')
            ax[i, j].set_title(title + ' vs Iterations')
            ax[i, j].set_xlabel('Iterations')
            ax[i, j].set_ylabel(title)
            ax[i, j].grid()
    # center the title on top of the figure
    fig.suptitle(f"Initial Carriage velocity: {data['cart_velocity'][0]:.2f} m/s \nPole angular velocity: {data['pole_velocity'][0]:.2f} rad/s \n{verdict}")
    fig.tight_layout(rect=[0, 0.03, 1, 1])  # Adjust the rect to make space for the title
    plt.show()

# plot multiple y vs x
def plot_state(x_data, y_data):
    fig, ax = plt.subplots(nrows=2, ncols=2, figsize=(15, 5))
    titles = ['cart_location', 'cart_velocity', 'pole_angle', 'pole_velocity']
    for i in range(2):
        for j in range(2):
            title = titles[i * 2 + j]
            ax[i, j].plot(x_data[title], y_data[title], 'r-')
            ax[i, j].set_title("change in " + title + ' vs ' + title)
            ax[i, j].set_xlabel(title)
            ax[i, j].set_ylabel(title)
            ax[i, j].set_xlim(min(-1, min(x_data[title]) - 0.1), max(1, max(x_data[title]) + 0.1))
            ax[i, j].set_ylim(min(-1, min(y_data[title]) - 0.1), max(1, max(y_data[title]) + 0.1))
            ax[i, j].grid()
    # center the title on top of the figure
    fig.suptitle("Change in state vs State")
    fig.tight_layout(rect=[0, 0.05, 0.95, 1])  # Adjust the rect to make space for the title
    plt.show()

# plot multiple y vs x
def plot_pair_plot(data, verdict):
    fig, ax = plt.subplots(nrows=3, ncols=2, figsize=(15, 12))
    titles = ['cart_location', 'cart_velocity', 'pole_angle', 'pole_velocity']
    for i in range(3):
        title_x = titles[i]
        for j in range(2):
            if i + j + 1 >= len(titles):
                title_y = titles[0]
                title_x = titles[-1]
            else:
                title_y = titles[i + j + 1]
            ax[i, j].plot(data[title_x], data[title_y], 'r-')
            ax[i, j].set_title(title_y + ' vs ' + title_x)
            ax[i, j].set_xlabel(title_x)
            ax[i, j].set_ylabel(title_y)
            ax[i, j].grid()
    # center the title on top of the figure
    fig.suptitle(f"Initial Carriage velocity: {data['cart_velocity'][0]:.2f} m/s \nPole angular velocity: {data['pole_velocity'][0]:.2f} rad/s \n{verdict}")
    fig.tight_layout(rect=[0, 0.05, 0.95, 1])  # Adjust the rect to make space for the title
    plt.show()

# plot multiple y vs x
def plot_phase_plot(data, verdict):
    fig, ax = plt.subplots(nrows=1, ncols=2, figsize=(15, 4))
    titles = ['cart_location', 'cart_velocity', 'pole_angle', 'pole_velocity']
    for j in range(2):
        title_x = titles[j * 2]
        title_y = titles[j * 2 + 1]
        ax[j].plot(data[title_x], data[title_y], 'r-')
        ax[j].set_title(title_y + ' vs ' + title_x)
        ax[j].set_xlabel(title_x)
        ax[j].set_ylabel(title_y)
        ax[j].grid()
    # center the title on top of the figure
    fig.suptitle(f"Initial Carriage velocity: {data['cart_velocity'][0]:.2f} m/s \nPole angular velocity: {data['pole_velocity'][0]:.2f} rad/s \n{verdict}")
    fig.tight_layout(rect=[0, 0.05, 0.95, 1])  # Adjust the rect to make space for the title
    plt.show()

In [None]:
# different experiments
# Experiment 1: Rollout with initial state (0, 0, pi, 5) and initial force 0
data = rollout(initial_state=(0, 0, np.pi, 5), initial_force=0, num_steps=100)
plot_data_time(data, "Simple osscillation of pole")
plot_phase_plot(data, "Simple osscillation of pole")

In [None]:
# Experiment 2: Rollout with initial state (0, 0, pi, 14) and initial force 0
data = rollout(initial_state=(0, 0, np.pi, 13.85), initial_force=0, num_steps=100)
plot_data_time(data, "Complete rotation")
plot_phase_plot(data, "Complete rotation")

In [None]:
# Experiment 3: Rollout with initial state (0, 1, pi, 8) and initial force 0
data = rollout(initial_state=(0, 1, np.pi, 8), initial_force=0, num_steps=100)
plot_data_time(data, "Oscillation of pole and cart")
plot_phase_plot(data, "Oscillation of pole and cart")

In [None]:
# Experiment 4: Rollout with initial state (0, 10, pi, 14) and initial force 0
data = rollout(initial_state=(0, 10, np.pi, 14), initial_force=0, num_steps=100)
plot_data_time(data, "complete rotation")
plot_phase_plot(data, "complete rotation")

In [None]:

def count_revolution(initial_state, initial_force, num_steps):
    """
    Simulate the CartPole environment for a given number of steps.
    
    Args:
        initial_state (tuple): The initial state of the environment.
        it should be a tuple of the form (cart_location, cart_velocity, 
                                        pole_angle, pole_velocity).

        initial_force (float): The initial force applied to the cart.
        num_steps (int): The number of steps to simulate.
    
    Returns:
        revolution: The number of revolutions made by the pole.
    """
    env = CartPole(visual=False)
    env.reset()
    
    # Set the initial state
    env.setState(initial_state)

    num_revolutions = 0
    new_revolution = True

    # Perform the action for the specified number of steps
    for step in range(num_steps + 1):

        # Perform the action
        env.performAction(initial_force)

        # remap the angle to be between -pi and pi
        if abs(env.pole_angle) < 0.3:
            if new_revolution:
                # count the number of revolutions
                num_revolutions += 1

            new_revolution = False
        else:
            new_revolution = True

        env.remap_angle()
    
    if num_revolutions > 1:
        num_revolutions -= 1

    return num_revolutions

def generate_heatmap(num_steps, initial_force = 0):
    cart_velocity = np.linspace(-10, 10, num_steps)
    pole_velocity= np.linspace(-15, 15, num_steps)

    initial_state = [0, 0, np.pi, 0]

    revolution = [[0 for i in range(num_steps)] for j in range(num_steps)]

    for i, cv in enumerate(cart_velocity):
        for j, pv in enumerate(pole_velocity):
            initial_state[1] = cv
            initial_state[3] = pv

            num_revolutions = count_revolution(initial_state, initial_force, num_steps)

            revolution[i][j] = num_revolutions
    
    revolution = np.array(revolution)
    df = pd.DataFrame(revolution, index = cart_velocity, columns=pole_velocity)
    fig, ax = plt.subplots(figsize=(12, 8))
    sns.heatmap(df, annot=False, fmt=".1f", cmap="coolwarm")

    # Set axis labels
    ax.set_xlabel("initial pole velocity", fontsize=14)
    ax.set_ylabel("initial cart velocity", fontsize=14)
    ax.set_title("Revolutions", fontsize=16)

    # Set ticks at regular intervals (e.g., every 5 steps)
    step = 5  # Show every 5th tick label

    xticks = np.arange(0, num_steps, step)
    yticks = np.arange(0, num_steps, step)

    ax.set_xticks(xticks + 0.5)
    ax.set_xticklabels([f"{pole_velocity[i]:.2f}" for i in xticks], rotation=45, fontsize=12)

    ax.set_yticks(yticks + 0.5)
    ax.set_yticklabels([f"{cart_velocity[i]:.2f}" for i in yticks], rotation=0, fontsize=12)


    # Show the plot
    plt.show()


generate_heatmap(num_steps=50)          

    

In [None]:
def generate_heatmap_zoomed(num_steps, initial_force = 0):
    cart_velocity = np.linspace(8, 10, num_steps)
    pole_velocity= np.linspace(13, 15, num_steps)

    initial_state = [0, 0, np.pi, 0]

    revolution = [[0 for i in range(num_steps)] for j in range(num_steps)]

    for i, cv in enumerate(cart_velocity):
        for j, pv in enumerate(pole_velocity):
            initial_state[1] = cv
            initial_state[3] = pv

            num_revolutions = count_revolution(initial_state, initial_force, num_steps)

            revolution[i][j] = num_revolutions
    
    revolution = np.array(revolution)
    df = pd.DataFrame(revolution, index = cart_velocity, columns=pole_velocity)
    fig, ax = plt.subplots(figsize=(12, 8))
    sns.heatmap(df, annot=False, fmt=".1f", cmap="coolwarm")

    # Set axis labels
    ax.set_xlabel("initial pole velocity", fontsize=14)
    ax.set_ylabel("initial cart velocity", fontsize=14)
    ax.set_title("Revolutions", fontsize=16)

    # Set ticks at regular intervals (e.g., every 5 steps)
    step = 5  # Show every 5th tick label

    xticks = np.arange(0, num_steps, step)
    yticks = np.arange(0, num_steps, step)

    ax.set_xticks(xticks + 0.5)
    ax.set_xticklabels([f"{pole_velocity[i]:.2f}" for i in xticks], rotation=45, fontsize=12)

    ax.set_yticks(yticks + 0.5)
    ax.set_yticklabels([f"{cart_velocity[i]:.2f}" for i in yticks], rotation=0, fontsize=12)


    # Show the plot
    plt.show()


generate_heatmap_zoomed(num_steps=50)  

---
## Task 1.2

the delta time can be reduced to make the relationship a lot more linear. Infact, infinitesimal delta time will be perfectly linear. However, this will result in the control being very slow to respons. Also, if everything is linear, there will not be any need for non-linear control. 

In [None]:

def sweep(num_steps, initial_force):
    """
    Perform a sweep of the CartPole environment for a given number of steps.
    
    Args:
        num_steps (int): The number of steps to simulate.
        initial_force (float): The initial force applied to the cart.
    
    Returns:
        None: plots relation between current state and state after performing 
        action
    """
    state_sweeps = {
        'cart_location': np.linspace(-10, 10, num_steps),
        'cart_velocity': np.linspace(-10, 10, num_steps),
        'pole_angle': np.linspace(-np.pi, np.pi, num_steps),
        'pole_velocity': np.linspace(-15, 15, num_steps)
    }

    titles = ['cart_location', 'cart_velocity', 'pole_angle', 'pole_velocity']

    env = CartPole(visual=False)

    # this makes angle a lot more linear
    # env.setSimParams(sim_steps=5, delta_time=0.02)

    state = [np.random.uniform(-10, 10), np.random.uniform(-10, 10), 
             np.random.uniform(-np.pi, np.pi), np.random.uniform(-15, 15)]
    
    y_data = {
        'cart_location': [],
        'cart_velocity': [],
        'pole_angle': [],
        'pole_velocity': []
    }

    for i in range(len(state)):
        title = titles[i]
        for j in state_sweeps[title]:
            state[i] = j
            env.reset()
            env.setState(state)
            env.performAction(initial_force)
            new_state = env.getState()
            y_data[title].append(new_state[i])
        
        plot_x_y(state_sweeps[title], y_data[title], title, title + ' after action')

        # reset state to random values
        state = [np.random.uniform(-10, 10), np.random.uniform(-10, 10), 
             np.random.uniform(-np.pi, np.pi), np.random.uniform(-15, 15)]

sweep(num_steps=100, initial_force=0)

In [None]:
def difference_sweep(num_steps, initial_force):
    """
    Perform a sweep of the CartPole environment for a given number of steps.
    
    Args:
        num_steps (int): The number of steps to simulate.
        initial_force (float): The initial force applied to the cart.
    
    Returns:
        None: plots relation between current state and state after performing 
        action
    """
    state_sweeps = {
        'cart_location': np.linspace(-10, 10, num_steps),
        'cart_velocity': np.linspace(-10, 10, num_steps),
        'pole_angle': np.linspace(-np.pi, np.pi, num_steps),
        'pole_velocity': np.linspace(-15, 15, num_steps)
    }

    titles = ['cart_location', 'cart_velocity', 'pole_angle', 'pole_velocity']

    env = CartPole(visual=False)

    # this makes angle a lot more linear
    # env.setSimParams(sim_steps=5, delta_time=0.02)

    state = [np.random.uniform(-10, 10), np.random.uniform(-10, 10), 
             np.random.uniform(-np.pi, np.pi), np.random.uniform(-15, 15)]
    
    y_data = {
        'cart_location': [],
        'cart_velocity': [],
        'pole_angle': [],
        'pole_velocity': []
    }

    for i in range(len(state)):
        title = titles[i]
        for j in state_sweeps[title]:
            state[i] = j
            env.reset()
            env.setState(state)
            env.performAction(initial_force)
            new_state = env.getState()
            y_data[title].append(new_state[i] - state[i])
        
        # plot_x_y(state_sweeps[title], y_data[title], title, "change in " + title)

        # reset state to random values
        state = [np.random.uniform(-10, 10), np.random.uniform(-10, 10), 
             np.random.uniform(-np.pi, np.pi), np.random.uniform(-15, 15)]
        
    plot_state(state_sweeps, y_data)


difference_sweep(num_steps=100, initial_force=0)

In [None]:
def difference_sweep_2d(num_steps, initial_force):
    """
    Perform a sweep of the CartPole environment for a given number of steps.
    
    Args:
        num_steps (int): The number of steps to simulate.
        initial_force (float): The initial force applied to the cart.
    
    Returns:
        None: plots relation between current state and state after performing 
        action
    """
    state_sweeps = {
        'cart_location': np.random.uniform(-10, 10, num_steps),
        'cart_velocity': np.random.uniform(-10, 10, num_steps),
        'pole_angle': np.random.uniform(-np.pi, np.pi, num_steps),
        'pole_velocity': np.random.uniform(-15, 15, num_steps)
    }

    titles = ['cart_location', 'cart_velocity', 'pole_angle', 'pole_velocity']

    env = CartPole(visual=False)

    state = [np.random.uniform(-10, 10), np.random.uniform(-10, 10), 
             np.random.uniform(-np.pi, np.pi), np.random.uniform(-15, 15)]


    for i in range(len(state)):
        title_i = titles[i]
        for j in range(i+1,len(state)):
            title_j = titles[j]
            
            z_data_x = []
            z_data_y = []
            for k in range(len(state_sweeps[title_i])):
                state[i] = state_sweeps[title_i][k]
                state[j] = state_sweeps[title_j][k]
                env.reset()
                env.setState(state)
                env.performAction(initial_force)
                new_state = env.getState()
                z_data_x.append(new_state[i] - state[i])
                z_data_y.append(new_state[j] - state[j])

            plt.figure()
            tcf1 = plt.tricontourf(state_sweeps[title_i], state_sweeps[title_j], z_data_x, levels = 10)
            plt.title(f'Contours of change in {title_i}\n')
            plt.xlabel(title_i)
            plt.ylabel(title_j)
            plt.colorbar(tcf1)
            plt.show()

            plt.figure()
            tcf2 = plt.tricontourf(state_sweeps[title_i], state_sweeps[title_j], z_data_y, levels = 10)
            plt.title(f'Contours of change in {title_j}\n')
            plt.xlabel(title_i)
            plt.ylabel(title_j)
            plt.colorbar(tcf2)
            plt.show()

            # reset state to random values
            state = [np.random.uniform(-10, 10), np.random.uniform(-10, 10), 
                np.random.uniform(-np.pi, np.pi), np.random.uniform(-15, 15)]

difference_sweep_2d(num_steps=10000, initial_force=0)

---
## Task 1.3

In [None]:
def generate_data_rollout(num_steps, initial_force):
    """ 
    Generate data from Cartpole environment for training a model.
    Args:
        num_steps (int): The number of steps to simulate.
        initial_force (float): The initial force applied to the cart.
    Returns:
        X (np.ndarray): The input data, a 2D array of shape (num_steps, 4).
        Y (np.ndarray): The output data, a 2D array of shape (num_steps, 4).
    """
    # initial_state = (np.random.uniform(-10, 10), np.random.uniform(-10, 10), np.random.uniform(-np.pi, np.pi), np.random.uniform(-15, 15))
    initial_state = [0, 0, np.pi, 5]
    # [[0, 0, np.pi, 5], [0, 0, np.pi, 14], [0, 1, np.pi, 8], [0, 10, np.pi, 14], [0, -2, 3, 0]]
    data = rollout(initial_state=initial_state, initial_force=initial_force, num_steps=num_steps, visual=False)

    X = convert_dict_to_array(data)
    Y = np.array([X[i+1] - X[i] for i in range(X.shape[0] - 1)])

    # remove the last element of X
    X = X[:-1]
    
    # print("shape of X:", X.shape, "\nshape of Y:", Y.shape)
    return X, Y

def generate_data_random(num_steps, initial_force):
    env = CartPole(visual=False)
    env.reset()
    x_data = {
        'cart_location': [],
        'cart_velocity': [],
        'pole_angle': [],
        'pole_velocity': []
    }

    y_data = {
        'cart_location': [],
        'cart_velocity': [],
        'pole_angle': [],
        'pole_velocity': []
    }
    for i in range(num_steps):
        initial_state = [np.random.uniform(-10, 10), np.random.uniform(-10, 10),
                         np.random.uniform(-np.pi, np.pi), np.random.uniform(-15, 15)]
        env.reset()
        env.setState(initial_state)
        env.performAction(initial_force)

        # remap the angle to be between -pi and pi
        # env.remap_angle()
        
        next_state = env.getState()
    
        x_data['cart_location'].append(initial_state[0])
        x_data['cart_velocity'].append(initial_state[1])
        x_data['pole_angle'].append(initial_state[2])
        x_data['pole_velocity'].append(initial_state[3])

        y_data['cart_location'].append(next_state[0] - initial_state[0])
        y_data['cart_velocity'].append(next_state[1] - initial_state[1])
        y_data['pole_angle'].append(next_state[2] - initial_state[2])
        y_data['pole_velocity'].append(next_state[3] - initial_state[3])

    X = convert_dict_to_array(x_data)
    Y = convert_dict_to_array(y_data)
    
    # print("shape of X:", X.shape, "\nshape of Y:", Y.shape)
    return X, Y

def generate_data_random2(num_steps, initial_force = 0):
    np.random.seed(4)
    
    # set the random seed and create the sobol sequence generator
    sobol_engine = scipy.stats.qmc.Sobol( d=4, seed=4 )

    # get 512 initial states spaced in the recommended ranges
    X = (sobol_engine.random_base2( m=int(np.log2(num_steps)) ) - 0.5 ) * 2 * np.array( [ 10, 20, np.pi, 20 ] )

    env = CartPole(visual=False)
    env.reset()
    Y = []

    for i, initial_state in enumerate(X):
        env.reset()
        env.setState(initial_state)
        env.performAction(initial_force)
        # env.remap_angle()
        next_state = env.getState()

        Y.append(next_state - initial_state)

    Y = np.array(Y)  

    return X, Y

In [None]:
X, Y = generate_data_random(num_steps = 500, initial_force = 0)

In [None]:
# calculate average condition number

sum_k = 0

for i in range(20):   
    X, Y = generate_data_random(num_steps = 500, initial_force = 0)
    condition_number = np.linalg.cond((X.T @ X))
    sum_k += condition_number

average_condition_number = sum_k/20
print(average_condition_number)

In [20]:
def linear_regression(X, Y, intercept=False):
    """
    Perform linear regression on the given data.
    
    Args:
        X (numpy.ndarray): The input data.
        Y (numpy.ndarray): The output data.
    
    Returns:
        None: plots the linear regression results.
    """
    # Add a column of ones to X for the intercept term
    if intercept:
        X = np.hstack((np.ones((X.shape[0], 1)), X))
    
    # Calculate the weights using the normal equation
    # W = np.linalg.inv(X.T @ X) @ X.T @ Y
    W = np.linalg.lstsq(X, Y)[0]
    
    # Make predictions
    Y_pred = X @ W

    return W, Y_pred

W, Y_pred = linear_regression(X, Y, intercept=False)

In [None]:
def difference_sweep_predicted(num_steps, initial_force, W):
    
    state_sweeps = {
        'cart_location': np.linspace(-10, 10, num_steps),
        'cart_velocity': np.linspace(-10, 10, num_steps),
        'pole_angle': np.linspace(-np.pi, np.pi, num_steps),
        'pole_velocity': np.linspace(-15, 15, num_steps)
    }

    titles = ['cart_location', 'cart_velocity', 'pole_angle', 'pole_velocity']

    env = CartPole(visual=False)

    state = [np.random.uniform(-10, 10), np.random.uniform(-10, 10), 
             np.random.uniform(-np.pi, np.pi), np.random.uniform(-15, 15)]
    
    x_data = {
        'cart_location': [],
        'cart_velocity': [],
        'pole_angle': [],
        'pole_velocity': []
    }

    y_data = {
        'cart_location': [],
        'cart_velocity': [],
        'pole_angle': [],
        'pole_velocity': []
    }

    y_data_pred = {
        'cart_location': [],
        'cart_velocity': [],
        'pole_angle': [],
        'pole_velocity': []
    }

    actual_next_state_data = {
        'cart_location': [],
        'cart_velocity': [],
        'pole_angle': [],
        'pole_velocity': []
    }

    pred_next_state_data = {
        'cart_location': [],
        'cart_velocity': [],
        'pole_angle': [],
        'pole_velocity': []
    }

    for i in range(len(state)):
        title = titles[i]
        for j in state_sweeps[title]:
            state[i] = j
            x_data[title].append(np.array(state))

            env.reset()
            env.setState(state)
            env.performAction(initial_force)
            env.remap_angle()
            
            new_state = env.getState()
            y_data[title].append(np.array(new_state) - np.array(state))
            actual_next_state_data[title].append(np.array(new_state))

            pred_step = np.array(state) @ W
            y_data_pred[title].append(pred_step)
            pred_next_state = np.array(state) + pred_step
            pred_next_state[2] = remap_angle(pred_next_state[2])
            pred_next_state_data[title].append(pred_next_state)

        # reset state to random values
        state = np.array([np.random.uniform(-10, 10), np.random.uniform(-10, 10), 
             np.random.uniform(-np.pi, np.pi), np.random.uniform(-15, 15)])
    
    for title in titles:
        # combine state_sweeps as columns of X
        x_data[title] = np.array(x_data[title])
        
        # combine y_data as columns of Y
        y_data[title] = np.array(y_data[title])
        
        # combine y_data_pred as columns of Y_pred
        y_data_pred[title] = np.array(y_data_pred[title])

        # combine actual_next_state_data as columns of Y
        actual_next_state_data[title] = np.array(actual_next_state_data[title])

        # combine pred_next_state_data as columns of Y_pred
        pred_next_state_data[title] = np.array(pred_next_state_data[title])
        
    return x_data, y_data, y_data_pred, actual_next_state_data, pred_next_state_data

x_sweep, y_sweep, y_sweep_pred, actual_next_state_sweep, pred_next_state_sweep = difference_sweep_predicted(num_steps=100, initial_force=0, W=W)

In [None]:
def plot_linear_regression_sweep_1(x_data, y_data, y_data_pred, actual_next_state_sweep, pred_next_state_sweep, step):
    fig, ax = plt.subplots(nrows=2, ncols=2, figsize=(15, 5))
    titles = ['cart_location', 'cart_velocity', 'pole_angle', 'pole_velocity']
    for i in range(2):
        for j in range(2):
            title = titles[i * 2 + j]
            
            X = x_data[title]

            if not step:
                Y = actual_next_state_sweep[title]
                Y_pred = pred_next_state_sweep[title]
            
            else:
                Y = y_data[title]
                Y_pred = y_data_pred[title]
                
            ax[i, j].plot(X[:,i * 2 + j], Y[:,i * 2 + j] ,'r-', label = "actual")
            ax[i, j].plot(X[:,i * 2 + j], Y_pred[:,i * 2 + j] ,'b--', label = "pred")
            ax[i, j].set_title(title)
            ax[i, j].set_xlabel("current state")

            if not step:
                ax[i, j].set_ylabel("next state")
            else:
                ax[i, j].set_ylabel("next step")
                ax[i, j].set_xlim(min(X[:,i * 2 + j]) - 0.1, max(X[:,i * 2 + j]) + 0.1)
                ax[i, j].set_ylim(min([-1, Y_pred[:,i * 2 + j].min() - 0.1, Y[:,i * 2 + j].min() - 0.1]), max([1, Y_pred[:,i * 2 + j].max() + 0.1, Y[:,i * 2 + j].max() + 0.1, Y[:, i * 2 + j].max() + 0.1]))

            ax[i, j].grid()
            ax[i, j].legend()
    # center the title on top of the figure
    fig.suptitle("Forecast sweep")
    fig.tight_layout(rect=[0, 0.03, 1, 1])  # Adjust the rect to make space for the title
    plt.show()

def plot_linear_regression_sweep_2(y_data, y_data_pred, actual_next_state_data, pred_next_state_data, step):
    fig, ax = plt.subplots(nrows=2, ncols=2, figsize=(15, 5))
    titles = ['cart_location', 'cart_velocity', 'pole_angle', 'pole_velocity']
    for i in range(2):
        for j in range(2):
            title = titles[i * 2 + j]
            
            if not step:
                Y = actual_next_state_data[title]
                Y_pred = pred_next_state_data[title]

            else:
                Y = y_data[title]
                Y_pred = y_data_pred[title]

                
            ax[i, j].plot(Y[:,i * 2 + j], Y_pred[:,i * 2 + j] ,'r-', label = "predicted")
            ax[i, j].plot(Y[:,i * 2 + j], Y[:,i * 2 + j] ,'b--', label = "Y = X")
            ax[i, j].set_title(title)

            if not step:
                ax[i, j].set_xlabel("actual state")
                ax[i, j].set_ylabel("pred state")
            else:
                ax[i, j].set_xlabel("actual step")
                ax[i, j].set_ylabel("pred step")
                ax[i, j].set_xlim(min(X[:,i * 2 + j]) - 0.1, max(X[:,i * 2 + j]) + 0.1)
                ax[i, j].set_ylim(min(-1, Y_pred[:,i * 2 + j].min() - 0.1), max(1, Y_pred[:,i * 2 + j].max() + 0.1))
            ax[i, j].legend()
            ax[i, j].grid()
    # center the title on top of the figure
    fig.suptitle("Forecast sweep")
    fig.tight_layout(rect=[0, 0.03, 1, 1])  # Adjust the rect to make space for the title
    plt.show()

plot_linear_regression_sweep_1(x_sweep, y_sweep, y_sweep_pred, actual_next_state_sweep, pred_next_state_sweep, step = True)
plot_linear_regression_sweep_2(y_sweep, y_sweep_pred, actual_next_state_sweep, pred_next_state_sweep, step = False)

---
## Task 1.4

In [None]:
def forecast(initial_state, num_steps, W):
    """
    Forecast the future state of the CartPole environment using the learned model.

    Args:
        initial_state (list): The initial state of the environment.
        num_steps (int): The number of steps to forecast.
        W (numpy.ndarray): The learned model weights.

    Returns:
        X_forecast (dict): A dictionary containing the forecasted states.
    """

    # initialise the forecasted state
    X_forecast = {
        'cart_location': [],
        'cart_velocity': [],
        'pole_angle': [],
        'pole_velocity': []
    }

    state = np.array(initial_state)

    # perform the action for the specified number of steps
    for i in range(num_steps):
        X_forecast['cart_location'].append(state[0])
        X_forecast['cart_velocity'].append(state[1])
        X_forecast['pole_angle'].append(state[2])
        X_forecast['pole_velocity'].append(state[3])

        # forecast the next state
        state = state + (state @ W)

        # remap the angle to be between -pi and pi
        state[2] = remap_angle(state[2])
        
    return X_forecast   

In [None]:
print(W)

In [None]:
def plot_data_vs_forecast_time(data, forecast, graph_title):
    fig, ax = plt.subplots(nrows=2, ncols=2, figsize=(15, 5))
    titles = ['cart_location', 'cart_velocity', 'pole_angle', 'pole_velocity']
    for i in range(2):
        for j in range(2):
            title = titles[i * 2 + j]
            ax[i, j].plot(np.arange(0, len(data[title])), data[title], 'r-', label='actual')
            ax[i, j].plot(np.arange(0, len(forecast[title])), forecast[title], 'b-', label='forecast')
            ax[i, j].set_title(title)
            ax[i, j].set_xlabel('Iterations')
            ax[i, j].set_ylabel(title)
            ax[i, j].grid()
            ax[i, j].legend()
    # center the title on top of the figure
    fig.suptitle(graph_title)
    fig.tight_layout(rect=[0, 0.03, 1, 1])  # Adjust the rect to make space for the title
    plt.show()

# Example initial states for testing
initial_states = [[0, 0, np.pi, 5], [0, 0, np.pi, 14], [0, 1, np.pi, 8], [0, 10, np.pi, 14], [0, -2, 3, 0]]

# set the initial force to 0
initial_force = 0

# set the number of steps to forecast
num_steps = 100

# Forecast the future state of the CartPole environment using the learned model.
for initial_state in initial_states:
    # obtain the forecasted state
    X_forecast = forecast(initial_state, num_steps, W)

    # obtain the actual state
    X_actual = rollout(initial_state, initial_force, num_steps, visual=False)

    # plot with graph title as initial state formatted to 2 decimal places
    graph_title = f"Initial state: {initial_state[0]:.2f}, {initial_state[1]:.2f}, {initial_state[2]:.2f}, {initial_state[3]:.2f}"
    plot_data_vs_forecast_time(X_actual, X_forecast, graph_title=graph_title)

---
## Task 2.1

Large sigma_j --> the basis function is not sensitive to that dimension as the 
sum of squares contributes nothing to basis function

lambda is a regulariser --> helps reduce overfitting
need trial and error to get the correct value of lambda

These are actually gaussian processes (number of basis functions goes to infinity)

more trial and error to choose sigmas for basis functions. rule of thumb width: standard deviation of data * scalar, where scalar can also be tuned

import jax.numpy as jnp

jit will compile code so the second time the code is run, the compiled code is run instead of redoing compilation (use @jit decorator)

Jax allows for GPU using device (check if there is mac)

Be ready for JAX to gove errors

using both sine and coside allows you to reconstruct the angle and should give really good fit

use as much data as possible, training will take a couple of minutes so use JIT

do not use for loops with JAX. use JAX scan to replace for loops (not needed for week 2, useful for week 3)

In [8]:
def get_variance(X):
    # Calculate the covariance matrix
    return np.var(X, axis=0)

def kernel(X, X_prime, sigma):
    """
    Compute the Gaussian kernel between two sets of data points.
    Args:
        X (numpy.ndarray): The first set of data points. (N, D)
        X_prime (numpy.ndarray): The second set of data points. (M, D)
        sigma (float): The bandwidth parameter for the Gaussian kernel. (D,)
    """
    X_e = np.expand_dims(X, axis=1)  # (N, 1, D)
    X_prime_e = np.expand_dims(X_prime, axis=0)  # (1, M, D)
    diff = X_e - X_prime_e  # (N, M, D)

    # remap angle to be between -pi and pi
    diff[..., 2] = np.sin(diff[..., 2]/2)
    scaled_squared_diff = (diff ** 2)/(2 * sigma ** 2) # (N, M, D)

    K = np.exp(-np.sum(scaled_squared_diff, axis=-1))  # (N, M)
    return K


def tikhonov_solve(K, regularisation_matrix, Y,lamb):
    """
    Solve the Tikhonov regularization problem.
    
    Args:
        K (numpy.ndarray): The kernel matrix. (N, M)
        regularisation_matrix (numpy.ndarray): The regularization matrix. (M, M)
        Y (numpy.ndarray): The output data. (N, D)
        lamb (float): The regularization parameter.
    
    Returns:
        numpy.ndarray: The weights of the model.
    """
    Y_solve = (K.T) @ Y  # (N, D)

    regularisation_term = lamb * regularisation_matrix # (M, M)

    X_solve = ((K.T) @ K) + regularisation_term # (M, M)

    alpha = np.linalg.lstsq(X_solve, Y_solve, rcond=None)[0]  # (M, D)
    
    return alpha

def train_nonlinear_models(X, Y, M, lamb, sigma, kernel_fn):
  

    # choose M random points from X
    # indices = np.random.choice(X.shape[0], M, replace=False)
    # X_prime = X[indices]
    X_prime = X[:M] # (M, D)

    # Create the kernel matrix
    K = kernel_fn(X, X_prime, sigma) # (N, M)

    # Create the regularization matrix
    regularisation_matrix = kernel_fn(X_prime, X_prime, sigma) # (M, M)

    # solve the Tikhonov regularization problem
    alpha = tikhonov_solve(K, regularisation_matrix, Y, lamb)

    return alpha, X_prime, K

def plot_actual_pred_iterations(X_actual, X_forecast, graph_title):
    fig, ax = plt.subplots(nrows=2, ncols=2, figsize=(15, 6))
    titles = ['cart_location', 'cart_velocity', 'pole_angle', 'pole_velocity']
    for i in range(2):
        for j in range(2):
            title = titles[i * 2 + j]
            ax[i, j].plot(np.arange(0, len(X_actual[:, i * 2 + j])), X_actual[:, i * 2 + j], 'r-', label='actual')
            ax[i, j].plot(np.arange(0, len(X_forecast[:, i * 2 + j])), X_forecast[:, i * 2 + j], 'b--', label='forecast')
            # ax[i, j].set_title(title)
            ax[i, j].set_xlabel('Iterations')
            ax[i, j].set_ylabel(title)
            ax[i, j].grid()
            ax[i, j].legend()
    
    # center the title on top of the figure
    fig.suptitle(graph_title)
    fig.tight_layout(rect=[0, 0.03, 1, 1])  # Adjust the rect to make space for the title
    plt.rcParams.update({'font.size': 16})
    plt.show()

def plot_actual_pred_time(X_actual, X_forecast, graph_title):
    fig, ax = plt.subplots(nrows=2, ncols=2, figsize=(15, 6))
    titles = ['cart_location', 'cart_velocity', 'pole_angle', 'pole_velocity']
    for i in range(2):
        for j in range(2):
            title = titles[i * 2 + j]
            ax[i, j].plot(0.1 * np.arange(0, len(X_actual[:, i * 2 + j])), X_actual[:, i * 2 + j], 'r-', label='actual')
            ax[i, j].plot(0.1 * np.arange(0, len(X_forecast[:, i * 2 + j])), X_forecast[:, i * 2 + j], 'b--', label='forecast')
            # ax[i, j].set_title(title)
            ax[i, j].set_xlabel('time')
            ax[i, j].set_ylabel(title)
            ax[i, j].grid()
            ax[i, j].legend()
    
    # center the title on top of the figure
    fig.suptitle(graph_title)
    fig.tight_layout(rect=[0, 0.03, 1, 1])  # Adjust the rect to make space for the title
    plt.rcParams.update({'font.size': 16})
    plt.show()

def plot_fit(Y_actual, Y_pred, graph_title):
    fig, ax = plt.subplots(nrows=2, ncols=2, figsize=(15, 8))
    titles = ['cart_location', 'cart_velocity', 'pole_angle', 'pole_velocity']
    for i in range(2):
        for j in range(2):
            title = titles[i * 2 + j]
            # make scatter plot dots smaller
            # colour the scatter plot dots dark blue
            ax[i, j].scatter(Y_actual[:, i * 2 + j], Y_pred[:, i * 2 + j], label='pred', s=2, color='darkblue')
            ax[i, j].plot(Y_actual[:, i * 2 + j], Y_actual[:, i * 2 + j], 'r--', label='Y = X')
            # ax[i, j].set_title(title)
            ax[i, j].set_xlabel('actual change in state')
            ax[i, j].set_ylabel(title)
            ax[i, j].grid()
            ax[i, j].legend()

    # center the title on top of the figure
    fig.suptitle(graph_title)
    fig.tight_layout(rect=[0, 0.03, 1, 1])  # Adjust the rect to make space for the title
    plt.rcParams.update({'font.size': 15})
    plt.show()
    plt.close()

def forecast_nonlinear(initial_state, initial_force, num_steps, alpha, sigma, X_prime, kernel_fn):
    
    # obtain the actual state
    X_actual = convert_dict_to_array(rollout(initial_state, initial_force, num_steps, visual=False))

    current_state = np.array(initial_state)
    X_forecast = [current_state.copy()]
    for i in range(num_steps):
        # calculate the kernel for the current state
        K = kernel_fn(np.expand_dims(current_state, axis=0), X_prime, sigma)

        Y_pred = K @ alpha
        current_state = (current_state + Y_pred).flatten()

        # remap the angle to be between -pi and pi PURELY FOR PLOTTING
        # The original state is still used for the forecast
        remapped_state = current_state.copy()
        remapped_state[2] = remap_angle(remapped_state[2])
        X_forecast.append(remapped_state)

    X_forecast = np.array(X_forecast)
    
    plot_actual_pred_iterations(X_actual, X_forecast, graph_title=f"Forecast for initial state: {initial_state[0]:.2f}, {initial_state[1]:.2f}, {initial_state[2]:.2f}, {initial_state[3]:.2f}")
    plot_actual_pred_time(X_actual, X_forecast, graph_title=f"Forecast for initial state: {initial_state[0]:.2f}, {initial_state[1]:.2f}, {initial_state[2]:.2f}, {initial_state[3]:.2f}")


# ----------------------------------------------------------------------------
# NOT USED
def plot_fit2(X_actual, Y_actual, Y_pred, graph_title):
    plt.figure(figsize=(10, 6))
    plt.scatter(X_actual[:, 0], Y_actual[:, 0],label='actual')
    plt.scatter(X_actual[:, 0], Y_pred[:, 0], label='pred')
    plt.show()

def plot_fit3(Y_actual, Y_pred, graph_title):
    plt.figure(figsize=(10, 6))
    plt.scatter(Y_actual[:, -1], Y_pred[:, -1], label='pred')
    plt.plot(Y_actual[:, -1], Y_actual[:, -1], 'b--', label='y = x')
    plt.title(graph_title)
    plt.xlabel("actual change in state")
    plt.ylabel("pred change in state")
    plt.show()

def plot_actual_pred2(X_actual, X_forecast, graph_title):
    fig, ax = plt.subplots(nrows=2, ncols=2, figsize=(15, 5))
    titles = ['cart_location', 'cart_velocity', 'pole_angle', 'pole_velocity']
    for i in range(2):
        for j in range(2):
            title = titles[i * 2 + j]
            ax[i, j].plot(X_actual[:, i * 2 + j], X_forecast[:, i * 2 + j], 'r-', label='forecast')
            ax[i, j].plot(X_actual[:, i * 2 + j], X_actual[:, i * 2 + j], 'b--', label='Y = X')
            ax[i, j].set_title(title)
            ax[i, j].set_xlabel('actual state')
            ax[i, j].set_ylabel(title)
            ax[i, j].grid()
            ax[i, j].legend()
    
    # center the title on top of the figure
    fig.suptitle(graph_title)
    fig.tight_layout(rect=[0, 0.03, 1, 1])  # Adjust the rect to make space for the title
    plt.show()
    plt.close()


In [6]:
N, M = 4096, 1024
lamb = 1e-4
# lamb = 1.61541790e-03
# generate training data
X, Y = generate_data_random(num_steps=N, initial_force=0)

# Get the standard deviation of X
sigma = get_std(X)
# sigma = np.array([8.08589689e+00, 8.20378912e+00, 1.00000000e+00, 7.01860938e+00]) # from jax
# sigma = jnp.array([7.205328,   7.8390459, 1.0,         7.25186536]) # from jax

# train model
alpha, X_prime, K = train_nonlinear_models(X, Y, M=M, lamb=lamb, sigma=sigma, kernel_fn=kernel)

# predict using training set
Y_pred = K @ alpha

# this plot does not give much information
# plot_fit(X, Y, Y_pred, graph_title="Fit of the model")

plot_fit(Y, Y_pred, graph_title="Change in state")

# Example initial states for testing
initial_states = [[0, -2, np.pi, 4],[0, 0, np.pi, 5], [0, 0, np.pi, 14], [0, 1, np.pi, 8], [0, 10, np.pi, 14], [0, -2, 3, 0]]
# initial_states = [[0, -2, 3, 0]]

for initial_state in initial_states:
    forecast_nonlinear(initial_state, initial_force=0, num_steps=100, alpha=alpha, sigma=sigma, X_prime=X_prime, kernel_fn=kernel)

In [6]:
def plot_M(N, X, Y, lamb, sigma, kernel_fn):
    """
    Train nonlinear models with varying M and plot the results.
    """
    mse = []
    Ms = [10 * 2**i for i in range(0, int(np.log2(N/10))+1)] + [N]  # M values to test
    for M in Ms:
        alpha, X_prime, K = train_nonlinear_models(X, Y, M=M, lamb=lamb, sigma=sigma, kernel_fn=kernel_fn)
        Y_pred = K @ alpha
        mse.append(np.mean((Y - Y_pred) ** 2))
    
    plt.figure(figsize=(10, 6))
    # increase font size of all text in the plot
    plt.rcParams.update({'font.size': 16})
    plt.plot(Ms, mse, label='MSE')
    # mark the points with dots
    plt.scatter(Ms, mse)
    # label the final point
    
    plt.title("Mean Squared Error vs M")
    plt.xlabel("M")
    plt.ylabel("Mean Squared Error")
    plt.xscale("log")
    plt.legend()
    plt.grid()
    plt.show()

    print("Best M:", Ms[np.argmin(mse)], "with MSE:", np.min(mse))
    

plot_M(N, X, Y, lamb, sigma, kernel_fn=kernel)


Best M: 2560 with MSE: 0.012096831019302229


---
## Task 2.2

In [18]:
jax.config.update("jax_enable_x64", True)

def kernel_j(X, X_prime, sigma):
    """
    Compute the Gaussian kernel between two sets of data points.
    Args:
        X (numpy.ndarray): The first set of data points. (N, D)
        X_prime (numpy.ndarray): The second set of data points. (M, D)
        sigma (float): The bandwidth parameter for the Gaussian kernel. (D,)
    """
    X_e = jnp.expand_dims(X, axis=1)  # (N, 1, D)
    X_prime_e = jnp.expand_dims(X_prime, axis=0)  # (1, M, D)
    diff = X_e - X_prime_e  # (N, M, D)

    # remapped diff has the same first 2 dmensions and the last dimension as diff, the 3rd dimension is diff_angle
    diff = diff.at[..., 2].set(jnp.sin(diff[..., 2]/2))
    scaled_squared_diff = (diff ** 2)/(2 * sigma ** 2) # (N, M, D)

    K = jnp.exp(-jnp.sum(scaled_squared_diff, axis=-1))  # (N, M)
    return K

def tikhonov_solve_j(K, regularisation_matrix, Y,lamb):
    """
    Solve the Tikhonov regularization problem.
    
    Args:
        K (numpy.ndarray): The kernel matrix. (N, M)
        regularisation_matrix (numpy.ndarray): The regularization matrix. (M, M)
        Y (numpy.ndarray): The output data. (N, D)
        lamb (float): The regularization parameter.
    
    Returns:
        numpy.ndarray: The weights of the model.
    """
    Y_solve = (K.T) @ Y  # (N, D)

    regularisation_term = lamb * regularisation_matrix # (M, M)
    X_solve = ((K.T) @ K) + regularisation_term # (M, M)

    alpha = jnp.linalg.lstsq(X_solve, Y_solve, rcond=None, numpy_resid=True)[0]  # (M, D)
    return alpha

def train_nonlinear_models_j(X, Y, M, lamb, sigma, kernel_fn):
    # choose M points from X
    X_prime = X[:M] # (M, D)

    # Create the kernel matrix
    K = kernel_fn(X, X_prime, sigma) # (N, M)

    # Create the regularization matrix
    regularisation_matrix = kernel_fn(X_prime, X_prime, sigma) # (M, M)

    alpha = tikhonov_solve_j(K, regularisation_matrix, Y, lamb)
    return alpha, X_prime, K


In [None]:
N_train, N_test, M = 4096, 2048, 1024

X, Y = generate_data_random(num_steps=N_train+N_test, initial_force=0)
X = jnp.array(X)
Y = jnp.array(Y)
X_prime = X[:M]

X_train = X[:N_train]
Y_train = Y[:N_train]

X_val = X[-N_test:]
Y_val = Y[-N_test:]

def loss(parameters):
    lamb = parameters[0]
    sigma = parameters[1:]
    # train model
    alpha, X_prime, _= train_nonlinear_models_j(X_train, Y_train, M=M, lamb=lamb, sigma=sigma, kernel_fn=kernel_j)

    # predict using validation set
    K_val = kernel_j(X_val, X_prime, sigma)
    Y_pred = K_val @ alpha

    mse = jnp.mean((Y_val - Y_pred) ** 2)
    return mse

# create a function that calculates the gradient of the loss function using jax.grad
grad_loss = jax.grad(loss)

initial_lamb = 1E-4
# initial_sigma = get_std(X)
initial_sigma = jnp.array([6, 6, 2, 6])
initial_hyperparameters = jnp.array([initial_lamb, initial_sigma[0], initial_sigma[1], initial_sigma[2], initial_sigma[3]])

losses = [loss(initial_hyperparameters)]

def callback(intermediate_result):
    print(intermediate_result)
    losses.append(intermediate_result.fun)


bounds = [(1E-6, 1E-1)] + [(1, 10)] * 4  # bounds for lamb and sigma
res = scipy.optimize.minimize(loss, x0=initial_hyperparameters, method='L-BFGS-B', jac=grad_loss, bounds=bounds, callback=callback)

 fun: 0.022308191245793026
   x: [ 1.000e-06  6.011e+00  6.007e+00  1.979e+00  6.004e+00]
 fun: 0.015922112619939876
   x: [ 1.000e-06  6.134e+00  6.084e+00  1.739e+00  6.044e+00]
 fun: 0.010033445540062441
   x: [ 1.000e-06  6.342e+00  6.213e+00  1.336e+00  6.113e+00]
 fun: 0.005987545640896081
   x: [ 1.841e-04  6.521e+00  6.325e+00  1.000e+00  6.172e+00]
 fun: 0.0059873933477450824
   x: [ 1.832e-04  6.521e+00  6.325e+00  1.000e+00  6.172e+00]
 fun: 0.005987385719985881
   x: [ 1.832e-04  6.521e+00  6.325e+00  1.000e+00  6.172e+00]
 fun: 0.005984323656559595
   x: [ 1.825e-04  6.523e+00  6.326e+00  1.000e+00  6.172e+00]
 fun: 0.005984321023147712
   x: [ 1.825e-04  6.523e+00  6.326e+00  1.000e+00  6.172e+00]


In [11]:
print("optimal lambda:", res.x[0])
print("optimal sigma:", res.x[1:])
print("initial loss", losses[0])
print("Final loss:", res.fun)

def plot_loss(losses):
    plt.figure(figsize=(10, 6))
    plt.rcParams.update({'font.size': 16})
    plt.plot(losses, label='Loss')
    plt.xlabel('Iteration')
    plt.ylabel('Loss')
    plt.title('Loss over iterations')
    plt.grid()
    plt.legend()
    plt.show()

plot_loss(losses)


optimal lambda: 0.00018251511362460504
optimal sigma: [6.52276213 6.32598421 1.         6.17232726]
initial loss 0.022824242293971647
Final loss: 0.005984321023147712


In [None]:
N, M = 4096, 1024
lamb = res.x[0]

# generate training data
X, Y = generate_data_random(num_steps=N, initial_force=0)

# Get the standard deviation of X
sigma = res.x[1:]

# train model
alpha, X_prime, K = train_nonlinear_models(X, Y, M=M, lamb=lamb, sigma=sigma, kernel_fn=kernel)

# predict using training set
Y_pred = K @ alpha

plot_fit(Y, Y_pred, graph_title="Change in state")

# Example initial states for testing
initial_states = [[0, -2, np.pi, 4],[0, 0, np.pi, 5], [0, 0, np.pi, 14], [0, 1, np.pi, 8], [0, 10, np.pi, 14], [0, -2, 3, 0]]

for initial_state in initial_states:
    forecast_nonlinear(initial_state, initial_force=0, num_steps=100, alpha=alpha, sigma=sigma, X_prime=X_prime, kernel_fn=kernel)

ValueError: operands could not be broadcast together with shapes (4096,1024,4) (5,) 

---
## Task 2.3

### Use sin and cos

In [10]:
def kernel_expanded(X, X_prime, sigma):
    # create new X where 2 additional dimensions are added, replacing the angle with sin and cos
    # angle dimension is removed
    X_new = np.hstack((X[:,0:2], np.sin(X[:, 2:3]), np.cos(X[:, 2:3]), X[:, 3:]))  # (N, D+1)
    X_prime_new = np.hstack((X_prime[:,0:2], np.sin(X_prime[:, 2:3]), np.cos(X_prime[:, 2:3]), X_prime[:, 3:]))  # (M, D+1)

    X_e = np.expand_dims(X_new, axis=1)  # (N, 1, D+1)
    X_prime_e = np.expand_dims(X_prime_new, axis=0)  # (1, M, D+1)

    diff = X_e - X_prime_e  # (N, M, D+1)
    scaled_squared_diff = (diff ** 2)/(2 * sigma ** 2) # (N, M, D+1)

    K = np.exp(-np.sum(scaled_squared_diff, axis=-1))  # (N, M)
    return K
    

In [8]:
# before optimisation
N, M, lamb = 4096, 1024, 1E-4

# generate training data
X, Y = generate_data_random(num_steps=N, initial_force=0)

# Get the standard deviation of X
x_sigma = get_std(X)
std_sine, std_cos = (0.125)**0.5, (0.125)**0.5  # standard deviation for sine and cosine
sigma = np.array([x_sigma[0], x_sigma[1], std_sine, std_cos, x_sigma[-1]])

# train model
alpha, X_prime, K = train_nonlinear_models(X, Y, M=M, lamb=lamb, sigma=sigma, kernel_fn=kernel_expanded)

# predict using training set
Y_pred = K @ alpha

# plot_fit(X, Y, Y_pred, graph_title="Fit of the model")
plot_fit(Y, Y_pred, graph_title="Change in state")

# Example initial states for testing
initial_states = [[0, -2, np.pi, 4], [0, 0, np.pi, 5], [0, 0, np.pi, 14], [0, 1, np.pi, 8], [0, 10, np.pi, 14], [0, -2, 3, 0]]

for initial_state in initial_states:
    forecast_nonlinear(initial_state, initial_force=0, num_steps=100, alpha=alpha, sigma=sigma, X_prime=X_prime, kernel_fn=kernel_expanded)

### Perform optimisation

In [16]:
def kernel_expanded_j(X, X_prime, sigma):
    # create new X where 2 additional dimensions are added, replacing the angle with sin and cos
    # angle dimension is removed
    X_new = jnp.hstack((X[:,0:2], jnp.sin(X[:, 2:3]), jnp.cos(X[:, 2:3]), X[:, 3:]))  # (N, D+1)
    X_prime_new = jnp.hstack((X_prime[:,0:2], jnp.sin(X_prime[:, 2:3]), jnp.cos(X_prime[:, 2:3]), X_prime[:, 3:]))  # (M, D+1)

    X_e = jnp.expand_dims(X_new, axis=1)  # (N, 1, D+1)
    X_prime_e = jnp.expand_dims(X_prime_new, axis=0)  # (1, M, D+1)

    diff = X_e - X_prime_e  # (N, M, D+1)
    scaled_squared_diff = (diff ** 2)/(2 * sigma ** 2) # (N, M, D+1)

    K = jnp.exp(-jnp.sum(scaled_squared_diff, axis=-1))  # (N, M)
    return K

In [12]:
N_train, N_test, M = 4096, 2048, 1024

X, Y = generate_data_random(num_steps=N_train+N_test, initial_force=0)
X = jnp.array(X)
Y = jnp.array(Y)
X_prime = X[:M]

X_train = X[:N_train]
Y_train = Y[:N_train]

X_val = X[-N_test:]
Y_val = Y[-N_test:]

def loss(parameters):
    lamb = parameters[0]
    sigma = parameters[1:]
    # train model
    alpha, X_prime, _= train_nonlinear_models_j(X_train, Y_train, M=M, lamb=lamb, sigma=sigma, kernel_fn=kernel_expanded_j)

    # predict using validation set
    K_val = kernel_expanded_j(X_val, X_prime, sigma)
    Y_pred = K_val @ alpha

    mse = jnp.mean((Y_val - Y_pred) ** 2)
    return mse

# create a function that calculates the gradient of the loss function using jax.grad
grad_loss = jax.grad(loss)

initial_lamb = 1E-4

x_sigma = get_std(X)
# initial_sigma = jnp.array([6, 6, 0.5, 0.5, 6])
initial_sigma = jnp.array([x_sigma[0], x_sigma[1], std_sine, std_cos, x_sigma[-1]])
initial_hyperparameters = jnp.array([initial_lamb, initial_sigma[0], initial_sigma[1], initial_sigma[2], initial_sigma[3], initial_sigma[4]])

losses = [loss(initial_hyperparameters)]

def callback(intermediate_result):
    print(intermediate_result)
    losses.append(intermediate_result.fun)


bounds = [(1E-6, 1E-1)] + [(0, 10)] * 5  # bounds for lamb and sigma
res = scipy.optimize.minimize(loss, x0=initial_hyperparameters, method='L-BFGS-B', jac=grad_loss, bounds=bounds, callback=callback)

 fun: 0.005365586202948859
   x: [ 1.565e-02  5.782e+00  5.773e+00  3.688e-01  3.731e-01  8.551e+00]
 fun: 0.004686504128514557
   x: [ 1.723e-02  5.785e+00  5.777e+00  3.949e-01  4.059e-01  8.551e+00]
 fun: 0.003545364827911356
   x: [ 1.000e-06  5.797e+00  5.790e+00  4.693e-01  4.745e-01  8.551e+00]
 fun: 0.003084909066206818
   x: [ 7.703e-03  5.805e+00  5.800e+00  5.289e-01  5.345e-01  8.551e+00]
 fun: 0.002618828996883988
   x: [ 3.662e-03  5.820e+00  5.817e+00  6.289e-01  6.285e-01  8.551e+00]
 fun: 0.002379350582358112
   x: [ 1.000e-06  5.846e+00  5.847e+00  7.775e-01  7.009e-01  8.552e+00]
 fun: 0.0022607466185977747
   x: [ 2.677e-03  5.864e+00  5.868e+00  8.771e-01  7.453e-01  8.552e+00]
 fun: 0.002220152997829924
   x: [ 2.360e-03  5.874e+00  5.880e+00  9.306e-01  7.599e-01  8.553e+00]
 fun: 0.00214093512038394
   x: [ 1.405e-03  5.895e+00  5.905e+00  1.037e+00  7.784e-01  8.553e+00]
 fun: 0.0021022650966840946
   x: [ 8.618e-04  5.911e+00  5.923e+00  1.113e+00  7.869e-01  

In [13]:
print("optimal lambda:", res.x[0])
print("optimal sigma:", res.x[1:])
print("initial loss", losses[0])
print("Final loss:", res.fun)
print("number of iterations:", len(losses) - 1)

def plot_loss(losses):
    plt.figure(figsize=(10, 6))
    plt.rcParams.update({'font.size': 16})
    plt.plot(losses, label='Loss')
    plt.xlabel('Iteration')
    plt.ylabel('Loss')
    plt.title('Loss over iterations')
    plt.grid()
    plt.legend()
    plt.show()

plot_loss(losses)

optimal lambda: 0.0002869140389441694
optimal sigma: [9.2436661  9.9934784  3.2515648  0.43012361 8.13652092]
initial loss 0.005851756600993347
Final loss: 0.0008272871087123723
number of iterations: 50


In [17]:
# before optimisation
N, M, lamb = 4096, 1024, res.x[0]

# generate training data
X, Y = generate_data_random(num_steps=N, initial_force=0)

# Get the standard deviation of X
sigma = res.x[1:]

# train model
alpha, X_prime, K = train_nonlinear_models(X, Y, M=M, lamb=lamb, sigma=sigma, kernel_fn=kernel_expanded)

# predict using training set
Y_pred = K @ alpha

# plot_fit(X, Y, Y_pred, graph_title="Fit of the model")
plot_fit(Y, Y_pred, graph_title="Change in state")

# Example initial states for testing
# initial_states = [[0, -2, np.pi, 4], [0, 0, np.pi, 5], [0, 0, np.pi, 14], [0, 1, np.pi, 8], [0, 10, np.pi, 14], [0, -2, 3, 0]]
initial_states = [[0, -2, 3, 0]]
for initial_state in initial_states:
    forecast_nonlinear(initial_state, initial_force=0, num_steps=100, alpha=alpha, sigma=sigma, X_prime=X_prime, kernel_fn=kernel_expanded)

In [None]:
non_linear_model_sin = {
    'lambda': lamb,
    'sigma': sigma,
    'alpha': alpha,
    'X_prime': X_prime,
}

# Save the model
with open('non_linear_model_sin.pkl', 'wb') as f:
    pickle.dump(non_linear_model_sin, f)

### linear model

In [21]:
def expand_state(X):
    """
    Expand the state by replacing the angle with sine and cosine.
    Args:
        X (numpy.ndarray): The input state. (N, D)
    Returns:
        numpy.ndarray: The expanded state. (N, D+1)
    """
    return np.hstack((X[:,0:2], np.sin(X[:, 2:3]), np.cos(X[:, 2:3]), X[:, 3:]))  # (N, D+1)



def generate_data_random_sin(num_steps, initial_force):
    env = CartPole(visual=False)
    env.reset()
    
    x_data, y_data = [], []

    for i in range(num_steps):
        initial_state = [np.random.uniform(-10, 10), np.random.uniform(-10, 10), np.sin(np.random.uniform(-np.pi, np.pi)), np.cos(np.random.uniform(-np.pi, np.pi)), np.random.uniform(-15, 15)]
        env.reset()
        env.setState(initial_state)
        env.performAction(initial_force)
        
        next_state = env.getState()

        x_data.append([initial_state[0], initial_state[1], np.sin(initial_state[2]), np.cos(initial_state[2]), initial_state[3]])
        y_data.append([next_state[0] - initial_state[0], next_state[1] - initial_state[1], next_state[2] - initial_state[2], next_state[3] - initial_state[3]])

    X = np.array(x_data)
    Y = np.array(y_data)
    
    print("shape of X:", X.shape, "\nshape of Y:", Y.shape)
    return X, Y


def forecast_linear_sin(initial_state, initial_force, num_steps, W):
    
    # obtain the actual state
    X_actual = convert_dict_to_array(rollout(initial_state, initial_force, num_steps, visual=False))
    
    # current_state = expand_state(np.array([np.array(initial_state)]))[0]
    current_state = np.array(initial_state)
    current_state_e = [initial_state[0], initial_state[1], np.sin(initial_state[2]), np.cos(initial_state[2]), initial_state[3]]
    X_forecast = [np.array(initial_state)]
    for i in range(num_steps):
        # calculate the kernel for the current state
        current_state_e = np.array([current_state[0], current_state[1], np.sin(current_state[2]), np.cos(current_state[2]), current_state[3]])
        Y_pred = current_state_e @ W

        current_state = (current_state + Y_pred).flatten()
        # remap the angle to be between -pi and pi PURELY FOR PLOTTING
        # The original state is still used for the forecast
        remapped_state = current_state.copy()
        remapped_state[2] = remap_angle(remapped_state[2])
        X_forecast.append(remapped_state)

    X_forecast = np.array(X_forecast)
    
    plot_actual_pred_iterations(X_actual, X_forecast, graph_title=f"Forecast for initial state: {initial_state[0]:.2f}, {initial_state[1]:.2f}, {initial_state[2]:.2f}, {initial_state[3]:.2f}")
    return X_actual, X_forecast


X, Y = generate_data_random_sin(num_steps=4096, initial_force=0)
W, Y_pred = linear_regression(X, Y, intercept=False)
print("shape of W:", W.shape)

# initial_states = [[0, -2, np.pi, 4], [0, 0, np.pi, 5], [0, 0, np.pi, 14], [0, 1, np.pi, 8], [0, 10, np.pi, 14], [0, -2, 3, 0]]
initial_states = [[0, -2, 3, 0]]
for initial_state in initial_states:
    forecast_linear_sin(initial_state, initial_force=0, num_steps=100, W=W)

shape of X: (4096, 5) 
shape of Y: (4096, 4)
shape of W: (5, 4)


---
# Task 3.1


In [4]:
def generate_data_random_force(num_steps):
    env = CartPole(visual=False)
    env.reset()
    x_data = {
        'cart_location': [],
        'cart_velocity': [],
        'pole_angle': [],
        'pole_velocity': [],
        'force': []
    }

    y_data = {
        'cart_location': [],
        'cart_velocity': [],
        'pole_angle': [],
        'pole_velocity': []
    }
    for i in range(num_steps):
        initial_state = [np.random.uniform(-10, 10), np.random.uniform(-10, 10),
                         np.random.uniform(-np.pi, np.pi), np.random.uniform(-15, 15)]
        initial_force = np.random.uniform(-env.max_force, env.max_force)
        env.reset()
        env.setState(initial_state)
        env.performAction(initial_force)

        # remap the angle to be between -pi and pi
        # env.remap_angle()
        
        next_state = env.getState()
    
        x_data['cart_location'].append(initial_state[0])
        x_data['cart_velocity'].append(initial_state[1])
        x_data['pole_angle'].append(initial_state[2])
        x_data['pole_velocity'].append(initial_state[3])
        x_data['force'].append(initial_force)

        y_data['cart_location'].append(next_state[0] - initial_state[0])
        y_data['cart_velocity'].append(next_state[1] - initial_state[1])
        y_data['pole_angle'].append(next_state[2] - initial_state[2])
        y_data['pole_velocity'].append(next_state[3] - initial_state[3])

    X = convert_dict_to_array(x_data)
    Y = convert_dict_to_array(y_data)
    
    # print("shape of X:", X.shape, "\nshape of Y:", Y.shape)
    return X, Y

def forecast_nonlinear_force(initial_state, num_steps, alpha, sigma, X_prime, kernel_fn):
    x_state = initial_state[:-1]  # exclude the force from the state
    initial_force = initial_state[-1]  # force is the last element of the state
    # obtain the actual state
    X_actual = convert_dict_to_array(rollout(x_state, initial_force, num_steps, visual=False))

    current_state = np.array(initial_state)
    current_x_state = np.array(x_state)
    X_forecast = [current_x_state.copy()]
    for i in range(num_steps):
        # calculate the kernel for the current state
        K = kernel_fn(np.expand_dims(current_state, axis=0), X_prime, sigma)

        Y_pred = K @ alpha
        current_x_state = (current_state[:-1] + Y_pred).flatten()
        current_state = np.concatenate([current_x_state, current_state[-1:]]) # keep the force unchanged

        # remap the angle to be between -pi and pi PURELY FOR PLOTTING
        # The original state is still used for the forecast
        remapped_state = current_x_state.copy()
        remapped_state[2] = remap_angle(remapped_state[2])
        X_forecast.append(remapped_state)

    X_forecast = np.array(X_forecast)
    
    plot_actual_pred_iterations(X_actual, X_forecast, graph_title=f"Forecast for initial state: {initial_state[0]:.2f}, {initial_state[1]:.2f}, {initial_state[2]:.2f}, {initial_state[3]:.2f} with force {initial_state[4]:.2f}")
    plot_actual_pred_time(X_actual, X_forecast, graph_title=f"Forecast for initial state: {initial_state[0]:.2f}, {initial_state[1]:.2f}, {initial_state[2]:.2f}, {initial_state[3]:.2f} with force {initial_state[4]:.2f}")

In [None]:
N_train, N_test, M = 4096, 2048, 1024

X, Y = generate_data_random_force(num_steps=N_train+N_test)
X = jnp.array(X)
Y = jnp.array(Y)
X_prime = X[:M]

X_train = X[:N_train]
Y_train = Y[:N_train]

X_val = X[-N_test:]
Y_val = Y[-N_test:]

def loss(parameters):
    lamb = parameters[0]
    sigma = parameters[1:]
    # train model
    alpha, X_prime, _= train_nonlinear_models_j(X_train, Y_train, M=M, lamb=lamb, sigma=sigma, kernel_fn=kernel_expanded_j)

    # predict using validation set
    K_val = kernel_expanded_j(X_val, X_prime, sigma)
    Y_pred = K_val @ alpha

    mse = jnp.mean((Y_val - Y_pred) ** 2)
    return mse

# create a function that calculates the gradient of the loss function using jax.grad
grad_loss = jax.grad(loss)

initial_lamb = 1E-4
max_force = CartPole().max_force
std_force = max_force / (3**0.5)  # standard deviation for force
x_sigma = get_std(X)
# initial_sigma = jnp.array([6, 6, 0.5, 0.5, 6])
std_sine, std_cos = (0.125)**0.5, (0.125)**0.5  # standard deviation for sine and cosine
initial_sigma = jnp.array([x_sigma[0], x_sigma[1], std_sine, std_cos, x_sigma[-1], std_force])
initial_hyperparameters = jnp.array([initial_lamb] + initial_sigma.tolist())

losses = [loss(initial_hyperparameters)]

def callback(intermediate_result):
    print(intermediate_result)
    losses.append(intermediate_result.fun)



bounds = [(1E-6, 1E-1)] + [(0, 10)] * 5 + [(-max_force, max_force)]  # bounds for lamb and sigma
res = scipy.optimize.minimize(loss, x0=initial_hyperparameters, method='L-BFGS-B', jac=grad_loss, bounds=bounds, callback=callback)

NameError: name 'train_nonlinear_models_j' is not defined

In [51]:
print("optimal lambda:", res.x[0])
print("optimal sigma:", res.x[1:])
print("initial loss", losses[0])
print("Final loss:", res.fun)
print("number of iterations:", len(losses) - 1)

def plot_loss(losses):
    plt.figure(figsize=(10, 6))
    plt.rcParams.update({'font.size': 16})
    plt.plot(losses, label='Loss')
    plt.xlabel('Iteration')
    plt.ylabel('Loss')
    plt.title('Loss over iterations')
    plt.grid()
    plt.legend()
    plt.show()

plot_loss(losses)

optimal lambda: 0.00454346639487155
optimal sigma: [10.         10.          0.9916      0.60632823  7.00497217 20.        ]
initial loss 0.09131529865756136
Final loss: 0.005314008134870308
number of iterations: 109


In [52]:
print(res)

  message: CONVERGENCE: NORM OF PROJECTED GRADIENT <= PGTOL
  success: True
   status: 0
      fun: 0.005314008134870308
        x: [ 4.543e-03  1.000e+01  1.000e+01  9.916e-01  6.063e-01
             7.005e+00  2.000e+01]
      nit: 109
      jac: [ 2.044e-06 -5.932e-04 -7.181e-04  1.711e-07 -1.299e-07
            -4.979e-09 -1.972e-04]
     nfev: 136
     njev: 136
 hess_inv: <7x7 LbfgsInvHessProduct with dtype=float64>


In [27]:
# N, M, lamb = 4096, 1024, res.x[0]
N, M, lamb = 4096, 1024, 4.543e-03 # mine
# N, M, lamb = 4096, 1024, 4.918e-04 # andrew

# generate training data
X, Y = generate_data_random_force(num_steps=N)

# Get the standard deviation of X
# sigma = res.x[1:]
sigma = np.array([1.000e+01,  1.000e+01,  9.916e-01,  6.063e-01, 7.005e+00,  2.000e+01]) # mine
# sigma = np.array([15.41,  1.413e+01,  5.24,  0.97, 7.356,  13.52])    # andrew

# train model
alpha, X_prime, K = train_nonlinear_models(X, Y, M=M, lamb=lamb, sigma=sigma, kernel_fn=kernel_expanded)

# predict using training set
Y_pred = K @ alpha

# plot_fit(X, Y, Y_pred, graph_title="Fit of the model")
plot_fit(Y, Y_pred, graph_title="Change in state")


# Example initial states for testing
initial_states = [[0, -2, np.pi, 4, 8], [0, 0, np.pi, 5, 11], [0, 0, np.pi, 14, 14], [0, 1, np.pi, 8, 17], [0, 10, np.pi, 14, 20], [0, -2, 3, 0, 23]]
# initial_states = [[0, 0, np.pi, 0, 15]]
for initial_state in initial_states:
    forecast_nonlinear_force(initial_state, num_steps=100, alpha=alpha, sigma=sigma, X_prime=X_prime, kernel_fn=kernel_expanded)


In [28]:
def loss(parameters):
    lamb = parameters[0]
    sigma = parameters[1:]
    # train model
    alpha, X_prime, _= train_nonlinear_models_j(X_train, Y_train, M=M, lamb=lamb, sigma=sigma, kernel_fn=kernel_expanded_j)

    # predict using validation set
    K_val = kernel_expanded_j(X_val, X_prime, sigma)
    Y_pred = K_val @ alpha

    mse = jnp.mean((Y_val - Y_pred) ** 2)
    return mse

# initial_lamb = 4.918e-04 
# initial_sigma = np.array([15.41,  1.413e+01,  5.24,  0.97, 7.356,  13.52])

initial_lamb = 4.543e-03 
initial_sigma = np.array([1.000e+01,  1.000e+01,  9.916e-01,  6.063e-01, 7.005e+00,  2.000e+01])

initial_hyperparameters = jnp.array([initial_lamb] + initial_sigma.tolist())
loss(initial_hyperparameters)

Array(0.00594937, dtype=float64)

---
## Task 3.2


### Objective

In order to optimize a policy, we need to define, mathematically, what we want to achieve. That is captured in an _objective function_ (also called _loss function_), a measure of how close we are to the desired behaviour. In the present case, we want the pole to be upright, so we could use the _loss function_

$$
l(X) = -\cos\theta
$$

But it is better, and more general, to define a _target state_, $X_0$, which we want the system to achieve, and use a loss function that increases when the distance of the state from the target is larger. The following loss function achieves this, 

$$
l(X) = 1- e^{-|X-X_0|^2/2\sigma_l^2}
$$
where $\sigma_l$ is a scaling factor (you could introduce a separate one for each component of the state variable). The target state for the cartpole system is $X_0 = [0,0,0,0]$. The advantage of this form is that for large departures from the target, the loss is independent of the state. This expresses the notion that if the pole is far away from being upright and stationary, we do not much care what it is doing. The above loss functions are for a given state. We wish to keep the pole upright continuously, so the total loss of a trajectory should be a time integral (sum, in practice) of the pointwise loss of the state over some interval,

$$
L = \sum_{i=1}^N l(X_i)
$$

The `CartPole` class contains a function to evaluate the above pointwise loss $l(X)$. 

We start with defining a linear policy, 

$$
p(X) = \bf{p} \cdot X
$$

with unknown coefficent vector ${\bf p}$. 

#### Task 3.2

Write code to evaluate the loss function for the trajectory of a rollout - use a short time horizon, but enough to capture 1-2 oscillation periods. Before any optimisation, _visualise_ the loss function as you vary some parameters in ${\bf p}$, using similar 1D and 2D scans that you did for Task 1, i.e. varying just one or two elements of  ${\bf p}$ and plotting the loss as a function of those elements. 

Given a fixed initial state, optimise the unknowns in the policy to minimise the loss function under the evolution given by the true model dynamics. You can do this by using gradients given by jax and replacing calls to numpy in Cartpole.py with calls to jax.numpy. To speed up your code, you will need to use jax.scan instead of the for loop and jax.git. Use the `scipy.optimize` package for optimization as before. Find elements of ${\bf p}$ that are able to stabilize the pole when started just slightly displaced from the upright unstable position. Then try for the case where the pole is in its downward position. For these experiments, you will need to tune the $\sigma_l$ parameters in the loss to obtain good results. Which values of $\sigma_l$ did you choose? You will also have to change the max_force varible in Cartpole.py. Which value did you choose? Now try to start from the downward stable position, what happens in this case?

Plot the time evolution of the variables under the policy as predicted by the dynamics to demonstrate that the pole is kept upright

In [18]:
def loss_policy(state, target, sigma):
    delta = (state - target)/sigma
    exponent = 0.5 * delta.T @ delta
    return 1 - np.exp(-exponent)

def loss_policy2(state, target, sigma):
    diff = state - target  
    scaled_squared_diff = (diff ** 2)/(2 * sigma ** 2) 
    K = 1 - np.exp(-np.sum(scaled_squared_diff, axis=-1)) 
    return K
    

def loss_rollout_linear(initial_state, sigma, num_steps, P, target):
    env = CartPole(visual=False)
    env.reset()    
    # Set the initial state
    env.setState(initial_state)

    loss_cum = loss_policy(initial_state, target, sigma)
    # loss_cumulative = [loss_cum]

    # Perform the action for the specified number of steps
    for step in range(num_steps):
        prev_state = env.getState()

        # Perform the action
        force = P @ prev_state
        env.performAction(force)

        # remap the angle to be between -pi and pi
        env.remap_angle()

        curr_state = env.getState()
        loss_cum += loss_policy(curr_state, target, sigma)
        # loss_cumulative.append(loss_cum)

    return loss_cum

initial_state = np.array([1, 1, 1, 1])
sigma = np.array([10, 10, 10, 10])
num_steps = 100
target = np.array([0, 0, 0, 0])
P = np.array([1, 1, 1, 1])  # Example policy matrix
loss_cum = loss_rollout_linear(initial_state=initial_state, sigma=sigma, num_steps=num_steps, P=P, target=target)

print("Cumulative loss:", loss_cum)

Cumulative loss: 94.39433652422977


In [19]:
def sweep_loss_policy(initial_state, sigma, num_steps, target):
    # create a 2 by 2 grid of plots
    fig, ax = plt.subplots(2, 2, figsize=(12, 8))
    fig.suptitle('Cumulative Loss vs Policy Parameters')

    P = np.array([1, 1, 1, 1])  # Example policy matrix
    P_sweep = np.linspace(-10, 10, 50)  # Sweep range for P
    
    loss_list = {i: [] for i in range(len(P))}
    for i in range(len(P)):
        P_temp = P.copy()  # Create a temporary copy of P
        for j in P_sweep:
            P_temp[i] = j
            loss_cum = loss_rollout_linear(initial_state=initial_state, sigma=sigma, num_steps=num_steps, P=P_temp, target=target)
            loss_list[i].append(loss_cum)

        ax[i // 2, i % 2].plot(P_sweep, loss_list[i], label='Loss', color='b', linestyle=':', marker='o')
        ax[i // 2, i % 2].set_xlabel(f'Policy Parameter {i}')
        ax[i // 2, i % 2].set_ylabel('Cumulative Loss')
        ax[i // 2, i % 2].grid()

    plt.tight_layout()
    plt.show()

initial_state = np.array([0, 0, 0.1, 0])
sigma = np.array([10, 10, 10, 10])
num_steps = 20
target = np.array([0, 0, 0, 0])
sweep_loss_policy(initial_state, sigma, num_steps, target)

In [15]:
initial_state = jnp.array([0, 0, 0.1, 0])
sigma = jnp.array([10, 10, 10, 10])
# sigma = jnp.array([5.8, 5.8, 1.5, 8.5])  # Example sigma values
num_steps = 100
target = jnp.array([0, 0, 0, 0])

def loss_policy_j(state, target, sigma):
    delta = (state - target)/sigma
    exponent = 0.5 * delta.T @ delta
    return 1 - jnp.exp(-exponent)

def loss_rollout_linear_j(p):

    env = CartPole()
    env.reset_j()    
    env.setState(initial_state)

    def step_fn(loss_cum, _):
        prev_state = env.getState_j()
        force = p @ prev_state
        env.performAction_j(force)
        env.remap_angle_j()
        curr_state = env.getState_j()
        loss = loss_policy_j(curr_state, target, sigma)
        return loss_cum + loss, None

    loss_cum = loss_policy_j(initial_state, target, sigma)
    loss_cum, _ = jax.lax.scan(step_fn, loss_cum, None, length=num_steps)

    return loss_cum

grad_loss_linear = jax.grad(loss_rollout_linear_j)

losses = []

def callback(intermediate_result):
    print(intermediate_result)
    losses.append(intermediate_result.fun)

initial_p = jnp.array([1, 1, 1, 1])  # Example initial policy parameters
res = scipy.optimize.minimize(loss_rollout_linear_j, x0=initial_p, method='L-BFGS-B', jac=grad_loss_linear, callback=callback, bounds=[(-10, 10)] * len(initial_p))

 fun: 0.10960139669843616
   x: [ 1.000e+00  1.000e+00  1.024e+00  1.000e+00]
 fun: 0.009584332668480355
   x: [ 1.000e+00  1.000e+00  9.446e+00  1.000e+00]
 fun: 0.009575960247856896
   x: [ 1.000e+00  1.000e+00  9.369e+00  1.000e+00]


In [16]:
print(res)

  message: CONVERGENCE: NORM OF PROJECTED GRADIENT <= PGTOL
  success: True
   status: 0
      fun: 0.009575960247856896
        x: [ 1.000e+00  1.000e+00  9.369e+00  1.000e+00]
      nit: 3
      jac: [ 0.000e+00  0.000e+00 -1.087e-06  0.000e+00]
     nfev: 4
     njev: 4
 hess_inv: <4x4 LbfgsInvHessProduct with dtype=float64>
