In [None]:
import itertools
import os

import dotenv
import geopandas as gpd
import networkx as nx
import numpy as np
import tqdm
from shapely.geometry import LineString, Point, Polygon

import tfl.api
import tfl.exceptions
import tfl.models

In [None]:
dotenv.load_dotenv()

In [None]:
tf_client = tfl.api.Tfl(app_key=os.environ["FLATHUNT__TFL_API_KEY"])

In [None]:
stations_facilities = await tf_client.get_stations_facilities()

In [None]:
lines = await tf_client.get_lines_by_mode(
    [
        tfl.models.ModeId.TUBE,
        tfl.models.ModeId.OVERGROUND,
        tfl.models.ModeId.DLR,
        tfl.models.ModeId.ELIZABETH_LINE,
    ]
)

In [None]:
line_id_stop_points: dict[str, list[tfl.models.StopPointDetail]] = {}
for line in tqdm.tqdm(lines):
    line_id_stop_points[line.id] = await tf_client.get_stop_points_by_line(line.id)

In [None]:
line_id_stop_point_timetables: dict[
    str, dict[str, dict[tfl.api.Direction, tfl.models.TimetableResponse]]
] = {}
for line_id, stop_points in tqdm.tqdm(line_id_stop_points.items()):
    for stop_point in stop_points:
        for direction in tfl.api.Direction:
            print(line_id, stop_point.naptan_id, direction)
            try:
                result = await tf_client.get_timetable(
                    line_id, stop_point.naptan_id, direction
                )
            except tfl.exceptions.TflApiError as e:
                print(f"  -> TflApiError: {e}")
                continue
            line_id_stop_point_timetables.setdefault(line_id, {}).setdefault(
                stop_point.naptan_id, {}
            )[direction] = result

In [80]:
line_id_stop_point_timetables["bakerloo"]["940GZZLUWLO"][
    tfl.api.Direction.INBOUND
].timetable.routes[0].schedules

