In [None]:
# Problem Statement:
# Pick a double integrator. Implement a PD controller for it.
# Use imitation learning (Dagger) to learn the PD controller.
# Use RL to improve on the imitation learning when there are disturbances.

In [None]:
import numpy as np
import matplotlib.pyplot as plt
import matplotlib
font = {'family' : 'serif',
        'serif' : 'Computer Modern Roman',
        'size'   : 16}
matplotlib.rc('font', **font)
matplotlib.rcParams['text.usetex'] = True
matplotlib.rcParams['figure.figsize'] = [2*3.54, 3.54]

from systems import DoubleIntegrator, DoubleIntegratorWithPerturbations

In [None]:
dt = 0.01
N = 300

# Systems
system = DoubleIntegrator(dt)
system_perturbed = DoubleIntegratorWithPerturbations(dt)

system.state = np.zeros(2)
system_perturbed.state = np.zeros(2)
state_list = []
state_perturb_list = []

for i in range(N):
    action = np.array([0.1])
    result = system.step(action)
    result_perturb = system_perturbed.step(action)
    state_list.append(system.state)
    state_perturb_list.append(system_perturbed.state)
    
fig, ax = plt.subplots(2, 1, figsize=(4, 4), sharex=True)
state_list = np.array(state_list)
state_perturb_list = np.array(state_perturb_list)
# ax[0].plot(state_list[:, 0], '-', label='x')
ax[0].plot(state_perturb_list[:, 0], '--', label='x with perturbations')
# ax[1].plot(state_list[:, 1], '-', label='\dot{x}')    
ax[1].plot(state_perturb_list[:, 1], '--', label='\dot{x} with perturbations')


In [None]:
# Expert Policies
class ExpertPolicyIntegral:
    def __init__(self, dt):
        self.integral_term = 0
        self.dt = dt
        self.K = np.array([40.0 , 30.0]).reshape(1, 2)
        self.K_I = 10.0
        
    def __call__(self, state, state_des):
        self.integral_term += - self.K_I * (state[0] - state_des[0]) * self.dt
        # TODO antiwindup 
        u = (-self.K @ (state - state_des))  + self.integral_term
        return u
    
class ExpertPolicyPD:
    def __init__(self, dt):
        self.dt = dt
        self.K = np.array([40.0 , 30.0]).reshape(1, 2)
        
    def __call__(self, state, state_des):
        u = (-self.K @ (state - state_des))
        return u

class ExpertPolicyNonlinearPD:
    def __init__(self, dt):
        self.dt = dt
        self.K = np.array([40.0 , 30.0]).reshape(1, 2)
        
    def __call__(self, state, state_des):
        u = (-self.K @ (state - state_des)) + (10*np.sin(state[0]/10))**2
        return u
        
expert_policy = ExpertPolicyPD(dt)
expert_policy_nonlinear = ExpertPolicyNonlinearPD(dt)

# Test the expert controller

In [None]:
def desired_trajectory(N, dt):
    '''
    Args: 
        N : int - length of the trajectory
        dt : float - time step
    Returns:
        x_vec_d [x_d, x_dot_d] : np.array (N x 2) - desired trajectory
    '''
    frequency = np.random.uniform(0.01, 0.05)
    print('Frequency', frequency)
    x_d_fcn = lambda t: np.cos(t*frequency)
    x_d = [x_d_fcn(i) for i in range(N)]
    x_dot_d = np.zeros(N)

    for i in range(N - 1):
        x_dot_d[i] = (x_d[i+1] - x_d[i])/dt
    x_dot_d[-1] = x_dot_d[-2]

    x_vec_d = np.array([x_d, x_dot_d]).T
    return x_vec_d


x_vec_d = desired_trajectory(N, dt)
system_perturbed.state = np.array([x_vec_d[0, 0], x_vec_d[0, 1]])
state_list = []


des_traj_list = []
x0 = np.array([x_vec_d[0, 0], x_vec_d[0, 1]])
system_perturbed.state = x0.copy()

for i in range(N-1):
    state_des = np.array([ x_vec_d[i, 0], x_vec_d[i, 1] ])
    action = expert_policy(system_perturbed.state, state_des)
    result = system_perturbed.step(action)
    next_state = result[0]
    state_list.append(system_perturbed.state)
    des_traj_list.append(state_des)
    
state_list = np.array(state_list)
des_traj_list = np.array(des_traj_list)

fig, ax = plt.subplots(2, 1, figsize=(2, 2), sharex=True)
ax[0].set_title('Tracking Performance')
ax[0].plot(state_list[:, 0], '-', label='x')
ax[0].plot(des_traj_list[:, 0], '--', label='x_d')
ax[0].legend(fontsize='xx-small')

ax[1].plot(state_list[:, 1], '-', label='\dot{x}')
ax[1].plot(des_traj_list[:, 1], '--', label='\dot{x}_d')


In [None]:


# Multiple traj regulation:
def multiple_traj_reg(sys, sys_name, policy):
    nb_traj = 1
    fig, ax = plt.subplots(2, 1, figsize=(2, 2), sharex=True)
    for i in range(nb_traj):
        sys.reset()
        state_list = []
        x_vec_d = desired_trajectory(N, dt)
        sys.state = np.array([x_vec_d[0, 0], x_vec_d[0, 1]])

        for i in range(N):
            state_des = np.array([ x_vec_d[i, 0], x_vec_d[i, 1] ])
            action = policy(sys.state, state_des)
            if type(action) is not np.ndarray:
                action = action.detach().numpy()
            result = sys.step(action)
            state_list.append(sys.state)
            
        state_list = np.array(state_list)
        ax[0].plot(state_list[:, 0], '-', label='x')
        ax[0].plot(x_vec_d[:, 0], '--', label='x_d')
        ax[1].plot(state_list[:, 1], '-', label='\dot{x}')    
        ax[1].plot(x_vec_d[:, 1], '--', label='\dot{x}_d')
        ax[0].set_title(sys_name, fontsize='xx-small')

multiple_traj_reg(system, sys_name = 'Double Integrator', policy=expert_policy)
multiple_traj_reg(system_perturbed, sys_name = 'Double Integrator with Perturbations', policy=expert_policy)
multiple_traj_reg(system_perturbed, sys_name = 'Double Integrator with Perturbations', policy=expert_policy_nonlinear)
    
    


# Train Dagger

In [None]:
from nnpolicy import NNPolicy
from DAgger import DAgger


In [None]:
input_size = 4
net_arch = [(input_size, 24), (24, 48), (48, 24), (24, 1)] # NN policy
policy = NNPolicy(net_arch)

In [None]:
epochs = 40

dagger_trainer = DAgger(system, expert_policy, policy, desired_trajectory, np.linspace(1, 0, epochs), 300, None, 1, input_size)

In [None]:
dagger_trainer.train_dagger(epochs)

In [None]:
# Evaluate

def policy_NN(x, x_des):
    input_NN = np.concatenate([x, x_des])
    return policy.predict(input_NN)

multiple_traj_reg(system, sys_name = 'Double Integrator', policy=policy_NN)
multiple_traj_reg(system_perturbed, sys_name = 'Double Integrator with Perturbations; Policy Dagger', policy=policy_NN)


In [None]:
multiple_traj_reg(system_perturbed, sys_name = 'Double Integrator with Perturbations; Policy Dagger', policy=policy_NN)
