In [1]:
from matplotlib import pyplot as plt
from matplotlib import colors

import numpy as np
import random

import time
from IPython.display import clear_output

grid_rows, grid_columns = 5, 18

# count how many cars in and enter rest area:
cars_ra, cars_ra_enter = 0, 0

# Position y from top to bottom: 0, 1, 2, 3, ...

# Environment functions
#------------------------------------------------------------------------------------------------------------------------
def create_grid_base(rows, cols):
    grid_base = [[0 for _ in range(cols)] for _ in range(rows)]
    return grid_base

def create_grid_lane(grid_env, pos_y, start, length):
    for x in range(0, length):
        grid_env[pos_y][start + x] = 1
    return grid_env

def create_rest_area(grid_env, pos_y, pos_x):
    grid_env[pos_y][pos_x] = 2
    return grid_env
#------------------------------------------------------------------------------------------------------------------------

# Agent functions
#------------------------------------------------------------------------------------------------------------------------
def create_grid_move(pos_y, pos_x):
    grid_move_0 = create_grid_base(grid_rows, grid_columns) # current position
    grid_move_l = create_grid_base(grid_rows, grid_columns) # turn left
    grid_move_s = create_grid_base(grid_rows, grid_columns)
    grid_move_r = create_grid_base(grid_rows, grid_columns) # turn right

    try:
        grid_move_0[pos_y][pos_x] = 1
        grid_move_l[pos_y-1][pos_x+1] = 1
        grid_move_s[pos_y+0][pos_x+1] = 1
        grid_move_r[pos_y+1][pos_x+1] = 1
    except: # if agent arrived at destination
        pass

    return [grid_move_0, grid_move_l, grid_move_s, grid_move_r]

def prop_agent(grid_env, pos_y, pos_x, drowse, status):
    # if drowse > 5, need to visit rest_area
    #drowse = 1
    grid_pos = create_grid_move(pos_y, pos_x)
    if status == "Arrived":
        agent = [grid_pos, drowse, -1, status] # agent[0] is the position and future movement of the agents
    else:
        agent = [grid_pos, drowse, grid_env[pos_y][pos_x], status] # agent[0] is the position and future movement of the agents
    return agent

def drowse_rn_init():
    random.seed(None)
    drowse = 6*random.random()
    return drowse

def pos_find(grid_move): # for agent position and grid movement
    for i, row_move in enumerate(grid_move):
        for j, val in enumerate(row_move):
            if val == 1:
                pos_y = i
                pos_x = j
                break
    return [pos_y, pos_x]

def agents_rn_init(grid_env, n): # agent random spawn, initial
    random.seed(42)
    # All coordinates that satisfy the constraint
    eligible = [
        (r, c)
        for r, row in enumerate(grid_env)
        for c, v   in enumerate(row)
        if v == 1
    ]
    #print(eligible)
    if n > len(eligible):
        raise ValueError(
            f"Only {len(eligible)} eligible spots, but you asked for {n} agents."
        )
    agents = []
    for pos_y, pos_x in random.sample(eligible, n):
        #print(pos_y, pos_x)
        agent = prop_agent(grid_env, pos_y, pos_x, drowse_rn_init(), "Drive")
        agents.append(agent)
    return agents

def agents_rn_spawn(grid_env, grid_agents, agents):
    random.seed(None)
    # All coordinates that satisfy the constraint
    c = 0 # column or pos_x 0
    eligible = [
        (r, c)
        for r, row in enumerate(grid_env)
        if row[c] == 1 and grid_agents[r][c] == 0
    ]
    #print(eligible)
    rn_spawn = random.random()
    #print(rn_spawn)
    if rn_spawn >= 0.5 and len(eligible) > 0:
        for pos_y, pos_x in random.sample(eligible, 1):
            #print(pos_y, pos_x)
            agent = prop_agent(grid_env, pos_y, pos_x, drowse_rn_init(), "Drive")
            agents.append(agent)
    return agents

def grid_agents(agents): # to show the position of the agents
    grid_agent = create_grid_base(grid_rows, grid_columns)
    for agent in agents: # ag stands for agent number ...
        try:
            if agent[3] == "Drive":
                pos_curr = pos_find(agent[0][0])
                grid_agent[pos_curr[0]][pos_curr[1]] = 1
            elif agent[3] == "Crashed":
                pos_curr = pos_find(agent[0][0])
                grid_agent[pos_curr[0]][pos_curr[1]] = 4
        except:
            pass
    return grid_agent
