# Import Libraries

In [1]:
import numpy as np
import cv2
import random
import time
import math
import matplotlib.pyplot as plt

In [2]:
import gym
from stable_baselines3 import PPO
from stable_baselines3.common.evaluation import evaluate_policy
from gym import spaces

# Helper Functions

In [25]:
#functionss
def collision_landing_pad(landing_pad_position, score):
    terminate = True
    return terminate

def out_of_frame(rocket_position):
    if rocket_position[0]>=1920 or rocket_position[0]<0 or rocket_position[1]>=1080 or rocket_position[1]<0 :
        return 1
    else:
        return 0
    
def ode(t, y, *args):
    alpha = args[-5] # associated to left engine
    beta = args[-4] # associated to right engine
    T_l = args[-3] # thrust of left engine
    T_c = args[-2] # thrust of center engine, no gimbal capabilities
    T_r = args[-1] # thrust of right engine
    g = args[0]
    m = args[1]
    Theta = args[2]
    a = args[3]
    h = args[4]
    
    phi = y[2]
    
    vel_x = y[3]
    vel_y = y[4]
    vel_phi = y[5]
    
    pos_x_dot = vel_x
    pos_y_dot = vel_y
    phi_dot = vel_phi
    
    vel_x_dot = 1/m * (-np.sin(beta+phi)*T_r -np.sin(alpha+phi)*T_l - np.sin(phi)*T_c)
    vel_y_dot = -g + 1/m * (np.cos(beta+phi)*T_r + np.cos(alpha+phi)*T_l + np.cos(phi)*T_c)
    
    dr = (a - h*np.tan(beta))*np.cos(beta)
    dl = (a - h*np.tan(-alpha))*np.cos(-alpha)
    
    vel_phi_dot = 1/Theta * (T_r * dr - T_l * dl)
    
    return np.array([pos_x_dot, pos_y_dot, phi_dot, vel_x_dot, vel_y_dot, vel_phi_dot])


def ode_linear(t, y, *args):
    # linearisation around
        # alpha = 0
        # beta = 0
    
    # not "fully" linear since control input (T_r, T_c, T_l) is multiplied with TVC deflection angle (alpha, beta)
    
    alpha = args[-5] # deflection angle associated to left engine
    beta = args[-4] # deflection angle associated to right engine
    T_l = args[-3] # thrust of left engine
    T_c = args[-2] # thrust of center engine, no gimbal capabilities
    T_r = args[-1] # thrust of right engine
    g = args[0]
    m = args[1]
    Theta = args[2]
    a = args[3]
    h = args[4]
    
    phi = y[2]
    
    vel_x = y[3]
    vel_y = y[4]
    vel_phi = y[5]
    
    pos_x_dot = vel_x
    pos_y_dot = vel_y
    phi_dot = vel_phi
    
    vel_x_dot = 1/m * (-(beta+phi)*T_r -(alpha+phi)*T_l -phi*T_c)
    vel_y_dot = -g + 1/m * (T_c + T_r + T_l)
    
    dr = a - h*beta
    dl = a - h*(-alpha)
    
    vel_phi_dot = 1/Theta * (T_r * dr - T_l * dl)
    
    return np.array([pos_x_dot, pos_y_dot, phi_dot, vel_x_dot, vel_y_dot, vel_phi_dot])

def rk4_e(f, y, h, t, *args):
    # runge kutte 4th order explicit
    tk_05 = t + 0.5*h
    yk_025 = y + 0.5 * h * f(t, y, *args)
    yk_05 = y + 0.5 * h * f(tk_05, yk_025, *args)
    yk_075 = y + h * f(tk_05, yk_05, *args)
    
    return y + h/6 * (f(t, y, *args) + 2 * f(tk_05, yk_025, *args) + 2 * f(tk_05, yk_05, *args) + f(t+h, yk_075, *args))

