In [32]:
import gym
from gym import Env
from gym.spaces import MultiDiscrete, Box

from stable_baselines3 import PPO
from stable_baselines3.common.vec_env import VecFrameStack, DummyVecEnv
from stable_baselines3.common.evaluation import evaluate_policy
from stable_baselines3.common.env_checker import check_env

import numpy as np
import os


In [33]:
from __future__ import annotations
from typing import List, Dict
import math
import numpy as np
 
class Planet:
    def __init__(self, name: str, position: np.array, trajectory: List):
        self.name: str = name
        self.trajectory: List = trajectory
        self.planets: List[Planet] = []
        self.position: np.ndarray = position
        self.ideal_postion: np.ndarray = None
        self.next_position: np.ndarray = None
        self.distance_to_other_planets: List[np.ndarray] = []
        self.current_step = 0
        self.deviation: np.float32 = 0
        self.distance_to_ideal_position: np.float32 = 0
        self.distance_to_nearest_planet: np.float32 = 0
    
    def set_other_planets(self, other_planets: List) -> List[Planet]:
        self.planets = other_planets
        
    def get_ideal_position(self):
        ideal_position = self.trajectory[self.current_step]
        self.ideal_postion = ideal_position

    def get_direction(self):
        raise NotImplementedError

    def get_distance_to_other_planets(self) -> Dict[str, np.float32]:
        dist_dict = {}
        for planet in self.planets:
            if planet.name != self.name:
                distance = math.dist(self.position, planet.position)
                dist_dict[planet.name] = distance
        return dist_dict

    def get_next_position(self, action) -> np.ndarray:
        deviation = self.get_deviation(action)
        if self.deviation < 10:
            self.deviation += deviation

        if self.current_step==119:
            self.current_step = 118
        next_y: np.float32 = self.trajectory[self.current_step+1][1] + self.deviation
        next_position = np.array([self.current_step+1, next_y], dtype=np.float32)
        return next_position

    def update_current_step(self):
        self.current_step = self.current_step + 1
    
    def get_deviation(self, action: int) -> np.float32:
        if action==1:
            deviation = -0.1
        elif action==2:
            deviation = 0.1
        else:
            deviation = 0
        return deviation

    def get_distance_to_ideal_position(self) -> np.float32:
        self.get_ideal_position()
        distance: np.float32 = math.dist(self.position, self.ideal_postion)
        return distance

    def update_position(self, action):
        self.position = self.get_next_position(action)
    


### Creating trajectories

In [34]:
def get_point(x: float, h: float, w: float) -> np.array:
    root_number = 1-(x**2/w**2)
    if root_number < 0:
        root_number*-1
        y = h*np.sqrt(root_number)
        return np.array([x, -y])
    else:
        y = h*np.sqrt(root_number)
        return np.array([x, y]) 

In [35]:
h_1 = 5
w_1 = 30
trajectory_1 = []
for i in range(-30,31):
    trajectory_1.append(get_point(x=i, h=h_1, w=w_1))

trajectory_earth = trajectory_1.copy()
for el in trajectory_1:
    if el[0]==30 or el[0]==-30:
        continue
    else:
        trajectory_earth.append(np.array([-el[0], -el[1]]))
    

h_2 = 10
w_2 = 15
trajectory_2 = []
for i in range(-30,31
               ):
    trajectory_2.append(get_point(x=i/2, h=h_2, w=w_2))

trajectory_venus = trajectory_2.copy()
for el in trajectory_2:
    if el[0]==15 or el[0]==-15:
        continue
    else:
        trajectory_venus.append(np.array([-el[0], -el[1]]))




### Adding the planets

In [36]:
earth = Planet(name="earth", position=np.array([-30, 0]), trajectory=trajectory_earth)
venus = Planet(name="venus", position=np.array([-15, 0]), trajectory=trajectory_venus)

In [37]:
planets = [earth, venus]

earth.set_other_planets(planets)
venus.set_other_planets(planets)


In [38]:
earth.get_distance_to_other_planets()

{'venus': 15.0}

In [39]:
venus.position

array([-15,   0])

### Setting up test env

The observation space is a box of of dimension (2, 4) - two rows and four columns.<br>
The rows represent planets, and the columns represent:<br>

0. planet position, x-coordinate
1. planet position, y-coordinate
2. distance to nearest planet
3. distance to ideal position

The state of the environment is a (2, 4) np.array matching the observation space described above

