# Introduction

In this notebook, we implement a method for finding routes between two points of a city that minimizes the duration of the route and the amount of rain to which a vehicle is exposed.

This notebook consists of 3 auxiliary classes (`Agora`, `Grafo` and `Radar`), 3 auxiliary functions (`convert_coordinates`, `oracle` and `is_flooded`), the main class (`Experiment`), and an example.  We describe each one of those below.

## Libraries and packages

To begin, we will install and import all the libraries and packages that will be used throughout the notebook.


In [None]:
# Installing all the requirements

!pip install attrs
!pip install certifi
!pip install charset-normalizer
!pip install click
!pip install click-plugins
!pip install cligj
!pip install Fiona
!pip install geographiclib
!pip install geopandas
!pip install geopy
!pip install idna
!pip install importlib-metadata
!pip install networkx
!pip install numpy
!pip install osmnx
!pip install packaging
!pip install pandas
!pip install pyproj
!pip install python-dateutil
!pip install pytz
!pip install requests
!pip install scipy
!pip install shapely
!pip install six
!pip install tzdata
!pip install urllib3
!pip install zipp


In [None]:
# Importing all the libraries and packages that will be used

import copy
import networkx as nx
import numpy as np
import osmnx as ox
import pandas as pd
import pyproj
import requests
import shutil
import os

from geopy import distance
from shapely.geometry import Point


# Auxiliary classes and functions

In this section, we will define 3 auxiliary classes and 3 auxiliary functions.  They will all be used in the main class (`Experiment`) and in the example (below).

## Agora

The `Agora` class represents a time-like object. It provides methods to initialize a specific date and time, step forward in time, and retrieve flooding points at the current time. To create an instance of the `Agora` class, one needs to provide year, month, day, hour (24-hour clock format), minute, and second (optional) information. Once initialized, you can access and manipulate the date and time values.

In [None]:
class Agora:

    def __init__ (self, year, month, day, hour, minute, second=0):
        """
        Initializes a time-like object with the specified date and time.

        :param year (int): Year.
        :param month (int): Month.
        :param day (int): Day.
        :param hour (int): Hour.
        :param minute (int): Minute.
        :param second (int): Second (default: 0).
        """

        self.year = year
        self.month = month
        self.day = day
        self.hour = hour
        self.minute = minute
        self.second = second

    def __repr__( self):
        """
        Returns a string representation of the time in the format
        'YYYYMMDDHHMM'.
        """

        return f"{self.year:04d}{self.month:02d}{self.day:02d}{self.hour:02d}{self.minute:02d}"

    def step (self, integer):
        """
        Adds a specified number of seconds ('integer') to the current time.

        :param integer (int): Number of seconds to add.
        """

        total_seconds = self.second + integer

        # Update the seconds, minutes, and hours accordingly
        self.second = total_seconds % 60
        total_minutes = (total_seconds - self.second) // 60 + self.minute

        self.minute = total_minutes % 60
        total_hours = (total_minutes - self.minute) // 60 + self.hour

        self.hour = total_hours % 24

        # Update the day if necessary
        self.day += (total_hours - self.hour) // 24

    def get_flooding_points (self, url='https://github.com/liviatomas/floods_saopaulo/raw/main/1_input/e_Floods_2019.xlsx'):
        """
        Returns a list of flooding points at the current time.

        :param url (str): URL of the Excel file containing flooding data
            (default: 'https://github.com/liviatomas/floods_saopaulo/raw/main/1_input/e_Floods_2019.xlsx')

        :return flooding_points (list): List of flooding points. Each point is
            represented as a tuple (x, y) of coordinates.
        """

        # Formatting the date and time
        date = f"{self.year:04d}-{self.month:02d}-{self.day:02d}"
        start_time = f"{self.hour:02d}:{self.minute:02d}:{self.second:02d}"

        # Downloading the Excel file containing flooding data
        flood_df = pd.read_excel(url)

        # Filtering the data based on the current time
        my_floods = flood_df.loc[(flood_df['DATE'] == date) & (flood_df['START_T'] < start_time) & (flood_df['END_T'] > start_time)]

        # Extracting the coordinates
        flooding_points = my_floods[['X', 'Y']].values.tolist()

        # Converting coordinates
        flooding_points = [convert_coordinates(x, y) for x, y in flooding_points]

        return flooding_points