def scaling(pos_x, pos_y, width, height, uncovered=0.1):
    # scale flight path to frame

    # uncovered is part of frame that is not used to visualize flight path
    used_width = int(width * (1 - 2*uncovered))
    used_height = int(height * (1 - 2*uncovered))
    
    max_pos_x = max(pos_x)
    min_pos_x = min(pos_x)
    max_pos_y = max(pos_y)
    min_pos_y = min(pos_y)
    
    dist_x = max_pos_x - min_pos_x
    dist_y = max_pos_y - min_pos_y
    
    if dist_x>=dist_y:
        max_dist = dist_x + 1
    else:
        max_dist = dist_y + 1
    
    scaled_x = used_width/(max_dist)*pos_x
    scaled_x = scaled_x - (min(scaled_x) - int(uncovered*width))
    
    scaled_y = used_height/(max_dist)*pos_y
    scaled_y = scaled_y - (min(scaled_y) - int(uncovered*width))
    scaled_y = height - scaled_y
                           
    return scaled_x.astype(np.int32), scaled_y.astype(np.int32)

def rotation(phi, points):    
    x_rotate = points[:,0] * np.cos(phi) - points[:,1] * np.sin(phi)
    y_rotate = points[:,0] * np.sin(phi) + points[:,1] * np.cos(phi)
    
    return np.array([x_rotate, y_rotate]).T.astype(np.int32)

# Custom Environment

In [26]:
g = 1.625                   # m/s^2
m = 1000                    # mass of the spacecraft in kg
Theta = 1000                # moment of inertia of the spacecraft
b = 2                       # distance of thrust vector from center of gravity --> check the handout for the sketch
a = 1                       # distance of thrust vector from center of gravity --> check the handout for the sketch

h = 1/30   # time step size
t0 = 0                  # initial time
tn = 10                 # end time
linear = False           # using linear (True) or nonlinear equations of motion (False)

x0 = 250                # initial x position
y0 = 350                # initial y position
phi0 = math.radians(-15) # initial rotation angle of spacecraft (mathematically positive defined)
velx0 = -25             # initial x direction velocity (u)
vely0 = -25             # initial y direction velocity (v)
velphi0 = 0             # initial z direction velocity (w)

alphaBetaRate = math.radians(22.5)
thrust_rate = 200

time = np.linspace(t0, tn, int((tn-t0)/h)+1)

states = np.zeros((6, len(time)))

# data coming from 2D_flight_dynamic_propagation.py
pos_x = states[0,:-1]#.astype(np.int32)
pos_y = states[1,:-1]#.astype(np.int32)

phi = -states[2,:-1] # negative sign, because rotation matrix is defined incorrelty with regard to mathematical positive rotation

vel_x = states[3,:-1]
vel_y = states[4,:-1]

alpha = 0
beta = 0

max_thrust = 1500

T_l = 0
T_r = 0
T_c = 1625 * 1.2

