This python model helps with computation of key metrics such as NQPD, needed for multifunctionality calculations.

In [None]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [None]:
import os
folder_path = '/content/drive/MyDrive/Boston_data_colab'


In [None]:
import heapq
from collections import defaultdict
import geopandas as gpd
import networkx as nx
import numpy as np
import pandas as pd
import math
import shapely
from shapely.geometry import LineString

**Step 1: Street Network Pre-processing + Angular bearing calculations**

In [None]:
streets = gpd.read_file('/content/drive/MyDrive/Boston_data_colab/boston_streets_good/boston_streets_good.shp')


num_segments = len(streets)
print(f"Number of segments: {num_segments}")

Number of segments: 18328


In [None]:
def preprocess_streets(streets):
  "Streets data preprocessing"

    def drop_z_coord(geom):
        coords_2d = [(x,y) for x, y, *rest in geom.coords]
        return LineString(coords_2d)

    streets["geometry"] = streets.geometry.apply(drop_z_coord)
    streets = streets.to_crs(epsg=32619)

    def get_endpoints(geom):
        coords = list(geom.coords)
        return (coords[0], coords[-1])

    streets["endpoints"] = streets.geometry.apply(get_endpoints)
    streets["start_point"] = streets["endpoints"].apply(lambda x: x[0])
    streets["end_point"] = streets["endpoints"].apply(lambda x: x[1])
    streets["length"] = streets.geometry.length


    return streets

In [None]:
def compute_angular_bearing(line):
    start, end = line.coords[0], line.coords[-1]
    dx, dy = end[0] - start[0],  end[1] - start[1]
    angle = math.degrees(math.atan2(dy, dx)) % 360
    return angle

**Step 2: Function For Optimized dual graph construction**

In [None]:
def build_optimized_graph(streets):
    """Building graph with adjacency lists for faster neighbor access"""
    G = nx.Graph()

    # Add segments as nodes WITH length attribute
    for sid in streets.index:
        G.add_node(sid, length=streets.loc[sid, 'length'])

    # Build adjacency structure for O(1) neighbor lookup
    endpts_to_segments = defaultdict(set)
    for i, row in streets.iterrows():
        endpts_to_segments[row["start_point"]].add(i)
        endpts_to_segments[row["end_point"]].add(i)

    def angular_cost(b1, b2):
        diff = abs(b1 - b2)
        return min(diff, 360 - diff)

    # Pre-compute edge weights and store in efficient structure
    edge_weights = {}
    for segment_group in endpts_to_segments.values():
        segment_list = list(segment_group)
        for i in range(len(segment_list)):
            for j in range(i + 1, len(segment_list)):
                node1, node2 = segment_list[i], segment_list[j]
                a1 = streets.iloc[node1]["bearing"]
                a2 = streets.iloc[node2]["bearing"]
                weight = angular_cost(a1, a2)
                G.add_edge(node1, node2, weight=weight)
                # Store for quick lookup
                edge_weights[(node1, node2)] = weight
                edge_weights[(node2, node1)] = weight

    return G, edge_weights

**Step 3: Function that helps calculate both angular distances and Euclidean path lengths from a source node to all other reachable nodes with a specified radius using Dijkstra's algorithm**

In [None]:
def heap_dijkstra_radius_constrained(G, source, radius_meters, node_lengths):
    """
    HEAP-OPTIMIZED Dijkstra's algorithm with radius constraint.
    Returns both angular distances and euclidean path lengths.
    """
    # Initialize distances and path lengths
    angular_distances = {source: 0}
    path_lengths = {source: 0}
    visited = set()

    # Min-heap: (angular_distance, node, euclidean_length)
    heap = [(0, source, 0)]

    while heap:
        current_angular_dist, current_node, current_length = heapq.heappop(heap)

        # Skip if already visited or exceeds radius
        if current_node in visited or current_length > radius_meters:
            continue

        visited.add(current_node)

        # Process all neighbors
        for neighbor in G.neighbors(current_node):
            if neighbor in visited:
                continue

            # Get edge weight and neighbor length
            edge_weight = G[current_node][neighbor]['weight']
            neighbor_length = node_lengths[neighbor]

            # Calculate new distances
            new_angular_dist = current_angular_dist + edge_weight
            new_length = current_length + neighbor_length

            # Check radius constraint
            if new_length > radius_meters:
                continue

            # Update if we found a better path
            if (neighbor not in angular_distances or
                new_angular_dist < angular_distances[neighbor]):

                angular_distances[neighbor] = new_angular_dist
                path_lengths[neighbor] = new_length
                heapq.heappush(heap, (new_angular_dist, neighbor, new_length))

    return angular_distances, path_lengths