## Grafo

The `Grafo` class represents a road network graph based on geographical data obtained from OpenStreetMap. Its methods allow one to find the nearest node from geographic coordinates, crop the graph based on the fastest path between two nodes, and find an edge that links two specific nodes.  In order to initialize an instance of the `Grafo` class, one needs latitude and longitude boundary information of the area that one wants to study (for instance, a city or a neighborhood).

In [None]:
class Grafo:

    def __init__ (self, lat0, latoo, lon0, lonoo):
        """
        Initializes a graph object with latitude and longitude boundaries.
        Downloads and processes the graph data from OpenStreetMap within the
        specified boundaries.

        :param lat0 (float): Minimum latitude boundary.
        :param latoo (float): Maximum latitude boundary.
        :param lon0 (float): Minimum longitude boundary.
        :param lonoo (float): Maximum longitude boundary.
        """

        self.lat0 = lat0
        self.latoo = latoo
        self.lon0 = lon0
        self.lonoo = lonoo

        # Download and process the graph data within the specified boundaries
        self.graph = ox.graph_from_bbox(lat0, latoo, lon0, lonoo, network_type='drive')
        self.graph = ox.add_edge_speeds(self.graph)
        self.graph = ox.add_edge_travel_times(self.graph)
        self.graph = ox.distance.add_edge_lengths(self.graph)

    def get_nearest_node (self, lat, lon):
        """
        Finds the nearest node in the graph to the given latitude and
        longitude coordinates.

        :param lat (float): Latitude coordinate.
        :param lon (float): Longitude coordinate.

        :return node (int): Nearest node ID.
        """

        # Project the graph to enable us to use the nearest_nodes method
        graph_proj = ox.project_graph(self.graph)

        # Project the coordinates of the given point
        coords = [(lon, lat)]
        point = ox.projection.project_geometry(Point(coords))[0]
        x, y = point.x, point.y

        # Find the nearest node
        node = ox.distance.nearest_nodes(graph_proj, x, y, return_dist=False)

        return node

    def crop (self, initial_point, final_point):
        """
        Crops the graph based on the fastest path between two given nodes.

        :param initial_point (int): ID of the initial node.
        :param final_point (int): ID of the final node.

        :return cropped_graph (networkx.MultiDiGraph): Cropped graph
            containing only the nodes and edges around the fastest path.
        """

        # Find the fastest path between the initial and final nodes
        gmaps_path = nx.astar_path(self.graph, initial_point, final_point, weight='travel_time')

        # Retrieve latitude and longitude coordinates of the nodes along the
        # fastest path (aka Google Maps route)
        lats = [self.graph.nodes[node]['y'] for node in gmaps_path]
        lons = [self.graph.nodes[node]['x'] for node in gmaps_path]

        # Determine a tight bounding box around the path
        lat_min = min(lats)
        lat_max = max(lats)
        lon_min = min(lons)
        lon_max = max(lons)

        # Adjust the bounding box to ensure it contains the graph properly and
        # falls within the initial boundaries
        bbox_lat_min = max(self.lat0, lat_min - (lat_max - lat_min))
        bbox_lat_max = min(self.latoo, lat_max + (lat_max - lat_min))
        bbox_lon_min = max(self.lon0, lon_min - (lon_max - lon_min))
        bbox_lon_max = min(self.lonoo, lon_max + (lon_max - lon_min))

        # Crop the graph based on the adjusted bounding box
        self.graph = ox.truncate.truncate_graph_bbox(self.graph, bbox_lat_max, bbox_lat_min, bbox_lon_max, bbox_lon_min)

        return self.graph

    def reset (self):
        """
        Resets the graph to its original state, downloading and processing
        the graph data again.

        :return graph (networkx.MultiDiGraph): Reset graph.
        """

        self.graph = ox.graph_from_bbox(lat0, latoo, lon0, lonoo, network_type='drive')
        self.graph = ox.add_edge_speeds(self.graph)
        self.graph = ox.add_edge_travel_times(self.graph)
        self.graph = ox.distance.add_edge_lengths(self.graph)

        return self.graph

    def find_edge_by_nodes (self, node1, node2):
        """
        Finds an edge in the graph that links node1 to node2.

        :param node1 (int): Starting node ID.
        :param node2 (int): Ending node ID.

        :return None or edges[0] (None or tuple): Edge that links node1 to
            node2.
        """

        # Gather all the edges of the graph that start at node1 and end at node2,
        # or start at node2 and end at node1
        e1 = [e for e in self.graph.edges if e[0] == node1 and e[1] == node2]
        e2 = [e for e in self.graph.edges if e[0] == node2 and e[1] == node1]
        edges = [*e1, *e2]

        # If no edges were found, then print a message and return None.
        if len(edges) == 0:
            print(f"There are no edges linking {node1} to {node2} in graph.")
            return None

        # Otherwise, return the first edge that was found.
        else:
            return edges[0]