In [68]:
class SpacecraftEnv(gym.Env):
    """Custom Environment that follows gym interface."""

    metadata = {"render.modes": ["human"]}

    def __init__(self, gravity = -9.81,
        mass: float = 1000.0,
        moment_inertia: float = 1000.0,
        thrust_rate:float = 200,
        alphaBetaRate = 0,
        a= 1,
        b= 2,
        linear: bool = False,
        h = h):
        
        super(SpacecraftEnv, self).__init__()
        
        #rocket initial data
        self.gravity = gravity
        self.mass = mass
        self.moment_inertia = moment_inertia
        self.rocket_position = [250, 350] #vec2 position
        self.rocket_velocity = [-25, -25] #vec2 velocity
        self.rocket_accel = [0, 0]
        
        self.rocket_angle = 0 #float angle
        self.rocket_angular_vel = 0 #float angular velocity
        self.rocket_angular_accel = 0
        self.thrust_rate = thrust_rate
        
        #landing pad initial data
        self.pad_position = [0, 0] #vec2 position
        self.pad_width = 5 #float width
        self.pad_height = 2 #float height
        
        #rocket side engines
        self.alpha = 0
        self.beta = 0
        self.alphaBetaRate = alphaBetaRate
        self.thrust_dynamic = 200
        
        self.thrust_left = 0
        self.thrust_right = 0
        self.thrust_center = mass * gravity * 1.2
        
        self.a = a
        self.b = b
        self.h = h

        # position history
        self.pos_history = []
        
        self.linear = linear
        
        # thrust engines (3)
        # control alpha beta
        self.action_space = spaces.Box(-1, 1, (5,), dtype='int8')
        
        low = np.array(
            [
                # these are bounds for position
                # realistically the environment should have ended
                # long before we reach more than 50% outside
                -1.5,
                -1.5,
                # velocity bounds is 5x rated speed
                -5.0,
                -5.0,
                -math.pi,
                -5.0,
                -2.0,
                -2.0,
            ]
        ).astype(np.float32)
        high = np.array(
            [
                # these are bounds for position
                # realistically the environment should have ended
                # long before we reach more than 50% outside
                1.5,
                1.5,
                # velocity bounds is 5x rated speed
                5.0,
                5.0,
                math.pi,
                5.0,
                2.0,
                2.0,
            ]
        ).astype(np.float32)
        
        self.observation_space = spaces.Box(low, high)

    def step(self, action):
        #CALCULATION
        # update via newton and runga kutta
        args = [self.gravity, self.mass, self.moment_inertia, self.a, self.b, self.alpha, self.beta, self.thrust_left, self.thrust_center, self.thrust_right]
        coor_sets = np.array([self.rocket_position[0], self.rocket_position[1], self.rocket_angle, self.rocket_velocity[0], self.rocket_velocity[1], self.rocket_angular_vel])
        t_rk = 0 #not so important, only for runga kutta formalism

        if self.linear:
            coor_sets = rk4_e(ode_linear, coor_sets, self.h, t_rk, args)
        else:
            coor_sets = rk4_e(ode, coor_sets, self.h, t_rk, *args)

        self.rocket_position[0] = coor_sets[0]
        self.rocket_position[1] = coor_sets[1]
        self.pos_history.append((self.rocket_position[0],self.rocket_position[1]))
        self.rocket_angle = coor_sets[2]
        self.rocket_velocity[0] = coor_sets[3]
        self.rocket_velocity[1] = coor_sets[4]
        self.rocket_angular_vel = coor_sets[5]
        #print(self.rocket_position)
        #print(self.rocket_velocity)
        
        self.thrust_center += self.thrust_rate * action[0] * self.h
        self.thrust_right += self.thrust_rate * action[1] * self.h
        self.thrust_left += self.thrust_rate * action[2] * self.h
        
        self.alpha += self.alphaBetaRate * action[3] * self.h
        self.beta += self.alphaBetaRate * action[4] * self.h
                
        #RENDERING
        #cv2.imshow('Spacecraft Env', self.img)
        #cv2.waitKey(1)
        #self.img = np.zeros((500, 500, 3), dtype='uint8')
        #Display Rocket
        #Display Landing Pad
        
        #REWARD and TERMINATION
        reward = 0
        ### 1
        distance_to_landing_pad = np.linalg.norm(np.array(self.rocket_position) - np.array(self.pad_position))
        reward -= distance_to_landing_pad * 10
        
        ### 2
        epsilon = 0.005
        y_com = self.b * np.cos(self.rocket_angle) + abs(self.a * np.sin(self.rocket_angle))
        if (abs(y_com - self.rocket_position[1]) <= epsilon):
            self.terminated = True
            print("#2: " + str(self.rocket_position[0]) + " "+ str(self.rocket_position[1]))
            if (self.rocket_velocity[1] <= 1):
                
                reward += 10000 * (1.001 - self.rocket_velocity[1])    
            else:
                reward -= 10000

        ### 3
        if (self.rocket_angle < math.radians(-30) or self.rocket_angle > math.radians(30)):
            reward -= (abs(self.rocket_angle) - math.radians(30))
            
        ### 4
        if (self.rocket_position[1] < self.pad_position[1]):
            reward -= 25
            
        ### 5
        if (out_of_frame(self.rocket_position)):
            print("#5: " + str(self.rocket_position[0]) + " " + str(self.rocket_position[1]))
            reward -= 10000
            self.terminated = True
            
        info = {}    
        return observation, reward, self.terminated, info

    def reset(self):
        self.img = np.zeros((500, 500, 3), dtype='uint8')

        #reset rocket data
        self.rocket_position = [250, 350] #vec2 position
        self.rocket_velocity = [-25, -25] #vec2 velocity
        self.rocket_accel = [0, 0]
        
        self.rocket_angle = 0 #float angle
        self.rocket_angular_vel = 0 #float angular velocity
        self.rocket_angular_accel = 0
        self.is_touching_pad = False #bool is_touching_pad
        
        #general data
        self.terminated = False
        info = {}
        self.reward = 0
        
        rocket_position_x = self.rocket_position[0]
        rocket_position_y = self.rocket_position[1]
        rocket_velocity_x = self.rocket_velocity[0]
        rocket_velocity_y = self.rocket_velocity[1]
        rocket_accel_x = self.rocket_accel[0]
        rocket_accel_y = self.rocket_accel[1]

        rocket_angle = self.rocket_angle
        angular_vel = self.rocket_angular_vel
        angular_accel = self.rocket_angular_accel
                
        #thrust_left = self.thrust_left
        #thrust_right = self.thrust_right
        #thrust_center = self.thrust_center
        
        alpha = self.alpha
        beta = self.beta

        observation = [rocket_position_x, rocket_position_y, rocket_velocity_x, rocket_velocity_y, rocket_angle, angular_vel, alpha, beta]
        
        return observation

    def render(self, mode="human"):
        return

    def close(self):
        cv2.destroyAllWindows()