**Step 4: Defining a function that calculates POI densities on each street segment. This will be helpful for gravity centrality calculations**

In [None]:
def calculate_poi_densities_on_segments(streets, poi_gdfs_dict, buffer_distance=50):
    """
    Calculate POI densities on each street segment for gravity centrality calculation.
    This gives us P(y) - the proportion/density of POIs on each segment.
    """
    print("Calculating POI densities on street segments...")

    poi_densities = {}

    for poi_category, poi_gdf in poi_gdfs_dict.items():
        print(f"  Processing {poi_category} POI density...")

        # Buffer street segments to capture nearby POIs
        streets_buffered = streets.copy()
        streets_buffered["geometry"] = streets_buffered.geometry.buffer(buffer_distance)

        # Use spatial index for efficiency
        poi_sindex = poi_gdf.sindex
        segment_poi_counts = {}

        for segment_id in streets.index:
            segment_geom = streets_buffered.loc[segment_id, 'geometry']

            # Find POIs within buffer using spatial index
            possible_matches_idx = list(poi_sindex.intersection(segment_geom.bounds))
            possible_matches = poi_gdf.iloc[possible_matches_idx]

            # Count POIs that actually intersect
            intersecting_pois = possible_matches[possible_matches.intersects(segment_geom)]
            poi_count = len(intersecting_pois)

            # Calculate density as POIs per unit length (or just count)
            segment_length = streets.loc[segment_id, 'length']
            density = poi_count / segment_length if segment_length > 0 else 0

            segment_poi_counts[segment_id] = {
                'count': poi_count,
                'density': density
            }

        poi_densities[poi_category] = segment_poi_counts

        # Add to streets dataframe for reference
        streets[f"poi_count_{poi_category}"] = [segment_poi_counts[sid]['count'] for sid in streets.index]
        streets[f"poi_density_{poi_category}"] = [segment_poi_counts[sid]['density'] for sid in streets.index]

    return poi_densities

In [None]:
def calculate_gravity_centrality_nqpd(G, streets, poi_densities, radius_meters=1000):
    """
    Calculate gravity centrality NQPD for each POI category using the simplified formula:
    NQPD(x) = Σ(y∈Rx) W(y) / d(x,y)

    Where:
    - W(y) = POI density on segment y
    - d(x,y) = network distance from segment x to segment y
    - Rx = all segments reachable within radius from segment x
    """
    print(f"Calculating Gravity Centrality NQPD for radius: {radius_meters}m")

    # Pre-compute node lengths for faster access
    node_lengths = {node: G.nodes[node]['length'] for node in G.nodes()}

    # Results dictionary for each POI category
    gravity_nqpd_results = {}

    # Process each POI category
    for poi_category, poi_density_data in poi_densities.items():
        print(f"  Processing gravity NQPD for {poi_category}...")

        gravity_values = {}
        total_segments = len(G.nodes())

        for i, source_segment in enumerate(G.nodes()):
            if i % 100 == 0:
                print(f"    Processed {i}/{total_segments} segments for {poi_category}")

            # Get all reachable segments within radius using network distance
            angular_distances, path_lengths = heap_dijkstra_radius_constrained(
                G, source_segment, radius_meters, node_lengths
            )

            # Calculate gravity centrality sum
            gravity_sum = 0.0

            for dest_segment, network_distance in path_lengths.items():
                if dest_segment == source_segment:
                    continue  # Skip self

                # W(y) - POI density on destination segment
                W_y = poi_density_data[dest_segment]['density']


                # P_y = 1

                # d(x,y) - network distance (use path length in meters)
                d_xy = network_distance

                # Avoid division by zero
                if d_xy > 0 and W_y > 0:
                    # Add to gravity sum: W(y) / d(x,y) (P ignored)
                    gravity_contribution = W_y / d_xy
                    gravity_sum += gravity_contribution

            gravity_values[source_segment] = gravity_sum

        gravity_nqpd_results[poi_category] = gravity_values

        # Print summary statistics
        values = list(gravity_values.values())
        if values:
            print(f"    {poi_category} gravity NQPD: mean={np.mean(values):.6f}, max={np.max(values):.6f}")

    return gravity_nqpd_results

**Step 5: Carrying out spatial joins with street segments and population polygons data**

