In [None]:
import json

import geopandas as gpd
import networkx as nx
import numpy as np
import pandas as pd
import shapely
from common import load_crs, load_stops
from shapely import Point, Polygon


def compute_ellipticity(points: np.array) -> float:
    """
    Compute ellipticity of a set of points.

    Parameters:
    - points (numpy array): Array of shape (n, 2) representing (x, y) coordinates of points.

    Returns:
    - ellipticity (float): Ellipticity value.
    """

    # Calculate the covariance matrix of the points
    cov_matrix = np.cov(points, rowvar=False)

    # Calculate eigenvalues and eigenvectors of the covariance matrix
    eigenvalues, eigenvectors = np.linalg.eigh(cov_matrix)

    # Sort eigenvalues in descending order
    sorted_indices = np.argsort(eigenvalues)[::-1]
    eigenvalues = eigenvalues[sorted_indices]
    eigenvectors = eigenvectors[:, sorted_indices]

    # Major and minor axis lengths are square roots of eigenvalues
    major_axis_length = np.sqrt(eigenvalues[0])
    minor_axis_length = np.sqrt(eigenvalues[1])

    # Compute ellipticity
    ellipticity = 1.0 - (minor_axis_length / major_axis_length)

    return ellipticity


def ellipticity(points: list[Point], threshold: int = 10, decimals: int = 4) -> float:
    points = [(i.x, i.y) for i in points]
    if len(points) < threshold:
        return None

    return np.round(compute_ellipticity(points), decimals)


def determine_stop_geometries(
    stops: gpd.GeoDataFrame,
    subgraphs: dict,
    time_marker: int = 39,
    suffix: str = "",
    concaveness_ratio: float = 0.2,
    include_empty: bool = False,
) -> pd.DataFrame:
    """
    Calculates convex and concave hulls of the accessible network, and also the ellipticity of the stops.

    While the convex hull is unambiguous, multiple concave hulls can be constructed.
    """
    records = []
    for row in stops.itertuples():
        accessible_stops = list(
            subgraphs.get(f"{row.stop_id}_network_{time_marker}", nx.Graph())
        )
        accessible_stops = stops[stops["stop_id"].isin(accessible_stops)].copy()
        if len(accessible_stops) == 0:
            if include_empty:
                records.append([row.stop_id, Polygon(), 0, Polygon(), 0, 0])
            continue
        points = accessible_stops.union_all()
        cv = shapely.convex_hull(points)
        cc = shapely.concave_hull(points, ratio=concaveness_ratio)
        el = ellipticity(accessible_stops.geometry.tolist())

        records.append(
            [
                row.stop_id,
                cv,
                round(cv.area / 1e6, 3),
                cc,
                round(cc.area / 1e6, 3),
                el,
            ]
        )
    columns = ["stop_id"] + [
        i + suffix
        for i in [
            "convex",
            "convex_area",
            "concave",
            "concave_area",
            "ellipticity",
        ]
    ]

    return pd.DataFrame.from_records(records, columns=columns)


def determine_stop_geometries_from_walk(
    stops: gpd.GeoDataFrame,
    isochrones: gpd.GeoDataFrame,
    accessible_stops,
    crs: int = 23700,
    ellipticity_threshold: int = 2,
) -> gpd.GeoDataFrame:
    records = []
    for row in stops.itertuples():
        if row.stop_id not in accessible_stops:
            continue
        accessible = stops[stops["stop_id"].isin(accessible_stops[row.stop_id])].copy()

        el = ellipticity(accessible.geometry.tolist(), threshold=ellipticity_threshold)
        accessible_area = isochrones[
            (isochrones["stop_id"].isin(accessible_stops[row.stop_id]))
            & (isochrones["costing"] == "walk")
            & (isochrones["range"] == 5)
        ].copy()
        accessible_area_crs = accessible_area.to_crs(crs).union_all()
        records.append(
            [
                row.stop_id,
                accessible_area.union_all(),
                round(accessible_area_crs.area / 1e6, 3),
                el,
                len(accessible),
            ]
        )
    df = pd.DataFrame.from_records(
        records,
        columns=[
            "stop_id",
            "geometry",
            "area",
            "ellipticity",
            "number_of_accessible_stops",
        ],
    )
    return gpd.GeoDataFrame(df, crs=4326)

In [3]:
CITY = "paris"
ELLIPTICITY_THRESHOLD = 5

In [None]:
crs = load_crs()

with open(f"../data/stops/{CITY}/accessible_stops.json", "r") as fp:
    accessible_stops = json.load(fp)

# all_stops = set([i for k, v in accessible_stops.items() for i in v])
# len(all_stops)

isochrones = pd.read_csv(f"../output/{CITY}/isochrones.csv", dtype={"stop_id": str})
isochrones["geometry"] = isochrones["geometry"].apply(shapely.from_wkt)
isochrones = gpd.GeoDataFrame(isochrones, geometry="geometry", crs=4326)

stops = load_stops(CITY)

In [None]:
sgfw = determine_stop_geometries_from_walk(
    stops,
    isochrones.query("costing == 'walk' & range == 5"),
    accessible_stops,
    crs=crs[CITY],
    ellipticity_threshold=ELLIPTICITY_THRESHOLD,
)
sgfw.to_csv(f"../output/{CITY}/stop_geometries_from_walk.csv", index=False)
sgfw.to_file(f"../output/{CITY}/stop_geometries_from_walk.geojson")

  minor_axis_length = np.sqrt(eigenvalues[1])


In [16]:
sgfw

Unnamed: 0,stop_id,geometry,area,ellipticity,number_of_accessible_stops
0,9375374,"POLYGON ((2.55953 49.01458, 2.56197 49.0135, 2...",0.306,,2
1,8185379,"POLYGON ((2.14701 48.81059, 2.14714 48.81084, ...",2.874,0.9237,21
2,4109795,"POLYGON ((2.50654 48.88261, 2.50452 48.88437, ...",1.893,0.6463,17
3,4109789,"POLYGON ((2.51062 48.88033, 2.50981 48.88104, ...",2.106,0.5963,29
4,7647719,"POLYGON ((2.38138 48.89728, 2.37856 48.89745, ...",1.730,0.8079,17
...,...,...,...,...,...
7007,9296791,"POLYGON ((2.4619 48.77467, 2.46126 48.77497, 2...",2.339,0.6576,38
7008,3619886,"POLYGON ((2.46283 48.77166, 2.46247 48.77279, ...",2.234,0.6056,34
7009,8219623,"POLYGON ((2.32971 48.79304, 2.32956 48.79441, ...",2.226,0.6391,33
7010,4036670,"POLYGON ((2.28068 48.89295, 2.27615 48.89334, ...",1.694,0.6491,25


In [43]:
# stop = "009461"
# # stop = "009749"
# fig, ax = plt.subplots()
# sgfw[sgfw["stop_id"] == stop].plot(ax=ax, fc="#afdfff", ec="#00aaff")
# stops[
#     (stops["stop_id"].isin(accessible_stops[stop])) & (stops["stop_id"] != stop)
# ].plot(ax=ax, color="#2d2d2d", markersize=15, zorder=5)
# stops[stops["stop_id"] == stop].plot(ax=ax, color="red", markersize=20, zorder=10)