[Schedule(name='Saturday (also Good Friday)', known_journeys=[KnownJourney(hour='6', minute='0', interval_id=0), KnownJourney(hour='6', minute='12', interval_id=0), KnownJourney(hour='6', minute='23', interval_id=0), KnownJourney(hour='6', minute='31', interval_id=0), KnownJourney(hour='6', minute='37', interval_id=0), KnownJourney(hour='6', minute='42', interval_id=0), KnownJourney(hour='6', minute='47', interval_id=0), KnownJourney(hour='6', minute='52', interval_id=0), KnownJourney(hour='6', minute='57', interval_id=0), KnownJourney(hour='7', minute='2', interval_id=0), KnownJourney(hour='7', minute='7', interval_id=0), KnownJourney(hour='7', minute='12', interval_id=0), KnownJourney(hour='7', minute='17', interval_id=0), KnownJourney(hour='7', minute='22', interval_id=0), KnownJourney(hour='7', minute='27', interval_id=0), KnownJourney(hour='7', minute='32', interval_id=0), KnownJourney(hour='7', minute='37', interval_id=0), KnownJourney(hour='7', minute='42', interval_id=0), Known

In [None]:
stations = {}
for station in stations_facilities.stations.station:
    for stop_point in stop_points:
        if station.id == stop_point.ics_code:
            if station.placemark is None:
                raise ValueError(f"Station {station.id} has no placemark data")
            if station.serving_lines is None:
                raise ValueError(f"Station {station.id} has no serving lines data")
            stations[station.id] = {
                "naptan_id": stop_point.naptan_id,
                "x": station.placemark.point.longitude,
                "y": station.placemark.point.latitude,
                "serving_line": station.serving_lines.serving_line,
            }

In [None]:
roads_gdf = gpd.read_file(
    "/home/cemlyn/Downloads/greater-london-251126-free.shp/gis_osm_roads_free_1.shp"
)

In [None]:
def haversine(lon1, lat1, lon2, lat2):
    """Calculate the great circle distance between two points on earth (in km)

    All inputs can be scalars or numpy arrays. Arrays will be broadcast together.
    """
    lon1 = np.radians(lon1)
    lat1 = np.radians(lat1)
    lon2 = np.radians(lon2)
    lat2 = np.radians(lat2)
    dlon = lon2 - lon1
    dlat = lat2 - lat1
    a = np.sin(dlat / 2) ** 2 + np.cos(lat1) * np.cos(lat2) * np.sin(dlon / 2) ** 2
    c = 2 * np.arcsin(np.sqrt(a))
    km = 6371 * c
    return km


In [None]:
graph = nx.Graph()
for _, road in tqdm.tqdm(roads_gdf.iterrows(), total=len(roads_gdf)):
    for i, ((x1, y1), (x2, y2)) in enumerate(itertools.pairwise(road.geometry.coords)):
        if (x1, y1) not in graph:
            graph.add_node((x1, y1), x=x1, y=y1)
        if (x2, y2) not in graph:
            graph.add_node((x2, y2), x=x2, y=y2)
        if not graph.has_edge((x1, y1), (x2, y2)):
            graph.add_edge(
                (x1, y1),
                (x2, y2),
                length=(haversine(x1, y1, x2, y2) * 1000).item(),
                geometry=LineString([(x1, y1), (x2, y2)]),
            )  # in meters

In [None]:
for node_index, node_attributes in graph.nodes.items():
    if "station" in node_attributes:
        print(node_attributes["station"])

In [None]:
# add an edge attribute for time in minutes required to traverse each edge
travel_speed = 4.5  # walking speed in km/hour
meters_per_minute = travel_speed * 1000 / 60  # km per hour to m per minute
for a, b, data in graph.edges(data=True):
    data["time"] = data["length"] / meters_per_minute

In [None]:
def make_iso_poly(G, node, trip_time, edge_buff, node_buff, infill=False):
    subgraph = nx.ego_graph(G, node, radius=trip_time, distance="time")
    node_points = [
        Point((data["x"], data["y"])) for node, data in subgraph.nodes(data=True)
    ]
    nodes_gdf = gpd.GeoDataFrame({"id": list(subgraph.nodes)}, geometry=node_points)
    nodes_gdf = nodes_gdf.set_index("id")
    edge_lines = []
    for n_fr, n_to in subgraph.edges():
        edge_lookup = G.get_edge_data(n_fr, n_to)["geometry"]
        edge_lines.append(edge_lookup)
    n = nodes_gdf.buffer(node_buff).geometry
    e = gpd.GeoSeries(edge_lines).buffer(edge_buff).geometry
    all_gs = list(n) + list(e)
    new_iso = gpd.GeoSeries(all_gs).union_all()
    # try to fill in surrounded areas so shapes will appear solid and
    # blocks without white space inside them
    if infill:
        new_iso = Polygon(new_iso.exterior)
    return new_iso


iso_poly = make_iso_poly(graph, (-0.1778304, 51.6368388), 30, 0.001, 0.001, False)
type(iso_poly)

In [None]:
iso_poly.exterior

In [None]:
import shapely.plotting

shapely.plotting.plot_line(iso_poly.exterior)

In [None]:
transport_gdf = gpd.read_file(
    "/home/cemlyn/Downloads/greater-london-251126-free.shp/gis_osm_transport_free_1.shp"
)
transport_gdf = transport_gdf[
    transport_gdf["fclass"].isin(["railway_station", "tram_stop"])
]

In [None]:
def find_nearest_node(x, y):
    """Find the nearest node to a given (x, y) coordinate."""
    distances = haversine(x, y, points[:, 0], points[:, 1])
    return distances.argmin(axis=0).item()


points = np.array([(data["x"], data["y"]) for node, data in graph.nodes(data=True)])

transport_closest_node = {}
for index, transport in tqdm.tqdm(transport_gdf.iterrows(), total=len(transport_gdf)):
    x, y = transport.geometry.x, transport.geometry.y
    closest = find_nearest_node(x, y)
    transport_closest_node[index] = closest

In [None]:
node_ids = list(graph.nodes)
for transport_index, node_index in tqdm.tqdm(
    transport_closest_node.items(), total=len(transport_closest_node)
):
    transport = transport_gdf.loc[transport_index]
    x, y = transport.geometry.x, transport.geometry.y
    if (x, y) in graph.nodes:
        graph.nodes[(x, y)]["station"] = transport.to_dict()
    else:
        graph.add_node((x, y), x=x, y=y, station=transport.to_dict())
    if not graph.has_edge(node_index, (x, y)):
        node_id = node_ids[node_index]
        graph.add_edge(
            node_index,
            (x, y),
            length=haversine(x, y, graph.nodes[node_id]["x"], graph.nodes[node_id]["y"])
            * 1000,
        )  # in meters