# Prerequisites

In [None]:
!pip install torchdyn

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/


In [None]:
# Definitions
import numpy as np
import matplotlib.pyplot as plt
from matplotlib import rc
rc('animation', html='jshtml')

from matplotlib.animation import FuncAnimation

def animate_body(trajectory, resolution_Hz=15, duration=None, fig_size=8):
    plt.ioff()
    trajectory = np.array(trajectory)
    trajectory = trajectory.T[0:2].T
    if len(trajectory.shape) == 1:
        trajectory = trajectory.reshape(trajectory.size, 1)
    if trajectory.shape == (trajectory.size, 1):
        trajectory = np.stack((trajectory.T[0], np.zeros(trajectory.size))).T
    if duration == None:
        frames = range(trajectory.shape[0])
    else:
        frames = range(int(duration * resolution_Hz))
    fig, ax = plt.subplots(figsize=(fig_size, fig_size))
    # set the axes limits
    ax.axis([-2,2,-2, 2])
    ax.set_aspect("equal")
    # create a point in the axes
    plt.grid()
    ax.plot(0, 0, markersize=215, marker="o")
    point, = ax.plot(0,1, marker="o")

    # Updating function, to be repeatedly called by the animation
    def update(t):
        # obtain point coordinates 
        x,y = trajectory[int(t) % trajectory.shape[0]]
        # set point's coordinates
        point.set_data([x],[y])
        return point,

    
    ani = FuncAnimation(fig, update, interval=1000/resolution_Hz, blit=True, repeat=True,
                    frames=frames)
    plt.ion()
    return ani

def running_median(x,N):
    idx = np.arange(N) + np.arange(len(x)-N+1)[:,None]
    b = [row[row>0] for row in x[idx]]
    return np.array(list(map(np.median,b)))

In [None]:
from scipy.linalg import norm
import torch

class ParametrizedDiscreteTimeSystem:
    def __init__(self, 
                 state_transition_function,  # f(., .)
                 initial_state,              # x_0
                 discount               
                 ):
      self.__state_transition_function = state_transition_function
      self.__initial_state = initial_state
      self.__discount = discount

    def run_with_feedback_controller(self, 
                                     feedback_controller,
                                     initial_state=None,
                                     steps=100):
      initial_state = initial_state if initial_state else self.__initial_state
      trajectory = [initial_state]
      actions = []
      total_reward = 0
      accumulated_discount = 1
      for _ in range(steps):
        current_state = trajectory[-1]
        control_input = feedback_controller(current_state)
        actions.append(control_input)
        try:
            next_state, reward = self.__state_transition_function(current_state, control_input)
            total_reward += reward * accumulated_discount
            accumulated_discount *= self.__discount
        except OverflowError:
            print("The trajectory blew up. Ending the episode prematurely.")
            return np.array(trajectory), np.array(actions), total_reward
        trajectory.append(next_state)
      return np.array([x.detach().cpu().numpy() for x in trajectory]), np.array([x.detach().cpu().numpy() for x in actions]), total_reward

In [None]:
animate_body(np.array([[1.0, 1.0], [-1.0, 1.0]]*10))

# Driving a dynamic body

$$
\begin{aligned}
&\begin{cases}
 \frac{\partial}{\partial t}x(t) = \begin{cases}v(t), \text{ if } 2.4 \geq \lVert x(t)\rVert_2 \geq 1 \\ \left(0 \atop 0\right), \text{ otherwise}\end{cases}\\
 \frac{\partial}{\partial t}v(t) =  u(t)\end{cases}\\
& x_0 = \left(0 \atop 1.5 \right) \\
& v_0 = \left(0 \atop 0 \right) \\
& u(t) = \rho_\theta\big(x(t \text{ mod } 1)\big), \\
& r(x(t), v(t), u(t)) := -\lVert x(t) - \left(0\atop -1.5\right) \rVert_2^2, \ \gamma = 0.99
\end{aligned}
$$


## Discretization

$$
\begin{aligned}
& \left(x_{t + 1} \atop v_{t + 1}\right) = f(x_t, v_t, u_t)\\
& x_0 = \left(0 \atop 1.5 \right) \\
& v_0 = \left(0 \atop 0 \right) \\
& u_t = \rho_\theta(x_t), \\
& r_t := -\int_t^{t + 1}\gamma^{\tau - t}\lVert x(\tau ) - \left(0\atop -1.5\right) \rVert_2^2\text{d} \tau, \ \gamma = 0.99
\end{aligned}
$$

Here $f(\cdot, \cdot, \cdot) := x(t), v(t), u(t) \longmapsto x(t + 1), v(t + 1)$.


