In [1]:
import pyrosm
import numpy as np
import time as tm
from graph_tool.all import Graph
from geopy.exc import GeocoderServiceError
from geopy.geocoders import Nominatim

class OSMGraph(Graph):
    def __init__(self, OSM_PATH='.'):
        self.node_coords = {}
        self.graph = self.create_osm_graph(OSM_PATH)

    def download_osm_file(self, OSM_PATH):
        """
        Downloads the latest OSM file for Santiago.

        Parameters:
            OSM_PATH (str): The directory where the OSM file will be saved.

        Returns:
            str: The path to the downloaded OSM file.
        """
        fp = pyrosm.get_data(
            "Santiago",
            update=True,
            directory=OSM_PATH
        )

        return fp

    def create_osm_graph(self, OSM_PATH):
        """
        Creates a graph-tool's graph using the downloaded OSM data for Santiago.

        Returns:
            graph: osm data converted to a graph
        """
        # Download latest OSM data
        fp = self.download_osm_file(OSM_PATH)

        osm = pyrosm.OSM(fp)

        nodes, edges = osm.get_network(nodes=True)

        graph = Graph()

        # Create vertex properties for lon and lat
        lon_prop = graph.new_vertex_property("float")
        lat_prop = graph.new_vertex_property("float")

        # Create properties for the ids
        # Every OSM node has its unique id, different from the one given in the graph
        node_id_prop = graph.new_vertex_property("long")
        graph_id_prop = graph.new_vertex_property("long")

        # Create edge properties
        u_prop = graph.new_edge_property("long")
        v_prop = graph.new_edge_property("long")
        length_prop = graph.new_edge_property("double")
        weight_prop = graph.new_edge_property("double")

        vertex_map = {}

        print("GETTING OSM NODES...")
        for index, row in nodes.iterrows():
            lon = row['lon']
            lat = row['lat']
            node_id = row['id']
            graph_id = index
            self.node_coords[node_id] = (lat, lon)

            vertex = graph.add_vertex()
            vertex_map[node_id] = vertex

            # Assigning node properties
            lon_prop[vertex] = lon
            lat_prop[vertex] = lat
            node_id_prop[vertex] = node_id
            graph_id_prop[vertex] = graph_id

        # Assign the properties to the graph
        graph.vertex_properties["lon"] = lon_prop
        graph.vertex_properties["lat"] = lat_prop
        graph.vertex_properties["node_id"] = node_id_prop
        graph.vertex_properties["graph_id"] = graph_id_prop

        print("DONE")
        print("GETTING OSM EDGES...")

        for index, row in edges.iterrows():
            source_node = row['u']
            target_node = row['v']

            if row["length"] < 2 or source_node == "" or target_node == "":
                continue # Skip edges with empty or missing nodes

            if source_node not in vertex_map or target_node not in vertex_map:
                print(f"Skipping edge with missing nodes: {source_node} -> {target_node}")
                continue  # Skip edges with missing nodes

            source_vertex = vertex_map[source_node]
            target_vertex = vertex_map[target_node]

            if not graph.vertex(source_vertex) or not graph.vertex(target_vertex):
                print(f"Skipping edge with non-existent vertices: {source_vertex} -> {target_vertex}")
                continue  # Skip edges with non-existent vertices

            # Calculate the distance between the nodes and use it as the weight of the edge
            source_coords = self.node_coords[source_node]
            target_coords = self.node_coords[target_node]
            distance = np.linalg.norm(np.array(source_coords) - np.array(target_coords))

            e = graph.add_edge(source_vertex, target_vertex)
            u_prop[e] = source_node
            v_prop[e] = target_node
            length_prop[e] = row["length"]
            weight_prop[e] = distance

        graph.edge_properties["u"] = u_prop
        graph.edge_properties["v"] = v_prop
        graph.edge_properties["length"] = length_prop
        graph.edge_properties["weight"] = weight_prop

        print("OSM DATA HAS BEEN SUCCESSFULLY RECEIVED")
        return graph

    def get_nodes_and_edges(self):
        """
        Returns a tuple containing two lists: one with the nodes and another with the edges.
        """
        nodes = list(self.graph.vertices())
        edges = list(self.graph.edges())
        return nodes, edges

    def print_graph(self):
        """
        Prints the vertices and edges of the graph.
        """
        print("Vertices:")
        for vertex in self.graph.vertices():
            print(f"Vertex ID: {int(vertex)}, lon: {self.graph.vertex_properties['lon'][vertex]}, lat: {self.graph.vertex_properties['lat'][vertex]}")

        print("\nEdges:")
        for edge in self.graph.edges():
            source = int(edge.source())
            target = int(edge.target())
            print(f"Edge: {source} -> {target}")

    def find_node_by_coordinates(self, lon, lat):
        """
        Finds a node in the graph based on its coordinates (lon, lat).

        Parameters:
            lon (float): the longitude of the node.
            lat (float): the latitude of the node.

        Returns:
            vertex: the vertex in the graph with the specified coordinates, or None if not found.
        """
        for vertex in self.graph.vertices():
            if self.graph.vertex_properties["lon"][vertex] == lon and self.graph.vertex_properties["lat"][vertex] == lat:
                return vertex
        return None

    def find_node_by_id(self, node_id):
        """
        Finds a node in the graph based on its id.

        Parameters:
            node_id (long): the id of the node.

        Returns:
            vertex: the vertex in the graph with the specified id, or None if not found.
        """
        for vertex in self.graph.vertices():
            if self.graph.vertex_properties["node_id"][vertex] == node_id:
                return vertex
        return None

    def find_nearest_node(self, latitude, longitude):
        """
        Finds the nearest node in the graph to a given set of coordinates.

        Parameters:
            latitude (float): the latitude of the coordinates.
            longitude (float): the longitude of the coordinates.

        Returns:
            vertex: the vertex in the graph closest to the given coordinates.
        """
        query_point = np.array([longitude, latitude])

        # Obtains vertex properties: 'lon' and 'lat'
        lon_prop = self.graph.vertex_properties['lon']
        lat_prop = self.graph.vertex_properties['lat']

        # Calculates the euclidean distances between the node's coordinates and the consulted address's coordinates
        distances = np.linalg.norm(np.vstack((lon_prop.a, lat_prop.a)).T - query_point, axis=1)

        # Finds the nearest node's index
        nearest_node_index = np.argmin(distances)
        nearest_node = self.graph.vertex(nearest_node_index)

        return nearest_node

    def address_locator(self, address):
        """
        Finds the given address in the OSM graph.

        Parameters:
        address (str): The address to be located.

        Returns:
        int: The ID of the nearest vertex in the graph.

        Raises:
        GeocoderServiceError: If there is an error with the geocoding service.
        """
        geolocator = Nominatim(user_agent="ayatori")
        while True:
            try:
                location = geolocator.geocode(address)
                break
            except GeocoderServiceError:
                i = 0
                if i < 15:
                    print("Geocoding service error. Retrying in 5 seconds...")
                    tm.sleep(5)
                    i+=1
                else:
                    msg = "Error: Too many retries. Geocoding service may be down. Please try again later."
                    print(msg)
                    return
        if location is not None:
            lat, lon = location.latitude, location.longitude
            nearest = self.find_nearest_node(lat, lon)
            return nearest
        msg = "Error: Address couldn't be found."
        print(msg)


In [24]:
import pygtfs
import os
import pandas as pd
from math import *
from datetime import datetime, date, time, timedelta
from graph_tool.all import Graph

