In [2]:
import tensorflow as tf
from tensorflow import keras
from tensorflow.keras import layers

In [3]:
def discounted_cumulative_sums(x, discount):
    # Discounted cumulative sums of vectors for computing rewards-to-go and advantage estimates
    return scipy.signal.lfilter([1], [1, float(-discount)], x[::-1], axis=0)[::-1]


class Buffer:
    # Buffer for storing trajectories
    def __init__(self, observation_dimensions, size, gamma=0.99, lam=0.95):
        # Buffer initialization
        self.observation_buffer = np.zeros(
            (size, observation_dimensions), dtype=np.float32
        )
        self.action_buffer = np.zeros(size, dtype=np.int32)
        self.advantage_buffer = np.zeros(size, dtype=np.float32)
        self.reward_buffer = np.zeros(size, dtype=np.float32)
        self.return_buffer = np.zeros(size, dtype=np.float32)
        self.value_buffer = np.zeros(size, dtype=np.float32)
        self.logprobability_buffer = np.zeros(size, dtype=np.float32)
        self.gamma, self.lam = gamma, lam
        self.pointer, self.trajectory_start_index = 0, 0

    def store(self, observation, action, reward, value, logprobability):
        # Append one step of agent-environment interaction
        self.observation_buffer[self.pointer] = observation
        self.action_buffer[self.pointer] = action
        self.reward_buffer[self.pointer] = reward
        self.value_buffer[self.pointer] = value
        self.logprobability_buffer[self.pointer] = logprobability
        self.pointer += 1

    def finish_trajectory(self, last_value=0):
        # Finish the trajectory by computing advantage estimates and rewards-to-go
        path_slice = slice(self.trajectory_start_index, self.pointer)
        rewards = np.append(self.reward_buffer[path_slice], last_value)
        values = np.append(self.value_buffer[path_slice], last_value)

        deltas = rewards[:-1] + self.gamma * values[1:] - values[:-1]

        self.advantage_buffer[path_slice] = discounted_cumulative_sums(
            deltas, self.gamma * self.lam
        )
        self.return_buffer[path_slice] = discounted_cumulative_sums(
            rewards, self.gamma
        )[:-1]

        self.trajectory_start_index = self.pointer

    def get(self):
        # Get all data of the buffer and normalize the advantages
        self.pointer, self.trajectory_start_index = 0, 0
        advantage_mean, advantage_std = (
            np.mean(self.advantage_buffer),
            np.std(self.advantage_buffer),
        )
        self.advantage_buffer = (self.advantage_buffer - advantage_mean) / advantage_std
        return (
            self.observation_buffer,
            self.action_buffer,
            self.advantage_buffer,
            self.return_buffer,
            self.logprobability_buffer,
        )


def mlp(x, sizes, activation=tf.nn.relu, output_activation=None):
    # Build a feedforward neural network
    for size in sizes[:-1]:
        x = layers.Dense(units=size, activation=activation)(x)
    return layers.Dense(units=sizes[-1], activation=output_activation)(x)


def logprobabilities(logits, a):
    # Compute the log-probabilities of taking actions a by using the logits (i.e. the output of the actor)
    logprobabilities_all = tf.nn.log_softmax(logits)
    logprobability = tf.reduce_sum(
        tf.one_hot(a, num_actions) * logprobabilities_all, axis=1
    )
    return logprobability


# Sample action from actor
@tf.function
def sample_action(observation):
    logits = actor(observation)
    action = tf.squeeze(tf.random.categorical(logits, 1), axis=1)
    return logits, action


# Train the policy by maxizing the PPO-Clip objective
@tf.function
def train_policy(
    observation_buffer, action_buffer, logprobability_buffer, advantage_buffer
):

    with tf.GradientTape() as tape:  # Record operations for automatic differentiation.
        ratio = tf.exp(
            logprobabilities(actor(observation_buffer), action_buffer)
            - logprobability_buffer
        )
        min_advantage = tf.where(
            advantage_buffer > 0,
            (1 + clip_ratio) * advantage_buffer,
            (1 - clip_ratio) * advantage_buffer,
        )

        policy_loss = -tf.reduce_mean(
            tf.minimum(ratio * advantage_buffer, min_advantage)
        )
    policy_grads = tape.gradient(policy_loss, actor.trainable_variables)
    policy_optimizer.apply_gradients(zip(policy_grads, actor.trainable_variables))

    kl = tf.reduce_mean(
        logprobability_buffer
        - logprobabilities(actor(observation_buffer), action_buffer)
    )
    kl = tf.reduce_sum(kl)
    return kl


