In [76]:
import networkx as nx
import json
import math
import heapq # priority queue
import locale
from datetime import datetime, timedelta

In [77]:
class Data:
    def __init__(self):
        self.data = self.import_data()
        self.graph = self.create_graph(self.data)

    @staticmethod
    def import_data():
        with open('../data/stations.json', encoding='utf-8') as file:
            data = json.load(file)
        return data

    @staticmethod
    def create_graph(data):
        g = nx.Graph()
        for s1 in data['stations']:
            g.add_node(s1['name'], coordinates=s1['coordinates'], line=s1['line'])
            for s2, distance in s1['connected_to'].items():
                 g.add_edge(s1['name'], s2, weight=distance)
        return g

    def get_data(self):
        return self.data
    
    def get_graph(self):
        return self.graph

In [None]:
class Methods:
    LINE_COLORS = {
        0: "#E6E6E6",
        1: "#d35590",
        3: "#9e9a3a",
        7: "#df8600",
        9: "#8d5544",
        12: "#b89d4e"
    }

    LINE_INTERVALS = {
        1: 4,
        3: 3,
        7: 4,
        9: 3,
        12: 2}

    def __init__(self, data_class):
        self.data = data_class.get_data()

    def get_lines(self, station):
        for s in self.data["stations"]:
            if station == s["name"]:
                lines = s["line"]
                if isinstance(lines, (list, tuple)):
                    return [line for line in lines]
                else:
                    return [lines]
        return None

    def get_line_between(self, station1, station2):
        lines1 = self.get_lines(station1)
        lines2 = self.get_lines(station2)
        common = set(lines1) & set(lines2)
        return next(iter(common), None)

    def get_all_stations(self):
        stations = [s["name"] for s in self.data.get("stations", [])]
        return sorted(stations)

    def get_colors_of_path(self, path):
        lines = []
        for i in range(len(path) - 1):
            station1 = path[i]
            station2 = path[i + 1]
            line = self.get_line_between(station1, station2) if station1 != station2 else 0
            lines.append(line)
        return [self.LINE_COLORS.get(line) for line in lines]

    def get_line_interval(self, line):
        return self.LINE_INTERVALS.get(line)



In [79]:
import math
import heapq # priority queue
import datetime
import locale
from data import Data
from methods import Methods
from datetime import datetime, timedelta

class Al:
    velocity = 600  # 36 km/h = 600 m/min
    transshipment = 6  # 6 min
    stop_time = 0.5  # 30 s = 0.5 min
    opening_time = {0: 5 * 60, 1: 5 * 60, 2: 5 * 60, 3: 5 * 60, 4: 5 * 60, 5: 6 * 60,
                    6: 7 * 60}  # in minutes from midnight

    @staticmethod
    def h(graph, node1, node2):
        return math.dist(graph.nodes[node1]['coordinates'], graph.nodes[node2]['coordinates'])

    def astar_algorithm(self, graph, start_point, end_point, departure_date, departure_time):
        methods = Methods(Data())
        dt = datetime.strptime(f"{departure_date} {departure_time}", "%d %B %Y %H:%M")
        weekday = dt.weekday()
        openning = self.opening_time.get(weekday)
        dt_in_minites = dt.hour * 60 + dt.minute
        real_departure_minutes = max(dt_in_minites, openning)
        real_departure_dt = dt.replace(hour=real_departure_minutes // 60, minute=real_departure_minutes % 60)

        open_list = [(0, start_point)]  # (f, node)
        visited = {}

        g_acc = {station: float('inf') for station in graph.nodes()}
        g_acc[start_point] = 0

        f_acc = {station: float('inf') for station in graph.nodes()}
        f_acc[start_point] = g_acc[start_point] + self.h(graph, start_point, end_point)

        line_acc = {station: None for station in graph.nodes()}
        line_acc[start_point] = None

        while len(open_list) > 0:
            f, current = heapq.heappop(open_list)

            if current == end_point:
                path = []
                times = []
                while current != start_point:
                    predecessor, has_transshipment = visited[current]

                    path.append(current)
                    next_time = real_departure_dt + timedelta(minutes=g_acc[current])
                    next_time_str = next_time.strftime("%H:%M")
                    times.append(next_time_str)
                    if has_transshipment:
                        next_time = real_departure_dt + timedelta(minutes=g_acc[predecessor])
                        next_time += timedelta(minutes=self.transshipment)
                        next_time_str = next_time.strftime("%H:%M")
                        times.append(next_time_str)
                        path.append(predecessor)

                    # current = visited[current]
                    current = predecessor
                path.append(start_point)
                next_time = real_departure_dt
                next_time_str = next_time.strftime("%H:%M")
                times.append(next_time_str)
                path.reverse()
                times.reverse()
                locale.setlocale(locale.LC_TIME, 'es_ES.UTF-8')
                arrival_dt = real_departure_dt + timedelta(minutes=g_acc[end_point])
                real_departure_dt = real_departure_dt.strftime("%H:%M, %A, %d %B %Y")
                arrival_dt = arrival_dt.strftime("%H:%M, %A, %d %B %Y")

                return path, times, g_acc[end_point], real_departure_dt, arrival_dt

            for n in graph.neighbors(current):
                possible_g = g_acc[current] + graph.get_edge_data(current, n)['weight'] / self.velocity + self.stop_time
                line_between = methods.get_line_between(current, n)
                has_trasnshipment = False
                if line_acc[current] is not None and line_acc[current] != line_between:
                    has_trasnshipment = True
                    possible_g += self.transshipment
                    interval_between = methods.get_line_interval(int(line_between))
                    if interval_between is not None:
                        possible_g += (interval_between - (g_acc[current] % interval_between)) % interval_between

                if possible_g < g_acc[n]:
                    visited[n] = (current, has_trasnshipment)
                    g_acc[n] = possible_g
                    f_acc[n] = possible_g + self.h(graph, n, end_point)
                    line_acc[n] = line_between
                    heapq.heappush(open_list, (f_acc[n], n))

        return None, None, None, None, None

In [80]:
g = Data().get_graph()
path, times, time, dep_date, arr_date =  Al().astar_algorithm(g, 'Observatorio', 'Universidad', '24 Noviembre 2025', '08:00')
print('Path: %s\nTimes: %s\nTime: %.2f min\nDeparture Date: %s\nArrival Date: %s' % (path, times, time, dep_date, arr_date))

Path: ['Observatorio', 'Tacubaya', 'Tacubaya', 'San Pedro de los Pinos', 'San Antonio', 'Mixcoac', 'Mixcoac', 'Insurgentes Sur', 'Hospital 20 de Noviembre', 'Zapata', 'Zapata', 'CoyoacÃ¡n', 'Viveros', 'M.A. De Quevedo', 'Copilco', 'Universidad']
Times: ['08:00', '08:02', '08:08', '08:12', '08:13', '08:15', '08:21', '08:23', '08:25', '08:26', '08:32', '08:35', '08:37', '08:39', '08:41', '08:44']
Time: 44.64 min
Departure Date: 08:00, lunes, 24 noviembre 2025
Arrival Date: 08:44, lunes, 24 noviembre 2025