## Radar

The `Radar` class represents a radar that provides information on precipitation. It allows one to retrieve rainfall data from a text file and calculate the maximum rainfall along a specific edge in a given graph.

In order to initialize an instance of the `Radar` class, one needs the coordinates (latitude and longitude) of a reference point, the latitude- and longitude-deltas of the grid spacing of the data provided by a radar. Please note that the `Radar` class requires the `Agora` class for some of its functionalities.

In [None]:
class Radar:

    def __init__ (self, lat0, lon0, dlat, dlon):
        """
        Initializes a radar object with latitude and longitude information.
        This class represents the radar that provides the information on
        precipitation.

        :param lat0 (float): Minimum latitude boundary.
        :param lon0 (float): Minimum longitude boundary.
        :param dlat (float): Spacing in the latitude grid.
        :param dlon (float): Spacing in the longitude grid.
        """

        self.lat0 = lat0
        self.lon0 = lon0
        self.dlat = dlat
        self.dlon = dlon

        self.source = "https://raw.githubusercontent.com/RPvMM-2023-S1/Rain-and-flood-informed-vehicle-routing-problem/main/radar_data/R13537439_"

    def rain_by_time (self, agora):
        """
        Reads rainfall data (mm) from a text file and returns it as a matrix.

        :param agora (Agora): An instance of the Agora class that represents
            the time.

        :return A (list or None): Matrix of rainfall data. Each element
            represents the rainfall in millimeters at a specific location. If
            the data is not available, returns None.
        """

        filename = self.source + repr(agora) + ".txt"
        radar_id = self.source.split("/")[-1]
        local_filename = os.path.join('radar_cache', radar_id + repr(agora) + ".txt")

        # Check if the 'radar_cache' folder exists and create it if necessary
        if not os.path.isdir('radar_cache'):
            os.mkdir('radar_cache')

        # Check if the local file already exists
        if os.path.isfile(local_filename):
            # If the file exists, open it and read the data
            with open(local_filename, 'r') as file:
                    data = file.readlines()
        else:
            try:
                # If the file doesn't exist locally, try to download it
                response = requests.get(filename)

                # Check if the download was successful (status code 200)
                if response.status_code == 200:
                    # Extract the text data from the response and split it
                    # into lines
                    data = response.text.splitlines()

                    # Save the downloaded data to the local file
                    with open(local_filename, 'w') as file:
                            file.write('\n'.join(data))
                else:
                    # If the file doesn't exist on the server, print an error
                    # message
                    print(f"File {filename} does not exist.")
                    data = None

            except FileNotFoundError:
                # If there was an error with file handling, print an error
                # message
                print(f"File {filename} does not exist.")
                data = None

        # Data processing to create matrix A
        if data:
            # Create a matrix A by splitting each line and converting entries
            # to integers.  If the entry is "-99", it is replaced with 0.
            A = [[0 if entry == "-99" else int(entry) for entry in line.split()] for line in data]
        else:
            A = None

        # Print the resulting matrix A
        return A

    def rain_at_edge (self, graph, edge, agora):
        """
        Reads the rain data from a matrix and returns the maximum rainfall in
        a given edge of a given graph.

        :param graph (networkx.Graph): Graph object.
        :param edge (tuple): Edge in the graph.
        :param agora (Agora): An instance of the Agora class that represents
            the time.

        :return (int): Maximum rainfall on the edge.
        """

        # Gather the coordinates of several points along the edge
        if 'geometry' in graph.edges[edge]:
            lonlats = list(graph.edges[edge]['geometry'].coords)
        else:
            lonlats = []

        initial = edge[0]
        lon, lat = graph.nodes[initial]['x'], graph.nodes[initial]['y']
        lonlats.append((lon, lat))

        final = edge[1]
        lon, lat = graph.nodes[final]['x'], graph.nodes[final]['y']
        lonlats.append((lon, lat))

        # List the rows and columns of the points of the edge
        rowcols = [(int((lat - self.lat0) / self.dlat), int((lon - self.lon0) / self.dlon)) for lon, lat in lonlats]

        # List the mms of rain in the points of the edge
        matrix = self.rain_by_time(agora)
        rainfall = [matrix[row][col] for row, col in rowcols]

        return max(rainfall)