class GTFSData:
    def __init__(self, GTFS_PATH='gtfs.zip'):
        self.scheduler = self.create_scheduler(GTFS_PATH)
        self.graphs = {}
        self.route_stops = {}
        self.special_dates = []
        self.stops = set()
        self.graphs, self.route_stops, self.special_dates = self.get_gtfs_data()
        self.stops = self.get_stop_ids()
        
    def create_scheduler(self, GTFS_PATH):
        # Create a new schedule object using a GTFS file
        scheduler = pygtfs.Schedule(":memory:")
        pygtfs.append_feed(scheduler, GTFS_PATH)
        return scheduler
        
    def get_gtfs_data(self):
        """
        Reads the GTFS data from a file and creates a directed graph with its info, using the 'pygtfs' library. This gives
        the transit feed data of Santiago's public transport, including "Red Metropolitana de Movilidad" (previously known
        as Transantiago), "Metro de Santiago", "EFE Trenes de Chile", and "Buses de Acercamiento Aeropuerto".

        Returns:
            graphs: GTFS data converted to a dictionary of graphs, one per route.
            route_stops: Dictionary containing the stops for each route.
            special_dates: List of special calendar dates.
        """
        sched = self.scheduler

        # Get special calendar dates
        for cal_date in sched.service_exceptions: # Calendar_dates is renamed in pygtfs
            self.special_dates.append(cal_date.date.strftime("%d/%m/%Y"))

        stop_id_map = {} # To assign unique ids to every stop
        stop_coords = {}

        for route in sched.routes:
            graph = Graph(directed=True)
            stop_ids = set()
            trips = [trip for trip in sched.trips if trip.route_id == route.route_id]

            # Create a new vertex property for node_id
            node_id_prop = graph.new_vertex_property("string")
            
            # Create edge properties
            u_prop = graph.new_edge_property("object")
            v_prop = graph.new_edge_property("object")
            weight_prop = graph.new_edge_property("int")
            graph.edge_properties["weight"] = weight_prop
            graph.edge_properties["u"] = u_prop
            graph.edge_properties["v"] = v_prop
            
            added_edges = set() # To keep track of the edges that have already been added

            for trip in trips:
                stop_times = trip.stop_times
                orientation = trip.trip_id.split("-")[1]

                for i in range(len(stop_times)):
                    stop_id = stop_times[i].stop_id
                    sequence = stop_times[i].stop_sequence

                    if stop_id not in stop_id_map:
                        vertex = graph.add_vertex()
                        stop_id_map[stop_id] = vertex
                    else:
                        vertex = stop_id_map[stop_id]

                    stop_ids.add(vertex)

                    # Assign the node_id property to the vertex
                    node_id_prop[vertex] = stop_id

                    if i < len(stop_times) - 1:
                        next_stop_id = stop_times[i + 1].stop_id

                        if next_stop_id not in stop_id_map:
                            next_vertex = graph.add_vertex()
                            stop_id_map[next_stop_id] = next_vertex
                        else:
                            next_vertex = stop_id_map[next_stop_id]

                        edge = (vertex, next_vertex)
                        if edge not in added_edges: # Check if the edge has already been added
                            e = graph.add_edge(*edge)
                            graph.edge_properties["weight"][e] = 1
                            graph.edge_properties["u"][e] = node_id_prop[vertex]
                            graph.edge_properties["v"][e] = node_id_prop[next_vertex]
                            added_edges.add(edge) # Add the edge to the set of added edges

                        if route.route_id not in stop_coords:
                            stop_coords[route.route_id] = {}

                        if stop_id not in stop_coords[route.route_id]:
                            stop = sched.stops_by_id(stop_id)[0]
                            stop_coords[route.route_id][stop_id] = (stop.stop_lon, stop.stop_lat)

                            if route.route_id not in self.route_stops:
                                self.route_stops[route.route_id] = {}

                            self.route_stops[route.route_id][stop_id] = {
                                "route_id": route.route_id,
                                "stop_id": stop_id,
                                "coordinates": stop_coords[route.route_id][stop_id],
                                "orientation": "round" if orientation == "I" else "return",
                                "sequence": sequence,
                                "arrival_times": []
                            }

                    arrival_time = (datetime.min + stop_times[i].arrival_time).time()

                    if stop_id in self.route_stops[route.route_id]:
                        self.route_stops[route.route_id][stop_id]["arrival_times"].append(arrival_time)

            # Assign the node_id property to the graph
            graph.vertex_properties["node_id"] = node_id_prop

            self.graphs[route.route_id] = graph

            stops_by_direction = {"round_trip": [], "return_trip": []}
            for trip in trips:
                stop_times = trip.stop_times
                stops = [stop_times[i].stop_id for i in range(len(stop_times))]

                if trip.direction_id == 0:
                    stops_by_direction["round_trip"].extend(stops)
                else:
                    stops_by_direction["return_trip"].extend(stops)

            round_trip_stops = set(stops_by_direction["round_trip"])
            return_trip_stops = set(stops_by_direction["return_trip"])

            for stop_id in round_trip_stops:
                if stop_id in stop_coords[route.route_id]:
                    if stop_id in self.route_stops[route.route_id]:
                        self.route_stops[route.route_id][stop_id]["orientation"] = "round"
                    else:
                        self.route_stops[route.route_id][stop_id] = {
                            "route_id": route.route_id,
                            "stop_id": stop_id,
                            "coordinates": stop_coords[route.route_id][stop_id],
                            "orientation": "round",
                            "sequence": sequence,
                            "arrival_times": []
                        }

            for stop_id in return_trip_stops:
                if stop_id in stop_coords[route.route_id]:
                    if stop_id in self.route_stops[route.route_id]:
                        self.route_stops[route.route_id][stop_id]["orientation"] = "return"
                    else:
                        self.route_stops[route.route_id][stop_id] = {
                            "route_id": route.route_id,
                            "stop_id": stop_id,
                            "coordinates": stop_coords[route.route_id][stop_id],
                            "orientation": "return",
                            "sequence": sequence,
                            "arrival_times": []
                        }

        for route_id, graph in self.graphs.items():
            weight_prop = graph.new_edge_property("int")

            for e in graph.edges():
                weight_prop[e] = 1
            
            graph.edge_properties["weight"] = weight_prop

            data_dir = "gtfs_routes"
            if not os.path.exists(data_dir):
                os.makedirs(data_dir)

            graph.save(f"{data_dir}/{route_id}.gt")

        print("GTFS DATA RECEIVED SUCCESSFULLY")

        return self.graphs, self.route_stops, self.special_dates

    def get_stop_ids(self):
        stop_set = set()
        for route_id, stops in self.route_stops.items():
            for stop_id in stops:
                stop_set.add(stop_id)
        return stop_set

    def get_route_graph(self, route_id):
        """
        Given a route_id, returns the vertices and edges for the corresponding graph.

        Parameters:
        route_id (str): The ID of the route.

        Returns:
        tuple: A tuple containing the vertices and edges of the graph. The vertices are a list of node IDs, and the edges are a list of tuples containing the source and target node IDs.
        """
        if route_id not in self.graphs:
            print(f"Route {route_id} does not exist.")
            return None

        graph = self.graphs[route_id]
        vertices = []
        for v in graph.vertices():
            node_id = graph.vertex_properties["node_id"][v]
            if node_id != '' and node_id is not None:
                vertices.append(node_id)

        edges = []
        for e in graph.edges():
            u = graph.edge_properties["u"][e]
            v = graph.edge_properties["v"][e]
            if u is not None and v is not None:
                edges.append((u, v))

        return vertices, edges

    def get_route_graph_vertices(self, route_id):
        """
        Given a route_id, returns the vertices for the corresponding graph.

        Parameters:
        route_id (str): The ID of the route.

        Returns:
        list: A list containing the vertices of the graph. The vertices are a list of node IDs.
        """
        if route_id not in self.graphs:
            print(f"Route {route_id} does not exist.")
            return None

        graph = self.graphs[route_id]
        vertices = [graph.vertex_properties["node_id"][v] for v in graph.vertices()]

        return vertices

    def get_route_graph_edges(self, route_id):
        """
        Given a route_id, returns the edges for the corresponding graph.

        Parameters:
        route_id (str): The ID of the route.

        Returns:
        list: A list containing the edges of the graph.
        """
        if route_id not in self.graphs:
            print(f"Route {route_id} does not exist.")
            return None

        graph = self.graphs[route_id]
        edges = [(graph.edge_properties["u"][e], graph.edge_properties["v"][e]) for e in graph.edges()]

        return edges
    
    def map_route_stops(self, route_list, stops_flag, orientation_flag):
        """
        Create a map showing the stops visited on the round trip for the specified routes.

        Parameters:
        route_list (list): A list of route IDs.
        stops_flag (bool): A flag indicating whether to display the stops on the map.

        Returns:
        folium.Map: A map object showing the stops and routes.
        """
        # Map the stops visited on the round trip
        map = folium.Map(location=[-33.45, -70.65], zoom_start=12)
        
        # List of valid colors
        map_colors= ['red', 'orange', 'darkred', 'blue', 'lightblue', 'green', 'purple', 'lightred', 'beige',
                     'darkblue', 'darkgreen', 'cadetblue', 'darkpurple', 'white', 'pink', 'lightgreen',
                     'gray', 'black', 'lightgray']

        color_id = 0
        for route_id in route_list:
            # Get the stops for the specified route
            stops = self.route_stops.get(route_id, {})

            # Filter the stops that are visited on the round trip
            if orientation_flag:
                trip_stops = [stop_info for stop_info in stops.values() if stop_info["orientation"] == "round"]
            else:
                trip_stops = [stop_info for stop_info in stops.values() if stop_info["orientation"] == "return"]

            # Sort the stops by their sequence number in the trip
            trip_stops = sorted(trip_stops, key=lambda x: x['sequence'])

            folium.PolyLine(locations=[[stop_info["coordinates"][1], stop_info["coordinates"][0]] for stop_info in trip_stops],
                            color=map_colors[color_id], weight=4).add_to(map)
            
            if stops_flag:
                for stop_info in trip_stops:
                    folium.Marker(location=[stop_info["coordinates"][1], stop_info["coordinates"][0]], popup=stop_info["stop_id"],
                                   icon=folium.Icon(color='lightgray', icon='minus')).add_to(map)


            color_id+=1

        return map

    def get_route_coordinates(self, route_id):
        round_trip_stops = []
        return_trip_stops = []
        for stop_info in self.route_stops[route_id].values():
            if stop_info["orientation"] == "round":
                round_trip_stops.append(stop_info)
            elif stop_info["orientation"] == "return":
                return_trip_stops.append(stop_info)

        round_trip_stops.sort(key=lambda stop: stop["sequence"])
        return_trip_stops.sort(key=lambda stop: stop["sequence"])

        round_trip_coords = [(stop_info["coordinates"][1], stop_info["coordinates"][0]) for stop_info in round_trip_stops]
        return_trip_coords = [(stop_info["coordinates"][1], stop_info["coordinates"][0]) for stop_info in return_trip_stops]

        return round_trip_coords, return_trip_coords

    def haversine(self, lon1, lat1, lon2, lat2):
        """
        Calculate the great circle distance between two points on the earth (specified in decimal degrees).

        Parameters:
        lon1 (float): Longitude of the first point in decimal degrees.
        lat1 (float): Latitude of the first point in decimal degrees.
        lon2 (float): Longitude of the second point in decimal degrees.
        lat2 (float): Latitude of the second point in decimal degrees.

        Returns:
        float: The distance between the two points in kilometers.
        """
        R = 6372.8  # Earth radius in kilometers
        dLat = radians(lat2 - lat1)
        dLon = radians(lon2 - lon1)
        lat1 = radians(lat1)
        lat2 = radians(lat2)
        a = sin(dLat / 2)**2 + cos(lat1) * cos(lat2) * sin(dLon / 2)**2
        c = 2 * asin(sqrt(a))
        return R * c

    def get_stop_coords(self, stop_id):
        """
        Given a stop ID, returns the coordinates of the stop with the given ID.
        If the stop ID is not found, returns None.

        Parameters:
        stop_id (int): The ID of the stop to get the coordinates for.

        Returns:
        tuple: A tuple of two floats representing the longitude and latitude of the stop with the given ID.
        None: If the stop ID is not found.
        """
        for route_id, stops in self.route_stops.items():
            for stop_info in stops.values():
                if stop_info["stop_id"] == stop_id:
                    return stop_info["coordinates"]
        return None

    def get_near_stop_ids(self, coords, margin):
        """
        Given a tuple of coordinates and a margin, returns a list of stop IDs
        that are within the specified margin of the given coordinates, along with their orientations.

        Parameters:
        coords (tuple): A tuple of two floats representing the longitude and latitude of the coordinates to search around.
        margin (float): The maximum distance (in kilometers) from the given coordinates to include stops in the result.

        Returns:
        tuple: A tuple of two lists. The first list contains the stop IDs that are within the specified margin of the given coordinates.
        The second list contains tuples of stop IDs and their orientations.
        """
        stop_ids = []
        orientations = []
        for route_id, stops in self.route_stops.items():
            for stop_info in stops.values():
                stop_coords = stop_info["coordinates"]
                distance = self.haversine(coords[1], coords[0], stop_coords[1], stop_coords[0])
                if distance <= margin:
                    orientation = stop_info["orientation"]
                    stop_id = stop_info["stop_id"]
                    if stop_id not in stop_ids:
                        stop_ids.append(stop_id)
                        orientations.append((stop_id, orientation))
        return stop_ids, orientations

    def get_route_stop_ids(self, route_id):
        """
        Given a route ID, returns a list of stop IDs for the stops on the given route.

        Parameters:
        route_id (int): The ID of the route to get the stops for.

        Returns:
        list: A list of stop IDs for the stops on the given route.
        """
        stops = self.route_stops.get(route_id, {})
        return stops.keys()

    def route_stop_matcher(self, route_id, stop_id):
        """
        Given a route ID, and a stop ID, returns True if the stop ID is on the given route,
        and False otherwise.

        Parameters:
        route_id (int): The ID of the route to check.
        stop_id (int): The ID of the stop to check.

        Returns:
        bool: True if the stop ID is on the given route, False otherwise.
        """
        stop_list = self.get_route_stop_ids(route_id)
        return (stop_id in stop_list)

    def is_route_near_coordinates(self, route_id, coordinates, margin):
        """
        Given a route ID, a tuple of coordinates, and a margin, returns True if the route
        has a stop within the specified margin of the given coordinates, and False otherwise.

        Parameters:
        route_id (int): The ID of the route to check.
        coordinates (tuple): A tuple of two floats representing the longitude and latitude of the coordinates to search around.
        margin (float): The maximum distance (in kilometers) from the given coordinates to include stops in the result.

        Returns:
        bool: True if the route has a stop within the specified margin of the given coordinates, False otherwise.
        """
        for stop_info in self.route_stops[route_id].values():
            stop_coords = stop_info["coordinates"]
            distance = self.haversine(coordinates[1], coordinates[0], stop_coords[1], stop_coords[0])
            if distance <= margin:
                return route_id
        return False

    def get_bus_orientation(self, route_id, stop_id):
        """
        Checks and confirms the bus orientation, while visiting a stop, in the GTFS data files.

        Parameters:
        route_id (str): The route or service's ID to check.
        stop_id (str): The visited stop ID.

        Returns:
        str or list: The bus orientation(s) associated with the route_id and stop_id. None if nothing is found.
        """
        stop_times = pd.read_csv("stop_times.txt")
        filtered_stop_times = stop_times[(stop_times["trip_id"].str.startswith(route_id)) & (stop_times["stop_id"] == stop_id)]

        orientations = []
        for trip_id in filtered_stop_times["trip_id"]:
            orientation = trip_id.split("-")[1]
            if orientation == "I" and "round" not in orientations:
                orientations.append("round")
            elif orientation == "R" and "return" not in orientations:
                orientations.append("return")

        if len(orientations) == 0:
            return None
        elif len(set(orientations)) == 1:
            return orientations[0]
        else:
            return orientations

    def connection_finder(self, stop_id_1, stop_id_2):
        """
        Finds all routes that have stops at both given stop IDs.

        Parameters:
        stop_id_1 (str): The ID of the first stop to check.
        stop_id_2 (str): The ID of the second stop to check.

        Returns:
        list: A list of route IDs that have stops at both given stop IDs.
        """
        connected_routes = []
        for route_id, stops in self.route_stops.items():
            stop_ids = [stop_info["stop_id"] for stop_info in stops.values()]

            if stop_id_1 in stop_ids and stop_id_2 in stop_ids:
                connected_routes.append(route_id)
        return connected_routes

    def get_routes_at_stop(self, stop_id):
        """
        Finds all routes that have a stop at the given stop ID.

        Parameters:
        stop_id (str): The ID of the stop to check.

        Returns:
        list: A list of route IDs that have a stop at the given stop ID.
        """
        routes = [route_id for route_id in self.route_stops.keys() if stop_id in self.get_route_stop_ids(route_id) and self.connection_finder(stop_id, stop_id)]
        return routes

    def is_24_hour_service(self, route_id):
        """
        Determines if the given route has a 24-hour service.

        Parameters:
        route_id (str): A string representing the ID of the route.

        Returns:
        bool: True if the route has a 24-hour service, False otherwise.
        """
        # Read the frequencies for the route
        frequencies = pd.read_csv("frequencies.txt")
        route_str = str(route_id) + "-"
        route_frequencies = frequencies[frequencies["trip_id"].str.startswith(route_str)]

        # Check if any frequency has a start time of "00:00:00" and an end time of "24:00:00"
        has_start_time = False
        has_end_time = False
        for _, row in route_frequencies.iterrows():
            start_time = row["start_time"]
            end_time = row["end_time"]
            if start_time == "00:00:00":
                has_start_time = True
            if end_time == "24:00:00":
                has_end_time = True

        return has_start_time and has_end_time

    def check_night_routes(self, valid_services, is_nighttime):
        """
        Filters the given list of route IDs to only include night routes if is_nighttime is True.

        Parameters:
        valid_services (list): A list of route IDs to filter.
        is_nighttime (bool): True if it is nighttime, False otherwise.

        Returns:
        list: A list of route IDs that are night routes if is_nighttime is True, or all route IDs otherwise.
        """
        if is_nighttime:
            #nighttime_routes = [route_id for route_id in valid_services if route_id.endswith("N")]
            nighttime_routes = [route_id for route_id in valid_services if route_id.endswith("N") or self.is_24_hour_service(route_id)]
            if nighttime_routes:
                return nighttime_routes
            else:
                return None
        else:
            daytime_routes = [route_id for route_id in valid_services if not route_id.endswith("N")]
            if daytime_routes:
                return daytime_routes
            else:
                return None

    def is_nighttime(self, source_hour):
        """
        Determines if the given hour is during the nighttime.

        Parameters:
        source_hour (datetime.time): The hour to check.

        Returns:
        bool: True if the hour is during the nighttime, False otherwise.
        """
        start_time = time(0, 0, 0)
        end_time = time(5, 30, 0)
        if start_time <= source_hour <= end_time:
            return True
        else:
            return False

    def is_holiday(self, date_string):
        """
        Checks if a given date is a holiday.

        Parameters:
        date_string (str): A string representing the date in the format "dd/mm/yyyy".

        Returns:
        bool: True if the date is a holiday, False otherwise.
        """
        # Local holidays
        if date_string in self.special_dates:
            return True
        date_obj = datetime.strptime(date_string, "%d/%m/%Y")

        # Weekend days
        day_of_week = date_obj.weekday()
        if day_of_week == 5 or day_of_week == 6:
            return True
        return False

    def is_rush_hour(self, source_hour):
        """
        Determines if the given hour is during rush hour.

        Parameters:
        source_hour (datetime.time): The hour to check.

        Returns:
        bool: True if the hour is during rush hour, False otherwise.
        """
        am_start_time = time(5, 30, 0)
        am_end_time = time(9, 0, 0)
        pm_start_time = time(17, 30, 0)
        pm_end_time = time(21, 0, 0)
        if am_start_time <= source_hour <= am_end_time or pm_start_time <= source_hour <= pm_end_time:
            return True
        else:
            return False

    def check_express_routes(self, valid_services, is_rush_hour):
        """
        Filters the given list of route IDs to only include express routes if is_rush_hour is True.

        Parameters:
        valid_services (list): A list of route IDs to filter.
        is_rush_hour (bool): True if it is rush hour, False otherwise.

        Returns:
        list: A list of route IDs that are express routes if is_rush_hour is True, or all route IDs otherwise.
        """
        if is_rush_hour:
            return valid_services
        else:
            regular_hour_routes = [route_id for route_id in valid_services if not route_id.endswith("e")]
            return regular_hour_routes

    def get_trip_day_suffix(self, date):
        """
        Based on the given date, gets the corresponding trip day suffix for the trip IDs.

        Parameters:
        date (date): The date to be checked.

        Returns
        str: A string with the trip day suffix.
        """
        date_object = datetime.strptime(date, "%d/%m/%Y")
        day_of_week = date_object.weekday()

        if day_of_week < 5:
            trip_day_suffix = "L"
        elif day_of_week == 5:
            trip_day_suffix = "S"
        else:
            trip_day_suffix = "D"

        return trip_day_suffix

    def get_arrival_times(self, route_id, stop_id, source_date):
        """
        Returns the arrival times for a given route and stop.

        Parameters:
        route_id (str): A string representing the ID of the route.
        stop_id (str): A string representing the ID of the stop.
        source_date (str): A string representing the date of the travel.

        Returns:
        tuple: A tuple containing a string representing the bus orientation ("round" or "return") and a list of datetime objects representing the arrival times.
        """
        # Read the frequencies.txt file
        frequencies = pd.read_csv("frequencies.txt")

        # Filter the frequencies for the given route ID
        route_frequencies = frequencies[frequencies["trip_id"].str.startswith(route_id)]

        # Get the day suffix
        day_suffix = self.get_trip_day_suffix(source_date)

        # Get the arrival times for the stop for each trip
        stop_route_times = []
        bus_orientation = ""
        for _, row in route_frequencies.iterrows():
            start_time = pd.Timestamp(row["start_time"])
            if row["end_time"] == "24:00:00":
                end_time = pd.Timestamp("23:59:59")
            else:
                end_time = pd.Timestamp(row["end_time"])
            headway_secs = row["headway_secs"]
            round_trip_id = f"{route_id}-I-{day_suffix}"
            return_trip_id = f"{route_id}-R-{day_suffix}"
            round_stop_times = pd.read_csv("stop_times.txt").query(f"trip_id.str.startswith('{round_trip_id}') and stop_id == '{stop_id}'")
            return_stop_times = pd.read_csv("stop_times.txt").query(f"trip_id.str.startswith('{return_trip_id}') and stop_id == '{stop_id}'")
            if len(round_stop_times) == 0 and len(return_stop_times) == 0:
                return
            elif len(round_stop_times) > 0:
                bus_orientation = "round"
                stop_time = pd.Timestamp(round_stop_times.iloc[0]["arrival_time"])
            elif len(return_stop_times) > 0:
                bus_orientation = "return"
                stop_time = pd.Timestamp(return_stop_times.iloc[0]["arrival_time"])
            for freq_time in pd.date_range(start_time, end_time, freq=f"{headway_secs}s"):
                freq_time_str = freq_time.strftime("%H:%M:%S")
                freq_time = datetime.strptime(freq_time_str, "%H:%M:%S")
                stop_route_time = datetime.combine(datetime.min, stop_time.time()) + timedelta(seconds=(freq_time - datetime.min).seconds)
                if stop_route_time not in stop_route_times:
                    stop_route_times.append(stop_route_time)
                stop_time += pd.Timedelta(seconds=headway_secs)

        return bus_orientation, stop_route_times


    def get_time_until_next_bus(self, arrival_times, source_hour, source_date):
        """
        Returns the time until the next three buses.

        Parameters:
        arrival_times (list): A list of datetime objects representing the arrival times of the buses.
        source_hour (datetime.time): The source hour to compare with the arrival times.
        source_date (datetime.date): The source date to check if there are buses remaining.

        Returns:
        list: A list of tuples representing the time until the next three buses in minutes and seconds.
        """
        arrival_times_remaining = []
        for a_time in arrival_times:
            if a_time.time() >= source_hour:
                arrival_times_remaining.append(a_time)
        #arrival_times_remaining = [time for time in arrival_times if time.time() >= source_hour]
        if len(arrival_times_remaining) == 0:
            return None
        else:
            # Sort the remaining arrival times in ascending order
            arrival_times_remaining.sort()

            # Get the datetime objects for the next three buses
            next_buses = []
            for i in range(min(3, len(arrival_times_remaining))):
                next_arrival_time = arrival_times_remaining[i]
                next_bus = datetime.combine(next_arrival_time.date(), next_arrival_time.time())
                next_buses.append(next_bus)

            if next_buses is None:
                print("No buses remaining for the specified date.")
            else:
                # Calculate the time until the next three buses
                time_until_next_buses = []
                for next_bus in next_buses:
                    time_until_next_bus = (next_bus - datetime.combine(next_bus.date(), source_hour)).total_seconds()
                    minutes, seconds = divmod(time_until_next_bus, 60)
                    time_until_next_buses.append((int(minutes), int(seconds)))

                return time_until_next_buses

    def timedelta_to_hhmm(self, td):
        """
        Converts a timedelta object to a string in HHMM format.

        Parameters:
        td (timedelta): The timedelta object to be converted.

        Returns:
        str: A formated string with the time.
        """
        total_seconds = int(td.total_seconds())
        hours = total_seconds // 3600
        minutes = (total_seconds % 3600) // 60
        return f"{hours:02d}:{minutes:02d}"

    def timedelta_separator(self, td):
        """
        Separates a timedelta object into minutes and seconds.

        Parameters:
        td (timedelta): A timedelta object representing a duration of time.

        Returns:
        tuple: A tuple containing the number of minutes and seconds in the timedelta object. The minutes and seconds are both integers.
        """
        total_seconds = td.total_seconds()
        minutes = int(total_seconds // 60)
        seconds = int(total_seconds % 60)
        return minutes, seconds

    def get_travel_time(self, trip_id, stop_ids):
        """
        Returns the travel time between two stops for a given trip.

        Parameters:
        trip_id (str): A string representing the ID of the trip.
        stop_ids (list): A list of two strings representing the IDs of the stops.

        Returns:
        timedelta: A timedelta object representing the travel time.
        """
        stop_times = pd.read_csv("stop_times.txt").query(f"trip_id.str.startswith('{trip_id}') and stop_id in {stop_ids}")
        if len(stop_times) < 2:
            return None
        arrival_times = [datetime.strptime(arrival_time, "%H:%M:%S") for arrival_time in stop_times["arrival_time"]]
        travel_time = arrival_times[1] - arrival_times[0]
        return travel_time

    def get_trip_sequence(self, route_id, stop_id):
        """
        Given a dictionary of routes and stops, a route ID and a stop ID, gets the trip sequence number corresponding to the stop.

        Parameters:
        route_id (str): The route or service's ID.
        stop_id (str): The stop's ID.

        Returns:
        str: A string representing the sequence number.
        """
        seq = self.route_stops[route_id][stop_id]["sequence"]
        return seq

    def walking_travel_time(self, stop_coords, location_coords, speed):
        """
        Calculates the walking travel time between a location and a stop, given a speed value.

        Parameters:
        stop_coords (tuple): A tuple with the stop's coordinates.
        location_coords (tuple):  A tuple with the location's coordinates.
        speed (float): The walking speed value.

        Returns.
        float: The time (in seconds) that represents the travel time.
        """
        distance = self.haversine(stop_coords[0], stop_coords[1], location_coords[0], location_coords[1])
        time = round((distance / speed) * 3600,2)
        return time

    def parse_metro_stations(self, stops_file):
        """
        Parses the Metro Stations data, creating a dictionary with their names.

        Parameters:
        stops_file (File): The GTFS file with the stop data (stops.txt).

        Returns:
        dict: A dictionary with the names of the stations.
        """
        subway_stops = {}
        with open(stops_file, 'r') as f:
            for line in f:
                stop_id, _, stop_name, _, _, _, _ = line.strip().split(',')
                if stop_id.isdigit():
                    subway_stops[stop_id] = stop_name
        return subway_stops

    def is_metro_station(self, stop_id, route_dict):
        """
        Checks if a stop is a Metro station.

        Parameters:
        stop_id (str): The stop's ID to be checked.
        route_dict (dict): The dictionary with the Metro stations names.

        Returns:
        str or None: A string with the stop ID if the stop is a Metro station, or None if it isn't.
        """
        try:
            route_num = int(stop_id)
            return route_dict[stop_id]
        except ValueError:
            return None
        
    def get_stops_with_long_times(self):
        stop_with_long_times = []
        checked_stops = set()
        for route_id, stops in self.route_stops.items():
            for stop_id in stops:
                if stop_id in checked_stops:
                    continue
                stop_times = [st for st in self.scheduler.stop_times if st.trip.route_id == route_id and st.stop_id == stop_id]
                prev_arrival_time = None
                for stop_time in stop_times:
                    arrival_time = stop_time.arrival_time
                    if prev_arrival_time is not None:
                        time_diff = (datetime.min + arrival_time) - (datetime.min + prev_arrival_time)
                        if time_diff > timedelta(minutes=15):
                            stop_with_long_times.append(stop_id)
                            checked_stops.add(stop_id)
                            break
                    prev_arrival_time = arrival_time
        return stop_with_long_times

In [3]:
osm_graph = OSMGraph()

Downloaded Protobuf data 'Santiago.osm.pbf' (20.1 MB) to:
'/home/lysorek/aves/notebooks/Santiago.osm.pbf'
GETTING OSM NODES...
DONE
GETTING OSM EDGES...
OSM DATA HAS BEEN SUCCESSFULLY RECEIVED


In [6]:
osm_graph.get_nodes_and_edges()

([<Vertex object with index '0' at 0x7fed3f7119e0>,
  <Vertex object with index '1' at 0x7fed3f711cf0>,
  <Vertex object with index '2' at 0x7fed3f711c80>,
  <Vertex object with index '3' at 0x7fed3f711c10>,
  <Vertex object with index '4' at 0x7fed3f711ba0>,
  <Vertex object with index '5' at 0x7fed3f711b30>,
  <Vertex object with index '6' at 0x7fed3f711ac0>,
  <Vertex object with index '7' at 0x7fed3f711a50>,
  <Vertex object with index '8' at 0x7fed3f711970>,
  <Vertex object with index '9' at 0x7fed3f711900>,
  <Vertex object with index '10' at 0x7fed3f711890>,
  <Vertex object with index '11' at 0x7fed3f711820>,
  <Vertex object with index '12' at 0x7fed3f7117b0>,
  <Vertex object with index '13' at 0x7fed3f711740>,
  <Vertex object with index '14' at 0x7fed3f7116d0>,
  <Vertex object with index '15' at 0x7fed3f711660>,
  <Vertex object with index '16' at 0x7fed3f7115f0>,
  <Vertex object with index '17' at 0x7fed3f711580>,
  <Vertex object with index '18' at 0x7fed3f711510>,
  <

In [8]:
node = osm_graph.find_node_by_coordinates(-70.6367999, -33.436918)
print(node)

6


In [7]:
node2 = osm_graph.find_node_by_id(0)
print(node2)

None


In [4]:
gtfs_data = GTFSData()

Loading GTFS data for <class 'pygtfs.gtfs_entities.Agency'>:
Loading GTFS data for <class 'pygtfs.gtfs_entities.Stop'>:
Loading GTFS data for <class 'pygtfs.gtfs_entities.Transfer'>:
Loading GTFS data for <class 'pygtfs.gtfs_entities.Route'>:
Loading GTFS data for <class 'pygtfs.gtfs_entities.Fare'>:
Loading GTFS data for <class 'pygtfs.gtfs_entities.FareRule'>:
Loading GTFS data for <class 'pygtfs.gtfs_entities.ShapePoint'>:
Loading GTFS data for <class 'pygtfs.gtfs_entities.Service'>:
Loading GTFS data for <class 'pygtfs.gtfs_entities.ServiceException'>:
Loading GTFS data for <class 'pygtfs.gtfs_entities.Trip'>:
Loading GTFS data for <class 'pygtfs.gtfs_entities.Frequency'>:
Loading GTFS data for <class 'pygtfs.gtfs_entities.StopTime'>:
Loading GTFS data for <class 'pygtfs.gtfs_entities.FeedInfo'>:
Loading GTFS data for <class 'pygtfs.gtfs_entities.Translation'>:
4 records read for <class 'pygtfs.gtfs_entities.Agency'>.
..11716 records read for <class 'pygtfs.gtfs_entities.Stop'>.
40

In [7]:
import os
fname = "gtfs.zip"
print(os.path.abspath(fname))

/home/lysorek/aves/notebooks/gtfs.zip


In [3]:
gtfs_data.get_route_graph_edges("506")

[('PA290', ''),
 ('PI3', ''),
 ('PI231', ''),
 ('PI4', ''),
 ('PI232', ''),
 ('PI5', ''),
 ('PI233', ''),
 ('PI6', 'PI235'),
 ('PI234', ''),
 ('PI235', ''),
 ('PI236', ''),
 ('PI8', ''),
 ('PI369', ''),
 ('PI378', ''),
 ('PI379', ''),
 ('PI380', ''),
 ('PI387', ''),
 ('PI388', ''),
 ('PI389', ''),
 ('PI1467', ''),
 ('PI1519', ''),
 ('PI1520', ''),
 ('PI565', ''),
 ('PI167', ''),
 ('PI1850', ''),
 ('PI528', ''),
 ('PI529', ''),
 ('PI38', ''),
 ('PI39', ''),
 ('PI40', ''),
 ('PI41', ''),
 ('PI42', ''),
 ('PI43', ''),
 ('PI44', ''),
 ('PI217', ''),
 ('PI218', ''),
 ('PI219', ''),
 ('PI220', ''),
 ('PI221', ''),
 ('PI222', 'PI222'),
 ('PI253', ''),
 ('PI416', ''),
 ('PI417', 'PI1578'),
 ('PI1578', ''),
 ('PI418', ''),
 ('PI420', ''),
 ('PI422', 'PI161'),
 ('PI161', ''),
 ('PI423', ''),
 ('PI424', ''),
 ('PI425', ''),
 ('PI426', ''),
 ('PI165', ''),
 ('PI166', ''),
 ('PI1845', ''),
 ('PI1847', ''),
 ('PI1851', ''),
 ('PI513', ''),
 ('PI564', 'PI233'),
 ('PI802', ''),
 ('PI803', ''),
 ('PI36

In [8]:
def find_route_nodes(osm_graph, gtfs_data, route_id, desired_orientation):
    # Checks the desired orientation validity
    if desired_orientation != "round" and desired_orientation != "return":
        # Invalid orientation
        return

    # Get the stops for the specified route
    stops = gtfs_data.route_stops.get(route_id, {})

    # Filter the stops that are visited on the desired orientation
    trip_stops = [stop_info for stop_info in stops.values() if stop_info["orientation"] == desired_orientation]

    # Using the find_nearest_node method, finds the (nearest) nodes for each stop of the route
    route_nodes = []
    for stop_info in trip_stops:
        stop_coords = stop_info["coordinates"]
        route_node = osm_graph.find_nearest_node(stop_coords[1], stop_coords[0])
        route_nodes.append(route_node)

    return route_nodes

def find_nearest_stops(osm_graph, gtfs_data, address, margin):
    """
    Given an address and a margin, returns a list of the nearest stop IDs and their orientations.

    Parameters:
    address (str): The address to search around.
    margin (float): The maximum distance (in kilometers) from the given address to include stops in the result.

    Returns:
    tuple: A tuple of two lists. The first list contains the stop IDs that are within the specified margin of the given address.
    The second list contains tuples of stop IDs and their orientations.
    """
    graph = osm_graph.graph

    v = osm_graph.address_locator(str(address))
    v_lon = graph.vertex_properties['lon'][v]
    v_lat = graph.vertex_properties['lat'][v]
    v_coords = (v_lon, v_lat)
    nearest_stops, orientations = gtfs_data.get_near_stop_ids(v_coords, margin)
    return nearest_stops, orientations

def available_route_finder(osm_graph, gtfs_data, source_node_id, target_node_id, departure_time, departure_date, margin, geolocator):
    graph = osm_graph.graph
    route_stops = gtfs_data.route_stops

    selected_path_nodes = [source_node_id, target_node_id]
    selected_path = []
    for node in selected_path_nodes:
        # Getting the coordinates
        lat, lon = graph.vertex_properties["lat"][node], graph.vertex_properties["lon"][node]
        selected_path.append((lat, lon))

    # Coordinates
    source_lat = selected_path[0][0]
    source_lon = selected_path[0][1]
    target_lat = selected_path[-1][0]
    target_lon = selected_path[-1][1]

    # Reversing the geocoding to get the full info on the addresses
    source = geolocator.reverse((source_lat,source_lon))
    target = geolocator.reverse((target_lat,target_lon))

    # Add markers for the nearest stop from the source and target points
    near_source_stops, source_orientations = find_nearest_stops(osm_graph, gtfs_data, source, margin)
    near_target_stops, target_orientations = find_nearest_stops(osm_graph, gtfs_data, target, margin)

    fixed_orientation = None
    valid_services = set()
    for source_stop_id in near_source_stops:
        for target_stop_id in near_target_stops:
            # Finding services that connects a stop near the source with one near the target
            services = gtfs_data.connection_finder(source_stop_id, target_stop_id)
            for service in services:
                # Getting the orientations on which the services visits the stops
                source_orientation = gtfs_data.get_bus_orientation(service, source_stop_id)
                target_orientation = gtfs_data.get_bus_orientation(service, target_stop_id)
                # Getting the sequence number (the ordinal value for the visited stop)
                source_sequence = int(gtfs_data.get_trip_sequence(service, source_stop_id))
                target_sequence = int(gtfs_data.get_trip_sequence(service, target_stop_id))
                if source_sequence > target_sequence:
                    # Travel not valid
                    continue
                if isinstance(source_orientation, list) and isinstance(target_orientation, list):
                    # If both source and target orientations are lists, check if any of the values match
                    valid_orientation = any(x in target_orientation for x in source_orientation) or any(x in source_orientation for x in target_orientation)
                    if valid_orientation and service not in valid_services:
                        valid_services.add(service)
                        fixed_orientation = [x for x in source_orientation if x in target_orientation][0] if [x for x in source_orientation if x in target_orientation] else source_orientation[0]
                elif source_orientation == target_orientation and service not in valid_services:  # Check if both stops are visited in the same orientation
                    valid_services.add(service)
                    fixed_orientation = target_orientation
                elif isinstance(source_orientation, list) and target_orientation in source_orientation and service not in valid_services:
                    valid_services.add(service)
                    fixed_orientation = target_orientation
                elif isinstance(target_orientation, list) and source_orientation in target_orientation and service not in valid_services:
                    valid_services.add(service)
                    fixed_orientation = source_orientation

    if len(valid_services) == 0:
        print("Error: There are no available services right now to go to the desired destination.")
        print("Possible reasons: no routes that have stops near the source and target addresses.")
        print("You can try changing the search margin and try again.")
        return

    # Checking flags for time and date
    nighttime_flag = gtfs_data.is_nighttime(departure_time)
    rush_hour_flag = gtfs_data.is_rush_hour(departure_time)
    holiday_flag = gtfs_data.is_holiday(departure_date)

    if holiday_flag:
        # Rush hours modifications (like express routes) doesn't work on holidays
        rush_hour_flag = 0

    # Nighttime check
    daily_time_services = gtfs_data.check_night_routes(valid_services, nighttime_flag)

    if daily_time_services is None:
        print("Error: There are no available services right now to go to the desired destination.")
        print("Possible reasons: Source hour is during nighttime.")
        print("Please take into account that nighttime goes between 00:00:00 and 05:30:00.")
        return

    # Rush hour check
    valid_services = gtfs_data.check_express_routes(daily_time_services, rush_hour_flag)
    # Sorting the valid services
    valid_services = list(set(valid_services))

    # Filters the stops to get only the valid ones
    valid_source_stops = [stop_id for stop_id in near_source_stops if any(route_id in valid_services for route_id in route_stops.keys() if stop_id in route_stops[route_id])]
    valid_source_stops = list(set(valid_source_stops))
    valid_target_stops = [stop_id for stop_id in near_target_stops if any(route_id in valid_services for route_id in route_stops.keys() if stop_id in route_stops[route_id])]
    valid_target_stops = list(set(valid_target_stops))

    info = [selected_path, source, target, valid_source_stops, valid_target_stops, valid_services, fixed_orientation, near_source_stops, near_target_stops]

    return info

def find_best_option(osm_graph, gtfs_data, selected_path, departure_time, departure_date, valid_source_stops, valid_target_stops, valid_services, fixed_orientation):

    graph = osm_graph.graph
    route_stops = gtfs_data.route_stops

    best_option = None
    best_option_times = None
    initial_source_time = timedelta(hours=departure_time.hour, minutes=departure_time.minute, seconds=departure_time.second)
    source_time = None

    # Set of valid orientations defined by the source
    best_option_orientation = None

    # Checks for valid target stops
    valid_target = []
    for target_stop in valid_target_stops:
        target_routes = gtfs_data.get_routes_at_stop(target_stop)
        valid_target.extend(target_routes)
    valid_target = list(dict.fromkeys(valid_target))

    waiting_time = None
    initial_delta_time = None

    for stop_id in valid_source_stops:
        # Gets the services that visits the stop and filters the valid ones (source)
        routes_at_stop = gtfs_data.get_routes_at_stop(stop_id)
        valid_stop_services = [stop_id for stop_id in valid_services if stop_id in routes_at_stop]
        for valid_service in valid_stop_services:
            # Gets the arrival times and service orientation for this valid service
            arrival_info = gtfs_data.get_arrival_times(valid_service, stop_id, departure_date)
            if arrival_info is not None and arrival_info[0] == fixed_orientation:
                orientation = arrival_info[0]
                flag = False # A flag to correct the orientation
                for target_stop_id in valid_target_stops:
                    flag = False
                    # Gets the services that visits the stop and filters the valid ones (target)
                    target_stop_routes = gtfs_data.get_routes_at_stop(target_stop_id)
                    if valid_service in target_stop_routes:
                        target_orientation = gtfs_data.get_bus_orientation(valid_service, target_stop_id)
                        if orientation not in target_orientation:
                            flag = True
                            continue

                if flag:
                    continue

                if valid_service not in valid_target:
                    continue

                arrival_times = arrival_info[1]

                # Gets the coordinates for the stop and the source location
                stop_coords = gtfs_data.get_stop_coords(stop_id)

                # Base Coordinates
                source_lat = selected_path[0][0]
                source_lon = selected_path[0][1]
                target_lat = selected_path[-1][0]
                target_lon = selected_path[-1][1]

                location_coords = (source_lon, source_lat)

                # Consider the travel time between the source location and the stop
                # The average walking speed is between 4 km/h and 6.5 km/h, so we consider it as 5 km/h
                initial_walking_time = gtfs_data.walking_travel_time(stop_coords, location_coords, 5)
                this_delta_time = timedelta(seconds=initial_walking_time)

                initial_time = (datetime.combine(date.today(), departure_time) + this_delta_time).time().strftime("%H:%M:%S")
                initial_time = datetime.strptime(initial_time, "%H:%M:%S").time()
                source_time = timedelta(hours=initial_time.hour, minutes=initial_time.minute, seconds=initial_time.second)

                # Getting the times of the next buses arrival to the stop
                time_until_next_buses = gtfs_data.get_time_until_next_bus(arrival_times, initial_time, departure_date)

                if not time_until_next_buses:
                    print("Error: There are no available services right now to go to the desired destination.")
                    print("Possible reasons: There are no buses left today. Maybe the source hour is too close to the ending time for the service.")
                    return


                # Print the time until the next three buses in the desired format
                for i in range(len(time_until_next_buses)):
                    minutes, seconds = time_until_next_buses[i]
                    waiting_time = timedelta(minutes=minutes, seconds=seconds)
                    arrival_time = source_time + waiting_time
                    time_string = gtfs_data.timedelta_to_hhmm(arrival_time)

                    target_orientation = gtfs_data.get_bus_orientation(valid_service, target_stop_id)

                    # Update the best option
                    if (best_option is None or (arrival_time < best_option[2])) and orientation == fixed_orientation:
                        best_option = (valid_service, stop_id, arrival_time, waiting_time)
                        best_option_times = time_until_next_buses
                        best_option_orientation = orientation
                        if initial_delta_time is None or this_delta_time < initial_delta_time:
                            initial_delta_time = this_delta_time

    best_option_info = [best_option, initial_delta_time, best_option_times, initial_source_time, valid_target, best_option_orientation]
    return best_option_info


In [25]:
from geopy.geocoders import Nominatim
from datetime import datetime, date, time, timedelta
import time as tm
import pandas as pd
import folium


osm_graph = OSMGraph()
gtfs_data = GTFSData()

# Define the function to set the optimal zoom level for the map
def fit_bounds(points, m):
    """
    Fits the map bounds to a given set of points.

    Parameters:
    points (list): A list of points in the format [(lat1, lon1), (lat2, lon2), ...].
    m (folium.Map): A folium map object.
    """
    df = pd.DataFrame(points).rename(columns={0:'Lat', 1:'Lon'})[['Lat', 'Lon']]
    sw = df[['Lat', 'Lon']].min().values.tolist()
    ne = df[['Lat', 'Lon']].max().values.tolist()
    m.fit_bounds([sw, ne])

# Lite implementation of the Connection Scan Algorithm
def connection_scan_lite(source_address, target_address, departure_time, departure_date, margin):
    """
    The Connection Scan Algorithm is applied to search for travel routes from the source to the destination,
    given a departure time and date. By default, the algorithm uses the current date and time of the system.
    However, you can specify a different date or time if needed. The margin value let's the user determine
    the range on which a stop is considered as "near" to the source or target addresses.
    Note: this is a "lite" version of CSA that maps possible routes without doing any transfers.

    Parameters:
    source_address (string): the source address of the travel.
    target_address (string): the destination address of the travel.
    departure_time (time): the time at which the travel should start.
    departure_date (date): the date on which the travel should be done.
    margin (float): margin of distance between the nodes and the valid stops.

    Returns:
    folium.Map: the map of the best travel route. It returns None if no routes are found.
    """
    # Getting the nodes corresponding to the addresses
    source_node = osm_graph.address_locator(source_address)
    target_node = osm_graph.address_locator(target_address)

    # Instance of the route_stops dictionary
    route_stops = gtfs_data.route_stops

    if source_node is not None and target_node is not None:
        # Convert source and target node IDs to integers
        source_node_graph_id = osm_graph.graph.vertex_properties["graph_id"][source_node]
        target_node_graph_id = osm_graph.graph.vertex_properties["graph_id"][target_node]

        print("Both addresses have been found.")
        print("Processing...")

        geolocator = Nominatim(user_agent="ayatori")

        route_info = available_route_finder(osm_graph, gtfs_data, source_node_graph_id, target_node_graph_id, departure_time, departure_date, margin, geolocator)

        selected_path = route_info[0]
        source = route_info[1]
        target = route_info[2]
        valid_source_stops = route_info[3]
        valid_target_stops = route_info[4]
        valid_services = route_info[5]
        fixed_orientation = route_info[6]
        near_source_stops = route_info[7]
        near_target_stops = route_info[8]

        # Create a map that shows the correct public transport services to take from the source to the target
        m = folium.Map(location=[selected_path[0][0], selected_path[0][1]], zoom_start=13)

        # Add markers for the source and target points
        folium.Marker(location=[selected_path[0][0], selected_path[0][1]], popup="Origen: {}".format(source), icon=folium.Icon(color='green')).add_to(m)
        folium.Marker(location=[selected_path[-1][0], selected_path[-1][1]], popup="Destino: {}".format(target), icon=folium.Icon(color='red')).add_to(m)

        print("")
        print("Routes have been found.")
        print("Calculating the best route and getting the arrival times for the next buses...")

        best_option_info = find_best_option(osm_graph, gtfs_data, selected_path, departure_time, departure_date, valid_source_stops, valid_target_stops, valid_services, fixed_orientation)

        best_option = best_option_info[0]
        initial_delta_time = best_option_info[1]
        best_option_times = best_option_info[2]
        initial_source_time = best_option_info[3]
        valid_target = best_option_info[4]
        best_option_orientation = best_option_info[5]

        if best_option is None:
            print("Error: There are no available services right now to go to the desired destination.")
            print("Possible reasons: the valid routes are not available at the specified date or starting time.")
            print("Please take into account that some routes have trips only during or after nighttime, which goes between 00:00:00 and 05:30:00")
            return

        arrival_time = None

        source_stop = best_option[1]

        # Parse Metro stations's names
        metro_stations_dict = gtfs_data.parse_metro_stations("stops.txt")
        possible_metro_name = gtfs_data.is_metro_station(best_option[1], metro_stations_dict)
        if possible_metro_name is not None:
            source_stop = possible_metro_name

        walking_minutes, walking_seconds = gtfs_data.timedelta_separator(initial_delta_time)

        print("")
        print("To go from: {}".format(source))
        print("To: {}".format(target))
        best_arrival_time_str = gtfs_data.timedelta_to_hhmm(best_option[2])
        print("")
        if possible_metro_name is not None: # Changes the printing to adapt for the use of Metro
            print("The best option is to walk for {} minutes and {} seconds to {} Metro station, and take the line {}.".format(walking_minutes, walking_seconds, source_stop, best_option[0]))
            print("The next train arrives at {}.".format(best_arrival_time_str))
            print("The other two next trains arrives in:")
        else:
            print("The best option is to walk for {} minutes and {} seconds to stop {}, and take the route {}.".format(walking_minutes, walking_seconds, source_stop, best_option[0]))
            print("The next bus arrives at {}.".format(best_arrival_time_str))
            print("The other two next buses arrives in:")

        # Format and prints the times
        for i in range(len(best_option_times)):
            if i == 0:
                continue
            minutes, seconds = best_option_times[i]
            waiting_time = timedelta(minutes=minutes, seconds=seconds)
            arrival_time = initial_source_time + waiting_time
            time_string = gtfs_data.timedelta_to_hhmm(arrival_time)
            print(f"{minutes} minutes, {seconds} seconds ({time_string})")

        # Base Coordinates
        source_lat = selected_path[0][0]
        source_lon = selected_path[0][1]
        target_lat = selected_path[-1][0]
        target_lon = selected_path[-1][1]


        for stop_id in near_source_stops:
            if stop_id in valid_source_stops:
                # Filters the data for selecting the best source option for its mapping
                stop_coords = gtfs_data.get_stop_coords(str(stop_id))
                routes_at_stop = gtfs_data.get_routes_at_stop(stop_id)
                valid_stop_services = [stop_id for stop_id in valid_services if stop_id in routes_at_stop]

                for service in valid_stop_services:
                    if service == best_option[0] and stop_id == best_option[1]:
                        # Maps the best option to take the best option's service
                        folium.Marker(location=[stop_coords[1], stop_coords[0]],
                              popup="Mejor opción: subirse al recorrido {} en la parada {}.".format(best_option[0], best_option[1]),
                              icon=folium.Icon(color='cadetblue', icon='plus')).add_to(m)
                        initial_distance = [(selected_path[0][0], selected_path[0][1]),(stop_coords[1], stop_coords[0])]
                        folium.PolyLine(initial_distance,color='black',dash_array='10').add_to(m)

        for stop_id in near_target_stops:
            if stop_id in valid_target_stops:
                # Filters the data for the possible target stops
                stop_coords = gtfs_data.get_stop_coords(str(stop_id))
                routes_at_stop = gtfs_data.get_routes_at_stop(stop_id)
                valid_stop_services = [stop_id for stop_id in valid_services if stop_id in routes_at_stop]

        target_orientation = None
        for service in valid_target:
            if service == best_option[0]:
                # Generates the trip id to get the approximated travel time
                if fixed_orientation == "round":
                    trip_id = service + "-I-" + gtfs_data.get_trip_day_suffix(departure_date)
                else:
                    trip_id = service + "-R-" + gtfs_data.get_trip_day_suffix(departure_date)

                best_travel_time = None
                selected_stop = None
                for stop_id in valid_target_stops:
                    # Calculates the travel time while taking the service
                    bus_time = gtfs_data.get_travel_time(trip_id, [best_option[1], stop_id])
                    target_stop_routes = gtfs_data.get_routes_at_stop(stop_id)
                    target_orientation = gtfs_data.get_bus_orientation(best_option[0], stop_id)
                    if service in target_stop_routes and bus_time > timedelta() and (best_travel_time is None or bus_time < best_travel_time):
                        # Checking the correct orientation
                        if fixed_orientation in target_orientation:
                            # Updates the selected target stop and travel time
                            best_travel_time = bus_time
                            selected_stop = stop_id

                # Gets the coordinates for the target stop
                selected_stop_coords = gtfs_data.get_stop_coords(selected_stop)
                # Separates the best travel time for the printing
                minutes, seconds = gtfs_data.timedelta_separator(best_travel_time)

                # Gets the sequence number for the source and target stops
                seq_1 = route_stops[best_option[0]][best_option[1]]["sequence"]
                seq_2 = route_stops[best_option[0]][selected_stop]["sequence"]

                # Store the coordinates of the visited stops for their mapping
                visited_stops = []

                # Iterate over the stops of the selected route
                for stop_id, stop_info in route_stops[best_option[0]].items():
                    # Check if the stop sequence number is between seq_1 and seq_2
                    seq_number = stop_info["sequence"]
                    this_orientation = gtfs_data.get_bus_orientation(best_option[0], stop_id)
                    if best_option_orientation in this_orientation and seq_1 <= seq_number <= seq_2:
                        # Append the coordinates of the stop to the visited_stops list
                        lat = stop_info["coordinates"][0]
                        lon = stop_info["coordinates"][1]
                        visited_stops.append((seq_number, (lon, lat)))

                # Sorts the visited stops and gets their coordinates
                visited_stops_sorted = sorted(visited_stops, key=lambda x: x[0])
                visited_stops_sorted_coords = [x[1] for x in visited_stops_sorted]

                # Checks if the stop is a Metro Station (they are stored as a number)
                possible_metro_target_name = gtfs_data.is_metro_station(selected_stop, metro_stations_dict)

                if possible_metro_target_name is not None:
                    selected_stop = possible_metro_target_name

                print("")
                if possible_metro_name is not None: # Changes the message
                    print("You will get off the train on {} station after {} minutes and {} seconds.".format(selected_stop, minutes, seconds))
                else:
                    print("You will get off the bus on stop {} after {} minutes and {} seconds.".format(selected_stop, minutes, seconds))

                # Maps the best option to get off the best option's service
                folium.Marker(location=[selected_stop_coords[1], selected_stop_coords[0]],
                      popup="Mejor opción: bajarse del recorrido {} en la parada {}.".format(best_option[0], selected_stop),
                      icon=folium.Icon(color='cadetblue', icon='plus')).add_to(m)
                ending_distance = [(selected_path[-1][0], selected_path[-1][1]),(selected_stop_coords[1], selected_stop_coords[0])]
                folium.PolyLine(ending_distance,color='black',dash_array='10').add_to(m)

                # Create a polyline connecting the visited stops
                folium.PolyLine(visited_stops_sorted_coords, color='red').add_to(m)

                # Gets the coordinates for the target stop and target location
                final_stop_coords = (selected_stop_coords[1], selected_stop_coords[0])
                final_location_coords = (target_lat, target_lon)

                # Calculates the walking time between the target stop and location
                end_walking_time = gtfs_data.walking_travel_time(final_stop_coords, final_location_coords, 5)
                end_delta_time = timedelta(seconds=end_walking_time)
                end_walk_min, end_walk_sec = gtfs_data.timedelta_separator(end_delta_time)

                # Time walking to stop + waiting the bus + riding the bus + walking to target destination
                total_time = initial_delta_time + best_option[3] + best_travel_time + end_delta_time
                minutes, seconds = gtfs_data.timedelta_separator(total_time)

                # Parses the time for the printing
                destination_time = initial_source_time + total_time
                time_string = gtfs_data.timedelta_to_hhmm(destination_time)
                print(f"After that, you need to walk for {end_walk_min} minutes and {end_walk_sec} seconds to arrive at the target spot.")
                print(f"Total travel time: {minutes} minutes, {seconds} seconds. You will arrive your destination at {time_string}.")

        # Set the optimal zoom level for the map
        fit_bounds(selected_path, m)

        return m
    else:
        # Empty return
        return


def algorithm_commands():
    """
    Process the inputs given by the user to run the Connection Scan Algorithm.
    """

    # System's date and time
    now = datetime.now()

    # Date formatting
    today = date.today()
    today_format = today.strftime("%d/%m/%Y")

    # Time formatting
    moment = now.strftime("%H:%M:%S")
    used_time = datetime.strptime(moment, "%H:%M:%S").time()

    # User inputs
    # Date and time
    source_date = input(
        "Enter the travel's date, in DD/MM/YYY format (press Enter to use today's date) : ") or today_format
    print(source_date)
    source_hour = input(
        "Enter the travel's start time, in HH:MM:SS format (press Enter to start now) : ") or used_time
    if source_hour != used_time:
        source_hour = datetime.strptime(source_hour, "%H:%M:%S").time()
    print(source_hour)

    # Source address
    source_example = "Beauchef 850, Santiago"
    while True:
        source_address = input(
            "Enter the starting point's address, in 'Street #No, Province' format (Ex: 'Beauchef 850, Santiago'):") or source_example
        if source_address.strip() != '':
            break

    # Destination address
    destination_example = "Campus Antumapu Universidad de Chile, Santiago"
    while True:
        target_address = input(
            "Enter the ending point's address, in 'Street #No, Province' format (Ex: 'Campus Antumapu Universidad de Chile, Santiago'):")or destination_example
        if target_address.strip() != '':
            break

    # You can change the final number (the margin) as you please. Bigger numbers increase the range for near stops
    # But bigger numbers imply bigger execution times
    best_route_map = connection_scan_lite(source_address, target_address, source_hour, source_date, 0.2)

    if not best_route_map:
        print("")
        print("Something went wrong. Please try again later.")
        return

    # Displays the results and return
    return best_route_map


Downloaded Protobuf data 'Santiago.osm.pbf' (20.13 MB) to:
'/home/lysorek/aves/notebooks/Santiago.osm.pbf'
GETTING OSM NODES...
DONE
GETTING OSM EDGES...
OSM DATA HAS BEEN SUCCESSFULLY RECEIVED
Loading GTFS data for <class 'pygtfs.gtfs_entities.Agency'>:
Loading GTFS data for <class 'pygtfs.gtfs_entities.Stop'>:
Loading GTFS data for <class 'pygtfs.gtfs_entities.Transfer'>:
Loading GTFS data for <class 'pygtfs.gtfs_entities.Route'>:
Loading GTFS data for <class 'pygtfs.gtfs_entities.Fare'>:
Loading GTFS data for <class 'pygtfs.gtfs_entities.FareRule'>:
Loading GTFS data for <class 'pygtfs.gtfs_entities.ShapePoint'>:
Loading GTFS data for <class 'pygtfs.gtfs_entities.Service'>:
Loading GTFS data for <class 'pygtfs.gtfs_entities.ServiceException'>:
Loading GTFS data for <class 'pygtfs.gtfs_entities.Trip'>:
Loading GTFS data for <class 'pygtfs.gtfs_entities.Frequency'>:
Loading GTFS data for <class 'pygtfs.gtfs_entities.StopTime'>:
Loading GTFS data for <class 'pygtfs.gtfs_entities.FeedIn

In [134]:
gtfs_data.map_route_stops(["L1","L2","L3","L4","L4A","L5","L6"], 0, 0)

In [11]:
algorithm_commands()

Enter the travel's date, in DD/MM/YYY format (press Enter to use today's date) : 
25/09/2023
Enter the travel's start time, in HH:MM:SS format (press Enter to start now) : 10:00:00
10:00:00
Enter the starting point's address, in 'Street #No, Province' format (Ex: 'Beauchef 850, Santiago'):Punto Centro, Quilicura
Enter the ending point's address, in 'Street #No, Province' format (Ex: 'Campus Antumapu Universidad de Chile, Santiago'):Universidad de Chile, Santiago
Both addresses have been found.
Processing...

Routes have been found.
Calculating the best route and getting the arrival times for the next buses...

To go from: Mall Arauco Quilicura, Avenida Bernardo O'Higgins, Doctor Dussert, Quilicura, Provincia de Santiago, Región Metropolitana de Santiago, 8700000, Chile
To: Librería y Editorial Universitaria, 1050, Avenida Libertador Bernardo O'Higgins, Barrio París-Londres, Santiago, Provincia de Santiago, Región Metropolitana de Santiago, 8331009, Chile

The best option is to walk for

algorithm_commands()

In [139]:
from collections import defaultdict
def calculate_stop_density(osm_graph, gtfs_data):
    # Get the stops and their coordinates from the OSMGraph
    stops = gtfs_data.stops
    stop_coords = {stop: gtfs_data.get_stop_coords(stop) for stop in stops}

    # Get the number of routes that stop at each stop
    stop_routes = defaultdict(set)
    for route_id, graph in gtfs_data.graphs.items():
        for e in graph.edges():
            u = graph.edge_properties["u"][e]
            v = graph.edge_properties["v"][e]
            stop_routes[u].add(route_id)
            stop_routes[v].add(route_id)

    # Create a map with markers for each stop, colored by the number of routes that stop there
    m = folium.Map(location=[-33.45, -70.66], zoom_start=11)
    for stop, coord in stop_coords.items():
        num_routes = len(stop_routes[stop])
        if num_routes == 1:

            #color = "darkred" if num_routes >= 15 else "red" if num_routes >= 10 else "orange" if num_routes >= 7 else "lightgreen" if num_routes >= 5 else "green" if num_routes >= 3 else "blue" if num_routes >= 2 else "purple" if num_routes >= 1 else darkpurple
            #color = "darkred" if num_routes >= 15 else "red" if num_routes >= 12 else "orange"
            color = "purple"
            folium.Marker(location=[coord[1], coord[0]], icon=folium.Icon(color=color,icon='fa-bus')).add_to(m)

    return m

In [26]:
import csv
santiago_coords = (-33.4489, -70.6693)
m = folium.Map(location=santiago_coords, zoom_start=12)
stops_with_long_wait_times = gtfs_data.get_stops_with_long_times()
print(stops_with_long_wait_times)

# Add a marker for every stop with long wait times
for stop_id in stops_with_long_wait_times:
    stop = sched.stops_by_id(stop_id)[0]
    coords = (stop.stop_lat, stop.stop_lon)
    folium.Marker(location=coords, icon=folium.Icon(color='white')).add_to(m)
m

KeyboardInterrupt: 