In [40]:
class spaceEnv(Env):
    metadata = {"render_modes": ["human", "rgb_array"], "render_fps": 1}
    
    def __init__(self, planets: List[Planet]) -> None:
        self.action_space = MultiDiscrete([3, 3])
        self.observation_space = Box(low=-100.0, high=100.0, shape=(2, 4), dtype=np.float32)
        #set starting state
        self.state: np.ndarray = np.array(
                        [
                    [-15.0, 0.0, 15.0, 0.0],
                    [-30.0, 0.0 , 15.0, 0.0]          
                ], dtype=np.float32
            )
        self.planets = planets
        self.current_step = 0
        self.window = None
        self.clock = None
    
    def step(self, action):
        self.state = self.calculate_state(action)
        obs: np.ndarray = self.state
        reward = self.calculate_reward()

        info = {
            "venus": self.planets[0].position,
            "earth": self.planets[1].position
        }

        self.current_step+=1
        self.update_planet_steps()
        done = self.is_done()
        return obs, reward, done, info

    def render(self):
        pass
    
    def reset(self):
        self.state: np.ndarray = np.array(
                        [
                    [-15.0, 0.0, 15.0, 0.0],
                    [-30.0, 0.0 , 15.0, 0.0]          
                ],
                dtype=np.float32
            )
        self.current_step = 0
        return self.state

    def calculate_reward(self):
        reward = 0
        for planet in self.planets:
            if np.abs(planet.distance_to_nearest_planet) > 5:
                reward+=0
            else:
                reward-=planet.distance_to_nearest_planet
            if np.abs(planet.distance_to_ideal_position) > 0:
                reward-=np.abs(planet.distance_to_ideal_position)
            if np.array_equal(planet.position, planet.ideal_postion):
                reward+=1
            if np.abs(planet.distance_to_nearest_planet) < 1:
                reward-=100
                
        return reward

    def calculate_state(self, action) -> np.array:
        positions = []
        for action, planet in enumerate(self.planets):
            step = self.current_step + 1
            x = planet.trajectory[step][0]
            y = planet.get_next_position(action)[1]
            planet.update_position(action)
            positions.append([x, y])

        distances = []
        for planet in self.planets:
            distance_to_nearest_planet: np.float32 = min(planet.get_distance_to_other_planets().values())      
            planet.distance_to_nearest_planet = distance_to_nearest_planet
            distance_to_ideal_position = planet.get_distance_to_ideal_position()
            distances.append([
                distance_to_nearest_planet,
                distance_to_ideal_position
            ])
        
        state_list = []
        for i in range(len(self.planets)):
            row = [positions[i][0], positions[i][1], distances[i][0], distances[i][1]]
            state_list.append(row)
        
        return np.array(state_list, dtype=np.float32)
    
    def update_planet_steps(self):
        for planet in self.planets:
            if planet.current_step < 119:
                planet.current_step = self.current_step
    
    def is_done(self):
        if self.current_step==119:
            done = True
        else:
            done = False
        return done
        
    

### Test env

In [41]:
env = spaceEnv(planets=[venus, earth])

In [42]:
check_env(env, warn=True)



In [43]:
env.action_space.sample()

array([1, 2], dtype=int64)

In [44]:
episodes = 5
for episode in range(1, episodes+1):
    state = env.reset()
    done = False
    score = 0 
    
    while not done:
        #env.render()
        action = env.action_space.sample()
        n_state, reward, done, info = env.step(action)
        score+=reward
    print('Episode:{} Score:{} info{}'.format(episode, score, info))
env.close()

Episode:1 Score:-18.749009013175964 info{'venus': array([119.      ,  -2.560382], dtype=float32), 'earth': array([119.      , -27.280191], dtype=float32)}
Episode:2 Score:0 info{'venus': array([119.      ,  -2.560382], dtype=float32), 'earth': array([119.      , -51.080193], dtype=float32)}
Episode:3 Score:0 info{'venus': array([119.      ,  -2.560382], dtype=float32), 'earth': array([119.     , -74.88019], dtype=float32)}
Episode:4 Score:0 info{'venus': array([119.      ,  -2.560382], dtype=float32), 'earth': array([119.     , -98.68019], dtype=float32)}
Episode:5 Score:0 info{'venus': array([119.      ,  -2.560382], dtype=float32), 'earth': array([ 119.      , -122.480194], dtype=float32)}


In [20]:
earth.trajectory[2][1]

1.7950549357115018

In [66]:
env.observation_space.sample()

array([[-16.251116, -15.976686,  16.97138 ,  24.954184],
       [ 26.708305,  11.090343,  29.69334 , -16.34849 ]], dtype=float32)