In [1]:
import numpy as np
import pandas as pd
import math
import itertools

In [2]:
class Square_Crossroads():
    
    def __init__(self, n_time_steps, seed, dist_cars):
        
        # n_time_steps, length and width are not used yet, but will probably be used later
        self.n_time_steps = n_time_steps
        self.length = 10
        self.width = 10
        
        
        self.dist_cars = dist_cars
        
        # initial 4 cars
        self.cars = np.array([0, 1, 2, 3])
        
        # create 4 entrances
        self.state_a = [0, 5]
        self.state_b = [5, 0]
        self.state_c = [5, 10]
        self.state_d = [10, 5]
        self.exits = [self.state_a, self.state_b, self.state_c, self.state_d]

        self.set_seed(seed)
        self.reset()
                    
        
    def step(self, action_space):
        
        # not done yet
        # crash reward 
        # static reward (if velocity = 0, -5?)
        # timestamp reward (-1)
        
        # calculate the exact place we will get to using curvature = action_space[0] and acceleration = action_space[1]
        
        print(f"Taking a step in the environment with these actions: \n curvature = {action_space[0]}, acceleration = {action_space[1]}")
        
        done = False
        reward = 0
        time_reward = -1
        static_reward = 0
        reward_crash = 0
        reward_success = 0
        
        # Take a step
        
        
        # calculate crash reward
        reward_crash = self.check_crash(self.dist_cars)
        
        reward_success = self.check_success()

        reward_static = self.check_static()
        
        reward = reward_crash + static_reward + time_reward + reward_success
        return self.states, reward, done
    
    
    def set_seed(self, seed):
        np.random.seed(seed)
    
    
    def new_car(self):
        # add a new car
        
        last_car = self.cars[-1]
        new_car = last_car + 1
        
        # add new car index to list of all cars
        self.cars = np.append(self.cars, new_car)
        
        # take a random spawn exit
        spawn = np.random.choice(len(self.exits))
        
        # remove the spawn exit from the target exits
        target_indexes = list(np.arange(len(self.exits)))
        target_indexes.pop(spawn)
        
        
        # choose a random target exit (spawn exit is already excluded)
        rand_target = np.random.choice(target_indexes)
        
        # added the new car with next_index : [spawn exit, target exit]
        self.states[new_car] = [self.exits[spawn], self.exits[rand_target], 0, 0]
        
    def reset(self):
        
        # reset the whole environment
        
        print("Environment reset with param")
        
        self.states = dict.fromkeys(self.cars)
        
        for car in self.states:
            
            # remove the spawn exit from the target exits
            target_indexes = list(np.arange(len(self.exits)))
            target_indexes.pop(car)
            
            # choose a random target exit (spawn exit is already excluded)
            rand_target = np.random.choice(target_indexes)
            
            # create a dictionary of car : [spawn exit, target exit, velocity, curvature]
            self.states[car] = [self.exits[car], self.exits[rand_target], 0, 0]
    
    
    def check_crash(self, dist_cars):
        # check if a crash has occured and decrease the reward by 20 for every crash
        
        reward_crash = 0
        
        all_points = [a[0] for a in self.states.values()]
        
        for p0, p1 in itertools.combinations(all_points, 2):
            
            tmp_dist = distance(p0, p1)
            
            if tmp_dist <= dist_cars: reward_crash -=20
                
        return reward_crash
                
    def check_success(self):
        # check if a car has successfully exited and increase reward by 100 for each success
        
        reward_success = 0
        
        # list of cars that should successfully be removed
        exit = []
        
        for car, car_prop in self.states.items():
            print(self.states[car])
            if self.states[car][0] == self.states[car][1]:
                
                exit.append(car)          
                reward_success += 100
                
        for c in exit:
            del self.states[c]
            
        return reward_success
        
    def check_static(self):
        # check if velocity is 0 and decrease the reward by 10 for each car with velocity 0
        
        reward_static = 0
        
        for car_prop in self.states.values():
            
            if car_prop[2] == 0:
                reward_static += -10
                
        return reward_static
    
def distance(p0, p1):
    # calculates the distance between 2 points
    return math.sqrt((p0[0] - p1[0])**2 + (p0[1] - p1[1])**2)

In [3]:
n_times_steps = 3
seed = 10
dist_cars = 0.5

env = Square_Crossroads(n_times_steps, seed, dist_cars)

print(env.states)

Environment reset with param
{0: [[0, 5], [5, 10], 0, 0], 1: [[5, 0], [5, 10], 0, 0], 2: [[5, 10], [0, 5], 0, 0], 3: [[10, 5], [0, 5], 0, 0]}


In [4]:
curvature = np.random.uniform(-2, 4)
acceleration = np.random.uniform(-4, 16)

action_space = (curvature, acceleration)
states, reward, done = env.step(action_space)

Taking a step in the environment with these actions: 
 curvature = 0.6580896861316505, acceleration = 12.638227179424018
[[0, 5], [5, 10], 0, 0]
[[5, 0], [5, 10], 0, 0]
[[5, 10], [0, 5], 0, 0]
[[10, 5], [0, 5], 0, 0]


In [5]:
states

{0: [[0, 5], [5, 10], 0, 0],
 1: [[5, 0], [5, 10], 0, 0],
 2: [[5, 10], [0, 5], 0, 0],
 3: [[10, 5], [0, 5], 0, 0]}

In [6]:
reward

-1

In [7]:
done

False

In [8]:
env.new_car()
print(env.states)

{0: [[0, 5], [5, 10], 0, 0], 1: [[5, 0], [5, 10], 0, 0], 2: [[5, 10], [0, 5], 0, 0], 3: [[10, 5], [0, 5], 0, 0], 4: [[5, 0], [0, 5], 0, 0]}