#------------------------------------------------------------------------------------------------------------------------

# Agent movement functions
#------------------------------------------------------------------------------------------------------------------------
def agent_brake(agent, agents):
    ag_i_next_l = agent[0][1]
    ag_i_next_s = agent[0][2]
    ag_i_next_r = agent[0][3]
    for j in range(len(agents)):
        ag_j_next_l = agents[j][0][1]
        ag_j_next_s = agents[j][0][2]
        ag_j_next_r = agents[j][0][3]
        
        if ((ag_i_next_s ==  ag_j_next_l or ag_i_next_s ==  ag_j_next_r) and 
            (agents[j][3] == "Drive" and agents[j][2] == 1)):
            pass
            
def crash_check(agents):
    for i in range(len(agents)):
        if agents[i][3] == "Arrived" or agents[i][3] == "Crashed":
            continue
        for j in range(len(agents)):
            if agents[j][3] == "Arrived" or agents[j][3] == "Crashed":
                continue
            if agents[i][0] == agents[j][0] and i != j and agents[i][2] == 1 and agents[j][2] == 1:
                agents[i][3] = "Crashed"
                agents[j][3] = "Crashed"
    return agents

def ra_in(agents): # Agents in rest area
    ra_agents = [item for item in agents if item[2] == 2] # agents in ra
    return ra_agents

def ra_select(ra_agents): # From ra_in
    ra_agents_cont = [item for item in ra_agents if item[1] < 3] # agents continue candidate
    if len(ra_agents_cont) > 0:
        ra_agent_cont = random.choice(ra_agents_cont)
    else:
        ra_agent_cont = []
    return ra_agent_cont

def ra_out_rn(agent, agents, ra_agent_cont):
    ra_out = 0
    if agent == ra_agent_cont:
        for j in range(len(agents)):
            if isinstance(agents[j], str) or agent == agents[j]: # there should be the coordinate of agent[0]
                continue
            if agents[j][0][2] == agent[0][3]: # if car near ra has the same next spot as car in ra
                return ra_out
        rn_out = random.random()
        if rn_out >= 0.5:
            ra_out = 1
    return ra_out

def direction_agent(agent, grid_env):

    status = agent[3]
    drowse = agent[1]

    pos_next_l = pos_find(agent[0][1]) #[y, x]
    pos_next_s = pos_find(agent[0][2]) #[y, x]
    pos_next_r = pos_find(agent[0][3]) #[y, x]

    pass

def move_agent(agent, agents, grid_env, ra_agent_cont):
    global cars_ra, cars_ra_enter
    status = agent[3]
    drowse = agent[1]
    
    try:
        if status == "Drive":
            agent_beneath = agent[2]
            pos_curr = pos_find(agent[0][0])
            pos_next_l = pos_find(agent[0][1]) #[y, x]
            pos_next_s = pos_find(agent[0][2]) #[y, x]
            pos_next_r = pos_find(agent[0][3]) #[y, x]

            rn_ra_enter = random.random() # in case the agent wants to enter ra
            # take a break
            if agent_beneath == 2:
                if drowse >= 3:
                    drowse = drowse - 1
                    agent[1] = drowse
                else:
                    ra_out = ra_out_rn(agent, agents, ra_agent_cont)
                    if ra_out == 1:
                        agent = prop_agent(grid_env, pos_next_r[0], pos_next_r[1], drowse, "Drive")
                        cars_ra = cars_ra - 1
                    else:
                        drowse = drowse - 1
                        agent[1] = drowse
            # move forward, left, right
            elif grid_env[pos_next_l[0]][pos_next_l[1]] == 2 and drowse >= 5 and rn_ra_enter >= 0.5:
                agent = prop_agent(grid_env, pos_next_l[0], pos_next_l[1], drowse, "Drive")
                cars_ra = cars_ra + 1
                cars_ra_enter = cars_ra_enter + 1
            elif grid_env[pos_next_s[0]][pos_next_s[1]] == 1:
                drowse = drowse + 0.25
                agent = prop_agent(grid_env, pos_next_s[0], pos_next_s[1], drowse, "Drive")
            #print_grid(agent[0][0])
            #print()
    except:
        #agent = prop_agent(grid_env, -1, -1, drowse, "Arrived")
        agent[3] = "Arrived"
    return agent

