In [11]:
# Model design
import agentpy as ap
import networkx as nx 

# Visualization
import matplotlib.pyplot as plt 
import osmnx as ox 
from pyrosm import OSM
import networkx as nx
from shapely.geometry import Point
import numpy as np
import copy

In [18]:

class Pedestrian(ap.Agent):

    def setup(self):
        # Init random number generator
        rng = self.model.random
        
        # Initialize attributes and variables
        self.walking_speed = rng.random() + 1
        self.metric_path = [] 
        self.timer = 0
        
        # TODO: Improve bb for random org/dest selection to include all of the filmed area 
        # OSM data is in WGS84 so typically we need to use lat/lon coordinates when searching for the closest node
        # Choose random origin within boundaries of Quakenbrueck
        self.orig_y = 52.677063 + rng.random() * 0.002921
        self.orig_x = 7.949311 + rng.random() * 0.010128
        
        # Choose random destination within boundaries of Quakenbrueck
        self.dest_y = 52.677063 + rng.random() * 0.002921
        self.dest_x = 7.949311 + rng.random() * 0.010128
        
        # Find the closest nodes in the network for origin and destination
        self.orig_node_id, self.dist_to_orig = ox.distance.nearest_nodes(self.model.G, self.orig_x, self.orig_y, return_dist=True)
        self.dest_node_id, self.dist_to_dest = ox.distance.nearest_nodes(self.model.G, self.dest_x, self.dest_y, return_dist=True)
        
        # Set current location to the origin node
        self.location = self.model.nodes.loc[[self.orig_node_id]]
        
        # Compute shortest path to destination
        self.agent_compute_path()
        

    def agent_compute_path(self):
        """ Calculate the shortest path from the current location to the destination. """
        self.metric_path = nx.dijkstra_path(self.model.G, source=self.location.id.values[0], target=self.dest_node_id, weight='length')
            
    def increase_timer(self):
        """ Increases the agents inner timer. """
        self.timer += 1        
    
    def get_next_position(self, walking_speed, duration):
        # if path is shorter than 2, destination is reached, return False
        if len(self.metric_path) < 2:
            return 
        # else compute position in next timestep
        else:
            # calculate the distance the agent passes by during the current timestep, to prevent agent from walking further than the next node
            walking_distance = walking_speed * duration
            # set crs to current 
            orig_crs = 'epsg:4326'
            current_node = self.location.set_crs(orig_crs).to_crs('EPSG:32643')
            next_node = self.model.nodes.loc[[self.metric_path[1]]].set_crs(orig_crs).to_crs('EPSG:32643')
            # get coordinates of current location and next node 
            current_node_x = current_node.at[current_node.id.values[0],'geometry'].x
            current_node_y = current_node.at[current_node.id.values[0],'geometry'].y
            next_node_x = next_node.at[next_node.id.values[0],'geometry'].x
            next_node_y = next_node.at[next_node.id.values[0],'geometry'].y 
            # create vector and calculate length
            vector = [next_node_x - current_node_x, next_node_y - current_node_y]
            norm = np.linalg.norm(vector)
            # check if pedestrian would walk past next node or if length of vector is 0, in that case:
            if norm < walking_distance or norm == 0:
                # reset crs
                next_node = next_node.to_crs(orig_crs)
                # set current node to location of next node
                self.metric_path.pop(0)
                self.location = next_node
                return
            # if next node is not reached go on with calculation of next position
            else:
                # compute normalized vector
                normalized_vector = vector/norm
                # create deep copy of current node and reset crs of orig pointer
                new_current_node = copy.deepcopy(current_node)
                current_node = current_node.to_crs(orig_crs)
                # calculate new position and set point geometry, finally transform to orig crs
                new_lon = current_node_x +  walking_speed * duration * normalized_vector[0]
                new_lat = current_node_y + walking_speed * duration * normalized_vector[1]
                new_current_node.loc[:, 'geometry'] = Point(new_lon, new_lat)
                new_current_node = new_current_node.set_crs(epsg=32643).to_crs(orig_crs)
                # set current location to new position 
                self.location = new_current_node
                return
    
class MyModel(ap.Model):

    def setup(self):
        # load quakenbrueck data
        osm = OSM("./quakenbrueck.osm.pbf")

        # Parse roads that can be walked by pedestrian
        self.roads = osm.get_network(network_type="walking")
        # get nodes and edges
        self.nodes, self.edges = osm.get_network(network_type="walking", nodes=True)
        
        # Specify "id" as the index for nodes
        self.nodes = self.nodes.set_index("id", drop=False)
        self.nodes = self.nodes.rename_axis([None])
        # Convert edge and nodes attribute information into a dictionary format
        self.edge_attributes = self.edges.to_dict(orient="index")
        self.node_attributes = self.nodes.to_dict(orient="index")
        # create a routable graph with osmnx
        self.G = osm.to_graph(self.nodes, self.edges, graph_type="networkx")
        # plot the graph (TODO: does not work currently!)
        ox.plot_graph_folium(self.G)

        # Create a list of agents 
        self.agents = ap.AgentList(self, self.p.agents, Pedestrian)
        # Store list of inital paths into global list  
        self.routes = self.agents.metric_path
        
        # opt. visualize blank street network
        if self.p.viz:
            self.ax = self.edges.plot(figsize=(10,10), color="gray", lw=1.0)


    def step(self):
        """ Call a method for every agent. """
        # Calculate next position for all agents 
        self.agents.get_next_position(walking_speed=1.5, duration = 5)

    def update(self):
        """ Record a dynamic variable. """
        # self.agents.record('walking_speed')
        self.agents.record('metric_path')
        self.model.record('G')
        # store all the agents current location in list 
        self.positions = self.agents.location
        
        # optionally visualize current position for all agents on (static) map  
        if self.p.viz:
            self.ax = self.edges.plot(figsize=(10,10), color="gray", lw=1.0)
            self.ax = self.positions.plot(ax=self.ax, color="red", markersize=5)
            # try to visualize all routes (TODO: does not work as of now!)
            fig, ax = plt.subplots()
            for x in self.routes:
                fig, ax = ox.plot_graph_route(self.G, x, ax=ax)

    def end(self):
        """ Report an evaluation measure. """

# specify some parameters
parameters = {
    'agents': 10,
    'steps': 5,
    'comment': False,
    'viz': False
}

# Run the model!
model = MyModel(parameters)
results = model.run()        

Completed: 5 steps
Run time: 0:00:08.704188
Simulation finished


In [15]:
import IPython


In [21]:
def animation_plot(m, ax1):
    ax1.set_title("Pedestrian Movement")
    # Plot network with agents
    m.edges.plot(ax=ax1, figsize=(40,40), color="gray", lw=1.0)
    m.positions.plot(ax=ax1, color="red", markersize=5)

# create empty plot to start
fig = plt.figure(figsize=(20,20))
ax = fig.add_subplot()

# specify parameters
parameters = {
    'agents': 50,
    'steps': 20,
    'comment': False,
    'viz': False
}
# run model and display animatiun
animation = ap.animate(MyModel(parameters), fig, ax, animation_plot)
IPython.display.HTML(animation.to_jshtml()) 

<Figure size 432x288 with 0 Axes>