## Extra auxiliary functions

These auxiliary functions can be used in conjunction with the `Agora`, `Grafo` and `Radar` classes to perform various operations, such as coordinate conversion, prediction of nodes in a path, and determination of flooded edges in a graph.

**convert_coordinates**

This function converts coordinates from the EPSG 4326 system (latitude and longitude) to the usual UTM 23S system.  It is necesary in the method `get_flooding_points` of the class `Agora`.

In [None]:
def convert_coordinates (longitude, latitude):
    """
    Converts coordinates from EPSG 4326 system to UTM 23S system.

    :param longitude (float): Longitude coordinate.
    :param latitude (float): Latitude coordinate.

    :return (tuple): Converted coordinates in UTM 23S system.
    """

    utm23s = pyproj.CRS.from_string('+proj=utm +zone=23 +south +ellps=WGS84 +datum=WGS84 +units=m +no_defs')
    epsg4326 = pyproj.CRS.from_epsg(4326)
    transformer = pyproj.Transformer.from_crs(utm23s, epsg4326)

    return transformer.transform(longitude, latitude)



**oracle**

The `oracle` function predicts before which node in a path a vehicle would be after a given time. In fact, it returns a tuple `(next_node, crossed_edges)` where `next_node` is the next node after the given time, and `crossed_edges` is a list of edges crossed on the way.  It will be used in the method `experiment` of the class `Experiment`.

In [None]:
def oracle (grafo, path, initial_point, tyme):
    """
    Predicts the node and crossed edges after a given time in a given path.

    :param grafo (Grafo): An instance of the Grafo class in which the path is
        contained.
    :param path (list): A list of edges that representa a path in the grafo.
    :param initial_point (int): ID of the initial point in the path.
    :param tyme (int): Time in seconds.

    :return next_node, crossed_edges (tuple): Next node after the given
        time and a list of edges crossed on the way.
    """

    # Find the initial point within the path
    initial_indices = [i for i in range(len(path)) if path[i] == initial_point]

    # If the initial point is not in the path, return the initial point and
    # an empty list:
    if len(initial_indices) == 0:
        print(f"The node {initial_point} is not in {path}.")
        return initial_point, []

    # Otherwise, if the initial point is in the path, that is, if
    # initial_indices != []:
    else:
        n = initial_indices[0]
        next_node = initial_point
        crossed_edges = []

        # While there is: time left to cross the path and points in the path
        # to be crossed
        while tyme > 0 and n < len(path)-1:
            current_node = path[n]
            next_node = path[n+1]

            # Find the edge in the graph
            e = grafo.find_edge_by_nodes(current_node, next_node)
            crossed_edges.append(e)

            # Update the time left
            tyme -= grafo.graph.edges[e]['travel_time']

            # Update the index for the next step
            n += 1

        return next_node, crossed_edges