def move_agents(agents, grid_env): # move all agents for one step
    ra_chosen = ra_select(ra_in(agents))
    for i in range(len(agents)):
        agents[i] = move_agent(agents[i], agents, grid_env, ra_chosen)
    return agents
#------------------------------------------------------------------------------------------------------------------------

# Misc.
#------------------------------------------------------------------------------------------------------------------------
def print_grid(grid):
    for row in grid:
        print(row)

def count_agent(agents):
    statuses = []
    for agent in agents:
        statuses.append(agent[3]) # what status in each agents
    drive = statuses.count("Drive")
    arrived = statuses.count("Arrived")
    crashed = statuses.count("Crashed")
    return drive, arrived, crashed

def dupe_check(agents):
    has_dupe = False
    for i in range(len(agents)):
        if agents[i][3] == "Arrived" or agents[i][3] == "Crashed":
            continue
        for j in range(len(agents)):
            if agents[j][3] == "Arrived" or agents[j][3] == "Crashed":
                continue
            if agents[i][0] == agents[j][0] and i != j and agents[i][2] == 1 and agents[j][2] == 1:
                has_dupe = True
                return has_dupe
    return has_dupe

def agent_det(n):
    agents = []
    for i in range(n):
        agent = prop_agent(grid_env, 3, 10, 1, "Drive")
        agents.append(agent)
    return agents

<h2>Initial state</h2>

In [2]:
# Create the base grid
grid_env = create_grid_base(grid_rows, grid_columns)

# Create highway
grid_env = create_grid_lane(grid_env, 2, 0, grid_columns)
grid_env = create_grid_lane(grid_env, 3, 0, grid_columns)

# Create rest area
grid_env = create_rest_area(grid_env, 1, 15)

#cars = agents_rn_init(grid_env, 5)
cars_ra, cars_ra_enter = 0, 0
cars = agent_det(2)

# Create grid for car position
grid_cars = grid_agents(cars)

# Print environment and agents grid
print_grid(grid_env)
print()
print_grid(grid_cars)

[0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0]
[0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 2, 0, 0]
[1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1]
[1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1]
[0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0]

[0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0]
[0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0]
[0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0]
[0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1, 0, 0, 0, 0, 0, 0, 0]
[0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0]


<h2>Code Run</h2>

In [5]:
# Create the base grid
grid_env = create_grid_base(grid_rows, grid_columns)

# Create highway
grid_env = create_grid_lane(grid_env, 2, 0, grid_columns)
grid_env = create_grid_lane(grid_env, 3, 0, grid_columns)

# Create rest area
grid_env = create_rest_area(grid_env, 1, 15)

#cars = agents_rn_init(grid_env, 5)
cars_ra, cars_ra_enter = 0, 0
cars = agent_det(2)

cars = crash_check(cars)

# Create grid for car position
grid_cars = grid_agents(cars)

# Print environment and agents grid
print_grid(grid_env)
print()
print_grid(grid_cars)

[0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0]
[0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 2, 0, 0]
[1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1]
[1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1]
[0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0]

[0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0]
[0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0]
[0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0]
[0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 4, 0, 0, 0, 0, 0, 0, 0]
[0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0]


<h2>Archive</h2>

In [4]:
# # Create the base grid
# grid_env = create_grid_base(grid_rows, grid_columns)

# # Create highway
# grid_env = create_grid_lane(grid_env, 2, 0, grid_columns)
# grid_env = create_grid_lane(grid_env, 3, 0, grid_columns)

# # Create rest area
# grid_env = create_rest_area(grid_env, 1, 15)

# #cars = agents_rn_init(grid_env, 5)
# cars_ra, cars_ra_enter = 0, 0
# #cars = [prop_agent(grid_env, 2, 0, 6, "Drive")]
# cars = []
# # Create grid for car position
# grid_cars = grid_agents(cars)

# for i in range(400):
#     cars = move_agents(cars, grid_env)
    
#     # Spawn next car
#     cars = agents_rn_spawn(grid_env, grid_cars, cars)
#     grid_cars = grid_agents(cars)

#     # Print environment grid
#     print_grid(grid_env)
#     print()

#     # Car position update
#     print_grid(grid_cars)
#     print()
#     print("Time elapsed:", i+1)
#     print("Total agents spawned:", len(cars))
#     print(count_agent(cars))
#     print(cars_ra, cars_ra_enter)
        
#     # Refresh output
#     #time.sleep(0.25)
#     clear_output(wait=True)