In [341]:
import numpy as np
import pandas as pd
from typing import List, Dict, Tuple, Callable
from dataclasses import dataclass
from datetime import time
import time as tm
import sys

class Stop:
    def __init__(self, name: str, latitude: float, longitude: float) -> None:
        self.name = name
        self.latitude = latitude
        self.longitude = longitude
    def __str__(self) -> str:
        return self.name + "[" + str(self.latitude) + ", " + str(self.longitude) + "]"
    def __repr__(self) -> str:
        return self.name + ", " + str(self.latitude) + "," + str(self.longitude)
    def __hash__(self) -> int:
        return hash(str(self.name))
    def __eq__(self, __value: object) -> bool:
        return isinstance(__value, Stop) and str(self.name) == str(__value.name)
    
@dataclass
class Route:
    line: str
    departure_minutes: int
    arrival_minutes: int

@dataclass
class StopRecord():
    min_arrival_minutes: int
    last_stop: Stop
    last_route: Route
    
def time_to_minutes(time_str: str) -> int:
    time_arr = time_str.split(':')
    hour=int(time_arr[0])
    minute=int(time_arr[1])
    return 60 * (hour % 24) + minute
def format_time(abnormal_time: int) -> str:
    return str(abnormal_time // 60).zfill(2) + ":" + str(abnormal_time % 60).zfill(2)

In [342]:
from geopy.distance import geodesic
def euclidean(next_node: Stop, end_node: Stop) -> float:
    return geodesic((next_node.latitude,next_node.longitude), (end_node.latitude,end_node.longitude)).kilometers / 60

In [343]:
real_timetable = pd.read_csv("connection_graph (1).csv")
real_timetable.set_index(keys='Unnamed: 0', inplace=True)
real_timetable.drop_duplicates(ignore_index=True, inplace=True)

  real_timetable = pd.read_csv("connection_graph (1).csv")


In [344]:
graph: Dict[Stop, Dict[Stop, Dict[str,List[Route]]]] = {}
for stop in real_timetable.itertuples():
    start_stop: Stop = Stop(stop.start_stop, stop.start_stop_lat, stop.start_stop_lon) 
    end_stop: Stop = Stop(stop.end_stop, stop.end_stop_lat, stop.end_stop_lon)
    route: Route = Route(line=stop.line, departure_minutes=time_to_minutes(stop.departure_time), arrival_minutes=time_to_minutes(stop.arrival_time))
    if start_stop not in graph.keys():
        graph[start_stop] = {end_stop: {route.line: [route]}}
    elif end_stop in graph[start_stop].keys():
        if route.line in graph[start_stop][end_stop].keys():
            graph[start_stop][end_stop][route.line].append(route)
        else:
            graph[start_stop][end_stop][route.line] = [route]
    else:
        graph[start_stop][end_stop] = {route.line: [route]}
    if end_stop not in graph.keys():
        graph[end_stop] = {}
for stop, neighbors in graph.items():
    for _, lines in neighbors.items():
        for _, routes in lines.items():
            routes.sort(key=lambda rt: rt.arrival_minutes)   

In [356]:
class Solution:    
    def __init__(self) -> None:
        self.graph = graph     
    def _proceed(self, a_start: Stop, b_end:Stop, start_time: str, heuristic: Callable[[Stop,Stop],float] = euclidean):
        change_minutes = 1
        time = time_to_minutes(start_time)
        self.stops_records: Dict[Stop, StopRecord] = {} 
        for key in self.graph.keys():
            self.stops_records[key] = StopRecord(1e10, 1e10, None, None)
        self.stops_records[a_start] = StopRecord(time, time, None, None)
        unseen_stops = list(self.graph.keys())
        seen_stops = []
        while(len(unseen_stops) > 0):
            curr_stop = unseen_stops[0]
            for stop in unseen_stops:
                if self.stops_records[stop].f < self.stops_records[curr_stop].f:
                    curr_stop = stop
            if curr_stop == b_end:
                return
            unseen_stops.remove(curr_stop)
            seen_stops.append(curr_stop)
            # normalize arrival time to 24h
            g_partial = self.stops_records[curr_stop].g % (24*60) 
            for neighbor, routes in self.graph[curr_stop].items():             
                min_arrival_id = -1
                for i in range(len(routes)):
                    # take time for a change of line
                    change_fine = 0 if self.stops_records[curr_stop].last_route is not None and routes[i].line == self.stops_records[curr_stop].last_route.line else change_minutes
                    if g_partial + change_fine <= routes[i].departure_minutes:
                        min_arrival_id = i
                        break
                # if there are no more this day, take the first one after midnight...
                g = routes[min_arrival_id].arrival_minutes if min_arrival_id > -1 else 24*60+routes[0].arrival_minutes
                # ...therefore, this is the chosen route id
                min_arrival_id = max(min_arrival_id,0)
                # derive real cost including days already travelled (anti-prior-normalization)
                g += (self.stops_records[curr_stop].g // (24*60)) * 24*60
                # derive real cost adding day if you were on a road during midnight
                g += 0 if routes[min_arrival_id].arrival_minutes >= routes[min_arrival_id].departure_minutes else 24*60
                
                if neighbor not in unseen_stops and neighbor not in seen_stops:
                    unseen_stops.append(neighbor)
                    self.stops_records[neighbor].g = g
                    self.stops_records[neighbor].f = g + heuristic(neighbor, b_end)
                    self.stops_records[neighbor].last_stop = curr_stop
                    self.stops_records[neighbor].last_route = routes[min_arrival_id]
                else:
                    if g < self.stops_records[neighbor].g:
                        self.stops_records[neighbor].g = g
                        self.stops_records[neighbor].f = g + heuristic(neighbor, b_end)
                        self.stops_records[neighbor].last_stop = curr_stop
                        self.stops_records[neighbor].last_route = routes[min_arrival_id]
                        if neighbor in seen_stops:
                            unseen_stops.append(neighbor)
                            seen_stops.remove(neighbor)

In [357]:
start = Stop("Trawowa",51.09727033,16.94939119)
end = Stop("Stanisławowska (W.K. Formaty)",0,0)
start_time = '23:17:00'
s = Solution()
s._proceed(a_start=start, b_end=end, start_time=start_time)

TypeError: StopRecord.__init__() takes 4 positional arguments but 5 were given

In [269]:
print("Values range for departure time: " + real_timetable['departure_time'].min() + " - " + real_timetable['departure_time'].max())
print("Values range for arrival time: " + real_timetable['arrival_time'].min() + " - " + real_timetable['arrival_time'].max())
print("Values range for start-stop latitude: " + str(real_timetable['start_stop_lat'].min()) + " - " + str(real_timetable['start_stop_lat'].max()))
print("Values range for start-stop longitude: " + str(real_timetable['start_stop_lon'].min()) + " - " + str(real_timetable['start_stop_lon'].max()))

Values range for departure time: 03:34:00 - 30:03:00
Values range for arrival time: 03:36:00 - 30:05:00
Values range for start-stop latitude: 50.98042601 - 51.26556584
Values range for start-stop longitude: 16.695853 - 17.2855453