# Model

In [69]:
env = SpacecraftEnv(
        gravity = g,
        mass = m,
        alphaBetaRate = alphaBetaRate,
        a = a,
        b = b,
        linear = False,
        h = h)

In [70]:
episodes = 5
for episode in range(1, episodes + 1):
    observation = env.reset()
    done = False
    total_reward = 0
    
    while not done:
        random_action = env.action_space.sample()
        observation, reward, done, info = env.step(random_action)
        total_reward += reward
        
    print("episode {} with score: {}".format(episode, total_reward))
    
env.close()

#5: -0.5281426460208177 119.81333708889173
episode 1 with score: -792767.362724547
#5: -0.9358815728847385 114.33211637311106
episode 2 with score: -752921.5759073265
#5: -0.6228180460914712 115.81685717418556
episode 3 with score: -769083.3411965335
#5: -0.5441268718599853 101.96391683440122
episode 4 with score: -801334.5036756843
#5: -0.2001153954561361 106.95018395532963
episode 5 with score: -791927.5708342829


In [47]:
model = PPO("MlpPolicy", env, verbose=1)
model.learn(total_timesteps=int(5000), progress_bar=True)
model.save("PPO-Spaceraft_1")

Using cpu device
Wrapping the env with a `Monitor` wrapper
Wrapping the env in a DummyVecEnv.


Output()

----------------------------------
| rollout/           |           |
|    ep_len_mean     | 338       |
|    ep_rew_mean     | -3.84e+05 |
| time/              |           |
|    fps             | 89        |
|    iterations      | 1         |
|    time_elapsed    | 23        |
|    total_timesteps | 2048      |
----------------------------------