**Useful resources**:
* [torchdyn](https://torchdyn.readthedocs.io/en/latest/tutorials/quickstart.html)

In [None]:
from scipy.integrate import quad, solve_ivp
from torchdyn.core import NeuralODE


device = torch.device("cuda")

satellite_initial_state = torch.Tensor([0, 1.5, 0.0, 0.0])


try:
    satellite_initial_state = satellite_initial_state.to(device)
except:
    print("Failed to initialize a CUDA device. Using CPU.")
    device = torch.device("cpu")
    satellite_initial_state = satellite_initial_state.to(device)
satellite_discount = 0.99


class SatelliteDynamics(torch.nn.Module):
    def __init__(self):
        super(self.__class__, self).__init__()

    def forward(self, state_reward_action_time):
        position = state_reward_action_time[:, 0:2]
        speed = state_reward_action_time[:, 2:4]
        reward = state_reward_action_time[:, 4:5]
        action = state_reward_action_time[:, 5:7]
        time = state_reward_action_time[:, 7:]
        drift = torch.index_select(state_reward_action_time, 1, torch.LongTensor([2, 3, 0, 1, 4, 5, 6, 7]).to(device))
        drift_position = drift[:, 0:2]
        drift_speed = drift[:, 2:4]
        drift_reward = drift[:, 4:5]
        drift_action = drift[:, 5:7]
        drift_time = drift[:, 7:]
        drift_time *= 0
        drift_time += 1
        radius_squared = torch.matmul(position.reshape(position.shape[0], 1, position.shape[1]),
                                      position.reshape(position.shape[0], position.shape[1], 1)).squeeze(1)
        drift_position *= radius_squared >= 1
        drift_position *= radius_squared <= (2.4 ** 2) 
        drift_action *= 0
        drift_speed *= 0
        drift_speed += action - speed
        drift_reward *= 0
        direction = position - torch.tensor([0, -1.5]).to(device)
        distance_squared = torch.matmul(direction.reshape(speed.shape[0], 1, direction.shape[1]),
                                        direction.reshape(speed.shape[0], direction.shape[1], 1)).squeeze(1)
        drift_reward += -(satellite_discount ** time) * distance_squared
        return drift


class SatelliteTransition(torch.nn.Module):
    def __init__(self):
        super(self.__class__, self).__init__()
        self.ode = NeuralODE(SatelliteDynamics(), solver='dopri5').to(device)

    def forward(self, state_action):
        state_reward_action_time = torch.index_select(state_action, 1, torch.LongTensor([0, 1, 2, 3, 3, 4, 5, 5]).to(device))
        state_reward_action_time[:, 4:5] *= 0
        state_reward_action_time[:, 7:]  *= 0
        state_reward_action_time = self.ode(state_reward_action_time, torch.linspace(0, 4 , 2))[1][-1]
        state_reward = state_reward_action_time[:, :5]
        return state_reward


satellite_transition = SatelliteTransition()

def satellite_transition_function(state, control):
    state_reward = satellite_transition(torch.cat((state[None], control[None]), 1).to(device))[0]
    return state_reward[:-1], state_reward[-1]


satellite_system = ParametrizedDiscreteTimeSystem(satellite_transition_function,
                                                  satellite_initial_state,
                                                  satellite_discount)
    

# Problem 2: Optimizing the total cost using Actor-Critic

In [None]:
from tqdm import tqdm


torch.manual_seed(0)


class Scale(torch.nn.Module):
    def __init__(self, factor):
        super(self.__class__, self).__init__()
        self.factor = factor

    def forward(self, input):
        return input * self.factor


actor_layer_size= ## YOUR CODE HERE
actor = torch.nn.Sequential(
    ## YOUR CODE HERE
).to(device)

critic_layer_size=  ## YOUR CODE HERE
critic = torch.nn.Sequential(
    ## YOUR CODE HERE
).to(device)


class CriticTD(torch.nn.Module):
    def __init__(self, actor, critic):
        super(self.__class__, self).__init__()
        self.critic = critic
        self.actor = actor
        self.transition = SatelliteTransition()
        self.loss = torch.nn.MSELoss()
    
    def forward(self, state):
        ## YOUR CODE HERE
        return temporal_difference_loss

    def parameters(self):
        return self.critic.parameters()

class ActorImprovedValue(torch.nn.Module):
    def __init__(self, actor, critic):
        super(self.__class__, self).__init__()
        self.critic = critic
        self.actor = actor
        self.transition = SatelliteTransition()

    def forward(self, state):
        ## YOUR CODE HERE

    def parameters(self):
        return self.actor.parameters()



critic_temporal_difference = CriticTD(actor, critic)
actor_improved_value = ActorImprovedValue(actor, critic)


optimizer_critic = ## YOUR CODE HERE
optimizer_actor = ## YOUR CODE HERE


def critic_epoch(optimizer, model, iterations=4500, batch_size=1):
    losses = []
    for iteration in tqdm(range(iterations), "Critic epoch"):
        X = (torch.rand((batch_size, 4)) * 5 - 2.5).to(device)
        optimizer.zero_grad()
        loss = model(X)
        loss.backward()
        optimizer.step()
        losses.append(loss.detach().cpu().numpy())
    print("Critic mean loss:", np.mean(losses), "[%f --> %f]" % (np.mean(losses[0:10]), np.mean(losses[-10:-1])))
    return losses


def actor_epoch(optimizer, model, iterations=100, batch_size=1000):
    values = []
    for iteration in tqdm(range(iterations), "Actor epoch"):
        X = (torch.rand((batch_size, 4)) * 5 - 2.5).to(device)
        optimizer.zero_grad()
        improved_value = model(X)
        improved_value.backward()
        optimizer.step()
        values.append(improved_value.detach().cpu().numpy())
    print("Actor mean value:", np.mean(values), "[%f --> %f]" % (np.mean(values[0:10]), np.mean(values[-10:-1])))
    return values

epochs= ## YOUR CODE HERE
for _ in tqdm(range(epochs), "Actor-Critic learning"):
    values = np.array(critic_epoch(optimizer_critic, critic_temporal_difference))
    plt.plot(running_median(values, 100))
    plt.show()
    actor_epoch(optimizer_actor, actor_improved_value)

with torch.no_grad():
    trajectory, actions, total_reward = satellite_system.run_with_feedback_controller(actor, steps=100)
print("Total reward: %f" % total_reward)
print("First action:", actions[0])
animate_body(trajectory[::4])
