In [None]:
# default_exp env

In [None]:
#hide
from nbdev.showdoc import *

In [None]:
#hide
# stellt sicher, dass beim verändern der core library diese wieder neu geladen wird
%load_ext autoreload
%autoreload 2

# Environment

The idea of this environment is to control and make it stop after a defined distance of 1000m. The agent has 4 actions which he can use. These are 

* accelrate, which accelerates that car +1m/s until a max velocity of 35 m/s is reached
* break, which slows the car down with 1 m/s, until it stands
* keep velocity
* declutch, which slows the car with 0.1 m/s

There are the following rewards

* overshoot the distance: -1000 points
* stop in the last meter of the distance: +100 points
* move towards the end: 0 points
* standing still: -1 point

The environment delivers the following observations

* distance travelled in meters (float)
* distance till the end is reached in meters (float)
* current velocity in m/s (float)

The agent can execute an action at every second.

To move the car, energy is used. 
* 1 energy unit is used to move the car for one second at 10 m/s
* during acceleration, additional energy of 5% of the end velocity is used.. so if the car increases it speed from 9ms to 10ms, 1.05 energy units are consumed
* break and declutch don't use energy

There are a few options that can be configured
* penalty for using energy -> 1 unit -1 point
* stochastic environment: random value of +/- 20% when accelerate, using breack or declutch


In [None]:
#export
import gym
from gym.spaces import Tuple, Discrete, Box
import numpy as np
import random

class CarEnv(gym.Env):
    """
    Actions:
    0: accelerate
    1: break
    2: keep velocity
    3: declutch
    
    Observationspace
    1: distance travelled in meters
    2: distance to go in meters
    3: current velocity
    
    Rewards:
    - standing still     -1
    - goal reached      100
    - overshoot       -1000
    - closer to target    0
    - Option: not reaching goal within time -1000
    - Option: per used 1 Unit of Energy -1

    Options:
    - Penalty for using energy
    - Stochastic environment / random environment -> accelaration and deceleration have a random part
    - limit then number of possible steps. if target is not reached after a certain amaount of steps, 
    """
    def __init__(self, mode_energy_penalty:bool = False, mode_random:bool = False, mode_limit_steps:bool = False):
        super(CarEnv, self).__init__()
        
        self.mode_energy_penalty = mode_energy_penalty
        self.mode_random = mode_random
        self.mode_limit_steps = mode_limit_steps
        
        # define distance
        self.distance: float = 1000.0
        self.maxspeed: float = 35.0 # maxspeed 35/ms
        self.velocityenergy_unit: float = 10.0 # 1 second at the speed of velocityenergy_unit uses 1 energy unit
        self.accelerationenergy_factor: float = 0.05 # acceleration uses an extra amount of engergy
        self.max_timesteps = int(self.distance)
                
        self.currentposition: float = 0.0
        self.currentvelocity: float = 0.0
        self.usedenergy: float = 0.0
        self.is_done: bool = False
        
       
        # definition of observation value array
        low = np.array([0.0,
                        0.0,
                        0.0],
                       dtype=np.float32
        )
        
        high = np.array([self.distance,
                         self.distance,
                         self.maxspeed],
                         dtype=np.float32
        )
        
        self.observation_space = Box(low, high, dtype=np.float32)
        
        self.action_space = Discrete(n=4)
        
        self.step_index = 0

        
    def reset(self):
        self.currentposition = 0.0
        self.currentvelocity = 0.0
        self.usedenergy = 0.0
        self.is_done = False
        self.step_index = 0
        
        return self._calculate_state()
    
    def _calculate_state(self):
        return np.array([self.currentposition,
                         self.distance - self.currentposition,
                         self.currentvelocity
                        ], dtype=np.float32)
    
    
    def _set_new_velocity(self, acceleration:float):
        
        # in case of randommode, accelleration and decelaration have a uniform random part
        if self.mode_random:
            acceleration += acceleration * random.uniform(-0.2, +0.2)
        
        self.currentvelocity = max(0, min(self.maxspeed, self.currentvelocity + acceleration))
        
    
    def step(self, action):
        zero_state = np.array([0.0, 0.0, 0.0], dtype=np.float32)
        if self.is_done:
            return zero_state, 0, self.is_done
        
        err_msg = "%r (%s) invalid" % (action, type(action))
        assert self.action_space.contains(action), err_msg
        
        energy_for_step:float = 0.0
        reward = 0
        
        if action==0:
            # accelerate
            self._set_new_velocity(1.0)
            energy_for_step = (self.currentvelocity / self.velocityenergy_unit) * (1 + self.accelerationenergy_factor)
            
        if action==1:
            # break
            self._set_new_velocity(-1.0)
            energy_for_step = 0.0 # using the breaks doesn't need energy
            
        if action==2:
            # keep velocity
            energy_for_step = (self.currentvelocity / self.velocityenergy_unit)
            
        if action==3:
            # declutch
            self._set_new_velocity(-0.1)
            energy_for_step = 0.0 # declutch doesn't use energy
        
        if self.mode_random:
            self.currentvelocity += self.currentvelocity * random.uniform(-0.2, +0.2)
        
        self.currentposition += self.currentvelocity
        self.step_index      += 1
        self.usedenergy      += energy_for_step
        
        goal_reached = bool(self.currentposition > self.distance -1 
                           and self.currentposition <= self.distance
                           and abs(self.currentvelocity - 0.0)<0.00001)
        
        overshoot = self.currentposition > self.distance
        
        timeup = self.mode_limit_steps and (self.step_index >= max_timesteps)
        
        if overshoot or timeup:
            reward = -1000
            self.is_done = True
            return zero_state, reward, self.is_done, {}
            
        if goal_reached:
            self.is_done = True
            reward = 100
            return zero_state, reward, self.is_done, {}
        
        if abs(self.currentvelocity - 0.0)<0.00001: 
            reward = -1
        if self.mode_energy_penalty:
            reward -= self.usedenergy
  
        return self._calculate_state(), reward, self.is_done, {}

In [None]:
def test_init():
    env = CarEnv()
    result = env.reset()
    
    assert result[0] == 0.0
    assert result[1] == env.distance
    assert result[2] == 0.0

In [None]:
def test_accelerate():
    env = CarEnv()
    env.reset()
    
    state, reward, done, _ = env.step(0)
    
    assert reward == 0
    assert done == False
    assert state[0] == 1.0
    assert state[1] == 999.0
    assert state[2] == 1.0

In [None]:
def test_break():
    env = CarEnv()
    env.reset()
    
    env.step(0)
    state, reward, done, _ = env.step(1)
    
    assert reward == -1
    assert done == False
    assert state[0] == 1.0
    assert state[1] == 999.0
    assert state[2] == 0.0

In [None]:
def test_keep_velocity():
    env = CarEnv()
    env.reset()
    env.step(0)
    
    state, reward, done, _ = env.step(2)
    
    assert reward == 0
    assert done == False
    assert state[0] == 2.0
    assert state[1] == 998.0
    assert state[2] == 1.0

In [None]:
def test_declutch():
    env = CarEnv()
    env.reset()
    env.step(0)
    
    
    state, reward, done, _ = env.step(3)

    assert reward == 0
    assert done == False
    assert abs(state[0] - 1.9)<0.001
    assert abs(state[1] - 998.1)<0.001
    assert abs(state[2] - 0.9)<0.001

In [None]:
test_init()
test_accelerate()
test_break()
test_keep_velocity()
test_declutch()