**is_flooded**

The `is_flooded` function checks if a given edge of a graph is within 200 meters of a reported flooded point. Then, it returns `True` if the edge is near a flooded point and `False` otherwise.  It will be necessary in one of the weight functions that will be tested below.

In [None]:
def is_flooded (graph, edge, flooded_points):
    """
    Checks if a given edge of a graph is within 200 meters of a reported
    flooded point.

    :param graph (networkx.Graph): Graph object.
    :param edge (tuple): Edge in the graph.
    :param flooded_points (list): List of flooded points.

    :return tf (bool): True if edge is near a flooded point, False otherwise.
    """

    # Gather the coordinates of several points along the edge
    if 'geometry' in graph.edges[edge]:
        lonlats = list(graph.edges[edge]['geometry'].coords)
    else:
        lonlats = []

    initial = edge[0]
    lon, lat = graph.nodes[initial]['x'], graph.nodes[initial]['y']
    lonlats.append((lon, lat))

    final = edge[1]
    lon, lat = graph.nodes[final]['x'], graph.nodes[final]['y']
    lonlats.append((lon, lat))

    # List of (latitudes, longitudes) of the points of the edge
    latlons = [(y, x) for x, y in lonlats]

    # Determine whether any of the points of the edge are closer to 200 meters
    # of any of the flooded points (tf = True) or not (tf = False)
    tf = False
    for p in latlons:
        for q in flooded_points:
            if distance.great_circle(p, q).meters < 200:
                tf = True
                break

    return tf


# Experiment

The `Experiment` class is the main class of this notebook.  It represents an experiment that calculates the optimal path minimizing a certain weight-function. In order to initialize it, one must provide the origin, destination, time (an instance of the `Agora` class), road network (an instance of the `Grafo` class), and information on precipitation (an instance of the `Radar` class).  Its methods can be used to perform experiments on the road network: calculate optimal paths (`experiment`) and analyze the results (`analysis`).


In [None]:
class Experiment:

    def __init__ (self, origin, destination, agora, grafo, radar):
        """
        Initializes an experiment object with the given parameters.

        :param origin (tuple): The coordinates of the start location.
        :param destination (tuple): The coordinates of the end location.
        :param agora: An instace of the Agora class that represents the
            initial time (time of departure of the vehicle).
        :param grafo: An instace of the Grafo class that represents the road
            network.
        :param radar: An instance of the Radar class that represents the
            radar that will provide information on precipitation.
        """

        self.origin = grafo.get_nearest_node(*origin)
        self.destination = grafo.get_nearest_node(*destination)
        self.agora = agora
        self.grafo = grafo
        self.radar = radar

    def experiment (self, weight):
        """
        Calculates the path that minimizes a certain function (weight).

        :param weight (function): The weight function used to calculate the
            weights of edges in the graph.

        :return edge_rains (dict): A dictionary containing the edges of the
            optimal path and their corresponding rainfall values (at the time
            of travel).
        """

        # Parameters
        start = copy.deepcopy(self.origin)
        time = copy.deepcopy(self.agora)
        graph = self.grafo.crop(self.origin, self.destination)

        # Dicionary where we will save infomation about the path
        edge_rains = {}

        # Main loop
        while start != self.destination:

            # Update: list of fooded points, matrix with rain info and weigths
            flooding_points = time.get_flooding_points()

            if self.radar.rain_by_time(time):
                A = self.radar.rain_by_time(time)
                rains = {e: self.radar.rain_at_edge(graph, e, time) for e in graph.edges}
                weights = {e: weight(graph, e, flooding_points, rains) for e in graph.edges}
                nx.set_edge_attributes(graph, weights, 'weight')
            else:
                rains = {e: 0 for e in graph.edges}

            # Calculate the path with the least rain
            shortest_path = nx.astar_path(graph, start, self.destination, weight='weight')

            # Add 10 minutes to the current time for the next iteration
            time.step(600)

            # Predict where the start of the next iteration will be
            start, subpath = oracle(self.grafo, shortest_path, shortest_path[0], 600)

            # Save the information of the edges that will be crossed
            for e in subpath:
                edge_rains[e] = rains[e]

        return edge_rains

    def analysis (self, dictionary):
        """
        Prints the results of the output of an experiment.

        :param dictionary (dict): A dictionary containing the edges of a path
            and their corresponding rainfall values (output of an experiment).

        :return (tuple): A tuple containing the total length, total time, and
            rainfall per second along the path.
        """

        # Lists the edges of the path
        edge_path = list(dictionary.keys())

        # Prints the total length of the path
        edge_lengths = [self.grafo.graph.edges[e]['length'] for e in edge_path]
        total_length = sum(edge_lengths) / 1000
        print(f"The total distance traveled was {total_length:.3f} km.")

        # Prints the total duration of the path
        edge_times = [self.grafo.graph.edges[e]['travel_time'] for e in edge_path]
        total_time = sum(edge_times)
        print(f"The travel time was {total_time // 60:.0f} minutes and {total_time % 60:.0f} seconds.")

        # Prints the amount of rain along the path
        edge_rain_per_sec = [dictionary[e] * self.grafo.graph.edges[e]['travel_time'] for e in edge_path]
        rain_per_sec = sum(edge_rain_per_sec)
        print(f"The rainfall along the path was: {rain_per_sec:.03f} mm.")

        return total_length, total_time, rain_per_sec