# Train the value function by regression on mean-squared error
@tf.function
def train_value_function(observation_buffer, return_buffer):
    with tf.GradientTape() as tape:  # Record operations for automatic differentiation.
        value_loss = tf.reduce_mean((return_buffer - critic(observation_buffer)) ** 2)
    value_grads = tape.gradient(value_loss, critic.trainable_variables)
    value_optimizer.apply_gradients(zip(value_grads, critic.trainable_variables))


In [4]:
def plot_euler_angles(t, roll, pitch, yaw, filename=None, bbox=(60,100,-1,1), inset=True):
    fig, ax = plt.subplots(figsize=(16,8))
    ax.set_title("Euler Angles")
    
    ax.plot(t, roll, label = 'roll', color = 'red')
    ax.plot(t, pitch, label = 'pitch', color = 'green')
    ax.plot(t, yaw, label = 'yaw', color = 'blue')
    
    if inset:
        ylims = ax.get_ylim()
        above = abs(ylims[1]) > abs(ylims[0])
        axins = ax.inset_axes([0.3, 0.55 if above else 0.05 , 0.5, 0.25])
        x1, x2, y1, y2 = bbox
        axins.set_xlim(x1, x2)
        axins.set_ylim(y1, y2)
        axins.plot(t, roll, label = 'roll', color = 'red')
        axins.plot(t, pitch, label = 'pitch', color = 'green')
        axins.plot(t, yaw, label = 'yaw', color = 'blue')
        axins.set_xticklabels([])
        axins.grid()
        ax.indicate_inset_zoom(axins, edgecolor="black")

    ax.set_ylabel(r'angles, [deg]')
    ax.set_xlabel(r't, [s]')
    ax.grid(True)
    ax.legend()
    fig.show()
    if filename is not None:
        fig.savefig(f'{filename}')

def plot_phase_diagram(roll, pitch, yaw, omega_, filename=None):
    fig2 = plt.figure(figsize=(8,8))
    ax2 = fig2.add_subplot(1,1,1)
    ax2.set_title("Phase diagram")
    ax2.plot(roll, omega_[0], label = 'roll', color = 'red')
    ax2.plot(pitch, omega_[1], label = 'pitch', color = 'green')
    ax2.plot(yaw, omega_[2], label = 'yaw', color = 'blue')
    ax2.plot([0],[0], 'o', markersize=8 , label='Destination', color = 'magenta')
    ax2.set_xlabel(r'$\alpha$')
    ax2.set_ylabel(r'$\omega$')
    ax2.legend()
    ax2.grid()
    fig2.show()
    if filename is not None:
        fig2.savefig(f'{filename}')

In [6]:
def normalize(obj):

    return obj / np.linalg.norm(obj)


def cross_product(a, b):

    def check_dimensions(vec, string):

        if vec.ndim != 1:
            raise Exception("The {} input is not a vector".format(string))
        if len(vec) != 3:
            raise Exception("Wrong number of coordinates in the {0} vector: {1}, should be 3".format(string, len(vec)))

    check_dimensions(a, 'first')
    check_dimensions(b, 'second')

    return np.array([a[1]*b[2]-a[2]*b[1], a[2]*b[0]-a[0]*b[2], a[0]*b[1] - a[1]*b[0]])

def quat_product(q1, q2):

    def check_dimensions(q, string):

        if q.ndim != 1:
            raise Exception("The {} input is not a quaternion".format(string))
        if len(q) != 4:
            raise Exception("Wrong number of coordinates in the {0} quaternion: {1}, should be 4".format(string, len(q)))

    check_dimensions(q1, 'first')
    check_dimensions(q2, 'second')

    q = np.zeros(4)
    q[0] = q1[0] * q2[0] - q1[1:].dot(q2[1:])
    q[1:] = q1[0] * q2[1:] + q2[0] * q1[1:] + cross_product(q1[1:], q2[1:])

    return q