In [None]:
def heap_optimized_spatial_joins(streets, population_gdf, poi_gdfs_dict):
    """
    Optimized spatial joins using spatial indexing (original function)
    """
    print("Performing HEAP-OPTIMIZED spatial joins...")

    # Step 1: Network -> Population (30m) with spatial index
    print("Step 1: Network -> Population (30m radius) - OPTIMIZED")
    streets_pop = streets.copy()
    streets_pop["geometry"] = streets_pop.geometry.buffer(30)

    # Use spatial index for faster intersection
    pop_sindex = population_gdf.sindex
    pop_results = []

    for idx, street_geom in streets_pop.geometry.items():
        # Use spatial index to find potential matches
        possible_matches_idx = list(pop_sindex.intersection(street_geom.bounds))
        possible_matches = population_gdf.iloc[possible_matches_idx]

        # Precise intersection test
        intersects = possible_matches[possible_matches.intersects(street_geom)]
        total_pop = intersects["POP20"].sum() if len(intersects) > 0 else 0
        pop_results.append(total_pop)

    streets["POP20"] = pop_results

    # Step 2: Result -> POIs (400m) with spatial indexing
    print("Step 2: Result -> POIs (400m radius) - OPTIMIZED")
    streets_poi = streets.copy()
    streets_poi["geometry"] = streets_poi.geometry.buffer(400)

    # Process each POI category with spatial indexing
    for category_name, poi_gdf in poi_gdfs_dict.items():
        print(f"  Processing {category_name} with spatial index...")

        poi_sindex = poi_gdf.sindex
        poi_counts = []

        for idx, street_geom in streets_poi.geometry.items():
            # Use spatial index
            possible_matches_idx = list(poi_sindex.intersection(street_geom.bounds))
            possible_matches = poi_gdf.iloc[possible_matches_idx]

            # Precise containment test
            contains = possible_matches[possible_matches.within(street_geom)]
            count = len(contains)
            poi_counts.append(count)

        streets[f"poisdsty_{category_name}"] = poi_counts

    return streets

**Step 6: Overall Function that runs the gravity centrality NQPD analysis for each POI category**

