In [None]:
# Import necessary libraries
import geopandas as gpd
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import osmnx as ox
import networkx as nx
from shapely.geometry import Point
from libpysal.weights import Queen, KNN
from esda.moran import Moran

# -----------------------------
# Task A - Load data
# -----------------------------

# 1. Get the road network of Leeds (projected to EPSG:27700)
G_leeds = ox.graph_from_place("Leeds, UK", network_type="drive")
G_leeds = ox.project_graph(G_leeds, to_crs='EPSG:27700')

# 2. Convert the road network to GeoDataFrame (edges)
road_edges = ox.graph_to_gdfs(G_leeds, nodes=False, edges=True)

assert accident_gdf.crs.to_epsg() == 27700, "accident_gdf must use EPSG:27700 projection"

# -----------------------------
# Task B1 - Visualize roads and accident distribution
# -----------------------------

fig, ax = plt.subplots(figsize=(12, 10))
ox.plot_graph(G_leeds, ax=ax, node_size=5, edge_linewidth=0.3, bgcolor='white', show=False, close=False)
accident_gdf.plot(ax=ax, color='red', markersize=3, alpha=0.5, label='Accidents')
plt.legend()
plt.title("Distribution of Road Accidents in Leeds")
plt.show()

# -----------------------------
# Task B2 - Create buffer zones and count accidents
# -----------------------------

# Ensure unique road_id for each road
if 'road_id' not in road_edges.columns:
    road_edges['road_id'] = road_edges.index

# Create 20-meter buffers
road_edges["buffer"] = road_edges.geometry.buffer(20)

# Perform spatial join to count accidents within buffers
buffered_roads = road_edges.set_geometry("buffer")
road_accident_counts = gpd.sjoin(accident_gdf, buffered_roads, how="left", predicate="within")

# Group and count
accident_counts = road_accident_counts.groupby("road_id").size().reset_index(name="accident_count")

# Merge counts back to roads
road_edges = road_edges.merge(accident_counts, on="road_id", how="left")
road_edges["accident_count"] = road_edges["accident_count"].fillna(0)

# -----------------------------
# Task B3 - Calculate Moran's I spatial autocorrelation
# -----------------------------

# Check for variance
if road_edges['accident_count'].var() == 0:
    print("Accident counts are constant across all roads. Moran's I cannot be computed.")
else:
    road_edges = road_edges[~road_edges.geometry.is_empty & road_edges.geometry.is_valid]

    # Queen contiguity Moran's I
    try:
        w_queen = Queen.from_dataframe(road_edges)
        w_queen.transform = 'r'
        moran_queen = Moran(road_edges["accident_count"], w_queen)
        print(f"Moran's I (Queen contiguity): {moran_queen.I:.4f}, p-value: {moran_queen.p_sim:.4f}")
    except Exception as e:
        print(f"Queen Moran's I error: {e}")

    # KNN Moran's I
    try:
        w_knn = KNN.from_dataframe(road_edges, k=5)
        w_knn.transform = 'r'
        moran_knn = Moran(road_edges["accident_count"], w_knn)
        print(f"Moran's I (KNN): {moran_knn.I:.4f}, p-value: {moran_knn.p_sim:.4f}")
    except Exception as e:
        print(f"KNN Moran's I error: {e}")

print(road_edges[["road_id", "accident_count"]].head())
print("Unique accident count values:", road_edges["accident_count"].unique())


In [None]:

import numpy as np
import matplotlib.pyplot as plt
from shapely.geometry import box
from pointpats import PointPattern, PoissonPointProcess
from scipy.spatial import distance_matrix

def calculate_k_function(accident_gdf, max_dist=1000, step=50, n_simulations=99):

    bounds = accident_gdf.total_bounds
    area = (bounds[2] - bounds[0]) * (bounds[3] - bounds[1])
    region = box(*bounds)

    # Original point coordinates
    coords = np.array([(geom.x, geom.y) for geom in accident_gdf.geometry])
    n_points = len(coords)
    pp_obs = PointPattern(coords)

    # Distance range
    r_values = np.arange(step, max_dist + step, step)
    k_values_obs = []

    # Compute observed K(r)
    dists = distance_matrix(coords, coords)
    np.fill_diagonal(dists, np.inf)
    for r in r_values:
        count_within_r = np.sum(dists < r)
        k = area * count_within_r / (n_points * (n_points - 1))
        k_values_obs.append(k)

    # CSR simulations
    k_simulations = []
    for _ in range(n_simulations):
        x_sim = np.random.uniform(bounds[0], bounds[2], n_points)
        y_sim = np.random.uniform(bounds[1], bounds[3], n_points)
        sim_coords = np.column_stack((x_sim, y_sim))
        sim_dists = distance_matrix(sim_coords, sim_coords)
        np.fill_diagonal(sim_dists, np.inf)

        sim_k = []
        for r in r_values:
            count_within_r = np.sum(sim_dists < r)
            k = area * count_within_r / (n_points * (n_points - 1))
            sim_k.append(k)

        k_simulations.append(sim_k)

    # Compute 95% CSR envelope
    k_simulations = np.array(k_simulations)
    k_lower = np.percentile(k_simulations, 2.5, axis=0)
    k_upper = np.percentile(k_simulations, 97.5, axis=0)

    # Plot K-function
    plt.figure(figsize=(8, 6))
    plt.plot(r_values, k_values_obs, label="Observed K(r)", color="blue")
    plt.fill_between(r_values, k_lower, k_upper, color='gray', alpha=0.4, label="CSR Envelope (95%)")
    plt.plot(r_values, np.pi * r_values**2, linestyle="--", color="black", label="CSR Expected (πr²)")
    plt.xlabel("Distance r (meters)")
    plt.ylabel("K(r)")
    plt.title("Ripley's K-function for Road Accidents")
    plt.legend()
    plt.grid(True)
    plt.show()


In [None]:
calculate_k_function(accident_gdf, max_dist=1500, step=100)


In [None]:
from shapely.geometry import LineString
import numpy as np

def calculate_position_fraction_within_buffer(road_accident_counts, road_edges):

    road_geom_dict = road_edges.set_index('road_id')['geometry'].to_dict()

    position_fractions = []

    for idx, row in road_accident_counts.iterrows():
        road_id = row.get("road_id", None)
        accident_geom = row.geometry

        # Skip if no valid match
        if pd.isna(road_id) or road_id not in road_geom_dict:
            position_fractions.append(np.nan)
            continue

        line = road_geom_dict[road_id]
        if not isinstance(line, LineString) or line.length == 0:
            position_fractions.append(np.nan)
            continue

        # Calculate projected distance of accident point on line
        proj_distance = line.project(accident_geom)
        position_fraction = proj_distance / line.length

        position_fractions.append(position_fraction)

    # Add new column
    road_accident_counts["position_fraction"] = position_fractions

    return road_accident_counts


In [None]:
# Compute relative position of each accident point along the road
road_accident_counts = calculate_position_fraction_within_buffer(road_accident_counts, road_edges)


In [None]:
# Plot histogram of accident positions along roads
road_accident_counts['position_fraction'].dropna().hist(
    bins=30, figsize=(8,5), color='orange', edgecolor='black'
)
plt.xlabel("Fraction of road length from nearest intersection")
plt.ylabel("Number of accidents")
plt.title("Where Do Accidents Occur Along Roads?")
plt.grid(True)
plt.show()

# Optional: calculate mean position
mean_pos = road_accident_counts["position_fraction"].mean()
print(f"Mean relative position of accident points: {mean_pos:.2f} (0 = near intersection)")
