In [1]:
import numpy as np
import gym

In [2]:
def kmh_to_ms(val):
    return val/3.6

In [3]:
ship = {
    "min_speed": kmh_to_ms(6),
    "max_speed": kmh_to_ms(25),
    "max_accel": 0.5,
    "max_jerk": 0.05,
}


dim = 2
timestep = 1



In [18]:
class ShipVector():
    def __init__(self, pos, speed, specs):
        self.pos = pos
        self.speed = speed
        self.accel = np.zeros(pos.shape)
        self.specs = specs

class ShipVectorEnv(gym.Env):
    def __init__(self, specs, dim=1):
        self.action_space = gym.spaces.Box(
            low = -np.array([specs["max_accel"]]),
            high = np.array([specs["max_accel"]]),
            dtype = np.float64
        )
        
        self.observation_space = gym.spaces.Box(
            high = np.array([2000, specs["max_speed"], specs["max_accel"]]),
            low = np.array([0, 0, -specs["max_accel"]]),
            dtype = np.float64
        )
        
        self.metadata = None
        
        self.specs = specs
        self.dim = dim
        self.timestep = 1
                
        self.own_ship = ShipVector(np.zeros(self.dim), np.repeat(self.specs["min_speed"]*1.2, self.dim), self.specs)
        self.other_ship = ShipVector(np.random.uniform(100, 1000, self.dim), np.repeat(self.specs["min_speed"]*1.2, self.dim), self.specs)
        
        self.history = []
        
        
    def move_ship(self, specs, pos, speed, last_accel, accel):
        # Limit acceleration
        accel = np.clip(accel, -specs["max_accel"], specs["max_accel"])
        
        # Limit acceleration changes
        # Cap higher values
        indices = accel > last_accel + specs["max_jerk"] * self.timestep
        accel[indices] = last_accel[indices] + specs["max_jerk"] * self.timestep

        # Limit lower values
        indices = accel < last_accel - specs["max_jerk"] * self.timestep
        accel[indices] = last_accel[indices] - specs["max_jerk"] * self.timestep

        # Apply acceleration change
        new_speed = speed + accel * timestep

        # Limit top speed
        # Set last acceleration accordingly
        indices = new_speed > specs["max_speed"]
        accel[indices] = (specs["max_speed"] - speed[indices]) / self.timestep
        new_speed[indices] = specs["max_speed"]

        speed = new_speed

        pos += speed * self.timestep
        return pos, speed, accel
    
    def calc_other(self):
        # Default strategy: random walk
        # We want to move at least at a third of the possible speed
        too_slow = self.other_ship.speed < min(self.other_ship.specs["max_speed"]/3, self.other_ship.specs["min_speed"]*1.5)
        actions = np.random.normal(0, self.other_ship.specs["max_accel"], self.dim)
        actions[too_slow] = self.other_ship.specs["max_accel"]
        
        self.other_ship.pos, self.other_ship.speed, self.other_ship.accel = self.move_ship(self.other_ship.specs,
                                                                                           self.other_ship.pos,
                                                                                           self.other_ship.speed,
                                                                                           self.other_ship.accel,
                                                                                           actions)

    
    def encode_state(self):
        dist = self.other_ship.pos - self.own_ship.pos
        return np.stack([dist, self.own_ship.speed, self.own_ship.accel]).transpose(1,0)
    
    def calc_reward(self):
        dist = self.other_ship.pos - self.own_ship.pos
        
        rewards = np.zeros(self.dim)
                
        # Low penalty proportional to distance
        rewards -= dist * 0.0001
        
        # High penalty for lower than safety distance
        rewards[dist < 200] = -0.5
        
        # Critical penalty for death
        rewards[self.calc_death()] = -10
        
        return rewards
        
    def calc_death(self):
        return np.logical_or(self.own_ship.speed < self.own_ship.specs["min_speed"], self.own_ship.pos >= self.other_ship.pos)
        
    def reset(self):
        self.own_ship = ShipVector(np.zeros(self.dim), np.repeat(self.specs["min_speed"]*1.2, self.dim), self.specs)
        self.other_ship = ShipVector(np.random.uniform(100, 1000, self.dim), np.repeat(self.specs["min_speed"]*1.2, self.dim), self.specs)
        
        self.history = []
        
        return self.encode_state()
    
    def reset_indices(self, indices):
        self.own_ship.pos[indices] = 0
        self.own_ship.speed[indices] = self.specs["min_speed"]*1.2
        self.own_ship.accel[indices] = 0
        
        self.other_ship.pos[indices] = np.random.uniform(100, 1000, sum(indices))
        self.other_ship.speed[indices] = self.specs["min_speed"]*1.2
        self.other_ship.accel[indices] = 0
        
    
    def step(self, action):
        self.own_ship.pos, self.own_ship.speed, self.own_ship.accel = self.move_ship(self.own_ship.specs,
                                                                                     self.own_ship.pos,
                                                                                     self.own_ship.speed,
                                                                                     self.own_ship.accel,
                                                                                     action)
        self.calc_other()
        
        self.history.append(np.copy((self.own_ship.pos, self.own_ship.speed, self.own_ship.accel, self.other_ship.pos, self.other_ship.speed, self.other_ship.accel)))
        
        deaths = self.calc_death()
        rewards = self.calc_reward()
        
        if np.any(deaths):
            self.reset_indices(deaths)
        
        return self.encode_state(), rewards, deaths, None
        
        
    def render(self):
        data = np.stack(self.history)
        
    

In [19]:
env = ShipVectorEnv(ship, 2)

In [20]:
env.reset()

array([[335.77763251,   2.        ,   0.        ],
       [815.40263391,   2.        ,   0.        ]])

In [21]:

for _ in range(100):
    env.step([0.1, 0.3])

In [22]:
x = np.stack(env.history)

In [41]:
class ShipNormalizer(gym.Env):
    def __init__(self, env):
        self.env = env
        self.metadata = env.metadata
        
        self.clip = 3
        self.action_space = gym.spaces.Box(
            low = -np.ones(env.action_space.shape),
            high = np.ones(env.action_space.shape),
            dtype = env.action_space.dtype
        )
        
        self.observation_space = gym.spaces.Box(
            low = -np.ones(env.observation_space.shape)*self.clip,
            high = np.ones(env.observation_space.shape)*self.clip,
            dtype = env.observation_space.dtype
        )
        
        self.act_mean = (env.action_space.high + env.action_space.low) / 2
        self.act_std = env.action_space.high - env.action_space.low
        
        self.obs_mean = (env.observation_space.high + env.observation_space.low) /2
        self.obs_std = env.observation_space.high - env.observation_space.low
        
    def norm_obs(self, obs):
        return np.clip((obs - self.obs_mean) / self.obs_std, -self.clip, self.clip)
    
    def norm_act(self, act):
        return (act / 2 * self.act_std) + self.act_mean
    
    def reset(self):
        return self.norm_obs(self.env.reset())
    
    def step(self, action):
        action = self.norm_act(np.array(action))
        s,r,d,i = self.env.step(action)
        return self.norm_obs(s),r,d,i
    
    

In [42]:
env2 = ShipNormalizer(env)


In [43]:
env2.reset()

array([[-0.04912994, -0.212     ,  0.        ],
       [-0.39184138, -0.212     ,  0.        ]])

In [44]:
env2.step([-1,-1])

(array([[-0.04907994, -0.2192    , -0.05      ],
        [-0.39179138, -0.2192    , -0.05      ]]),
 array([-0.09018401, -0.02164172]),
 array([False, False]),
 None)

In [35]:
env2.act_std

array([1.])