def rotate_vec_with_quat(q, vec):

    def check_dimensions(obj, is_quat):

        if obj.ndim != 1:
            raise Exception("Not a {}".format('quaternion' * is_quat + 'vector' * (1 - is_quat)))
        if len(obj) != (3 + 1 * is_quat):
            raise Exception("Wrong number of coordinates in the {0}: {1}, should be {2}"
                            .format('quaternion' * is_quat + 'vector' * (1 - is_quat), len(obj), 3 + 1 * is_quat))

    check_dimensions(q, True)
    check_dimensions(vec, False)

    q = quat_conjugate(q)

    qxvec = cross_product(q[1:], vec)

    return q[1:].dot(vec) * q[1:] + q[0]**2. * vec + 2. * q[0] * qxvec + cross_product(q[1:], qxvec)

def quat2rpy(q0, q1, q2, q3):

    roll = np.arctan2(2. * (q0 * q1 + q2 * q3), 1. - 2. * (q1**2 + q2**2))
    pitch = np.arcsin(2. * (q0 * q2 - q1 * q3))
    yaw = np.arctan2(2. * (q0 * q3 + q1 * q2), 1. - 2. * (q2**2 + q3**2))

    return [roll, pitch, yaw]

def quat2rpy_deg(q0, q1, q2, q3):

    roll = np.arctan2(2. * (q0 * q1 + q2 * q3), 1. - 2. * (q1**2 + q2**2))*180/np.pi
    pitch = np.arcsin(2. * (q0 * q2 - q1 * q3))*180/np.pi
    yaw = np.arctan2(2. * (q0 * q3 + q1 * q2), 1. - 2. * (q2**2 + q3**2))*180/np.pi

    return [roll, pitch, yaw]

def quat_conjugate(q):

    q_new = np.copy(q)
    q_new[1:] = q_new[1:] * -1.

    return q_new

In [7]:
def train(epochs, obs_shape, start=None):
# Iterate over the number of epochs
  print('starting training with', epochs, 'epochs')
  buffer = Buffer(observation_dimensions, steps_per_epoch)
  env = TorqueDynamics(0.1, np.array([1, 0, 0, 0]), obs_shape)
  # Initialize the observation, episode return and episode length
  observation, episode_return, episode_length = env.reset(start), 0, 0

  returns = []

  for epoch in range(epochs):
      # Initialize the sum of the returns, lengths and number of episodes for each epoch
      sum_return = 0
      sum_length = 0
      num_episodes = 0

      # Iterate over the steps of each epoch
      for t in range(steps_per_epoch):
          if render:
              env.render()

          # Get the logits, action, and take one step in the environment
          observation = observation.reshape(1, -1)
          logits, action = sample_action(observation)
          #print(type(action[0].numpy()))
          observation_new, reward, done, info = env.step(action[0].numpy())
          episode_return += reward
          episode_length += 1

          # Get the value and log-probability of the action
          value_t = critic(observation)
          logprobability_t = logprobabilities(logits, action)

          # Store obs, act, rew, v_t, logp_pi_t
          buffer.store(observation, action, reward, value_t, logprobability_t)

          # Update the observation
          observation = observation_new

          # Finish trajectory if reached to a terminal state
          terminal = done
          if terminal or (t == steps_per_epoch - 1):
              last_value = 0 if done else critic(observation.reshape(1, -1))
              buffer.finish_trajectory(last_value)
              sum_return += episode_return
              sum_length += episode_length
              num_episodes += 1
              observation, episode_return, episode_length = env.reset(start), 0, 0

      # Get values from the buffer
      (
          observation_buffer,
          action_buffer,
          advantage_buffer,
          return_buffer,
          logprobability_buffer,
      ) = buffer.get()

      # Update the policy and implement early stopping using KL divergence
      for _ in range(train_policy_iterations):
          kl = train_policy(
              observation_buffer, action_buffer, logprobability_buffer, advantage_buffer
          )
          if kl > 1.5 * target_kl:
              # Early Stopping
              break

      # Update the value function
      for _ in range(train_value_iterations):
          train_value_function(observation_buffer, return_buffer)

      # Print mean return and length for each epoch
      print(
          f" Epoch: {epoch + 1}. Mean Return: {sum_return / num_episodes}. Mean Length: {sum_length / num_episodes}"
      )
      returns.append(sum_return / num_episodes)
  return returns