------------------------------------------
| rollout/                |              |
|    ep_len_mean          | 336          |
|    ep_rew_mean          | -4.83e+05    |
| time/                   |              |
|    fps                  | 83           |
|    iterations           | 2            |
|    time_elapsed         | 49           |
|    total_timesteps      | 4096         |
| train/                  |              |
|    approx_kl            | 0.0018600852 |
|    clip_fraction        | 4.88e-05     |
|    clip_range           | 0.2          |
|    entropy_loss         | -7.1         |
|    explained_variance   | 0            |
|    learning_rate        | 0.0003       |
|    loss                 | 2.34e+08     |
|    n_updates            | 10           |
|    policy_gradient_loss | -0.00179     |
|    std                  | 1            |
|    value_loss           | 4.96e+08     |
------------------------------------------


-------------------------------------------
| rollout/                |               |
|    ep_len_mean          | 330           |
|    ep_rew_mean          | -7.11e+05     |
| time/                   |               |
|    fps                  | 83            |
|    iterations           | 3             |
|    time_elapsed         | 73            |
|    total_timesteps      | 6144          |
| train/                  |               |
|    approx_kl            | 0.00088501105 |
|    clip_fraction        | 0             |
|    clip_range           | 0.2           |
|    entropy_loss         | -7.1          |
|    explained_variance   | 5.96e-08      |
|    learning_rate        | 0.0003        |
|    loss                 | 6.68e+08      |
|    n_updates            | 20            |
|    policy_gradient_loss | -0.00139      |
|    std                  | 1             |
|    value_loss           | 1.28e+09      |
-------------------------------------------


# Visualization

In [None]:
FPS = 1/h      
safe_name= 'test.mp4'
font = cv2.FONT_HERSHEY_SIMPLEX
fontScale = 0.5
color = (255, 255, 255)
thickness = 1

n_timesteps = np.shape(pos_x)[0]

#resolution
image_height = 1080
image_width = 1920
scaled_x, scaled_y = scaling(pos_x, pos_y, image_width, image_height)

flight_path = np.array([scaled_x, scaled_y]).T
scaling = 10

lander_polygon = np.array([[-scaling, 2*scaling], [scaling, 2*scaling], [scaling, -2*scaling], [0, -3*scaling], [-scaling, -2*scaling]])
right_fin_polygon = np.array([[1.3*scaling, 2*scaling], [2*scaling, 3*scaling], [2*scaling, 1*scaling], [1.3*scaling, 0], [1.3*scaling, 2*scaling]])
left_fin_polygon = np.copy(right_fin_polygon)
left_fin_polygon[:,0] = left_fin_polygon[:,0] * -1