# Example

Now, we will show an example of the use of the classes and functions defined above.

We begin by fixing the boundary coordinates of the region that we want to study and the grid spacing of the radar.  In this case, it is a region within the metropolitan area of Sao Paulo and the grid spacing is given by Radar Sao Roque:

In [None]:
# Boudary coordinates and grid spacing

lat0 = -23.7748
latoo = -23.2959
lon0 = -46.6807
lonoo = -46.3841
dlat = -0.0090014
dlon = 0.009957


Using the information above, we can instantiate a `Grafo` class and a `Radar` class:

In [None]:
# Instances of the Grafo and Radar classes

sao_paulo = Grafo(lat0, latoo, lon0, lonoo)
sao_roque = Radar(lat0, lon0, dlat, dlon)


Next, we choose the origin, destination and initial time for our route:

In [None]:
# Origin, destination and initial time information of the route

origin = (-23.536718, -46.589832)    # Belenzinho, Sao Paulo 03015-000
destination = (-23.531962, -46.653599)    # Theatro São Pedro, 01152-000
agora = Agora(2019, 1, 4, 16, 40)    # 4:40pm on January 4th, 2019


Using the information above, we can instantiate an `Experiment`:

In [None]:
# Instance of the Experiment class

example = Experiment(origin, destination, agora, sao_paulo, sao_roque)


To finish the setup, we only need to define the weight function that our experiment should minimize:

In [None]:
# Here are a few weight functions that can be used:

## Only minimizes the total duration of the route
def time_no_rain (graph, edge, flooded_points, rains):
    return graph.edges[edge]['travel_time']

## Only minimizes the total length of the route
def length_no_rain (graph, edge, flooded_points, rains):
    return graph.edges[edge]['length']

## Minimizes the time, taking into account rain and flood
def proposed (graph, edge, flooded_points, rains):
    if is_flooded(graph, edge, flooded_points):
        return float('inf')
    else:
        return graph.edges[edge]['travel_time'] * (1 + rains[edge]/100)


Now that we have all these ingredients, we can run an `experiment` and its `analysis`:

In [None]:
for f in [time_no_rain, length_no_rain, proposed]:
    print(f.__name__)

    # Run the experiment
    edges_rains = example.experiment(f)

    # Print the results
    example.analysis(edges_rains)


time_no_rain
The total distance traveled was 7.827 km.
The travel time was 10 minutes and 30 seconds.
The rainfall along the path was: 44.200 mm.
length_no_rain
The total distance traveled was 7.724 km.
The travel time was 11 minutes and 7 seconds.
The rainfall along the path was: 15.700 mm.
proposed
The total distance traveled was 8.039 km.
The travel time was 10 minutes and 52 seconds.
The rainfall along the path was: 0.000 mm.