In [None]:
def run_gravity_centrality_boston_analysis():
    """
    Boston analysis with Gravity Centrality NQPD calculations for each POI category.
    """
    print("=== GRAVITY CENTRALITY NQPD BOSTON ANALYSIS ===")

    # Load data
    streets = gpd.read_file('/content/drive/MyDrive/Boston_data_colab/boston_streets_good/boston_streets_good.shp')

    # Preprocess
    streets = preprocess_streets(streets)
    streets["bearing"] = streets.geometry.apply(compute_angular_bearing)

    # Build OPTIMIZED graph
    print("Building optimized graph structure...")
    G, edge_weights = build_optimized_graph(streets)

    # Load POI and population data
    poi_gdfs = {
        'work': gpd.read_file("/content/drive/MyDrive/Boston_data_colab/boston_POIS_work/Boston_POIS_work.shp").to_crs(epsg=32619),
        'educivic': gpd.read_file("/content/drive/MyDrive/Boston_data_colab/boston_POIS_educivic/boston_POIS_educivic.shp").to_crs(epsg=32619),
        'leisure': gpd.read_file("/content/drive/MyDrive/Boston_data_colab/boston_POIS_leisure/boston_POIS_leisure.shp").to_crs(epsg=32619),
        'shopping': gpd.read_file("/content/drive/MyDrive/Boston_data_colab/boston_POIS_shopping/boston_POIS_shopping.shp").to_crs(epsg=32619)
    }

    pop = gpd.read_file("/content/drive/MyDrive/Boston_data_colab/boston_pop_blocks/boston_pop_blocks.shp").to_crs(epsg=32619)

    # OPTIMIZED spatial joins (original POI density calculations)
    streets = heap_optimized_spatial_joins(streets, pop, poi_gdfs)

    # Calculate POI densities on segments (P(y) values)
    print("\n--- CALCULATING POI DENSITIES ON SEGMENTS ---")
    poi_densities = calculate_poi_densities_on_segments(streets, poi_gdfs, buffer_distance=50)

    # Calculate Gravity Centrality NQPD at 1000m radius
    print("\n--- CALCULATING GRAVITY CENTRALITY NQPD (1000m) ---")
    gravity_nqpd_results = calculate_gravity_centrality_nqpd(G, streets, poi_densities, radius_meters=1000)

    # Add Gravity Centrality NQPD results to streets dataframe
    for poi_category, nqpd_values in gravity_nqpd_results.items():
        col_name = f"gravity_nqpd_{poi_category}_1000m"
        streets[col_name] = streets.index.map(nqpd_values).fillna(0)
        print(f"Added column: {col_name}")

    # Calculate multifunctionality as sum of gravity NQPD values
    multifunctionality_cols = [f"gravity_nqpd_{cat}_1000m" for cat in poi_gdfs.keys()]
    streets['multifunctionality_1000m'] = streets[multifunctionality_cols].sum(axis=1)
    print("Added column: multifunctionality_1000m")


    all_results = {}

    # Export results
    print("\nExporting results...")

    # Including only Gravity NQPD columns, multifunctionality, and basic POI data
    gravity_cols = [f"gravity_nqpd_{cat}_1000m" for cat in poi_gdfs.keys()]
    poi_density_cols = [col for col in streets.columns if col.startswith('poi_count_') or col.startswith('poi_density_')]
    basic_poi_cols = [col for col in streets.columns if col.startswith('poisdsty_')]
    all_output_cols = gravity_cols + poi_density_cols + basic_poi_cols + ['multifunctionality_1000m', 'POP20']

    export_df = streets[['geometry'] + all_output_cols]

    # Save as text file
    text_output = export_df.drop(columns=['geometry'])
    text_output.to_csv('/content/drive/MyDrive/Boston_data_colab/boston_gravity_centrality_results.txt', sep='\t', index=True)

    # Save as shapefile
    export_df.to_file('/content/drive/MyDrive/Boston_data_colab/boston_gravity_centrality_results.shp')

    print("Gravity Centrality Analysis complete!")
    print(f"Results saved to boston_gravity_centrality_results.txt and boston_gravity_centrality_results.shp")

    # Print summary of Gravity Centrality NQPD results
    print("\n=== GRAVITY CENTRALITY NQPD SUMMARY ===")
    for poi_category in poi_gdfs.keys():
        col_name = f"gravity_nqpd_{poi_category}_1000m"
        if col_name in streets.columns:
            mean_val = streets[col_name].mean()
            max_val = streets[col_name].max()
            std_val = streets[col_name].std()
            print(f"{poi_category}: mean={mean_val:.6f}, max={max_val:.6f}, std={std_val:.6f}")

    # Multifunctionality summary
    print(f"\nMultifunctionality: mean={streets['multifunctionality_1000m'].mean():.6f}, max={streets['multifunctionality_1000m'].max():.6f}")

    return streets, all_results, gravity_nqpd_results


In [None]:
# Run the gravity centrality analysis
if __name__ == "__main__":
    streets_results, analysis_results, gravity_nqpd_results = run_gravity_centrality_boston_analysis()

**Step 7: Exporting Result as shp file for GIS**

In [None]:
import geopandas as gpd
import pandas as pd

# === Extract endpoints into x1, y1, x2, y2 ===
streets_results[['x1', 'y1']] = streets_results['endpoints'].apply(lambda ep: pd.Series(ep[0]))
streets_results[['x2', 'y2']] = streets_results['endpoints'].apply(lambda ep: pd.Series(ep[1]))

# === Rename long NQPD columns to shapefile-safe names ===
rename_map = {
    'gravity_nqpd_work_1000m': 'nqpd_work',
    'gravity_nqpd_educivic_1000m': 'nqpd_edu',
    'gravity_nqpd_shopping_1000m': 'nqpd_shop',
    'gravity_nqpd_leisure_1000m': 'nqpd_leis',
    'multifunctionality_1000m': 'nqpd_total',
    'POP20': 'pop20'
}
streets_results = streets_results.rename(columns=rename_map)

# === Define export columns (must include geometry) ===
columns_to_export = [
    'x1', 'y1', 'x2', 'y2',
    'pop20',
    'nqpd_work', 'nqpd_edu', 'nqpd_shop', 'nqpd_leis', 'nqpd_total',
    'geometry'
]

# === Create shapefile GeoDataFrame ===
export_shp = streets_results[columns_to_export].copy()

# === Optional: round for clean output ===
numeric_cols = [col for col in export_shp.columns if col != 'geometry']
export_shp[numeric_cols] = export_shp[numeric_cols].round(6)

# === Export shapefile ===
export_shp.to_file(
    '/content/drive/MyDrive/Boston_data_colab/boston_nqpd_shapefile.shp',
    driver='ESRI Shapefile'
)

print("Shapefile exported as: boston_nqpd_shapefile.shp (with shortened column names)")