In [321]:
# Model design
import agentpy as ap
import numpy as np

# Visualization
import matplotlib.pyplot as plt
import seaborn as sns
import IPython

from pysistence import make_list #pip install pysistence

In [322]:
def normalizacion(v):
    norm = np.linalg.norm(v)
    if norm == 0:
        return v
    return v / norm

In [323]:
class Car(ap.Agent):
    """ An agent with a position and velocity in a continuous space,
    who follows Craig Reynolds three rules of flocking behavior;
    plus a fourth rule to avoid the edges of the simulation space. """

    def setup(self):

        self.velocity =  [1.0, 0.000] 
        self.acceleration = [0 , 0]          
        self.close_car = False

    def setup_pos(self, space):

        self.space = space
        self.neighbors = space.neighbors
        self.pos = space.positions[self]
        
    def update_velocity(self):

        pos = self.pos
        ndim = self.p.ndim

        nbs = self.neighbors(self, distance=self.p.outer_radius)
        nbs_len = 0
        nbs_pos_array = np.array(nbs.pos)
        for nbs_check in nbs_pos_array:
            if (nbs_check[0] -  pos[0]  > 0 and nbs_check[0] -  pos[0]  < 10 ) and (nbs_check[1] -  pos[1]  > -1 and nbs_check[1] -  pos[1]  < 1 ) :
                nbs_len += 1
        nbs_vec_array = np.array(nbs.velocity)
        if nbs_len > 0:
            if self.velocity[0] > 0.0:
                v1 = np.array([-1, 0.])
            else:
                v1 = np.zeros(ndim)
        else:
            if self.velocity[0] < 1.0:
                v1 = np.array([1, 0.])
            else:
                v1 = np.zeros(ndim)

        
    
        self.velocity += v1
        self.velocity = normalizacion(self.velocity)

    def update_position(self):

        self.space.move_by(self, self.velocity)
        

In [324]:
[1.0, 0.000][1]

0.0

In [325]:
class CarIncor(ap.Agent):
    """ An agent with a position and velocity in a continuous space,
    who follows Craig Reynolds three rules of flocking behavior;
    plus a fourth rule to avoid the edges of the simulation space. """

    def setup(self):

        self.velocity =  [10.0, 1.0] 
        self.acceleration = [0 , 0]          
        self.close_car = False

    def setup_pos(self, space):

        self.space = space
        self.neighbors = space.neighbors
        self.pos = space.positions[self]
        
    def update_velocity(self):

        pos = self.pos
        ndim = self.p.ndim

        nbs = self.neighbors(self, distance=self.p.outer_radius)
        nbs_len = 0
        nbs_pos_array = np.array(nbs.pos)
        for nbs_check in nbs_pos_array:
            if (nbs_check[0] -  pos[0]  > 0 and nbs_check[0] -  pos[0]  < 10 ) and (nbs_check[1] -  pos[1]  > -1 and nbs_check[1] -  pos[1]  < 1 ) :
                nbs_len += 1
        nbs_vec_array = np.array(nbs.velocity)
        v1 = np.zeros(ndim)
        if pos[1] >= 5.0:
            self.velocity = [1, 0.] # Cambiar 10 por velocidad a la cual se tiene que disminuir
            pos[1] = 5.0
            if nbs_len > 0:
                if self.velocity[0] >= 0.0:
                    v1 = np.array([-1.0, 0.])
                else:
                    if self.velocity[0] <= 1.0:
                        v1 = np.array([1.0, 0.])
        else:
            if nbs_len > 0:
                if self.velocity[0] >= 0.0:
                    self.velocity = np.array( [0.0, 0.0] )
            else:
                if self.velocity[0] <= 10.0:
                    self.velocity = np.array( [10.0, 1.0] )

        
    
        self.velocity += v1
        self.velocity = normalizacion(self.velocity)


        

        self.velocity = normalizacion(self.velocity)

    def update_position(self):

        self.space.move_by(self, self.velocity)
        if self.pos[1]>=5.0:
            self.pos[1]=5.0
        