video = cv2.VideoWriter(safe_name,cv2.VideoWriter_fourcc('m','p','4','v'), FPS, (image_width,image_height))
for i in range(n_timesteps-1):
    # image = 255 * np.ones((image_height,image_width,3), np.uint8)
    # replace with cv2.imread(imageFile)
    image = cv2.imread('background.png')

    # image = cv2.polylines(image, [flight_path.reshape(-1,1,2)], False, (255,255,255), 1, cv2.LINE_AA)
    
    image = cv2.fillPoly(image, [(rotation(phi[i], lander_polygon)+flight_path[i,:]).reshape(-1,1,2)], (208,193,155))
    image = cv2.fillPoly(image, [(rotation(phi[i], right_fin_polygon)+flight_path[i,:]).reshape(-1,1,2)], (49,44,203))
    image = cv2.fillPoly(image, [(rotation(phi[i], left_fin_polygon)+flight_path[i,:]).reshape(-1,1,2)], (49,44,203))

    image = cv2.fillPoly(image, [(rotation(phi[i], left_fin_polygon)+flight_path[i,:]).reshape(-1,1,2)], (49,44,203))

    exhaust_polygon_left = np.array([[-9, 2*scaling], [-7, (2 + 3*T_l/max_thrust)*scaling], [-5, 2*scaling]])
    exhaust_polygon_right = np.array([[9, 2*scaling], [7, (2 + 3*T_r/max_thrust)*scaling], [5, 2*scaling]])
    exhaust_polygon_center = np.array([[2, 2*scaling], [0, (2 + 3*T_c/max_thrust)*scaling], [-2, 2*scaling]])
    
    image = cv2.fillPoly(image, [(rotation(phi[i] + alpha, exhaust_polygon_left)+flight_path[i,:]).reshape(-1,1,2)], (39,188,248))
    image = cv2.fillPoly(image, [(rotation(phi[i], exhaust_polygon_center)+flight_path[i,:]).reshape(-1,1,2)], (39,188,248))
    image = cv2.fillPoly(image, [(rotation(phi[i] + beta, exhaust_polygon_right)+flight_path[i,:]).reshape(-1,1,2)], (39,188,248))
    
    
    # image = cv2.polylines(image, [np.array([np.array([224-35, 888 + 3*scaling]), np.array([224+35, 888 + 3*scaling])]).reshape(-1,1,2)], isClosed = False, color=(0,0.0), thickness = 5)
    image = cv2.polylines(image, [np.array([flight_path[-1,:] + np.array([35, 3*scaling]), flight_path[-1,:] + np.array([-35, 3*scaling])]).reshape(-1,1,2)], isClosed = False, color=(0,0.0), thickness = 5)


    image = cv2.putText(image, 'Horizontal Position: %i [m]' % int(env.pos_history[i][0]), (50, 50), font, fontScale, color, thickness, cv2.LINE_AA)
    image = cv2.putText(image, 'Vertical Position: %i [m]' % int(env.pos_history[i][1]), (50, 70), font, fontScale, color, thickness, cv2.LINE_AA)
    image = cv2.putText(image, 'Horizontal Velocity: %i [m/s]' % int(vel_x[i]), (50, 90), font, fontScale, color, thickness, cv2.LINE_AA)
    image = cv2.putText(image, 'Vertical Velocity: %i [m/s]' % int(vel_y[i]), (50, 110), font, fontScale, color, thickness, cv2.LINE_AA)
    image = cv2.putText(image, 'Left Engine Thrust: %i [%%]' % int(T_l/max_thrust*100), (50, 130), font, fontScale, color, thickness, cv2.LINE_AA)
    image = cv2.putText(image, 'Center Engine Thrust: %i [%%]' % int(T_c/max_thrust*100), (50, 150), font, fontScale, color, thickness, cv2.LINE_AA)
    image = cv2.putText(image, 'Right Engine Thrust: %i [%%]' % int(T_r/max_thrust*100), (50, 170), font, fontScale, color, thickness, cv2.LINE_AA)
    image = cv2.putText(image, 'Left Engine Gimbal Angle: %i [Degrees]' % int(-alpha*180/np.pi), (50, 190), font, fontScale, color, thickness, cv2.LINE_AA)
    image = cv2.putText(image, 'Right Engine Gimbal Angle: %i [Degrees]' % int(-beta*180/np.pi), (50, 210), font, fontScale, color, thickness, cv2.LINE_AA)

    video.write(image)
video.release()

# Plotting

In [None]:
plt.figure(figsize=(24,5))
plt.subplot(141)
plt.plot(time[:-1], states[0,:-1], label='Position x')
plt.plot(time[:-1], states[1,:-1], label='Position y')
plt.xlabel('Time [s]')
plt.ylabel('Position [m]')
plt.legend()
plt.grid()

plt.subplot(142)
plt.plot(time[:-1], states[2,:-1]*180/np.pi, label='Rotation phi')
plt.xlabel('Time [s]')
plt.ylabel('Angle [°]')
plt.legend()
plt.grid()

plt.subplot(143)
plt.plot(time[:-1], states[3,:-1], label='Velocity x')
plt.plot(time[:-1], states[4,:-1], label='Velocity y')
plt.xlabel('Time [s]')
plt.ylabel('Velocity [m/s]')
plt.legend()
plt.grid()

plt.subplot(144)
plt.plot(time[:-1], states[5,:-1]*180/np.pi, label='Velocity phi')
plt.xlabel('Time [s]')
plt.ylabel('Velocity [°/s]')
plt.legend()
plt.grid()

plt.show()