In [326]:
class MergeCar(ap.Agent):
    pass

In [327]:
def parse_positions(positions: list) -> list[tuple]:
    """Return paramater positions as a list of tuples so we can use add_agents method."""
    pass    


def initialize_starting_positions(car_amount) -> list[tuple]:
    """Return a list of tuples as starting position for cars."""
    pass

In [328]:
class TrafficFlowModel(ap.Model):
    

    def setup(self):
        """ Initializes the agents and network of the model. """


        if self.p.cars_pos:
            # Initialize model with existing car positions
            self.space = ap.Space(self, shape=[self.p.sizeX,self.p.sizeY])
            self.agents = ap.AgentList(self, self.p.population, Car)
            self.agentsIncor = ap.AgentList(self, self.p.populationIncor, CarIncor)
            self.space.add_agents(self.agents,self.p.cars_pos, random=False)
            self.space.add_agents(self.agentsIncor,self.p.cars_posIncor, random=False)
            self.agents.setup_pos(self.space)
            self.agentsIncor.setup_pos(self.space)


        elif self.p.car_amount:
            # Initialize new model with amount of car parameter
            # Set up agents (cars)

            self.space = ap.Space(self, shape=[self.p.sizeX,self.p.sizeY])
            self.agents = ap.AgentList(self, self.p.population, Car)
            self.agentsIncor = ap.AgentList(self, self.p.populationIncor, CarIncor)
            self.space.add_agents(self.agents,self.p.cars_pos, random=False)
            self.space.add_agents(self.agents,self.p.cars_posIncor, random=False)
            self.agents.setup_pos(self.space)
            self.agentsIncor.setup_pos(self.space)

        else:
            # Stop simulation and return error
            pass

    def step(self):

        # Update velocity and position for cars in every step

        self.agents.update_velocity()  # Adjust direction
        self.agentsIncor.update_velocity() 
        self.agents.update_position()  # Move into new direction
        self.agentsIncor.update_position() 


        # Check if any of the cars have a car nearby. If so, update agent.close_car to True

        # Update VSL value depending on density of cars in one specific area in grid.



    def calculate_VSL(self):
        # Calculate VSL inside a specific range
        pass

In [329]:
def animation_plot_single(m, ax):
    ndim = m.p.ndim
    ax.set_title(f"Filas de vehiculos en movimiento {ndim}D t={m.t}")
    pos = m.space.positions.values()
    pos = np.array(list(pos)).T  # Transform
    ax.scatter(*pos, s=1, c='black')
    ax.set_xlim(0, m.p.sizeX)
    ax.set_ylim(0, m.p.sizeY)
    if ndim == 3:
        ax.set_zlim(0, m.p.size)
    ax.set_axis_off()

def animation_plot(m, p):
    projection = '3d' if p['ndim'] == 3 else None
    fig = plt.figure(figsize=(10,4))
    ax = fig.add_subplot(111, projection=projection)
    animation = ap.animate(m(p), fig, ax, animation_plot_single)
    return IPython.display.HTML(animation.to_jshtml(fps=20))

In [330]:
parameters2D = {
    'sizeX': 100,
    'sizeY': 10,
    'seed': 123,
    'steps': 100,
    'ndim': 2,
    'population': 5,
    'populationIncor': 2,
    'inner_radius': 6,
    'outer_radius': 6,
    'cars_pos':[np.array([1, 5.]),np.array([6, 5.]),np.array([12, 5.]),np.array([18, 5.]),np.array([24, 5.])],
    'cars_posIncor':[np.array([20, 2.]),np.array([29, 3.])]

}

In [331]:
animation_plot(TrafficFlowModel, parameters2D)

[40.89578099  4.0895781 ]
1
[40.89578099  4.0895781 ]
1
[40.89578099  4.0895781 ]
1
[40.89578099  4.0895781 ]
1
