<a href="https://colab.research.google.com/github/Katy-Kittivibul/Sheffield-spatial-pattern-mining/blob/main/Sheffield_DT_SL_all_distances.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# **Spatial pattern mining of air quality diffusion tubes in Sheffield**

This notebook is a part of MSc Data Science dissertation by Kulisara Kittivibul.

This study aims to identify the spatial relationship between air quality monitoring diffusion tubes and sensitive locations (e.g. schools, kindergartens, hospitals, etc.) in Sheffield.

Dataset: https://sheffield-city-council-open-data-sheffieldcc.hub.arcgis.com/

## Import libraries and packages

In [None]:
# update the package for current session
!pip install --upgrade jupyter_client

In [None]:
# Install required packages
!pip install geopandas shapely mlxtend osmnx esda

import os
import osmnx as ox
import geopandas as gpd
import folium
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
import itertools
import branca.colormap as cm
import libpysal as lp
import folium
from collections import defaultdict
from folium.plugins import MarkerCluster
from esda.moran import Moran, Moran_Local
from scipy.spatial import cKDTree
from shapely.geometry import Point, MultiPoint, LineString
from shapely.ops import nearest_points
from mlxtend.frequent_patterns import apriori, association_rules

In [None]:
#import warnings
#warnings.filterwarnings(
#    "ignore",
#    message="datetime.datetime.utcnow() is deprecated and scheduled for removal in a future version.*",
#    category=DeprecationWarning,
#    module='jupyter_client')

## Sensitive locations

In [None]:
# Define the place of interest
place_name = "Sheffield, UK"

# Define the tags for the sensitive locations you want to retrieve
# Refer to the OpenStreetMap Wiki for comprehensive tag documentation:
# https://wiki.openstreetmap.org/wiki/Map_features
tags = {"amenity": ["hospital",
                    "university",
                    "school",
                    "kindergarten",
                    "nursing_home",
                    "childcare"],
        "healthcare": "hospital"
        }

print(f"Retrieving data for {place_name}...")

# 1. Retrieve the data from OpenStreetMap using osmnx
sensitive_locations_gdf = ox.features.features_from_place(place_name, tags)

print(f"Retrieved {len(sensitive_locations_gdf)} features.")
print(f"Initial CRS: {sensitive_locations_gdf.crs}")

# Reset the index to make 'id' accessible columns
sensitive_locations_gdf = sensitive_locations_gdf.reset_index().copy()

# 2. Convert all geometries to points (centroids for polygons, original points for points)
point_geometries = sensitive_locations_gdf.geometry.apply(
    lambda geom: geom.centroid if geom.geom_type in ['Polygon', 'MultiPolygon', 'LineString', 'MultiLineString'] else geom)

# Replace the original 'geometry' column with the new point geometries
sensitive_locations_gdf.geometry = point_geometries

sensitive_locations_points = sensitive_locations_gdf[sensitive_locations_gdf.geometry.geom_type == 'Point'].copy()

if len(sensitive_locations_gdf) != len(sensitive_locations_points):
    print(f"Warning: {len(sensitive_locations_gdf) - len(sensitive_locations_points)} non-Point geometries were dropped after centroid conversion.")
else:
    print("All features successfully converted to Point geometries.")

# 3. Reproject to British National Grid (EPSG:27700)
target_crs = "EPSG:27700"
sensitive_locations_bng = sensitive_locations_points.to_crs(target_crs)
print(f"Reprojected CRS: {sensitive_locations_bng.crs}")

# Verify the coordinate ranges (should now be in BNG for Sheffield)
print("Reprojected Sensitive Locations X range (BNG): "
      f"{sensitive_locations_bng.geometry.x.min():.2f} to {sensitive_locations_bng.geometry.x.max():.2f}")
print("Reprojected Sensitive Locations Y range (BNG): "
      f"{sensitive_locations_bng.geometry.y.min():.2f} to {sensitive_locations_bng.geometry.y.max():.2f}")

sensitive_locations_bng.to_file("sheffield_sensitive_locations_BNG.geojson")
# sensitive_locations_bng.to_csv("sheffield_sensitive_locations_BNG.csv", index=False)

# Display a sample of the data
print("\nSample of retrieved data (first 5 rows):")
print(sensitive_locations_bng[['name', 'amenity', 'healthcare', 'geometry']].head())

check corrodinates

In [None]:
print(f"Sensitive Locations X range: {sensitive_locations_bng.geometry.x.min()} to {sensitive_locations_bng.geometry.x.max()}")
print(f"Sensitive Locations Y range: {sensitive_locations_bng.geometry.y.min()} to {sensitive_locations_bng.geometry.y.max()}")

In [None]:
# Define the output directory for categorised GeoJSON files
output_directory_categories = "categorised_sensitive_locations_geojson"
os.makedirs(output_directory_categories, exist_ok=True)

print(f"Saving categorized sensitive locations to: {output_directory_categories}\n")

# Define the categories and their corresponding OSM tag values
categories = {
    "hospital": ["hospital"],
    "university": ["university"],
    "school": ["school"],
    "kindergarten": ["kindergarten"],
    "nursing_home": ["nursing_home"], # This covers care homes
    "childcare": ["childcare"]
    }

# Process each category
for category_name, amenity_values in categories.items():
    filtered_gdf = sensitive_locations_bng[sensitive_locations_bng['amenity'].isin(amenity_values)].copy()

    # Special handling for 'hospital' to also include healthcare=hospital tag
    if category_name == "hospital":
        hospitals_by_healthcare_tag = sensitive_locations_bng[sensitive_locations_bng['healthcare'] == 'hospital'].copy()

        # Combine the two sets of hospitals
        combined_hospitals_gdf = pd.concat([filtered_gdf, hospitals_by_healthcare_tag])

        # Check if 'id' column exists before trying to deduplicate by it
        if 'id' in combined_hospitals_gdf.columns:
            filtered_gdf = combined_hospitals_gdf.drop_duplicates(subset=['id'], keep='first')
            print(f"    Deduplicated {category_name} using 'id'.")
        else:
            filtered_gdf = combined_hospitals_gdf
            print(f"    Warning: 'id' not found for {category_name}. Skipping deduplication.")


    if not filtered_gdf.empty:
        # Define the output file path
        file_path = os.path.join(output_directory_categories, f"{category_name}.geojson")

        # Save the filtered GeoDataFrame to a GeoJSON file
        try:
            filtered_gdf.to_file(file_path, driver="GeoJSON")
            print(f"Saved {len(filtered_gdf)} {category_name} features to {file_path}")
        except Exception as e:
            print(f"Error saving {category_name} to {file_path}: {e}")
    else:
        print(f"No {category_name} features found to save.")

print("\nCategorization and saving complete!")

In [None]:
# check data type
sensitive_locations_bng.info()

### Visualisation

In [None]:
# Create folium map
m_all_sensitive = folium.Map(location=[53.3810, -1.4701], zoom_start=13)

# Add the fixed GeoJson layer
folium.GeoJson(
    sensitive_locations_bng,
    name="Sensitive Locations",
    style_function=lambda x: {
        'fillColor': '#3186cc',
        'color': '#3186cc',
        'weight': 1,
        'fillOpacity': 0.5
    }
).add_to(m_all_sensitive)

# Show the map
m_all_sensitive # it will show all sensitive locations as points.

In [None]:
# Define the categories and their corresponding OSM tag values
categories = {
    "Hospital": ["hospital"],
    "University": ["university"],
    "School": ["school"],
    "Kindergarten": ["kindergarten"],
    "Nursing Home": ["nursing_home"], # This covers care homes
    "Childcare": ["childcare"]
}

# Define a color for each category for visual distinction on the map
category_colors = {
    "Hospital": "darkred",
    "University": "darkblue",
    "School": "yellow",
    "Kindergarten": "purple",
    "Nursing Home": "orange",
    "Childcare": "pink"
}

sensitive_locations_wgs84 = sensitive_locations_bng.to_crs(epsg=4326)
m_sensitive_locations = folium.Map(location=[53.3810, -1.4701], zoom_start=13)

# Add the city and ward boundary GeoJSON layers
try:
    sheffield_city_boundary_wgs84 = gpd.read_file("City_Boundary.geojson").to_crs(epsg=4326)
    sheffield_ward_boundary_wgs84 = gpd.read_file("Sheffield_Wards.geojson").to_crs(epsg=4326)

    # Add Sheffield City Boundary
    folium.GeoJson(
        sheffield_city_boundary_wgs84,
        name="Sheffield City Boundary",
        style_function=lambda x: {
            'color': 'black',
            'weight': 2,
            'fillColor': 'none',
            'fillOpacity': 0
        }
    ).add_to(m_sensitive_locations)

    # Add Sheffield Ward Boundary
    folium.GeoJson(
        sheffield_ward_boundary_wgs84,
        name="Sheffield Ward Boundary",
        style_function=lambda x: {
            'color': 'black',
            'weight': 1,
            'fillColor': 'none',
            'fillOpacity': 0
        }
    ).add_to(m_sensitive_locations)

except Exception as e:
    print(f"Error loading boundary GeoJSON files: {e}")


# Process each category and add a distinct layer
for category_name, amenity_values in categories.items():
    filtered_gdf = sensitive_locations_wgs84[sensitive_locations_wgs84['amenity'].isin(amenity_values)].copy()

    # Special handling for 'hospital' to also include healthcare=hospital tag
    if category_name == "Hospital":
        hospitals_by_healthcare_tag = sensitive_locations_wgs84[sensitive_locations_wgs84['healthcare'] == 'hospital'].copy()
        combined_hospitals_gdf = pd.concat([filtered_gdf, hospitals_by_healthcare_tag])

        # Check for 'id' column before deduplicating, as per your original code
        if 'id' in combined_hospitals_gdf.columns:
            filtered_gdf = combined_hospitals_gdf.drop_duplicates(subset=['id'], keep='first')
        else:
            filtered_gdf = combined_hospitals_gdf

    # Use a FeatureGroup to hold the layer, which is useful for grouping layers
    feature_group = folium.FeatureGroup(name=category_name)

    # Add each point as a CircleMarker to the feature group
    for index, row in filtered_gdf.iterrows():
        folium.CircleMarker(
            location=[row.geometry.y, row.geometry.x],
            radius=5, # You can adjust the circle size
            color=category_colors.get(category_name, 'gray'), # Use the color from the dictionary
            fill=True,
            fillColor=category_colors.get(category_name, 'gray'),
            fillOpacity=1,
            tooltip=f"<b>Name:</b> {row.get('name', 'N/A')}<br><b>Amenity:</b> {row.get('amenity', 'N/A')}"
        ).add_to(feature_group)

    # Add the feature group to the map
    feature_group.add_to(m_sensitive_locations)

# Add the LayerControl to the map to allow users to toggle layers
folium.LayerControl().add_to(m_sensitive_locations)

m_sensitive_locations.save("sheffield_map_all-sensitive.html")

# Display the map
m_sensitive_locations


## Diffusion tubes

In [None]:
#from google.colab import drive
#drive.mount('/content/drive')

In [None]:
# Diffusion tubes locations
#diffusion = gpd.read_file("/content/drive/MyDrive/Diffusion_Tubes.geojson")
diffusion = gpd.read_file("/content/Diffusion_Tubes.geojson")

# Add type
diffusion['type'] = 'Diffusion_tube'

# Set CRS to EPSG:4326: This uses for plotting in lat-long coordinate
diffusion_gdf = diffusion.set_crs(epsg=4326, allow_override=True)

In [None]:
print(diffusion_gdf.head())
print(diffusion_gdf.crs)
print(diffusion_gdf.geometry.iloc[0])

In [None]:
diffusion_gdf["yr2023"].isnull().sum()

In [None]:
# To check if a coordinate pair likely represents Lat/Lon (EPSG:4326)
def looks_like_wgs84(x, y):
    """
    Checks if a given (x, y) coordinate pair falls within typical WGS84 (Lat/Lon) ranges.
    Returns True if it looks like WGS84, False otherwise.
    """
    return (-180 <= x <= 180) and (-90 <= y <= 90)

print(f"Checking coordinates for GeoDataFrame with declared CRS: {diffusion_gdf.crs}\n")

# Iterate through rows and print coordinates that do NOT look like Lat/Lon
for i, row in diffusion_gdf.iterrows():
    x_coord = row.geometry.x
    y_coord = row.geometry.y

    if not looks_like_wgs84(x_coord, y_coord):
        print(f"Index {i}: X={x_coord:.6f}, Y={y_coord:.6f} (Likely NOT Lat/Lon)")
    else:
        # Optional: Print those that DO look like Lat/Lon if you want to see them
        # print(f"Index {i}: X={x_coord:.6f}, Y={y_coord:.6f} (Likely Lat/Lon)")
        pass # Do nothing if it looks like Lat/Lon (as per your request)

In [None]:
# Center the map on Sheffield's coordinates (Latitude: 53.3810, Longitude: -1.4701)
sheffield_center = [53.3810, -1.4701]

# Create a Folium map centered on Sheffield
m_diffusion_tubes = folium.Map(location=sheffield_center, zoom_start=13)

# Plot each diffusion tube location
for _, row in diffusion_gdf.iterrows():
    folium.Marker(
        location=[row.geometry.y, row.geometry.x],
        popup="Diffusion Tube"
        ).add_to(m_diffusion_tubes)

# Display the map
m_diffusion_tubes

In [None]:
diffusion_bng = diffusion_gdf.to_crs(epsg=27700)

## Find distance threshold

In [None]:
# Check CRS
print(diffusion_bng.crs)
print(sensitive_locations_bng.crs)

### Diffusion tubes to sensitive locations

In [None]:
# Compute nearest distance from each diffusion tube to a sensitive location.
distance_DT = diffusion_bng.geometry.apply(
    lambda tube: sensitive_locations_bng.distance(tube).min()
    )

# Summary
print(distance_DT.describe())

In [None]:
# Perform spatial join: find each diffusion tube to nearest sensitive location
dt_with_nearest_sl = gpd.sjoin_nearest(
    diffusion_bng,                    # LEFT GeoDataFrame: diffusion tubes
    sensitive_locations_bng,          # RIGHT GeoDataFrame: sensitive locations
    how='left',
    max_distance=float('inf'),        # Consider all sensitive locations
    distance_col='distance_to_nearest_sensitive_location'
)

# Remove duplicate diffusion tubes if necessary (e.g., based on 'id' or index)
if 'id' in dt_with_nearest_sl.columns:
    diffusion_with_nearest_sl = dt_with_nearest_sl.drop_duplicates(subset=['id'], keep='first').copy()
    print("Deduplicated diffusion tubes using 'id' column.")
else:
    diffusion_with_nearest_sl = dt_with_nearest_sl.drop_duplicates(keep='first').copy()
    print("Warning: 'id' not found. Deduplicated diffusion tubes using index.")

# Compute Q1, Q3, and IQR for distances
Q1 = diffusion_with_nearest_sl['distance_to_nearest_sensitive_location'].quantile(0.25)
Q3 = diffusion_with_nearest_sl['distance_to_nearest_sensitive_location'].quantile(0.75)
IQR = Q3 - Q1

# Define upper bound for outlier detection
upper_bound_outlier = Q3 + 1.5 * IQR

# Output summary stats
print(f"\nQ1 (25th percentile): {Q1:.2f} m")
print(f"Q3 (75th percentile): {Q3:.2f} m")
print(f"IQR (Interquartile Range): {IQR:.2f} m")
print(f"Upper bound for outlier detection (Q3 + 1.5 * IQR): {upper_bound_outlier:.2f} m")

# Identify outlier diffusion tubes (those far from any sensitive location)
outlier_diffusion_tubes = diffusion_with_nearest_sl[
    diffusion_with_nearest_sl['distance_to_nearest_sensitive_location'] > upper_bound_outlier
].copy()

print(f"\nNumber of outlier diffusion tubes: {len(outlier_diffusion_tubes)}")

### Sensitive locations to diffusion tubes

In [None]:
# Compute nearest distance from each sensitive location to diffusion tubes.
distance_SL = sensitive_locations_bng.geometry.apply(
    lambda tube: diffusion_bng.distance(tube).min())

# Summary
print(distance_SL.describe())

In [None]:
print(f"Sensitive Locations: {len(sensitive_locations_bng)} features")
print(f"Diffusion Tubes: {len(diffusion_bng)} features")

In [None]:
# Calculate Q1 and Q3
q1_distance = distance_SL.quantile(0.25)
q3_distance = distance_SL.quantile(0.75)

# Calculate Interquartile Range (IQR)
iqr_distance = q3_distance - q1_distance

# Calculate Upper Bound (for outlier detection, using 1.5 * IQR rule)
upper_bound_distance = q3_distance + 1.5 * iqr_distance

print(f"Interquartile Range (IQR) of distances: {iqr_distance:.2f} meters")
print(f"Upper Bound for outlier detection: {upper_bound_distance:.2f} meters")

Calculate distance statistics for sensitive locations to ***nearest*** diffusion tube

In [None]:
# Calculate the distance from each sensitive location to its nearest diffusion tube

sl_with_nearest_dt = gpd.sjoin_nearest(
    sensitive_locations_bng, # LEFT: sensitive location
    diffusion_bng, # RIGHT: diffusion tubes
    how='left',
    max_distance=float('inf'), # Consider all diffusion tubes
    distance_col='distance_to_nearest_diffusion_tube'
    )

# Remove duplicate entries
if 'id' in sl_with_nearest_dt.columns:
    sensitive_locations_with_nearest_dt = sl_with_nearest_dt.drop_duplicates(subset=['id'], keep='first').copy()
    print("Deduplicated sensitive locations using 'osmid'.")
else:
    if 'id' in sl_with_nearest_dt.columns:
        sensitive_locations_with_nearest_dt = sl_with_nearest_dt.drop_duplicates(subset=['id'], keep='first').copy()
        print("Deduplicated sensitive locations using 'id' column.")
    else:
        # Option 2: Fallback to the original index of the LEFT dataframe (the sensitive locations)
        sensitive_locations_with_nearest_dt = sl_with_nearest_dt.drop_duplicates(keep='first').copy()
        print("Warning: 'osmid' or 'id' not found. Deduplicated sensitive locations using the GeoDataFrame's index.")
        # sl_with_nearest_dt.sort_values(by='distance_to_nearest_diffusion_tube', inplace=True)

In [None]:
# Check the descriptive statistics for this new distance column
print("\nDistance statistics for Sensitive Locations to Nearest Diffusion Tube:")
print(sl_with_nearest_dt['distance_to_nearest_diffusion_tube'].describe())

Define outliers using the IQR method

In [None]:
# Calculate Q1, Q3, and IQR
Q1 = sensitive_locations_with_nearest_dt['distance_to_nearest_diffusion_tube'].quantile(0.25)
Q3 = sensitive_locations_with_nearest_dt['distance_to_nearest_diffusion_tube'].quantile(0.75)
IQR = Q3 - Q1

# Define the upper bound for outliers (using 1.5 * IQR rule)
# Outliers are typically values greater than Q3 + 1.5 * IQR
upper_bound_outlier = Q3 + 1.5 * IQR

print(f"\nQ1 (25th percentile): {Q1:.2f} m")
print(f"Q3 (75th percentile): {Q3:.2f} m")
print(f"IQR (Interquartile Range): {IQR:.2f} m")
print(f"Upper bound for outlier detection (Q3 + 1.5 * IQR): {upper_bound_outlier:.2f} m")

# Identify the outlier sensitive locations
outlier_sensitive_locations = sensitive_locations_with_nearest_dt[
    sensitive_locations_with_nearest_dt['distance_to_nearest_diffusion_tube'] > upper_bound_outlier].copy()

print(f"\nNumber of identified outlier sensitive locations: {len(outlier_sensitive_locations)}")

In [None]:
outlier_sensitive_locations[['amenity', 'name', 'distance_to_nearest_diffusion_tube', 'geometry']]

In [None]:
if not outlier_sensitive_locations.empty:
    print("\nOutlier Sensitive Locations breakdown by Amenity type:")
    print(outlier_sensitive_locations['amenity'].value_counts())
    print("\nDetails of Outlier Sensitive Locations (first 5 rows):")
    print(outlier_sensitive_locations[['amenity', 'name', 'distance_to_nearest_diffusion_tube', 'geometry']].head(20))
else:
    print("\nNo outlier sensitive locations identified to break down by amenity type.")

### Visualising the outliers



This will help to answer:
- Which specific locations are poorly covered.
- What types of facilities (schools, hospitals, etc.) are commonly found among these outliers.
- Where geographically these monitoring gaps are located.

In [None]:
# Reproject GeoDataFrames to WGS84 (EPSG:4326) for Folium
sensitive_locations_bng_wgs84 = sensitive_locations_bng.to_crs(epsg=4326)
outlier_sensitive_locations_wgs84 = outlier_sensitive_locations.to_crs(epsg=4326)

# Center the map on Sheffield (Latitude, Longitude)
sheffield_center = [53.3810, -1.4701]
m_outliers = folium.Map(location=sheffield_center, zoom_start=12)

# Add Diffusion Tubes (blue markers)
diffusion_tube_group = folium.FeatureGroup(name="Diffusion Tubes").add_to(m_outliers)
for _, row in diffusion_gdf.iterrows():
    folium.Marker(location=[row.geometry.y, row.geometry.x],
                  popup="Diffusion Tube").add_to(diffusion_tube_group)

# Add ALL Sensitive Locations (green circles)
all_sensitive_group = folium.FeatureGroup(name="All Sensitive Locations").add_to(m_outliers)
for _, row in sensitive_locations_bng_wgs84.iterrows():
    folium.CircleMarker(
        location=[row.geometry.y, row.geometry.x],
        radius=5, # pixels
        color='green',
        fill=True,
        fill_color='lightgreen',
        fill_opacity=0.7,
        popup=f"Sensitive Location: {row.get('amenity', 'N/A')}<br>Name: {row.get('name', 'N/A')}"
    ).add_to(all_sensitive_group)

# Add Outlier Sensitive Locations (larger red circles)
outlier_group = folium.FeatureGroup(name="Outlier Sensitive Locations").add_to(m_outliers)
if not outlier_sensitive_locations_wgs84.empty:
    for _, row in outlier_sensitive_locations_wgs84.iterrows():
        folium.CircleMarker(
            location=[row.geometry.y, row.geometry.x],
            radius=8, # Larger to stand out
            color='red',
            fill=True,
            fill_color='darkred',
            fill_opacity=0.9,
            popup=(f"OUTLIER! Type: {row.get('amenity', 'N/A')}<br>"
                   f"Name: {row.get('name', 'N/A')}<br>"
                   f"Distance to Tube: {row['distance_to_nearest_diffusion_tube']:.2f} m")
        ).add_to(outlier_group)
else:
    print("No outliers to plot on the map.")

# Add a Layer Control to toggle layers on/off (very useful for interactive maps)
folium.LayerControl().add_to(m_outliers)

m_outliers.save("sheffield_map.html")

# Display the map (will render in a Jupyter Notebook/environment)
m_outliers

In [None]:
import folium
import geopandas as gpd

# Load the GeoJSON file for the Sheffield City boundary
sheffield_city_boundary = gpd.read_file("City_Boundary.geojson")

# Load the GeoJSON file for the Sheffield Wards boundary
sheffield_ward_boundary = gpd.read_file("Sheffield_Wards.geojson")

# Reproject GeoDataFrames to WGS84 (EPSG:4326) for Folium
sensitive_locations_bng_wgs84 = sensitive_locations_bng.to_crs(epsg=4326)
outlier_sensitive_locations_wgs84 = outlier_sensitive_locations.to_crs(epsg=4326)
sheffield_city_boundary_wgs84 = sheffield_city_boundary.to_crs(epsg=4326)
sheffield_ward_boundary_wgs84 = sheffield_ward_boundary.to_crs(epsg=4326)

# Center the map on Sheffield (Latitude, Longitude)
sheffield_center = [53.3810, -1.4701]
m_outlier_2 = folium.Map(location=sheffield_center, zoom_start=12)

# Add Sheffield City Boundary
folium.GeoJson(
    sheffield_city_boundary_wgs84,
    name="Sheffield City Boundary",
    style_function=lambda x: {
        'color': 'black',
        'weight': 2,
        'fillColor': 'none',
        'fillOpacity': 0
    }
).add_to(m_outlier_2)

# Add Sheffield Ward Boundary
folium.GeoJson(
    sheffield_ward_boundary_wgs84,
    name="Sheffield Ward Boundary",
    style_function=lambda x: {
        'color': 'black',
        'weight': 1,
        'fillColor': 'none',
        'fillOpacity': 0
    }
).add_to(m_outlier_2)

# Add Diffusion Tubes (blue circles)
diffusion_tube_group = folium.FeatureGroup(name="Diffusion Tubes").add_to(m_outlier_2)
for _, row in diffusion_gdf.iterrows():
    folium.CircleMarker(
        location=[row.geometry.y, row.geometry.x],
        radius=5,  # pixels
        color='blue',
        fill=True,
        fill_color='blue',
        fill_opacity=0.7,
        popup="Diffusion Tube"
    ).add_to(diffusion_tube_group)

# Add ALL Sensitive Locations (green circles)
all_sensitive_group = folium.FeatureGroup(name="All Sensitive Locations").add_to(m_outlier_2)
for _, row in sensitive_locations_bng_wgs84.iterrows():
    folium.CircleMarker(
        location=[row.geometry.y, row.geometry.x],
        radius=5,  # pixels
        color='green',
        fill=True,
        fill_color='lightgreen',
        fill_opacity=0.7,
        popup=f"Sensitive Location: {row.get('amenity', 'N/A')}<br>Name: {row.get('name', 'N/A')}"
    ).add_to(all_sensitive_group)

# Add Outlier Sensitive Locations (larger red circles)
outlier_group = folium.FeatureGroup(name="Outlier Sensitive Locations").add_to(m_outlier_2)
if not outlier_sensitive_locations_wgs84.empty:
    for _, row in outlier_sensitive_locations_wgs84.iterrows():
        folium.CircleMarker(
            location=[row.geometry.y, row.geometry.x],
            radius=8,  # Larger to stand out
            color='red',
            fill=True,
            fill_color='darkred',
            fill_opacity=0.9,
            popup=(f"OUTLIER! Type: {row.get('amenity', 'N/A')}<br>"
                   f"Name: {row.get('name', 'N/A')}<br>"
                   f"Distance to Tube: {row['distance_to_nearest_diffusion_tube']:.2f} m")
        ).add_to(outlier_group)
else:
    print("No outliers to plot on the map.")

# Add a Layer Control to toggle layers on/off
folium.LayerControl().add_to(m_outlier_2)

# Save the map to an HTML file
m_outlier_2.save("sheffield_outlier_with_wards.html")

In [None]:
m_outlier_2

In [None]:
# Total count per amenity type
total_counts = sensitive_locations_with_nearest_dt['amenity'].value_counts()
print("\nTotal sensitive locations per amenity type:\n", total_counts)

# Outlier count per amenity type
outlier_counts = outlier_sensitive_locations['amenity'].value_counts()
print("\nOutlier sensitive locations per amenity type:\n", outlier_counts)

# Percentage of each amenity type that are outliers
percentage_outliers = (outlier_counts / total_counts * 100).fillna(0).sort_values(ascending=False)
print("\nPercentage of each amenity type that are outliers:\n", percentage_outliers.round(2))

# You might also want to look at average/median distance per amenity type for ALL locations
print("\nAverage distance to nearest tube per amenity type (all locations):\n",
      sensitive_locations_with_nearest_dt.groupby('amenity')['distance_to_nearest_diffusion_tube'].mean().round(2).sort_values(ascending=False))

## Other locations

Retrieve data of other locations such as industrial area, park, and main road.

In [None]:
diffusion_bng.info()

In [None]:
place_name = "Sheffield, UK"
target_crs = "EPSG:27700" # British National Grid

output_dir_colocation = "colocation_data"
os.makedirs(output_dir_colocation, exist_ok=True)

print("Starting co-location pattern mining setup...")
print("\nUsing existing sensitive_locations_bng and diffusion_gdf_bng.")
print(f"  Sensitive Locations: {len(sensitive_locations_bng)} features (CRS: {sensitive_locations_bng.crs})")
print(f"  Diffusion Tubes: {len(diffusion_bng)} features (CRS: {diffusion_bng.crs})")

# Ensure feature_type is set for these existing datasets for consistency
if 'feature_type' not in sensitive_locations_bng.columns:
    # This assumes 'amenity' or 'healthcare' exists for sensitive locations
    sensitive_locations_bng['feature_type'] = sensitive_locations_bng['amenity'].fillna(sensitive_locations_bng['healthcare']).copy()
    sensitive_locations_bng = sensitive_locations_bng[~sensitive_locations_bng['feature_type'].isna()].copy()
if 'feature_type' not in diffusion_bng.columns:
    diffusion_bng['feature_type'] = 'diffusion_tube'

# 2. Retrieve Other Location Types
print("\nRetrieving other location types (Industrial, Parks)...")

other_tags = {"landuse": ["industrial"],
              "leisure": ["park", "recreation_ground"]}
other_locations_gdf_raw = ox.features.features_from_place(place_name, other_tags)
other_locations_gdf_raw = other_locations_gdf_raw.reset_index().copy()

# Convert all geometries to points (centroids)
other_locations_gdf_raw['geometry'] = other_locations_gdf_raw.geometry.apply(
    lambda geom: geom.centroid if geom.geom_type in ['Polygon', 'MultiPolygon', 'LineString', 'MultiLineString', 'Point'] else None)
other_locations_gdf_raw = other_locations_gdf_raw.dropna(subset=['geometry']).copy()

# Assign feature_type based on original tags
other_locations_gdf_raw['feature_type'] = None
other_locations_gdf_raw.loc[other_locations_gdf_raw['landuse'] == 'industrial', 'feature_type'] = 'industrial_area'
other_locations_gdf_raw.loc[other_locations_gdf_raw['leisure'] == 'park', 'feature_type'] = 'park'
other_locations_gdf_raw.loc[other_locations_gdf_raw['leisure'] == 'recreation_ground', 'feature_type'] = 'park'

other_locations_gdf_points = other_locations_gdf_raw[
    ~other_locations_gdf_raw['feature_type'].isna()].copy()
other_locations_gdf_points = other_locations_gdf_points.to_crs(target_crs)
print(f"  Processed {len(other_locations_gdf_points)} other locations (industrial, parks).")

In [None]:
# Define target_crs and output_dir_colocation if not already defined
target_crs = "EPSG:27700"
output_dir_colocation = "colocation_data"
os.makedirs(output_dir_colocation, exist_ok=True) # Ensure the directory exists


print("\nCombining all processed features...")

# --- Add a generic 'uid' for consistent tracking across all data sources ---

# 1. Sensitive Locations
sensitive_uid_col_found = None
if 'id' in sensitive_locations_bng.columns:
    sensitive_uid_col_found = 'id'
elif 'osmid' in sensitive_locations_bng.columns:
    sensitive_uid_col_found = 'osmid'

if sensitive_uid_col_found:
    sensitive_locations_bng['uid'] = sensitive_locations_bng[sensitive_uid_col_found].astype(str) + '_sl'
    print(f"  Sensitive locations UID based on '{sensitive_uid_col_found}'.")
else:
    sensitive_locations_bng['uid'] = 'sl_' + sensitive_locations_bng.index.astype(str)
    print("  Warning: Neither 'id' nor 'osmid' found in sensitive_locations_bng. UID based on index.")


# 2. Diffusion Tubes
diffusion_uid_col_found = None
# Check for 'objectid' first as per your data
if 'objectid' in diffusion_bng.columns:
    diffusion_uid_col_found = 'objectid'
elif 'id' in diffusion_bng.columns: # Fallback to 'id'
    diffusion_uid_col_found = 'id'
elif 'osmid' in diffusion_bng.columns: # Fallback to 'osmid'
    diffusion_uid_col_found = 'osmid'

if diffusion_uid_col_found:
    diffusion_bng['uid'] = diffusion_bng[diffusion_uid_col_found].astype(str) + '_dt'
    print(f"  Diffusion tubes UID based on '{diffusion_uid_col_found}'.")
else:
    diffusion_bng['uid'] = 'dt_' + diffusion_bng.index.astype(str)
    print("  Warning: Neither 'objectid', 'id', nor 'osmid' found in diffusion_gdf_points. UID based on index.")


# 3. Other Locations
other_uid_col_found = None
if 'id' in other_locations_gdf_points.columns:
    other_uid_col_found = 'id'
elif 'osmid' in other_locations_gdf_points.columns:
    other_uid_col_found = 'osmid'

if other_uid_col_found:
    other_locations_gdf_points['uid'] = other_locations_gdf_points[other_uid_col_found].astype(str) + '_other'
    print(f"  Other locations UID based on '{other_uid_col_found}'.")
else:
    other_locations_gdf_points['uid'] = 'other_' + other_locations_gdf_points.index.astype(str)
    print("  Warning: Neither 'id' nor 'osmid' found in other_locations_gdf_points. UID based on index.")


# --- Select only the relevant columns for combination ---
cols_to_keep = ['uid', 'feature_type', 'geometry']

all_features_gdf = pd.concat([
    sensitive_locations_bng[cols_to_keep],
    diffusion_bng[cols_to_keep], # Use diffusion_gdf_points here as it's the point data
    other_locations_gdf_points[cols_to_keep]
]).reset_index(drop=True)

# Ensure the combined GeoDataFrame has the correct CRS
all_features_gdf.crs = target_crs

print(f"\nTotal features combined: {len(all_features_gdf)}")
print("Combined Feature Types Distribution:")
print(all_features_gdf['feature_type'].value_counts())
print(f"Combined GeoDataFrame CRS: {all_features_gdf.crs}")

# Save the combined data for future use
all_features_gdf.to_file(os.path.join(output_dir_colocation, "all_spatial_features_bng.geojson"), driver="GeoJSON")
print(f"All features saved to {os.path.join(output_dir_colocation, 'all_spatial_features_bng.geojson')}")

In [None]:
print(f"Sensitive Locations X range: {all_features_gdf.geometry.x.min()} to {all_features_gdf.geometry.x.max()}")
print(f"Sensitive Locations Y range: {all_features_gdf.geometry.y.min()} to {all_features_gdf.geometry.y.max()}")

## Co-location pattern mining

In [None]:
# --- Configuration for Co-location Mining ---
distance_threshold_m = [200, 300, 400, 500, 600]
pi_thresholds = [0.01, 0.05, 0.1, 0.2, 0.5]
pr_thresholds = [0]

print(f"\nCo-location mining with distance threshold: {distance_threshold_m} meters")
print(f"Min Participation Index (PI): {pi_thresholds}")
print(f"Min Participation Ratio (PR): {pr_thresholds}")

In [None]:
# Define path for saving files
output_dir_colocation = "colocation_data"
os.makedirs(output_dir_colocation, exist_ok=True)

In [None]:
# Loads and processes all_features_gdf
try:
    if 'all_features_gdf' not in locals() and 'all_features_gdf' not in globals():
        print("`all_features_gdf` not found. Attempting to load from 'all_spatial_features_bng.geojson'...")

        output_dir_colocation = "colocation_data"
        geojson_path = os.path.join(output_dir_colocation, "all_spatial_features_bng.geojson")

        if os.path.exists(geojson_path):
            all_features_gdf = gpd.read_file(geojson_path)
            target_crs = "EPSG:27700"
            all_features_gdf.crs = target_crs
            print(f"Loaded {len(all_features_gdf)} features from {geojson_path}.")
        else:
            raise FileNotFoundError(f"Combined GeoJSON not found at: {geojson_path}. Please run data prep first or provide actual all_features_gdf.")
except Exception as e:
    print(f"Error loading all_features_gdf: {e}")
    print("Please ensure `all_features_gdf` is properly loaded/defined before this step.")
    raise

In [None]:
# Ensure geometries are points and CRS is correct before cKDTree
if all_features_gdf.crs != target_crs:
    print(f"WARNING: CRS is {all_features_gdf.crs}, expected {target_crs}. Reprojecting...")
    all_features_gdf = all_features_gdf.to_crs(target_crs)
    print(f"Reprojected to: {all_features_gdf.crs}")

In [None]:
# Convert non-point geometries to centroids if necessary, and filter to only points
geom_types = all_features_gdf.geometry.geom_type.unique()
if 'Point' not in geom_types or len(geom_types) > 1:
    print("Converting non-Point geometries to centroids for cKDTree compatibility...")
    all_features_gdf['geometry'] = all_features_gdf.geometry.apply(
        lambda geom: geom.centroid if geom.geom_type != 'Point' else geom)

    # Filter out any rows where centroid conversion might have failed or resulted in non-points
    all_features_gdf = all_features_gdf[all_features_gdf.geometry.geom_type == 'Point'].copy()
    print(f"Geometry Types after conversion and filtering: {all_features_gdf.geometry.geom_type.unique()}")
    if all_features_gdf.empty:
        raise ValueError("No valid point geometries remaining after processing for cKDTree. Cannot proceed.")

### Extract coordinates

In [None]:
# Extract coordinates for cKDTree
coords = np.array([(p.x, p.y) for p in all_features_gdf.geometry])
tree = cKDTree(coords)

# Get unique feature types
unique_feature_types = all_features_gdf['feature_type'].unique().tolist()
print(f"\nUnique feature types for mining: {unique_feature_types}")

### Defind neighbouring pairs (instance-level) function

In [None]:
def build_neighborhood_graph(current_distance_threshold_m, all_features_gdf, coords, tree):
    """
    Builds the instance-level neighborhood graph based on a distance threshold.

    Args:
        current_distance_threshold_m (float): The current distance threshold in meters.
        all_features_gdf (gpd.GeoDataFrame): GeoDataFrame containing all feature instances.
        coords (np.array): Nx2 array of coordinates for KDTree.
        tree (cKDTree): KDTree built from the coordinates.

    Returns:
        dict: A dictionary where keys are instance UIDs and values are
              lists of (neighbor_uid, neighbor_type) tuples.
    """
    print(f"\n--- Building Neighborhood Graph for Distance Threshold: {current_distance_threshold_m}m ---")
    neighbors_map = {}
    idx_to_uid_type = all_features_gdf[['uid', 'feature_type']].to_dict('index')

    for i in range(len(all_features_gdf)):
        current_uid = all_features_gdf.iloc[i]['uid']
        # Query neighbors for the point at current_idx in the coords array
        neighbor_indices = tree.query_ball_point(coords[i], current_distance_threshold_m)

        current_instance_neighbors = []
        for neighbor_idx in neighbor_indices:
            if neighbor_idx != i: # Don't count self as neighbor
                neighbor_data = idx_to_uid_type.get(neighbor_idx)
                if neighbor_data:
                    neighbor_uid = neighbor_data['uid']
                    neighbor_type = neighbor_data['feature_type']
                    current_instance_neighbors.append((neighbor_uid, neighbor_type))
        neighbors_map[current_uid] = current_instance_neighbors

    print(f"Built neighborhood graph (instance-level) for {current_distance_threshold_m}m. Found {len(neighbors_map)} instances with neighbors.")
    return neighbors_map

### Define PR/PI function

In [None]:
# Function to calculate PI/PR for a given pattern
def calculate_pattern_prevalence(pattern_types, neighbors_map, all_features_gdf):
    """
    Calculates the Participation Index (PI) and Participation Ratio (PR) for a given pattern.
    Assumes a star-based co-location model: an instance participates if it's a central
    node that has neighbors of ALL other types in the pattern.
    """
    pattern_pi_values = []
    feature_type_counts = all_features_gdf['feature_type'].value_counts().to_dict()

    for f_type in pattern_types:
        total_instances_of_type = feature_type_counts.get(f_type, 0)
        if total_instances_of_type == 0:
            pattern_pi_values.append(0.0)
            continue

        participating_count = 0
        instances_of_f_type = all_features_gdf[all_features_gdf['feature_type'] == f_type]

        for _, instance_row in instances_of_f_type.iterrows():
            instance_uid = instance_row['uid']

            neighbors_of_instance = {n_type for _, n_type in neighbors_map.get(instance_uid, [])}
            other_types_in_pattern = [t for t in pattern_types if t != f_type]

            if all(ot in neighbors_of_instance for ot in other_types_in_pattern):
                participating_count += 1

        pattern_pi_values.append(participating_count / total_instances_of_type)

    pattern_pi = min(pattern_pi_values) if pattern_pi_values else 0.0
    pattern_pr = sum(pattern_pi_values) / len(pattern_pi_values)

    return pattern_pi, pattern_pr

### Define Conditional probability (CP) function

In [None]:
# Function to calculate conditional probability
def calculate_conditional_probability(target_type, given_type, neighbors_map, all_features_gdf):
    """
    Calculates the Conditional Probability P(target_type | given_type).

    Args:
        target_type (str): The feature type we are checking for (e.g., 'school').
        given_type (str): The feature type that must be present (e.g., 'diffusion_tube').
        neighbors_map (dict): The neighborhood graph.
        all_features_gdf (gpd.GeoDataFrame): GeoDataFrame of all feature instances.

    Returns:
        float: The calculated conditional probability.
    """
    # 1. Get the total number of instances of the 'given' feature type (the denominator).
    total_given_instances = len(all_features_gdf[all_features_gdf['feature_type'] == given_type])
    if total_given_instances == 0:
        print(f"No instances of '{given_type}' found.")
        return 0.0

    # 2. Count the number of 'given' instances that have at least one 'target' neighbor.
    co_located_count = 0
    instances_of_given_type = all_features_gdf[all_features_gdf['feature_type'] == given_type]

    for _, instance_row in instances_of_given_type.iterrows():
        instance_uid = instance_row['uid']
        # Use a set comprehension for efficient lookup of neighbor types
        neighbors_of_instance = {n_type for _, n_type in neighbors_map.get(instance_uid, [])}

        # Check if the target type is in the set of neighbors
        if target_type in neighbors_of_instance:
            co_located_count += 1

    # 3. Calculate the conditional probability.
    conditional_prob = co_located_count / total_given_instances
    return conditional_prob

def calculate_all_conditional_probabilities_for_prevalent_patterns(prevalent_patterns, neighbors_map, all_features_gdf):
    """
    Calculates conditional probabilities for all 2-item prevalent patterns.

    Args:
        prevalent_patterns (dict): A dictionary of prevalent patterns, e.g.,
                                   {frozenset({'A', 'B'}): {'pi': 0.5, 'pr': 0.8}}
        neighbors_map (dict): The neighborhood graph.
        all_features_gdf (gpd.GeoDataFrame): GeoDataFrame of all feature instances.

    Returns:
        dict: A dictionary with conditional probabilities for each pair.
    """
    conditional_probs = {}
    for pattern_key in prevalent_patterns.keys():
        f_type1, f_type2 = list(pattern_key)

        # Calculate P(f_type2 | f_type1)
        prob_f2_given_f1 = calculate_conditional_probability(f_type2, f_type1, neighbors_map, all_features_gdf)
        conditional_probs[f"{f_type2} | {f_type1}"] = prob_f2_given_f1

        # Calculate P(f_type1 | f_type2)
        prob_f1_given_f2 = calculate_conditional_probability(f_type1, f_type2, neighbors_map, all_features_gdf)
        conditional_probs[f"{f_type1} | {f_type2}"] = prob_f1_given_f2

    return conditional_probs

### Calculate Co-location Patterns

In [None]:
# --- Co-location Pattern Mining Loop (including distance threshold) ---
print("\n--- Phase 3: Running Co-location Pattern Mining for different thresholds ---")

# Lists to collect all results across ALL distance, PI, and PR combinations
all_prevalent_patterns_combined_list = [] # For combined prevalent patterns CSV
all_maximal_patterns_combined_list = [] # For combined maximal patterns CSV

# Outer loop for distance thresholds
for current_distance_threshold_m in distance_threshold_m:
    print(f"\n===== Running Co-location for Distance: {current_distance_threshold_m}m =====")

    # Recalculate neighbors_map for the current distance threshold
    neighbors_map = build_neighborhood_graph(current_distance_threshold_m, all_features_gdf, coords, tree)

    # Nested loops to iterate through all PI and PR combinations
    for current_min_pr in pr_thresholds: # Outer loop for PR
        for current_min_pi in pi_thresholds: # Inner loop for PI
            print(f"\n--- Current Thresholds (D={current_distance_threshold_m}m): Min PI={current_min_pi:.2f}, Min PR={current_min_pr:.2f} ---")

            # Dictionary to store prevalent patterns for this specific run (PI/PR/Distance)
            all_prevalent_patterns_current_run = {
                2: {},
                3: {},
                4: {}
            }

            # --- Mining 2-item patterns ---
            filtered_unique_feature_types = [
                f_type for f_type in unique_feature_types
                if f_type in all_features_gdf['feature_type'].unique()
            ]
            candidate_2_item_patterns = list(itertools.combinations(filtered_unique_feature_types, 2))
            print(f"   Searching for 2-item co-location patterns ({len(candidate_2_item_patterns)} candidates)...")

            for F1_type, F2_type in candidate_2_item_patterns:
                pattern_key = frozenset([F1_type, F2_type])
                pattern_pi, pattern_pr = calculate_pattern_prevalence(
                    pattern_key, neighbors_map, all_features_gdf
                )

                if pattern_pi >= current_min_pi and pattern_pr >= current_min_pr:
                    all_prevalent_patterns_current_run[2][pattern_key] = {'pi': pattern_pi, 'pr': pattern_pr}
            print(f"   Total prevalent 2-item patterns found: {len(all_prevalent_patterns_current_run[2])}")


            # --- Mining 3-item patterns (Apriori-like) ---
            print("\n   Searching for 3-item co-location patterns...")
            frequent_2_item_keys = all_prevalent_patterns_current_run[2].keys()
            frequent_2_item_types = [list(p) for p in frequent_2_item_keys]

            candidate_3_item_patterns_set = set()
            for i in range(len(frequent_2_item_types)):
                for j in range(i + 1, len(frequent_2_item_types)):
                    p1 = set(frequent_2_item_types[i])
                    p2 = set(frequent_2_item_types[j])
                    union_pattern = frozenset(p1.union(p2))
                    if len(union_pattern) == 3:
                        is_candidate = True
                        for subset in itertools.combinations(union_pattern, 2):
                            if frozenset(subset) not in frequent_2_item_keys:
                                is_candidate = False
                                break
                        if is_candidate:
                            candidate_3_item_patterns_set.add(union_pattern)

            candidate_3_item_patterns = [list(p) for p in candidate_3_item_patterns_set]

            for pattern_types in candidate_3_item_patterns:
                pattern_key = frozenset(pattern_types)
                pattern_pi, pattern_pr = calculate_pattern_prevalence(
                    pattern_key, neighbors_map, all_features_gdf
                )

                if pattern_pi >= current_min_pi and pattern_pr >= current_min_pr:
                    all_prevalent_patterns_current_run[3][pattern_key] = {'pi': pattern_pi, 'pr': pattern_pr}
            print(f"   Total prevalent 3-item patterns found: {len(all_prevalent_patterns_current_run[3])}")


            # --- Mining 4-item patterns (Apriori-like) ---
            print("\n   Searching for 4-item co-location patterns...")
            frequent_3_item_keys = all_prevalent_patterns_current_run[3].keys()
            frequent_3_item_types = [list(p) for p in frequent_3_item_keys]

            candidate_4_item_patterns_set = set()
            for i in range(len(frequent_3_item_types)):
                for j in range(i + 1, len(frequent_3_item_types)):
                    p1 = set(frequent_3_item_types[i])
                    p2 = set(frequent_3_item_types[j])
                    union_pattern = frozenset(p1.union(p2))
                    if len(union_pattern) == 4:
                        is_candidate = True
                        for subset in itertools.combinations(union_pattern, 3):
                            if frozenset(subset) not in frequent_3_item_keys:
                                is_candidate = False
                                break
                        if is_candidate:
                            candidate_4_item_patterns_set.add(union_pattern)

            candidate_4_item_patterns = [list(p) for p in candidate_4_item_patterns_set]

            for pattern_types in candidate_4_item_patterns:
                pattern_key = frozenset(pattern_types)
                pattern_pi, pattern_pr = calculate_pattern_prevalence(
                    pattern_key, neighbors_map, all_features_gdf
                )

                if pattern_pi >= current_min_pi and pattern_pr >= current_min_pr:
                    all_prevalent_patterns_current_run[4][pattern_key] = {'pi': pattern_pi, 'pr': pattern_pr}
            print(f"   Total prevalent 4-item patterns found: {len(all_prevalent_patterns_current_run[4])}")

            # --- Summary and Saving for the Current Threshold Combination (PI/PR/Distance) ---
            print(f"\n   --- Summary for D={current_distance_threshold_m}m, Min PI={current_min_pi:.2f}, Min PR={current_min_pr:.2f} ---")
            summary_data_prevalent_current_run = [] # Renamed for clarity

            found_any_patterns_current_run = False

            for k in sorted(all_prevalent_patterns_current_run.keys()):
                if all_prevalent_patterns_current_run[k]:
                    found_any_patterns_current_run = True
                    print(f"\n   Prevalent {k}-item Patterns:")
                    sorted_patterns = sorted(all_prevalent_patterns_current_run[k].items(), key=lambda item: item[1]['pi'], reverse=True)
                    for pattern, metrics in sorted_patterns:
                        pattern_set = set(pattern)
                        print(f"    Pattern: {pattern_set}, PI: {metrics['pi']:.4f}, PR: {metrics['pr']:.4f}")

                        summary_data_prevalent_current_run.append({
                            'distance_threshold_m': current_distance_threshold_m,
                            'min_pi_threshold': current_min_pi,
                            'min_pr_threshold': current_min_pr,
                            'pattern_length': k,
                            'pattern': ", ".join(sorted(pattern_set)),
                            'participation_index': metrics['pi'],
                            'participation_ratio': metrics['pr']
                        })

            if not found_any_patterns_current_run:
                print("   No prevalent co-location patterns found for this combination of thresholds.")

            # Save individual prevalent patterns CSV
            if summary_data_prevalent_current_run:
                patterns_summary_df_current_run = pd.DataFrame(summary_data_prevalent_current_run)
                patterns_summary_df_current_run = patterns_summary_df_current_run.sort_values(
                    by=['distance_threshold_m', 'pattern_length', 'participation_index'],
                    ascending=[True, True, False]
                ).reset_index(drop=True)

                formatted_dist_str = f"{current_distance_threshold_m}m"
                formatted_pi_str = f"{current_min_pi:.2f}".replace('.', '')
                formatted_pr_str = f"{current_min_pr:.2f}".replace('.', '')
                output_filename = f"patterns_summary_D{formatted_dist_str}_PI{formatted_pi_str}_PR{formatted_pr_str}.csv"
                output_filepath = os.path.join(output_dir_colocation, output_filename)

                patterns_summary_df_current_run.to_csv(output_filepath, index=False)
                print(f"   Prevalent co-location pattern summary saved to: {output_filepath}")
            else:
                print("   No prevalent patterns CSV file saved for this run as no patterns were found.")

            # Extend the list for the final combined CSV for ALL prevalent patterns
            all_prevalent_patterns_combined_list.extend(summary_data_prevalent_current_run)


            # --- Identify Maximal Co-location Patterns for the Current Thresholds ---
            print(f"\n   --- Identifying Maximal Co-location Patterns for D={current_distance_threshold_m}m, PI={current_min_pi:.2f}, PR={current_min_pr:.2f} ---")

            # Flatten all prevalent patterns for the CURRENT RUN into a list: (pattern, metrics)
            # This is crucial for maximal pattern finding - only consider patterns from the same run
            all_patterns_for_maximal_check = []
            for k_len in sorted(all_prevalent_patterns_current_run.keys(), reverse=True):
                for pattern, metrics in all_prevalent_patterns_current_run[k_len].items():
                    all_patterns_for_maximal_check.append((pattern, metrics))

            maximal_patterns = []
            summary_data_maximal_current_run = [] # Collects data for current run's maximal CSV

            if not all_patterns_for_maximal_check:
                print("   No prevalent patterns found, thus no maximal patterns for this run.")
            else:
                for i, (pattern_i, metrics_i) in enumerate(all_patterns_for_maximal_check):
                    is_maximal = True
                    for j, (pattern_j, _) in enumerate(all_patterns_for_maximal_check):
                        if i != j:
                            # Check if pattern_i is a subset of pattern_j AND pattern_j is larger
                            if pattern_i.issubset(pattern_j) and len(pattern_i) < len(pattern_j):
                                is_maximal = False
                                break # Not maximal, found a larger super-pattern
                    if is_maximal:
                        maximal_patterns.append((pattern_i, metrics_i))
                        summary_data_maximal_current_run.append({
                            'distance_threshold_m': current_distance_threshold_m, # ADDED: Distance threshold
                            'min_pi_threshold': current_min_pi,
                            'min_pr_threshold': current_min_pr,
                            'pattern_length': len(pattern_i),
                            'pattern': ", ".join(sorted(pattern_i)),
                            'participation_index': metrics_i['pi'],
                            'participation_ratio': metrics_i['pr']
                        })

                print(f"   Total maximal co-location patterns found: {len(maximal_patterns)}")

                if maximal_patterns:
                    print("\n   Maximal Co-location Patterns:")
                    for pattern, metrics in sorted(maximal_patterns, key=lambda x: x[1]['pi'], reverse=True):
                        print(f"    Pattern: {set(pattern)}, PI: {metrics['pi']:.4f}, PR: {metrics['pr']:.4f}")

                    # Save individual maximal patterns CSV
                    df_maximal_current_run = pd.DataFrame(summary_data_maximal_current_run).sort_values(
                        by=['distance_threshold_m', 'pattern_length', 'participation_index'], # ADDED: Sort by distance
                        ascending=[True, True, False]).reset_index(drop=True)

                    formatted_dist_str = f"{current_distance_threshold_m}m" # NEW: Format distance
                    pi_str = f"{current_min_pi:.2f}".replace('.', '')
                    pr_str = f"{current_min_pr:.2f}".replace('.', '')
                    filename = f"maximal_patterns_D{formatted_dist_str}_PI{pi_str}_PR{pr_str}.csv" # UPDATED FILENAME
                    filepath = os.path.join(output_dir_colocation, filename)

                    df_maximal_current_run.to_csv(filepath, index=False)
                    print(f"   Maximal co-location pattern summary saved to: {filepath}")
                else:
                    print("   No maximal co-location patterns found for this combination of thresholds.")

            # Extend the list for the final combined CSV for ALL maximal patterns
            all_maximal_patterns_combined_list.extend(summary_data_maximal_current_run)


# --- Final Combined CSV Files ---
print("\n--- Phase 4: Generating combined CSV files for all prevalent and maximal patterns ---")

# Combine all prevalent patterns into one DataFrame
if all_prevalent_patterns_combined_list: # Renamed from all_results_summary_df_list
    final_prevalent_patterns_df = pd.DataFrame(all_prevalent_patterns_combined_list)
    final_prevalent_patterns_df = final_prevalent_patterns_df.sort_values(
        by=['distance_threshold_m', 'min_pi_threshold', 'min_pr_threshold', 'pattern_length', 'participation_index'], # Added distance for sorting
        ascending=[True, True, True, True, False]
    ).reset_index(drop=True)

    combined_prevalent_output_filepath = os.path.join(output_dir_colocation, "all_prevalent_patterns_combined.csv")
    final_prevalent_patterns_df.to_csv(combined_prevalent_output_filepath, index=False)
    print(f"All prevalent patterns combined and saved to: {combined_prevalent_output_filepath}")
else:
    print("No prevalent patterns found across all runs to combine.")

# Combine all maximal patterns into one DataFrame
if all_maximal_patterns_combined_list: # Renamed from all_maximal_patterns_df_list
    final_maximal_patterns_df = pd.DataFrame(all_maximal_patterns_combined_list)
    final_maximal_patterns_df = final_maximal_patterns_df.sort_values(
        by=['distance_threshold_m', 'min_pi_threshold', 'min_pr_threshold', 'pattern_length', 'participation_index'], # Added distance for sorting
        ascending=[True, True, True, True, False]
    ).reset_index(drop=True)

    combined_maximal_output_filepath = os.path.join(output_dir_colocation, "all_maximal_patterns_combined.csv")
    final_maximal_patterns_df.to_csv(combined_maximal_output_filepath, index=False)
    print(f"All maximal patterns combined and saved to: {combined_maximal_output_filepath}")
else:
    print("No maximal patterns found across all runs to combine.")

print("\n--- Co-location Pattern Mining Process Completed ---")

### Calculate conditional probability (cp) of size-2 patterns

In [None]:
# --- Phase 5: Calculating and Saving Conditional Probabilities ---

def calculate_and_save_conditional_probabilities(combined_prevalent_filepath, output_dir, neighbors_map, all_features_gdf):
    """
    Reads the combined prevalent patterns file, calculates conditional probabilities for 2-item patterns,
    and saves the results to a new CSV file.
    """
    print("\n--- Phase 5: Calculating Conditional Probabilities for all 2-item prevalent patterns ---")

    if not os.path.exists(combined_prevalent_filepath):
        print(f"File not found: {combined_prevalent_filepath}. Skipping conditional probability calculation.")
        return

    # Read the combined prevalent patterns CSV
    prevalent_df = pd.read_csv(combined_prevalent_filepath)
    # Filter for only 2-item patterns
    two_item_patterns_df = prevalent_df[prevalent_df['pattern_length'] == 2].copy()

    if two_item_patterns_df.empty:
        print("No 2-item prevalent patterns found in the combined file to calculate conditional probabilities for.")
        return

    # Create a new list to store the results
    conditional_prob_data = []

    # Iterate through each unique 2-item pattern to calculate probabilities
    unique_patterns = two_item_patterns_df.drop_duplicates(subset=['distance_threshold_m', 'pattern'])

    for _, row in unique_patterns.iterrows():
        pattern_str = row['pattern']
        # The pattern string is like "A, B", split and strip whitespace
        f_type1, f_type2 = [s.strip() for s in pattern_str.split(',')]

        # Re-build or access the correct map for the current distance threshold if not already available
        current_distance_threshold_m = row['distance_threshold_m']
        current_neighbors_map = build_neighborhood_graph(current_distance_threshold_m, all_features_gdf, coords, tree)

        # Calculate P(f_type2 | f_type1)
        prob_f2_given_f1 = calculate_conditional_probability(f_type2, f_type1, current_neighbors_map, all_features_gdf)

        # Calculate P(f_type1 | f_type2)
        prob_f1_given_f2 = calculate_conditional_probability(f_type1, f_type2, current_neighbors_map, all_features_gdf)

        conditional_prob_data.append({
            'distance_threshold_m': current_distance_threshold_m,
            'pattern': pattern_str,
            'P(f_type2 | f_type1)': f"{f_type2} | {f_type1}",
            'conditional_probability_value': prob_f2_given_f1
        })
        conditional_prob_data.append({
            'distance_threshold_m': current_distance_threshold_m,
            'pattern': pattern_str,
            'P(f_type1 | f_type2)': f"{f_type1} | {f_type2}",
            'conditional_probability_value': prob_f1_given_f2
        })

    # Create a DataFrame and save to CSV
    if conditional_prob_data:
        conditional_probs_df = pd.DataFrame(conditional_prob_data)
        conditional_probs_df.to_csv(os.path.join(output_dir, "conditional_probabilities_combined.csv"), index=False)
        print(f"Conditional probabilities for all 2-item patterns saved to: {os.path.join(output_dir, 'conditional_probabilities_combined.csv')}")
    else:
        print("No conditional probabilities were calculated.")

# --- Execute the new function after the main loop finishes ---
if all_prevalent_patterns_combined_list:
    calculate_and_save_conditional_probabilities(
        combined_prevalent_output_filepath,
        output_dir_colocation,
        neighbors_map, # This will be the last map generated by the loop, but it will be re-built inside the function
        all_features_gdf
    )
else:
    print("Skipping conditional probability calculation because no prevalent patterns were found.")

print("\n--- All Processes Completed ---")

In [None]:
cp_read = pd.read_csv("/content/colocation_data/conditional_probabilities_combined.csv")
display(cp_read)

In [None]:
# --- Final Combined CSV Files ---
print("\n--- Phase 4: Generating combined CSV files for all prevalent and maximal patterns ---")

# Combine all prevalent patterns into one DataFrame
if all_prevalent_patterns_combined_list:
    final_prevalent_patterns_df = pd.DataFrame(all_prevalent_patterns_combined_list)
    final_prevalent_patterns_df = final_prevalent_patterns_df.sort_values(
        by=['distance_threshold_m', 'min_pi_threshold', 'min_pr_threshold', 'pattern_length', 'participation_index'],
        ascending=[True, True, True, True, False]
    ).reset_index(drop=True)

    combined_prevalent_output_filepath = os.path.join(output_dir_colocation, "all_prevalent_patterns_combined.csv")
    final_prevalent_patterns_df.to_csv(combined_prevalent_output_filepath, index=False)
    print(f"All prevalent patterns combined and saved to: {combined_prevalent_output_filepath}")
    print("\nFirst 10 rows of the combined prevalent patterns summary:")
    print(final_prevalent_patterns_df.head(10)) # Added head print for prevalent
else:
    print("No prevalent patterns found across all runs to combine.")

# Combine all maximal patterns into one DataFrame
if all_maximal_patterns_combined_list:
    final_maximal_patterns_df = pd.DataFrame(all_maximal_patterns_combined_list)
    final_maximal_patterns_df = final_maximal_patterns_df.sort_values(
        by=['distance_threshold_m', 'min_pi_threshold', 'min_pr_threshold', 'pattern_length', 'participation_index'],
        ascending=[True, True, True, True, False]
    ).reset_index(drop=True)

    combined_maximal_output_filepath = os.path.join(output_dir_colocation, "all_maximal_patterns_combined.csv")
    final_maximal_patterns_df.to_csv(combined_maximal_output_filepath, index=False)
    print(f"All maximal patterns combined and saved to: {combined_maximal_output_filepath}")
    print("\nFirst 10 rows of the combined maximal patterns summary:")
    print(final_maximal_patterns_df.head(10)) # Added head print for maximal
else:
    print("No maximal patterns found across all runs to combine.")

print("\n--- Co-location Pattern Mining Process Completed ---")

In [None]:
import shutil

folder_to_zip = "colocation_data"
zip_filename = "colocation_results_archive"

# Define the output path for the zip file.
output_zip_path = os.path.join(os.getcwd(), zip_filename)

print(f"Attempting to zip folder: {folder_to_zip}")

try:
    shutil.make_archive(output_zip_path, 'zip', folder_to_zip)
    print(f"Successfully created zip file: {output_filename}.zip")
    print(f"The zip file is located at: {output_zip_path}.zip")

except FileNotFoundError:
    print(f"Error: The folder '{folder_to_zip}' was not found.")
except Exception as e:
    print(f"An error occurred during zipping: {e}")

### Visualisation

compare most and least frequent patterns

In [None]:
# --- Configuration for Visualization ---
output_dir_colocation = "colocation_data"

# Define the specific thresholds for the comparison
D_COMPARE = 600  # meters
MIN_PI_COMPARE = 0.5

# Define pattern size for comparison (e.g., compare only size-2 patterns)
PATTERN_SIZE_COMPARE = 2

# --- Load the combined prevalent patterns data ---
combined_prevalent_output_filepath = os.path.join(output_dir_colocation, "all_prevalent_patterns_combined.csv")

try:
    df_all_prevalent = pd.read_csv(combined_prevalent_output_filepath)
    print(f"Loaded combined prevalent patterns from: {combined_prevalent_output_filepath}")
except FileNotFoundError:
    print(f"Error: Combined prevalent patterns file not found at {combined_prevalent_output_filepath}")
    exit() # Exit if the file isn't found

# --- Filter for the specific comparison scenario ---
df_scenario = df_all_prevalent[
    (df_all_prevalent['distance_threshold_m'] == D_COMPARE) &
    (df_all_prevalent['min_pi_threshold'] == MIN_PI_COMPARE) &
    (df_all_prevalent['pattern_length'] == PATTERN_SIZE_COMPARE) # Filter by pattern size
].copy() # Use .copy() to avoid SettingWithCopyWarning

if df_scenario.empty:
    print(f"\nNo prevalent patterns found for D={D_COMPARE}m, Min PI={MIN_PI_COMPARE}, Size={PATTERN_SIZE_COMPARE}.")
else:
    # --- Identify Most and Least Frequent Patterns ---
    df_scenario_sorted = df_scenario.sort_values(by='participation_index', ascending=False)

    most_frequent_pattern = df_scenario_sorted.iloc[0]
    # Ensure there's more than one pattern to pick a 'least' from
    if len(df_scenario_sorted) > 1:
        least_frequent_pattern = df_scenario_sorted.iloc[-1]
    else:
        least_frequent_pattern = None # Only one pattern, cannot compare least

    print(f"\n--- Comparison for D={D_COMPARE}m, Min PI={MIN_PI_COMPARE}, Size={PATTERN_SIZE_COMPARE} ---")

    if most_frequent_pattern is not None:
        print("\nMost Frequent Pattern:")
        print(f"  Pattern: {{{most_frequent_pattern['pattern']}}}, PI: {most_frequent_pattern['participation_index']:.4f}, PR: {most_frequent_pattern['participation_ratio']:.4f}")

    if least_frequent_pattern is not None:
        print("\nLeast Frequent Pattern:")
        print(f"  Pattern: {{{least_frequent_pattern['pattern']}}}, PI: {least_frequent_pattern['participation_index']:.4f}, PR: {least_frequent_pattern['participation_ratio']:.4f}")
    elif len(df_scenario_sorted) == 1:
        print("Only one pattern found for this scenario, cannot compare most vs. least.")
    else:
        print("No patterns found to compare.")

    # --- Visualization (Bar Chart) ---
    if most_frequent_pattern is not None and least_frequent_pattern is not None:
        patterns_to_plot = pd.DataFrame([most_frequent_pattern, least_frequent_pattern])

        plt.figure(figsize=(10, 6))
        sns.barplot(x='pattern', y='participation_index', data=patterns_to_plot, palette='viridis')
        plt.title(f'Most vs. Least Frequent Size-{PATTERN_SIZE_COMPARE} Co-location Patterns\n'
                  f'(D={D_COMPARE}m, Min PI={MIN_PI_COMPARE})')
        plt.xlabel('Co-location Pattern')
        plt.ylabel('Participation Index (PI)')
        plt.ylim(0, 1) # PI is between 0 and 1
        plt.xticks(rotation=45, ha='right')
        plt.grid(axis='y', linestyle='--', alpha=0.7)
        plt.tight_layout()

        # Save the plot
        plot_filename = f"most_least_frequent_D{D_COMPARE}min_PI{str(MIN_PI_COMPARE).replace('.', '')}_Size{PATTERN_SIZE_COMPARE}.png"
        plot_filepath = os.path.join(output_dir_colocation, plot_filename)
        plt.savefig(plot_filepath)
        print(f"\nVisualization saved to: {plot_filepath}")
        plt.show()

    elif most_frequent_pattern is not None:
        print("Cannot create comparison plot: Only one prevalent pattern found for this scenario.")

compare most and least frequent patterns (diffusion tube)

In [None]:
# --- Configuration for Visualization ---
output_dir_colocation = "colocation_data" # Ensure this matches your output folder

# Define the specific thresholds for the comparison
D_COMPARE = 600  # meters
MIN_PI_COMPARE = 0.01
TARGET_FEATURE_TYPE = 'diffusion_tube'
PATTERN_SIZE_COMPARE = 2 # choose: 2, 3, 4, or None

# --- Load the combined prevalent patterns data ---
combined_prevalent_output_filepath = os.path.join(output_dir_colocation, "all_prevalent_patterns_combined.csv")

try:
    df_all_prevalent = pd.read_csv(combined_prevalent_output_filepath)
    print(f"Loaded combined prevalent patterns from: {combined_prevalent_output_filepath}")
except FileNotFoundError:
    print(f"Error: Combined prevalent patterns file not found at {combined_prevalent_output_filepath}")
    exit() # Exit if the file isn't found

# --- Filter for the specific comparison scenario (distance, PI, PR) ---
df_scenario = df_all_prevalent[(df_all_prevalent['distance_threshold_m'] == D_COMPARE) &
 (df_all_prevalent['min_pi_threshold'] == MIN_PI_COMPARE)].copy()

# --- NEW FILTER: Filter for patterns containing the target feature type ---
# The 'pattern' column is stored as a comma-separated string, so we check for substring.
df_target_feature_patterns = df_scenario[df_scenario['pattern'].str.contains(TARGET_FEATURE_TYPE, na=False)].copy()

# Optional: Further filter by pattern size if PATTERN_SIZE_COMPARE is set
if PATTERN_SIZE_COMPARE is not None:
    df_target_feature_patterns = df_target_feature_patterns[
        df_target_feature_patterns['pattern_length'] == PATTERN_SIZE_COMPARE
    ].copy()

if df_target_feature_patterns.empty:
    print(f"\nNo prevalent patterns containing '{TARGET_FEATURE_TYPE}' found for D={D_COMPARE}m, Min PI={MIN_PI_COMPARE}.")
    if PATTERN_SIZE_COMPARE is not None:
        print(f"Consider adjusting PATTERN_SIZE_COMPARE or setting it to None.")
else:
    # --- Identify Most and Least Frequent Patterns among the filtered set ---
    df_filtered_sorted = df_target_feature_patterns.sort_values(by='participation_index', ascending=False)

    most_frequent_pattern = df_filtered_sorted.iloc[0]
    if len(df_filtered_sorted) > 1:
        least_frequent_pattern = df_filtered_sorted.iloc[-1]
    else:
        least_frequent_pattern = None # Only one pattern, cannot compare least

    print(f"\n--- Comparison for patterns containing '{TARGET_FEATURE_TYPE}' at D={D_COMPARE}m, Min PI={MIN_PI_COMPARE} ---")
    if PATTERN_SIZE_COMPARE is not None:
        print(f"--- (Only considering size-{PATTERN_SIZE_COMPARE} patterns) ---")


    if most_frequent_pattern is not None:
        print("\nMost Frequent Pattern:")
        print(f"  Pattern: {{{most_frequent_pattern['pattern']}}}, PI: {most_frequent_pattern['participation_index']:.4f}, PR: {most_frequent_pattern['participation_ratio']:.4f}")

    if least_frequent_pattern is not None:
        print("\nLeast Frequent Pattern:")
        print(f"  Pattern: {{{least_frequent_pattern['pattern']}}}, PI: {least_frequent_pattern['participation_index']:.4f}, PR: {least_frequent_pattern['participation_ratio']:.4f}")
    elif len(df_filtered_sorted) == 1:
        print(f"Only one pattern containing '{TARGET_FEATURE_TYPE}' found for this scenario, cannot compare most vs. least.")
    else:
        print(f"No patterns containing '{TARGET_FEATURE_TYPE}' found to compare.")

    # --- Visualization (Bar Chart) ---
    if most_frequent_pattern is not None and least_frequent_pattern is not None:
        patterns_to_plot = pd.DataFrame([most_frequent_pattern, least_frequent_pattern])

        plt.figure(figsize=(10, 6))
        sns.barplot(x='pattern', y='participation_index', data=patterns_to_plot, palette='viridis')
        plt.title(f'Most vs. Least Frequent Co-location Patterns with "{TARGET_FEATURE_TYPE}"\n'
                  f'(D={D_COMPARE}m, Min PI={MIN_PI_COMPARE})' +
                  (f' - Size {PATTERN_SIZE_COMPARE} Patterns' if PATTERN_SIZE_COMPARE else ''))
        plt.xlabel('Co-location Pattern')
        plt.ylabel('Participation Index (PI)')
        plt.ylim(0, 1) # PI is between 0 and 1
        plt.xticks(rotation=0, ha='center')
        plt.grid(axis='y', linestyle='--', alpha=0.7)
        plt.tight_layout()

        # Save the plot
        plot_filename = f"most_least_{TARGET_FEATURE_TYPE}_D{D_COMPARE}m_PI{str(MIN_PI_COMPARE).replace('.', '')}"
        if PATTERN_SIZE_COMPARE is not None:
            plot_filename += f"_Size{PATTERN_SIZE_COMPARE}"
        plot_filename += ".png"

        plot_filepath = os.path.join(output_dir_colocation, plot_filename)
        plt.savefig(plot_filepath)
        print(f"\nVisualization saved to: {plot_filepath}")
        plt.show()
    else:
        print("Cannot create comparison plot as there are not enough patterns for comparison.")

In [None]:
# Reproject to a local projected CRS (e.g., British National Grid) for accurate distance calculation
local_crs = "EPSG:27700"
all_features_gdf = all_features_gdf.to_crs(local_crs)

# --- Define the Most Prevalent Pattern and its Colors ---
MOST_PREVALENT_PATTERN = ['diffusion_tube', 'kindergarten', 'park', 'school']
FEATURE_COLORS = {
    'diffusion_tube': 'blue',
    'kindergarten': 'purple',
    'park': 'green',
    'school': 'red'
}
FEATURE_RADIUS = 2

# --- 2. Load and Reproject Boundaries for the Map ---
print("\nLoading and reprojecting boundaries...")
sheffield_city_boundary_wgs84 = gpd.read_file("City_Boundary.geojson").to_crs("EPSG:4326")
sheffield_ward_boundary_wgs84 = gpd.read_file("Sheffield_Wards.geojson").to_crs("EPSG:4326")

# Reproject all features for Folium plotting (back to WGS84)
all_features_gdf_wgs84 = all_features_gdf.to_crs("EPSG:4326")

# --- 3. Create a Folium Map Object and Add Boundaries ---
sheffield_center = [53.3810, -1.4701]
m_prevalent_pattern = folium.Map(location=sheffield_center, zoom_start=12)

folium.GeoJson(sheffield_city_boundary_wgs84, name="Sheffield City Boundary", style_function=lambda x: {'color': 'black', 'weight': 2, 'fillColor': 'none'}).add_to(m_prevalent_pattern)
folium.GeoJson(sheffield_ward_boundary_wgs84, name="Sheffield Ward Boundary", style_function=lambda x: {'color': 'gray', 'weight': 1, 'fillColor': 'none'}).add_to(m_prevalent_pattern)

# --- 4. Plot Each Feature in the Prevalent Pattern with Distinct Colors ---
print(f"Plotting the most prevalent pattern: {MOST_PREVALENT_PATTERN}")
for feature_type in MOST_PREVALENT_PATTERN:
    # Filter the GeoDataFrame for the current feature type
    gdf_filtered = all_features_gdf_wgs84[all_features_gdf_wgs84['feature_type'] == feature_type]

    if not gdf_filtered.empty:
        # Create a FeatureGroup for each type to enable layer control
        feature_group = folium.FeatureGroup(name=f"{feature_type.title()} Locations").add_to(m_prevalent_pattern)

        # Plot each point as a circle marker
        for _, row in gdf_filtered.iterrows():
            folium.CircleMarker(
                location=[row.geometry.y, row.geometry.x],
                radius=FEATURE_RADIUS,
                color=FEATURE_COLORS[feature_type],
                fill=True,
                fill_color=FEATURE_COLORS[feature_type],
                fill_opacity=0.8,
                popup=f"Type: {feature_type.title()}"
            ).add_to(feature_group)

# Add a Layer Control to toggle layers on/off
folium.LayerControl().add_to(m_prevalent_pattern)

# Save the map as an HTML file
map_filename = "most_prevalent_pattern_map.html"
m_prevalent_pattern.save(map_filename)
print(f"\nInteractive map saved to: {map_filename}")

In [None]:
m_prevalent_pattern

## Non-prevalent patterns

### calculate non-prevalent patterns for specific distance and min-prev thresholds

In [None]:
# --- Finding Potential Non-prevalent Patterns ---
print("\n--- Identifying Potential Non-prevalent Patterns (2-item with very low PI) ---")

# Define the distance threshold for evaluating "non-prevalent" proximity.
D_FOR_NEGATIVE_CHECK = 600 # meters
LOW_PI_THRESHOLD = 0.01 # Define what "very low PI" means (e.g., < 1%)

# Build the neighborhood graph for this specific distance
neighbors_map_for_nonprev_check = build_neighborhood_graph(D_FOR_NEGATIVE_CHECK, all_features_gdf, coords, tree)

# Get all unique feature types present in your data
actual_unique_feature_types = all_features_gdf['feature_type'].unique().tolist()

# Generate all possible 2-item candidate patterns
all_2_item_candidates = list(itertools.combinations(actual_unique_feature_types, 2))
print(f"Checking {len(all_2_item_candidates)} total 2-item candidate patterns for low PI at D={D_FOR_NEGATIVE_CHECK}m.")

nonprev_patterns_candidates = []

for F1_type, F2_type in all_2_item_candidates:
    pattern_key = frozenset([F1_type, F2_type])
    pattern_pi, pattern_pr = calculate_pattern_prevalence(
        pattern_key, neighbors_map_for_nonprev_check, all_features_gdf
    )

    # Condition to identify "non-prevalent" (very low PI) patterns
    # Includes patterns with PI strictly less than LOW_PI_THRESHOLD OR PI exactly 0
    if pattern_pi < LOW_PI_THRESHOLD:
        nonprev_patterns_candidates.append({
            'distance_threshold_m': D_FOR_NEGATIVE_CHECK,
            'pattern': ", ".join(sorted(pattern_key)),
            'participation_index': pattern_pi,
            'participation_ratio': pattern_pr
        })


if nonprev_patterns_candidates:
    df_nonprev_patterns = pd.DataFrame(nonprev_patterns_candidates)
    df_nonprev_patterns = df_nonprev_patterns.sort_values(
        by='participation_index', ascending=True
    ).reset_index(drop=True)

    output_filename = f"potential_non-prevalent_2_item_patterns_D{D_FOR_NEGATIVE_CHECK}m_PI_lt_{str(LOW_PI_THRESHOLD).replace('.', '')}.csv"
    output_filepath = os.path.join(output_dir_colocation, output_filename)
    df_nonprev_patterns.to_csv(output_filepath, index=False)

    print(f"\nFound {len(nonprev_patterns_candidates)} potential non-prevalent 2-item patterns (PI < {LOW_PI_THRESHOLD}).")
    print(f"Results saved to: {output_filepath}")
    print("\nAll patterns with lowest PI (most 'non-prevalent patterns'):")
    # --- UPDATED LINE TO PRINT ALL RESULTS ---
    print(df_nonprev_patterns.to_string()) # Use .to_string() to print the entire DataFrame without truncation
else:
    print(f"No 2-item patterns found with PI less than {LOW_PI_THRESHOLD} at {D_FOR_NEGATIVE_CHECK}m.")

### calculate non-prevalent patterns from all distance and min_prev thresholds.

In [None]:
# --- Phase 5: Finding Infrequent (Non-Prevalent) Patterns ---
print("\n--- Phase 5: Running Infrequent Pattern Mining for different thresholds ---")

# List to collect all infrequent patterns across ALL distance, PI, and PR combinations
all_infrequent_patterns_combined_list = []

# Outer loop for distance thresholds
for current_distance_threshold_m in distance_threshold_m:
    print(f"\n===== Running Infrequent Pattern Mining for Distance: {current_distance_threshold_m}m =====")

    # Recalculate neighbors_map for the current distance threshold
    neighbors_map = build_neighborhood_graph(current_distance_threshold_m, all_features_gdf, coords, tree)

    # Nested loops to iterate through all PI and PR combinations
    for current_max_pr in pr_thresholds: # Renamed to current_max_pr for clarity
        for current_max_pi in pi_thresholds: # Renamed to current_max_pi for clarity
            print(f"\n--- Current Thresholds (D={current_distance_threshold_m}m): Max PI={current_max_pi:.2f}, Max PR={current_max_pr:.2f} (for infrequent patterns) ---")

            infrequent_patterns_current_run = {
                2: {},
                3: {},
                4: {}
            }

            # --- Generate and check all 2-item candidates ---
            # For infrequent patterns, we need to check ALL combinations, not just Apriori-derived ones.
            # Ensures we only consider existing feature types
            filtered_unique_feature_types = [
                f_type for f_type in unique_feature_types
                if f_type in all_features_gdf['feature_type'].unique()
            ]
            candidate_2_item_patterns = list(itertools.combinations(filtered_unique_feature_types, 2))
            print(f"   Checking {len(candidate_2_item_patterns)} total 2-item candidate patterns...")

            for F1_type, F2_type in candidate_2_item_patterns:
                pattern_key = frozenset([F1_type, F2_type])
                pattern_pi, pattern_pr = calculate_pattern_prevalence(
                    pattern_key, neighbors_map, all_features_gdf
                )

                # --- CORRECTED CONDITION FOR INFREQUENT PATTERNS ---
                # A pattern is infrequent if it is NOT prevalent
                # (i.e., its PI is below max_pi OR its PR is below max_pr)
                if not (pattern_pi >= current_max_pi and pattern_pr >= current_max_pr):
                    infrequent_patterns_current_run[2][pattern_key] = {'pi': pattern_pi, 'pr': pattern_pr}
            print(f"   Total infrequent 2-item patterns found: {len(infrequent_patterns_current_run[2])}")


            # --- Generate and check all 3-item candidates ---
            print("\n   Checking all 3-item candidate patterns...")
            candidate_3_item_patterns = list(itertools.combinations(filtered_unique_feature_types, 3))

            for pattern_types in candidate_3_item_patterns:
                pattern_key = frozenset(pattern_types)
                pattern_pi, pattern_pr = calculate_pattern_prevalence(
                    pattern_key, neighbors_map, all_features_gdf
                )

                if not (pattern_pi >= current_max_pi and pattern_pr >= current_max_pr):
                    infrequent_patterns_current_run[3][pattern_key] = {'pi': pattern_pi, 'pr': pattern_pr}
            print(f"   Total infrequent 3-item patterns found: {len(infrequent_patterns_current_run[3])}")


            # --- Generate and check all 4-item candidates ---
            print("\n   Checking all 4-item candidate patterns...")
            candidate_4_item_patterns = list(itertools.combinations(filtered_unique_feature_types, 4))

            for pattern_types in candidate_4_item_patterns:
                pattern_key = frozenset(pattern_types)
                pattern_pi, pattern_pr = calculate_pattern_prevalence(
                    pattern_key, neighbors_map, all_features_gdf
                )

                if not (pattern_pi >= current_max_pi and pattern_pr >= current_max_pr):
                    infrequent_patterns_current_run[4][pattern_key] = {'pi': pattern_pi, 'pr': pattern_pr}
            print(f"   Total infrequent 4-item patterns found: {len(infrequent_patterns_current_run[4])}")


            # --- Summary and Saving for the Current Infrequent Run (PI/PR/Distance) ---
            print(f"\n   --- Summary for D={current_distance_threshold_m}m, Max PI={current_max_pi:.2f}, Max PR={current_max_pr:.2f} (Infrequent) ---")
            summary_data_infrequent_current_run = []
            found_any_infrequent_patterns_current_run = False

            for k in sorted(infrequent_patterns_current_run.keys()):
                if infrequent_patterns_current_run[k]:
                    found_any_infrequent_patterns_current_run = True
                    print(f"\n   Infrequent {k}-item Patterns:")
                    sorted_patterns = sorted(infrequent_patterns_current_run[k].items(), key=lambda item: item[1]['pi'], reverse=False) # Sort PI ascending for infrequent
                    for pattern, metrics in sorted_patterns:
                        pattern_set = set(pattern)
                        print(f"    Pattern: {pattern_set}, PI: {metrics['pi']:.4f}, PR: {metrics['pr']:.4f}")

                        summary_data_infrequent_current_run.append({
                            'distance_threshold_m': current_distance_threshold_m,
                            'max_pi_threshold': current_max_pi, # Renamed to max_pi_threshold for clarity
                            'max_pr_threshold': current_max_pr, # Renamed to max_pr_threshold for clarity
                            'pattern_length': k,
                            'pattern': ", ".join(sorted(pattern_set)),
                            'participation_index': metrics['pi'],
                            'participation_ratio': metrics['pr']
                        })

            if not found_any_infrequent_patterns_current_run:
                print("   No infrequent co-location patterns found for this combination of thresholds.")

            # Save individual infrequent patterns CSV
            if summary_data_infrequent_current_run:
                infrequent_patterns_df_current_run = pd.DataFrame(summary_data_infrequent_current_run)
                infrequent_patterns_df_current_run = infrequent_patterns_df_current_run.sort_values(
                    by=['distance_threshold_m', 'pattern_length', 'participation_index'],
                    ascending=[True, True, True] # Sort PI ascending for infrequent
                ).reset_index(drop=True)

                formatted_dist_str = f"{current_distance_threshold_m}m"
                formatted_pi_str = f"{current_max_pi:.2f}".replace('.', '')
                formatted_pr_str = f"{current_max_pr:.2f}".replace('.', '')
                output_filename = f"infrequent_patterns_D{formatted_dist_str}_MaxPI{formatted_pi_str}_MaxPR{formatted_pr_str}.csv"
                output_filepath = os.path.join(output_dir_colocation, output_filename)

                infrequent_patterns_df_current_run.to_csv(output_filepath, index=False)
                print(f"   Infrequent co-location pattern summary saved to: {output_filepath}")
            else:
                print("   No infrequent patterns CSV file saved for this run as no patterns were found.")

            # Extend the list for the final combined CSV for ALL infrequent patterns
            all_infrequent_patterns_combined_list.extend(summary_data_infrequent_current_run)


# --- Final Combined CSV for Infrequent Patterns ---
print("\n--- Phase 6: Generating combined CSV file for all infrequent patterns ---")

if all_infrequent_patterns_combined_list:
    final_infrequent_patterns_df = pd.DataFrame(all_infrequent_patterns_combined_list)
    final_infrequent_patterns_df = final_infrequent_patterns_df.sort_values(
        by=['distance_threshold_m', 'max_pi_threshold', 'max_pr_threshold', 'pattern_length', 'participation_index'],
        ascending=[True, True, True, True, True] # Sort PI ascending for overall infrequent
    ).reset_index(drop=True)

    combined_infrequent_output_filepath = os.path.join(output_dir_colocation, "all_infrequent_patterns_combined.csv")
    final_infrequent_patterns_df.to_csv(combined_infrequent_output_filepath, index=False)
    print(f"All infrequent patterns combined and saved to: {combined_infrequent_output_filepath}")
else:
    print("No infrequent patterns found across all runs to combine.")

print("\n--- Infrequent Pattern Mining Process Completed ---")

In [None]:
final_infrequent_patterns_df

In [None]:
filtered_df = final_infrequent_patterns_df[
    final_infrequent_patterns_df['pattern'].apply(lambda x: 'diffusion_tube' in x) &
    (final_infrequent_patterns_df['pattern_length'] == 2)]

filtered_df

## Results

### size-2 prevelent pattern: diffusion tube and school

In [None]:
# --- 1. Define the Pattern and Distance Range of Interest ---
TARGET_FEATURE_TYPE_A = 'school'
TARGET_FEATURE_TYPE_B = 'diffusion_tube'

# The distance threshold for co-location analysis (in meters)
DISTANCE_THRESHOLD = 600

print(f"\n--- Analyzing co-location between '{TARGET_FEATURE_TYPE_A}' and '{TARGET_FEATURE_TYPE_B}' ---")
print(f"Using a distance threshold of {DISTANCE_THRESHOLD}m.")

# --- 2. Load and Reproject Data (Assumed to be pre-loaded) ---
# Reproject for the plotting part later
all_features_gdf_wgs84 = all_features_gdf.to_crs("EPSG:4326")

# Extract GeoDataFrames for the target feature types
gdf_type_A = all_features_gdf[all_features_gdf['feature_type'] == TARGET_FEATURE_TYPE_A].copy()
gdf_type_B = all_features_gdf[all_features_gdf['feature_type'] == TARGET_FEATURE_TYPE_B].copy()

if gdf_type_A.empty or gdf_type_B.empty:
    print(f"Error: One or both target feature types ('{TARGET_FEATURE_TYPE_A}', '{TARGET_FEATURE_TYPE_B}') not found.")
    exit()

# Prepare coordinates for cKDTree for TARGET_FEATURE_TYPE_B (the "nearest to" feature)
coords_B = np.array([[p.x, p.y] for p in gdf_type_B.geometry])
tree_B = cKDTree(coords_B)

# --- 3. Calculate Co-location Instances within the Distance Threshold ---
co_location_pairs = []
print(f"Calculating co-location instances within {DISTANCE_THRESHOLD}m...")

for idx_A, row_A in gdf_type_A.iterrows():
    point_A = row_A.geometry
    # Query the KDTree to find the nearest neighbor within the threshold
    distance, index = tree_B.query(np.array([point_A.x, point_A.y]), k=1, distance_upper_bound=DISTANCE_THRESHOLD)

    # Check if a neighbor was found
    if np.isfinite(distance):
        co_location_pairs.append({
            'source_uid': row_A['uid'],
            'source_type': row_A['feature_type'],
            'target_uid': gdf_type_B.iloc[index]['uid'],
            'target_type': gdf_type_B.iloc[index]['feature_type'],
            'distance_m': distance
        })

if not co_location_pairs:
    print(f"\nNo co-location instances found between '{TARGET_FEATURE_TYPE_A}' and '{TARGET_FEATURE_TYPE_B}' within {DISTANCE_THRESHOLD}m.")
    # Exit or continue with a map that has no lines
else:
    # Create a DataFrame from the valid pairs for easy plotting
    df_plot_lines = pd.DataFrame(co_location_pairs)
    print(f"\nFound {len(df_plot_lines)} co-location pairs to plot.")

# --- 4. Plotting the Result on a Folium Map ---
print("\nGenerating Folium map to visualize all locations...")

# Reproject GeoDataFrames for plotting on Folium
gdf_type_A_wgs84 = all_features_gdf_wgs84[all_features_gdf_wgs84['feature_type'] == TARGET_FEATURE_TYPE_A].copy()
gdf_type_B_wgs84 = all_features_gdf_wgs84[all_features_gdf_wgs84['feature_type'] == TARGET_FEATURE_TYPE_B].copy()

# Load and reproject boundaries
sheffield_city_boundary_wgs84 = gpd.read_file("City_Boundary.geojson").to_crs("EPSG:4326")
sheffield_ward_boundary_wgs84 = gpd.read_file("Sheffield_Wards.geojson").to_crs("EPSG:4326")

# Create a Folium map object
sheffield_center = [53.3810, -1.4701]
m_prevalent_1 = folium.Map(location=sheffield_center, zoom_start=12)

# Add boundaries to the map
folium.GeoJson(sheffield_city_boundary_wgs84, name="Sheffield City Boundary", style_function=lambda x: {'color': 'black', 'weight': 2, 'fillColor': 'none'}).add_to(m_prevalent_1)
folium.GeoJson(sheffield_ward_boundary_wgs84, name="Sheffield Ward Boundary", style_function=lambda x: {'color': 'black', 'weight': 1, 'fillColor': 'none'}).add_to(m_prevalent_1)

# Create a feature group for the connecting lines
lines_group = folium.FeatureGroup(name=f'Co-location Pairs ({DISTANCE_THRESHOLD}m)').add_to(m_prevalent_1)

# Plot all schools and diffusion tubes that are part of the co-location pairs
if 'df_plot_lines' in locals():
    # Get the UIDs of the features that participate in the pairs
    source_uids = df_plot_lines['source_uid'].unique()
    target_uids = df_plot_lines['target_uid'].unique()

    # Plot schools (Feature A)
    schools_group = folium.FeatureGroup(name=f"{TARGET_FEATURE_TYPE_A.title()} Locations").add_to(m_prevalent_1)
    for _, row in gdf_type_A_wgs84[gdf_type_A_wgs84['uid'].isin(source_uids)].iterrows():
        folium.CircleMarker(
            location=[row.geometry.y, row.geometry.x],
            radius=5,
            color='orange',
            fill=True,
            fill_color='orange',
            fill_opacity=0.8,
            popup=f"{TARGET_FEATURE_TYPE_A.title()} (UID: {row['uid']})"
        ).add_to(schools_group)

    # Plot diffusion tubes (Feature B)
    diffusion_tube_group = folium.FeatureGroup(name=f"{TARGET_FEATURE_TYPE_B.title()} Locations").add_to(m_prevalent_1)
    for _, row in gdf_type_B_wgs84[gdf_type_B_wgs84['uid'].isin(target_uids)].iterrows():
        folium.CircleMarker(
            location=[row.geometry.y, row.geometry.x],
            radius=5,
            color='blue',
            fill=True,
            fill_color='blue',
            fill_opacity=0.8,
            popup=f"{TARGET_FEATURE_TYPE_B.title()} (UID: {row['uid']})"
        ).add_to(diffusion_tube_group)

    # Draw the connecting lines
    for _, row in df_plot_lines.iterrows():
        source_coords = gdf_type_A_wgs84[gdf_type_A_wgs84['uid'] == row['source_uid']].geometry.iloc[0]
        target_coords = gdf_type_B_wgs84[gdf_type_B_wgs84['uid'] == row['target_uid']].geometry.iloc[0]

        folium.PolyLine(
            locations=[[source_coords.y, source_coords.x], [target_coords.y, target_coords.x]],
            color='red',
            weight=2,
            opacity=1,
            dash_array='5, 5',
            tooltip=f"Distance: {row['distance_m']:.2f}m"
        ).add_to(lines_group)
else:
    print("No co-location pairs to plot. The map will show boundaries only.")

# Add a Layer Control to toggle layers on/off
folium.LayerControl().add_to(m_prevalent_1)

# Save the map as an HTML file
map_filename = "size-2_prevalent_school_diffusion_tube_map.html"
m_prevalent_1.save(map_filename)
print(f"\nInteractive map saved to: {map_filename}")

In [None]:
m_prevalent_1

In [None]:
# --- 1. Define the Pattern and Distance Range of Interest ---
TARGET_FEATURE_TYPE_A = 'school'
TARGET_FEATURE_TYPE_B = 'diffusion_tube'

# The distance threshold for co-location analysis (in meters)
DISTANCE_THRESHOLD = 600

print(f"\n--- Analyzing co-location between '{TARGET_FEATURE_TYPE_A}' and '{TARGET_FEATURE_TYPE_B}' ---")
print(f"Using a distance threshold of {DISTANCE_THRESHOLD}m.")

# --- 2. Load and Reproject Data (Assumed to be pre-loaded) ---
# Reproject for the plotting part later
all_features_gdf_wgs84 = all_features_gdf.to_crs("EPSG:4326")

# Extract GeoDataFrames for the target feature types
gdf_type_A = all_features_gdf[all_features_gdf['feature_type'] == TARGET_FEATURE_TYPE_A].copy()
gdf_type_B = all_features_gdf[all_features_gdf['feature_type'] == TARGET_FEATURE_TYPE_B].copy()

if gdf_type_A.empty or gdf_type_B.empty:
    print(f"Error: One or both target feature types ('{TARGET_FEATURE_TYPE_A}', '{TARGET_FEATURE_TYPE_B}') not found.")
    # In a real script, you might raise an error or exit here.
    # For this example, we'll continue to generate a map with no lines.

# Prepare coordinates for cKDTree for TARGET_FEATURE_TYPE_B (the "nearest to" feature)
# The tree should be built on the geometry of the CRS that the distance is calculated in (e.g., BNG)
# For this example, we'll assume the provided lon/lat are sufficient.
coords_B = np.array([[p.x, p.y] for p in gdf_type_B.geometry])
tree_B = cKDTree(coords_B)

# --- 3. Calculate Co-location Instances within the Distance Threshold ---
co_location_pairs = []
print(f"Calculating co-location instances within {DISTANCE_THRESHOLD}m...")

for idx_A, row_A in gdf_type_A.iterrows():
    point_A = row_A.geometry
    # Query the KDTree to find the nearest neighbor within the threshold
    distance, index = tree_B.query(np.array([point_A.x, point_A.y]), k=1, distance_upper_bound=DISTANCE_THRESHOLD)

    # Check if a neighbor was found
    if np.isfinite(distance):
        co_location_pairs.append({
            'source_uid': row_A['uid'],
            'source_type': row_A['feature_type'],
            'target_uid': gdf_type_B.iloc[index]['uid'],
            'target_type': gdf_type_B.iloc[index]['feature_type'],
            'distance_m': distance
        })

if not co_location_pairs:
    print(f"\nNo co-location instances found between '{TARGET_FEATURE_TYPE_A}' and '{TARGET_FEATURE_TYPE_B}' within {DISTANCE_THRESHOLD}m.")
    df_plot_lines = pd.DataFrame() # Create an empty DataFrame
else:
    # Create a DataFrame from the valid pairs for easy plotting
    df_plot_lines = pd.DataFrame(co_location_pairs)
    print(f"\nFound {len(df_plot_lines)} co-location pairs to plot.")

# --- 4. Plotting the Result on a Folium Map ---
print("\nGenerating Folium map to visualize all locations...")

# Reproject GeoDataFrames for plotting on Folium
gdf_type_A_wgs84 = all_features_gdf_wgs84[all_features_gdf_wgs84['feature_type'] == TARGET_FEATURE_TYPE_A].copy()
gdf_type_B_wgs84 = all_features_gdf_wgs84[all_features_gdf_wgs84['feature_type'] == TARGET_FEATURE_TYPE_B].copy()

# Get the UIDs of the features that participate in the pairs
source_uids = df_plot_lines['source_uid'].unique() if not df_plot_lines.empty else []
target_uids = df_plot_lines['target_uid'].unique() if not df_plot_lines.empty else []

# Load and reproject boundaries (assuming files exist)
try:
    sheffield_city_boundary_wgs84 = gpd.read_file("City_Boundary.geojson").to_crs("EPSG:4326")
    sheffield_ward_boundary_wgs84 = gpd.read_file("Sheffield_Wards.geojson").to_crs("EPSG:4326")
except Exception as e:
    print(f"Warning: Could not load boundary files. {e}")
    sheffield_city_boundary_wgs84 = gpd.GeoDataFrame()
    sheffield_ward_boundary_wgs84 = gpd.GeoDataFrame()

# Create a Folium map object
sheffield_center = [53.3810, -1.4701]
m_prevalent_1 = folium.Map(location=sheffield_center, zoom_start=12)

# Add boundaries to the map
if not sheffield_city_boundary_wgs84.empty:
    folium.GeoJson(sheffield_city_boundary_wgs84, name="Sheffield City Boundary", style_function=lambda x: {'color': 'black', 'weight': 2, 'fillColor': 'none'}).add_to(m_prevalent_1)
if not sheffield_ward_boundary_wgs84.empty:
    folium.GeoJson(sheffield_ward_boundary_wgs84, name="Sheffield Ward Boundary", style_function=lambda x: {'color': 'black', 'weight': 1, 'fillColor': 'none'}).add_to(m_prevalent_1)

# Create a feature group for the connecting lines
lines_group = folium.FeatureGroup(name=f'Co-location Pairs ({DISTANCE_THRESHOLD}m)').add_to(m_prevalent_1)

# Plot ALL schools and diffusion tubes, distinguishing paired from non-paired
schools_group = folium.FeatureGroup(name=f"{TARGET_FEATURE_TYPE_A.title()} Locations").add_to(m_prevalent_1)
for _, row in gdf_type_A_wgs84.iterrows():
    # --- EDITED: Changed the color based on whether the school is a source in a pair ---
    color = 'orange' if row['uid'] in source_uids else 'gray'
    folium.CircleMarker(
        location=[row.geometry.y, row.geometry.x],
        radius=5,
        color=color,
        fill=True,
        fill_color=color,
        fill_opacity=0.8,
        popup=f"{TARGET_FEATURE_TYPE_A.title()} (UID: {row['uid']})"
    ).add_to(schools_group)

diffusion_tube_group = folium.FeatureGroup(name=f"{TARGET_FEATURE_TYPE_B.title()} Locations").add_to(m_prevalent_1)
for _, row in gdf_type_B_wgs84.iterrows():
    # --- EDITED: Changed the color based on whether the diffusion tube is a target in a pair ---
    color = 'blue' if row['uid'] in target_uids else 'lightgray'
    folium.CircleMarker(
        location=[row.geometry.y, row.geometry.x],
        radius=5,
        color=color,
        fill=True,
        fill_color=color,
        fill_opacity=0.8,
        popup=f"{TARGET_FEATURE_TYPE_B.title()} (UID: {row['uid']})"
    ).add_to(diffusion_tube_group)

# Draw the connecting lines
if not df_plot_lines.empty:
    for _, row in df_plot_lines.iterrows():
        source_coords = gdf_type_A_wgs84[gdf_type_A_wgs84['uid'] == row['source_uid']].geometry.iloc[0]
        target_coords = gdf_type_B_wgs84[gdf_type_B_wgs84['uid'] == row['target_uid']].geometry.iloc[0]
        folium.PolyLine(
            locations=[[source_coords.y, source_coords.x], [target_coords.y, target_coords.x]],
            color='red',
            weight=2,
            opacity=1,
            dash_array='5, 5',
            tooltip=f"Distance: {row['distance_m']:.2f}m"
        ).add_to(lines_group)

# Add a Layer Control to toggle layers on/off
folium.LayerControl().add_to(m_prevalent_1)

# Save the map as an HTML file
map_filename = "size-2_prevalent_school_diffusion_tube_map.html"
m_prevalent_1.save(map_filename)
print(f"\nInteractive map saved to: {map_filename}")

### size-3 prevalent pattern: diffusion tube, school, and kindergarten

In [None]:
import folium
import geopandas as gpd
import numpy as np
from scipy.spatial import cKDTree
import pandas as pd
import os

# --- 1. Define the Patterns and Distance Range of Interest ---
TARGET_FEATURE_TYPE_A = 'school'
TARGET_FEATURE_TYPE_B = 'diffusion_tube'
TARGET_FEATURE_TYPE_C = 'kindergarten' # NEW: Added kindergarten as a target feature

# The distance threshold for co-location analysis (in meters)
DISTANCE_THRESHOLD = 600

print(f"\n--- Analyzing co-location between '{TARGET_FEATURE_TYPE_A}', '{TARGET_FEATURE_TYPE_C}' and '{TARGET_FEATURE_TYPE_B}' ---")
print(f"Using a distance threshold of {DISTANCE_THRESHOLD}m.")

# --- 2. Load and Reproject Data (Assumed to be pre-loaded) ---

# Reproject for the plotting part later
all_features_gdf_wgs84 = all_features_gdf.to_crs("EPSG:4326")

# Extract GeoDataFrames for the target feature types
gdf_type_A = all_features_gdf[all_features_gdf['feature_type'] == TARGET_FEATURE_TYPE_A].copy()
gdf_type_B = all_features_gdf[all_features_gdf['feature_type'] == TARGET_FEATURE_TYPE_B].copy()
gdf_type_C = all_features_gdf[all_features_gdf['feature_type'] == TARGET_FEATURE_TYPE_C].copy() # NEW: DataFrame for kindergartens

if gdf_type_A.empty or gdf_type_B.empty or gdf_type_C.empty:
    print("Error: One or more target feature types not found.")
    exit()

# Prepare coordinates for cKDTree for TARGET_FEATURE_TYPE_B (the "nearest to" feature)
coords_B = np.array([[p.x, p.y] for p in gdf_type_B.geometry])
tree_B = cKDTree(coords_B)

# --- 3. Calculate Co-location Instances within the Distance Threshold ---
co_location_pairs = []
print(f"Calculating co-location instances within {DISTANCE_THRESHOLD}m...")

# Loop for schools (Type A)
for idx_A, row_A in gdf_type_A.iterrows():
    point_A = row_A.geometry
    distance, index = tree_B.query(np.array([point_A.x, point_A.y]), k=1, distance_upper_bound=DISTANCE_THRESHOLD)
    if np.isfinite(distance):
        co_location_pairs.append({
            'source_uid': row_A['uid'], 'source_type': row_A['feature_type'],
            'target_uid': gdf_type_B.iloc[index]['uid'], 'target_type': gdf_type_B.iloc[index]['feature_type'],
            'distance_m': distance
        })

# NEW: Loop for kindergartens (Type C)
for idx_C, row_C in gdf_type_C.iterrows():
    point_C = row_C.geometry
    distance, index = tree_B.query(np.array([point_C.x, point_C.y]), k=1, distance_upper_bound=DISTANCE_THRESHOLD)
    if np.isfinite(distance):
        co_location_pairs.append({
            'source_uid': row_C['uid'], 'source_type': row_C['feature_type'],
            'target_uid': gdf_type_B.iloc[index]['uid'], 'target_type': gdf_type_B.iloc[index]['feature_type'],
            'distance_m': distance
        })


if not co_location_pairs:
    print(f"\nNo co-location instances found between target features and '{TARGET_FEATURE_TYPE_B}' within {DISTANCE_THRESHOLD}m.")
else:
    df_plot_lines = pd.DataFrame(co_location_pairs)
    print(f"\nFound {len(df_plot_lines)} co-location pairs to plot.")

# --- 4. Plotting the Result on a Folium Map ---
print("\nGenerating Folium map to visualize all locations...")

# Reproject GeoDataFrames for plotting on Folium
gdf_type_A_wgs84 = all_features_gdf_wgs84[all_features_gdf_wgs84['feature_type'] == TARGET_FEATURE_TYPE_A].copy()
gdf_type_B_wgs84 = all_features_gdf_wgs84[all_features_gdf_wgs84['feature_type'] == TARGET_FEATURE_TYPE_B].copy()
gdf_type_C_wgs84 = all_features_gdf_wgs84[all_features_gdf_wgs84['feature_type'] == TARGET_FEATURE_TYPE_C].copy() # NEW: DataFrame for plotting kindergartens

# Load and reproject boundaries
sheffield_city_boundary_wgs84 = gpd.read_file("City_Boundary.geojson").to_crs("EPSG:4326")
sheffield_ward_boundary_wgs84 = gpd.read_file("Sheffield_Wards.geojson").to_crs("EPSG:4326")

# Create a Folium map object
sheffield_center = [53.3810, -1.4701]
m_prevalent_2 = folium.Map(location=sheffield_center, zoom_start=12)

# Add boundaries to the map
folium.GeoJson(sheffield_city_boundary_wgs84, name="Sheffield City Boundary", style_function=lambda x: {'color': 'black', 'weight': 2, 'fillColor': 'none'}).add_to(m_prevalent_2)
folium.GeoJson(sheffield_ward_boundary_wgs84, name="Sheffield Ward Boundary", style_function=lambda x: {'color': 'black', 'weight': 1, 'fillColor': 'none'}).add_to(m_prevalent_2)

# Create a feature group for the connecting lines
lines_group = folium.FeatureGroup(name=f'Co-location Pairs ({DISTANCE_THRESHOLD}m)').add_to(m_prevalent_2)

if 'df_plot_lines' in locals():
    # Plot schools (Feature A)
    schools_group = folium.FeatureGroup(name=f"{TARGET_FEATURE_TYPE_A.title()} Locations").add_to(m_prevalent_2)
    for _, row in gdf_type_A_wgs84.iterrows():
        folium.CircleMarker(location=[row.geometry.y, row.geometry.x], radius=5, color='orange', fill=True, fill_color='orange', fill_opacity=0.8, popup=f"{TARGET_FEATURE_TYPE_A.title()} (UID: {row['uid']})").add_to(schools_group)

    # Plot kindergartens (Feature C) - NEW
    kindergarten_group = folium.FeatureGroup(name=f"{TARGET_FEATURE_TYPE_C.title()} Locations").add_to(m_prevalent_2)
    for _, row in gdf_type_C_wgs84.iterrows():
        folium.CircleMarker(location=[row.geometry.y, row.geometry.x], radius=5, color='purple', fill=True, fill_color='purple', fill_opacity=0.8, popup=f"{TARGET_FEATURE_TYPE_C.title()} (UID: {row['uid']})").add_to(kindergarten_group)

    # Plot diffusion tubes (Feature B)
    diffusion_tube_group = folium.FeatureGroup(name=f"{TARGET_FEATURE_TYPE_B.title()} Locations").add_to(m_prevalent_2)
    for _, row in gdf_type_B_wgs84.iterrows():
        folium.CircleMarker(location=[row.geometry.y, row.geometry.x], radius=5, color='blue', fill=True, fill_color='blue', fill_opacity=0.8, popup=f"{TARGET_FEATURE_TYPE_B.title()} (UID: {row['uid']})").add_to(diffusion_tube_group)

    # Draw the connecting lines
    for _, row in df_plot_lines.iterrows():
        source_uid = row['source_uid']
        source_type = row['source_type']
        target_uid = row['target_uid']

        # Get the coordinates for the source point
        if source_type == TARGET_FEATURE_TYPE_A:
            source_coords = gdf_type_A_wgs84[gdf_type_A_wgs84['uid'] == source_uid].geometry.iloc[0]
        elif source_type == TARGET_FEATURE_TYPE_C: # NEW: Logic for kindergartens
            source_coords = gdf_type_C_wgs84[gdf_type_C_wgs84['uid'] == source_uid].geometry.iloc[0]
        else:
            continue # Skip if source type is not a target type

        target_coords = gdf_type_B_wgs84[gdf_type_B_wgs84['uid'] == target_uid].geometry.iloc[0]

        # Determine line color based on the source feature
        line_color = 'red' if source_type == TARGET_FEATURE_TYPE_A else 'purple' if source_type == TARGET_FEATURE_TYPE_C else 'green'

        folium.PolyLine(
            locations=[[source_coords.y, source_coords.x], [target_coords.y, target_coords.x]],
            color=line_color,
            weight=2,
            opacity=1,
            dash_array='5, 5',
            tooltip=f"Distance: {row['distance_m']:.2f}m"
        ).add_to(lines_group)
else:
    print("No co-location pairs to plot. The map will show boundaries only.")

# Add a Layer Control to toggle layers on/off
folium.LayerControl().add_to(m_prevalent_2)

# Save the map as an HTML file
map_filename = "size-3_prevalent_school_kindergarten_map.html"
m_prevalent_2.save(map_filename)
print(f"\nInteractive map saved to: {map_filename}")

In [None]:
m_prevalent_2

### size-4 prevalent pattern: diffusion tube, school, kindergarten, and park

In [None]:
# --- 1. Define the Patterns and Distance Range of Interest ---
TARGET_FEATURE_TYPE_A = 'school'
TARGET_FEATURE_TYPE_B = 'diffusion_tube'
TARGET_FEATURE_TYPE_C = 'kindergarten'
TARGET_FEATURE_TYPE_D = 'park' # NEW: Added park as a target feature

# The distance threshold for co-location analysis (in meters)
DISTANCE_THRESHOLD = 600

print(f"\n--- Analyzing co-location between '{TARGET_FEATURE_TYPE_A}', '{TARGET_FEATURE_TYPE_C}', '{TARGET_FEATURE_TYPE_D}' and '{TARGET_FEATURE_TYPE_B}' ---")
print(f"Using a distance threshold of {DISTANCE_THRESHOLD}m.")

# --- 2. Load and Reproject Data (Assumed to be pre-loaded) ---

# Reproject for the plotting part later
all_features_gdf_wgs84 = all_features_gdf.to_crs("EPSG:4326")

# Extract GeoDataFrames for the target feature types
gdf_type_A = all_features_gdf[all_features_gdf['feature_type'] == TARGET_FEATURE_TYPE_A].copy()
gdf_type_B = all_features_gdf[all_features_gdf['feature_type'] == TARGET_FEATURE_TYPE_B].copy()
gdf_type_C = all_features_gdf[all_features_gdf['feature_type'] == TARGET_FEATURE_TYPE_C].copy()
gdf_type_D = all_features_gdf[all_features_gdf['feature_type'] == TARGET_FEATURE_TYPE_D].copy() # NEW: DataFrame for parks

if gdf_type_A.empty or gdf_type_B.empty or gdf_type_C.empty or gdf_type_D.empty:
    print("Error: One or more target feature types not found.")
    exit()

# Prepare coordinates for cKDTree for TARGET_FEATURE_TYPE_B (the "nearest to" feature)
coords_B = np.array([[p.x, p.y] for p in gdf_type_B.geometry])
tree_B = cKDTree(coords_B)

# --- 3. Calculate Co-location Instances within the Distance Threshold ---
co_location_pairs = []
print(f"Calculating co-location instances within {DISTANCE_THRESHOLD}m...")

# Loop for schools (Type A)
for idx_A, row_A in gdf_type_A.iterrows():
    point_A = row_A.geometry
    distance, index = tree_B.query(np.array([point_A.x, point_A.y]), k=1, distance_upper_bound=DISTANCE_THRESHOLD)
    if np.isfinite(distance):
        co_location_pairs.append({
            'source_uid': row_A['uid'], 'source_type': row_A['feature_type'],
            'target_uid': gdf_type_B.iloc[index]['uid'], 'target_type': gdf_type_B.iloc[index]['feature_type'],
            'distance_m': distance
        })

# Loop for kindergartens (Type C)
for idx_C, row_C in gdf_type_C.iterrows():
    point_C = row_C.geometry
    distance, index = tree_B.query(np.array([point_C.x, point_C.y]), k=1, distance_upper_bound=DISTANCE_THRESHOLD)
    if np.isfinite(distance):
        co_location_pairs.append({
            'source_uid': row_C['uid'], 'source_type': row_C['feature_type'],
            'target_uid': gdf_type_B.iloc[index]['uid'], 'target_type': gdf_type_B.iloc[index]['feature_type'],
            'distance_m': distance
        })

# NEW: Loop for parks (Type D)
for idx_D, row_D in gdf_type_D.iterrows():
    point_D = row_D.geometry
    distance, index = tree_B.query(np.array([point_D.x, point_D.y]), k=1, distance_upper_bound=DISTANCE_THRESHOLD)
    if np.isfinite(distance):
        co_location_pairs.append({
            'source_uid': row_D['uid'], 'source_type': row_D['feature_type'],
            'target_uid': gdf_type_B.iloc[index]['uid'], 'target_type': gdf_type_B.iloc[index]['feature_type'],
            'distance_m': distance
        })

if not co_location_pairs:
    print(f"\nNo co-location instances found between target features and '{TARGET_FEATURE_TYPE_B}' within {DISTANCE_THRESHOLD}m.")
else:
    df_plot_lines = pd.DataFrame(co_location_pairs)
    print(f"\nFound {len(df_plot_lines)} co-location pairs to plot.")

# --- 4. Plotting the Result on a Folium Map ---
print("\nGenerating Folium map to visualize all locations...")

# Reproject GeoDataFrames for plotting on Folium
gdf_type_A_wgs84 = all_features_gdf_wgs84[all_features_gdf_wgs84['feature_type'] == TARGET_FEATURE_TYPE_A].copy()
gdf_type_B_wgs84 = all_features_gdf_wgs84[all_features_gdf_wgs84['feature_type'] == TARGET_FEATURE_TYPE_B].copy()
gdf_type_C_wgs84 = all_features_gdf_wgs84[all_features_gdf_wgs84['feature_type'] == TARGET_FEATURE_TYPE_C].copy()
gdf_type_D_wgs84 = all_features_gdf_wgs84[all_features_gdf_wgs84['feature_type'] == TARGET_FEATURE_TYPE_D].copy() # NEW: DataFrame for plotting parks

# Load and reproject boundaries
sheffield_city_boundary_wgs84 = gpd.read_file("City_Boundary.geojson").to_crs("EPSG:4326")
sheffield_ward_boundary_wgs84 = gpd.read_file("Sheffield_Wards.geojson").to_crs("EPSG:4326")

# Create a Folium map object
sheffield_center = [53.3810, -1.4701]
m_prevalent_3 = folium.Map(location=sheffield_center, zoom_start=12)

# Add boundaries to the map
folium.GeoJson(sheffield_city_boundary_wgs84, name="Sheffield City Boundary", style_function=lambda x: {'color': 'black', 'weight': 2, 'fillColor': 'none'}).add_to(m_prevalent_3)
folium.GeoJson(sheffield_ward_boundary_wgs84, name="Sheffield Ward Boundary", style_function=lambda x: {'color': 'black', 'weight': 1, 'fillColor': 'none'}).add_to(m_prevalent_3)

# Create a feature group for the connecting lines
lines_group = folium.FeatureGroup(name=f'Co-location Pairs ({DISTANCE_THRESHOLD}m)').add_to(m_prevalent_3)

if 'df_plot_lines' in locals():
    # Plot schools (Feature A)
    schools_group = folium.FeatureGroup(name=f"{TARGET_FEATURE_TYPE_A.title()} Locations").add_to(m_prevalent_3)
    for _, row in gdf_type_A_wgs84.iterrows():
        folium.CircleMarker(location=[row.geometry.y, row.geometry.x], radius=5, color='orange', fill=True, fill_color='orange', fill_opacity=0.8, popup=f"{TARGET_FEATURE_TYPE_A.title()} (UID: {row['uid']})").add_to(schools_group)

    # Plot kindergartens (Feature C)
    kindergarten_group = folium.FeatureGroup(name=f"{TARGET_FEATURE_TYPE_C.title()} Locations").add_to(m_prevalent_3)
    for _, row in gdf_type_C_wgs84.iterrows():
        folium.CircleMarker(location=[row.geometry.y, row.geometry.x], radius=5, color='purple', fill=True, fill_color='purple', fill_opacity=0.8, popup=f"{TARGET_FEATURE_TYPE_C.title()} (UID: {row['uid']})").add_to(kindergarten_group)

    # Plot parks (Feature D) - NEW
    parks_group = folium.FeatureGroup(name=f"{TARGET_FEATURE_TYPE_D.title()} Locations").add_to(m_prevalent_3)
    for _, row in gdf_type_D_wgs84.iterrows():
        folium.CircleMarker(location=[row.geometry.y, row.geometry.x], radius=5, color='green', fill=True, fill_color='green', fill_opacity=0.8, popup=f"{TARGET_FEATURE_TYPE_D.title()} (UID: {row['uid']})").add_to(parks_group)

    # Plot diffusion tubes (Feature B)
    diffusion_tube_group = folium.FeatureGroup(name=f"{TARGET_FEATURE_TYPE_B.title()} Locations").add_to(m_prevalent_3)
    for _, row in gdf_type_B_wgs84.iterrows():
        folium.CircleMarker(location=[row.geometry.y, row.geometry.x], radius=5, color='blue', fill=True, fill_color='blue', fill_opacity=0.8, popup=f"{TARGET_FEATURE_TYPE_B.title()} (UID: {row['uid']})").add_to(diffusion_tube_group)

    # Draw the connecting lines
    for _, row in df_plot_lines.iterrows():
        source_uid = row['source_uid']
        source_type = row['source_type']
        target_uid = row['target_uid']

        # Get the coordinates for the source point
        if source_type == TARGET_FEATURE_TYPE_A:
            source_coords = gdf_type_A_wgs84[gdf_type_A_wgs84['uid'] == source_uid].geometry.iloc[0]
        elif source_type == TARGET_FEATURE_TYPE_C:
            source_coords = gdf_type_C_wgs84[gdf_type_C_wgs84['uid'] == source_uid].geometry.iloc[0]
        elif source_type == TARGET_FEATURE_TYPE_D: # NEW: Logic for parks
            source_coords = gdf_type_D_wgs84[gdf_type_D_wgs84['uid'] == source_uid].geometry.iloc[0]
        else:
            continue # Skip if source type is not a target type

        target_coords = gdf_type_B_wgs84[gdf_type_B_wgs84['uid'] == target_uid].geometry.iloc[0]

        # Determine line color based on the source feature
        if source_type == TARGET_FEATURE_TYPE_A:
            line_color = 'orange'
        elif source_type == TARGET_FEATURE_TYPE_C:
            line_color = 'purple'
        elif source_type == TARGET_FEATURE_TYPE_D: # NEW: Line color for parks
            line_color = 'green'
        else:
            line_color = 'red' # Default color

        folium.PolyLine(
            locations=[[source_coords.y, source_coords.x], [target_coords.y, target_coords.x]],
            color=line_color,
            weight=2,
            opacity=1,
            dash_array='5, 5',
            tooltip=f"Distance: {row['distance_m']:.2f}m"
        ).add_to(lines_group)
else:
    print("No co-location pairs to plot. The map will show boundaries only.")

# Add a Layer Control to toggle layers on/off
folium.LayerControl().add_to(m_prevalent_3)

# Save the map as an HTML file
map_filename = "size-4_prevalent_map.html"
m_prevalent_3.save(map_filename)
print(f"\nInteractive map saved to: {map_filename}")

In [None]:
m_prevalent_3

In [None]:
from sklearn.cluster import DBSCAN

# Reproject to a local projected CRS (e.g., British National Grid) for accurate distance calculation
local_crs = "EPSG:27700"
all_features_gdf = all_features_gdf.to_crs(local_crs)

# --- Define the Most Prevalent Pattern and its Colors ---
MOST_PREVALENT_PATTERN = ['diffusion_tube', 'kindergarten', 'park', 'school']
FEATURE_COLORS = {
    'diffusion_tube': 'blue',
    'kindergarten': 'purple',
    'park': 'green',
    'school': 'red'
    }
FEATURE_RADIUS = 3

# The maximum distance to consider points part of the same cluster (in meters)
CLUSTER_DISTANCE_THRESHOLD = 600
MIN_POINTS_IN_CLUSTER = len(MOST_PREVALENT_PATTERN)

# --- 2. Load and Reproject Boundaries for the Map ---
print("\nLoading and reprojecting boundaries...")
sheffield_city_boundary_wgs84 = gpd.read_file("City_Boundary.geojson").to_crs("EPSG:4326")
sheffield_ward_boundary_wgs84 = gpd.read_file("Sheffield_Wards.geojson").to_crs("EPSG:4326")

# Reproject all features for Folium plotting (back to WGS84)
all_features_gdf_wgs84 = all_features_gdf.to_crs("EPSG:4326")

# --- 3. Create a Folium Map Object and Add Boundaries ---
sheffield_center = [53.3810, -1.4701]
m_prevalent_pattern = folium.Map(location=sheffield_center, zoom_start=12)

folium.GeoJson(sheffield_city_boundary_wgs84, name="Sheffield City Boundary", style_function=lambda x: {'color': 'black', 'weight': 2, 'fillColor': 'none'}).add_to(m_prevalent_pattern)
folium.GeoJson(sheffield_ward_boundary_wgs84, name="Sheffield Ward Boundary", style_function=lambda x: {'color': 'black', 'weight': 1, 'fillColor': 'none'}).add_to(m_prevalent_pattern)

# --- 4. Identify and Plot Co-location Clusters as Polygons ---
print(f"Identifying and plotting clusters for the most prevalent pattern: {MOST_PREVALENT_PATTERN}")

# Filter the data for the prevalent pattern features
prevalent_features_gdf = all_features_gdf_wgs84[all_features_gdf_wgs84['feature_type'].isin(MOST_PREVALENT_PATTERN)]
prevalent_features_gdf_projected = prevalent_features_gdf.to_crs(local_crs)

# Use DBSCAN to find clusters. A cluster is defined as a group of points
# where each point is within `eps` distance of at least `min_samples` other points.
coords = np.array([[p.x, p.y] for p in prevalent_features_gdf_projected.geometry])
db = DBSCAN(eps=CLUSTER_DISTANCE_THRESHOLD, min_samples=MIN_POINTS_IN_CLUSTER).fit(coords)
labels = db.labels_

# Create a FeatureGroup for the cluster polygons
cluster_group = folium.FeatureGroup(name="Co-location Clusters").add_to(m_prevalent_pattern)
cluster_id_counter = 0

for cluster_id in set(labels):
    if cluster_id != -1:  # Ignore noise points (outliers)
        cluster_points_gdf = prevalent_features_gdf_projected[labels == cluster_id]

        # We are looking for patterns with a mix of all four types.
        # Check if this cluster contains all feature types in the prevalent pattern
        cluster_feature_types = set(cluster_points_gdf['feature_type'])
        if cluster_feature_types.issuperset(set(MOST_PREVALENT_PATTERN)):
            # Create a convex hull from the cluster points
            multi_point = MultiPoint(list(cluster_points_gdf.geometry))
            cluster_polygon = multi_point.convex_hull

            # Create a GeoSeries with the polygon and then reproject it
            cluster_polygon_geoseries = gpd.GeoSeries(cluster_polygon, crs=local_crs)
            cluster_polygon_wgs84 = cluster_polygon_geoseries.to_crs("EPSG:4326")

            # Plot the polygon on the map
            folium.GeoJson(cluster_polygon_wgs84,
                           name=f"Cluster {cluster_id_counter}",
                           style_function=lambda x: {
                               'color': 'darkorange',
                               'weight': 2,
                               'fillColor': 'orange',
                               'fillOpacity': 0.2
                               }
                           ).add_to(cluster_group)

# --- 5. Plot Each Feature Individually on Top of the Polygons ---
for feature_type in MOST_PREVALENT_PATTERN:
    gdf_filtered = all_features_gdf_wgs84[all_features_gdf_wgs84['feature_type'] == feature_type]

    if not gdf_filtered.empty:
        feature_group = folium.FeatureGroup(name=f"{feature_type.title()} Locations").add_to(m_prevalent_pattern)

        for _, row in gdf_filtered.iterrows():
            folium.CircleMarker(
                location=[row.geometry.y, row.geometry.x],
                radius=FEATURE_RADIUS,
                color=FEATURE_COLORS[feature_type],
                fill=True,
                fill_color=FEATURE_COLORS[feature_type],
                fill_opacity=0.8,
                popup=f"Type: {feature_type.title()}"
            ).add_to(feature_group)

# Add a Layer Control to toggle layers on/off
folium.LayerControl().add_to(m_prevalent_pattern)

# Save the map as an HTML file
map_filename = "most_prevalent_pattern_map.html"
m_prevalent_pattern.save(map_filename)
print(f"\nInteractive map saved to: {map_filename}")

In [None]:
m_prevalent_pattern

### Non-prevalent pattern: diffusion tube and nursing home

In [None]:
# --- 1. Define the Pattern and Distance Range of Interest ---
TARGET_FEATURE_TYPE_A = 'nursing_home'
TARGET_FEATURE_TYPE_B = 'diffusion_tube'

# The distance threshold for co-location analysis (in meters)
DISTANCE_THRESHOLD = 200

print(f"\n--- Analyzing co-location between '{TARGET_FEATURE_TYPE_A}' and '{TARGET_FEATURE_TYPE_B}' ---")
print(f"Using a distance threshold of {DISTANCE_THRESHOLD}m.")

# --- 2. Load and Reproject Data (You must have these GeoDataFrames loaded) ---

# Reproject to a local projected CRS (e.g., British National Grid) for accurate distance calculation
all_features_gdf_wgs84 = all_features_gdf.to_crs("EPSG:4326")

# Extract GeoDataFrames for the target feature types
gdf_type_A = all_features_gdf[all_features_gdf['feature_type'] == TARGET_FEATURE_TYPE_A].copy()
gdf_type_B = all_features_gdf[all_features_gdf['feature_type'] == TARGET_FEATURE_TYPE_B].copy()

if gdf_type_A.empty or gdf_type_B.empty:
    print(f"Error: One or both target feature types ('{TARGET_FEATURE_TYPE_A}', '{TARGET_FEATURE_TYPE_B}') not found.")
    exit()

# Prepare coordinates for cKDTree for TARGET_FEATURE_TYPE_B (the "nearest to" feature)
coords_B = np.array([[p.x, p.y] for p in gdf_type_B.geometry])
tree_B = cKDTree(coords_B)

# --- 3. Calculate Co-location Instances within the Distance Threshold ---
co_location_pairs = []
print(f"Calculating co-location instances within {DISTANCE_THRESHOLD}m...")

for idx_A, row_A in gdf_type_A.iterrows():
    point_A = row_A.geometry
    # Query the KDTree to find the nearest neighbor within the threshold
    distance, index = tree_B.query(np.array([point_A.x, point_A.y]), k=1, distance_upper_bound=DISTANCE_THRESHOLD)

    if np.isfinite(distance):
        # A co-location instance is found
        co_location_pairs.append({
            'source_uid': row_A['uid'],
            'source_type': row_A['feature_type'],
            'target_uid': gdf_type_B.iloc[index]['uid'],
            'target_type': gdf_type_B.iloc[index]['feature_type'],
            'distance_m': distance
        })

if not co_location_pairs:
    print(f"\nNo co-location instances found between '{TARGET_FEATURE_TYPE_A}' and '{TARGET_FEATURE_TYPE_B}' within {DISTANCE_THRESHOLD}m.")
    print("This confirms the pattern has a low prevalence, including a PI of 0.")
else:
    pass

# --- 4. Plotting the Result on a Folium Map ---
print("\nGenerating Folium map to visualize all locations...")

# Reproject for Folium plotting (back to WGS84)
all_features_gdf_wgs84 = all_features_gdf.to_crs("EPSG:4326")
gdf_type_A_wgs84 = all_features_gdf_wgs84[all_features_gdf_wgs84['feature_type'] == 'nursing_home'].copy()
gdf_type_B_wgs84 = all_features_gdf_wgs84[all_features_gdf_wgs84['feature_type'] == 'diffusion_tube'].copy()

# Load and reproject boundaries (assuming files are in the same directory)
sheffield_city_boundary_wgs84 = gpd.read_file("City_Boundary.geojson").to_crs("EPSG:4326")
sheffield_ward_boundary_wgs84 = gpd.read_file("Sheffield_Wards.geojson").to_crs("EPSG:4326")

# Create a Folium map object
sheffield_center = [53.3810, -1.4701]
m_non_prevalent = folium.Map(location=sheffield_center, zoom_start=12)

# Add boundaries to the map
folium.GeoJson(sheffield_city_boundary_wgs84, name="Sheffield City Boundary", style_function=lambda x: {'color': 'black', 'weight': 2, 'fillColor': 'none'}).add_to(m_non_prevalent)
folium.GeoJson(sheffield_ward_boundary_wgs84, name="Sheffield Ward Boundary", style_function=lambda x: {'color': 'black', 'weight': 1, 'fillColor': 'none'}).add_to(m_non_prevalent)

# Create a feature group for the connecting lines
lines_group = folium.FeatureGroup(name=f'Nearest Neighbors (for reference)').add_to(m_non_prevalent)

# Plot all nursing homes (orange circles)
nursing_home_group = folium.FeatureGroup(name="Nursing Homes").add_to(m_non_prevalent)
for _, row_wgs84 in gdf_type_A_wgs84.iterrows():
    folium.CircleMarker(
        location=[row_wgs84.geometry.y, row_wgs84.geometry.x],
        radius=5,
        color='orange',
        fill=True,
        fill_color='orange',
        fill_opacity=0.8,
        popup=f"Nursing Home (UID: {row_wgs84['uid']})"
    ).add_to(nursing_home_group)

    # To get the distance, you must query the KDTree built with the local CRS data
    point_A_local = gdf_type_A[gdf_type_A['uid'] == row_wgs84['uid']].geometry.iloc[0]
    distance, index = tree_B.query(np.array([point_A_local.x, point_A_local.y]), k=1)

    # Get the nearest diffusion tube in WGS84 for plotting
    nearest_diffusion_tube_wgs84 = gdf_type_B_wgs84.iloc[index]

    # Draw a line to the nearest diffusion tube, regardless of the distance, for a complete picture
    folium.PolyLine(
        locations=[[row_wgs84.geometry.y, row_wgs84.geometry.x], [nearest_diffusion_tube_wgs84.geometry.y, nearest_diffusion_tube_wgs84.geometry.x]],
        color='red',
        weight=2,
        opacity=1,
        dash_array='5, 5',
        tooltip=f"Distance: {distance:.2f}m"  # This is the line that adds the distance tooltip
    ).add_to(lines_group)


# Plot all diffusion tubes (blue circles)
diffusion_tube_group = folium.FeatureGroup(name="Diffusion Tubes").add_to(m_non_prevalent)
for _, row in gdf_type_B_wgs84.iterrows():
    folium.CircleMarker(
        location=[row.geometry.y, row.geometry.x],
        radius=5,
        color='blue',
        fill=True,
        fill_color='blue',
        fill_opacity=0.8,
        popup=f"Diffusion Tube (UID: {row['uid']})"
    ).add_to(diffusion_tube_group)

# Add a Layer Control to toggle layers on/off
folium.LayerControl().add_to(m_non_prevalent)

# Save the map as an HTML file
map_filename = "non_prevalent_nursing_homes_map.html"
m_non_prevalent.save(map_filename)
print(f"\nInteractive map saved to: {map_filename}")

In [None]:
m_non_prevalent

In [None]:
# Define the path to your Sheffield boundary GeoJSON file
SHEFFIELD_BOUNDARY_GEOJSON_PATH = "/content/City_Boundary.geojson"
output_dir_colocation = "colocation_data"
os.makedirs(output_dir_colocation, exist_ok=True)

# --- 1. Define the Pattern and Distance Range of Interest ---
TARGET_FEATURE_TYPE_A = 'nursing_home'
TARGET_FEATURE_TYPE_B = 'diffusion_tube' # The feature type you want to find neighbors of

# Define the distance range to filter by (in meters)
DISTANCE_MIN_FOR_PLOT = 10 # Minimum distance to consider for plotting
DISTANCE_MAX_FOR_PLOT = 400 # Maximum distance to consider for plotting

print(f"\n--- Analyzing distances for '{TARGET_FEATURE_TYPE_A}' to nearest '{TARGET_FEATURE_TYPE_B}' ---")
print(f"Filtering results for distances between {DISTANCE_MIN_FOR_PLOT}m and {DISTANCE_MAX_FOR_PLOT}m.")

# --- 2. Extract GeoDataFrames for the Target Feature Types (using WGS84 for Folium) ---

new_crs = "EPSG:4326"   # WGS84 for Folium
all_features_gdf_wgs84 = all_features_gdf.to_crs(new_crs)

gdf_type_A_wgs84 = all_features_gdf_wgs84[all_features_gdf_wgs84['feature_type'] == TARGET_FEATURE_TYPE_A].copy()
gdf_type_B_wgs84 = all_features_gdf_wgs84[all_features_gdf_wgs84['feature_type'] == TARGET_FEATURE_TYPE_B].copy()

# Use original CRS data for KDTree query as distance calculations are done in projected CRS
gdf_type_A_original_crs = all_features_gdf[all_features_gdf['feature_type'] == TARGET_FEATURE_TYPE_A].copy()
gdf_type_B_original_crs = all_features_gdf[all_features_gdf['feature_type'] == TARGET_FEATURE_TYPE_B].copy()


if gdf_type_A_wgs84.empty or gdf_type_B_wgs84.empty:
    print(f"Error: One or both target feature types ('{TARGET_FEATURE_TYPE_A}', '{TARGET_FEATURE_TYPE_B}') not found in your data.")
    exit()

# Prepare coordinates for cKDTree for TARGET_FEATURE_TYPE_B (the "nearest to" feature)
# IMPORTANT: Use original CRS for distance calculation with KDTree
coords_B_original_crs = np.array([[p.x, p.y] for p in gdf_type_B_original_crs.geometry])
tree_B_original_crs = cKDTree(coords_B_original_crs)


# --- 3. Calculate Nearest Neighbor Distances and Prepare Plotting Data ---
plot_data_lines = []

print(f"Calculating nearest '{TARGET_FEATURE_TYPE_B}' for each '{TARGET_FEATURE_TYPE_A}' instance...")

for idx_A, row_A in gdf_type_A_original_crs.iterrows(): # Iterate over original CRS for query
    point_A_original_crs = row_A.geometry
    uid_A = row_A['uid']
    type_A = row_A['feature_type']

    # Query the KDTree to find the nearest neighbor in gdf_type_B
    distance, index = tree_B_original_crs.query(np.array([point_A_original_crs.x, point_A_original_crs.y]),
                                                k=1, distance_upper_bound=DISTANCE_MAX_FOR_PLOT + 1)

    if np.isfinite(distance):
        # Get the corresponding point in WGS84 for plotting
        point_A_wgs84 = gdf_type_A_wgs84.loc[idx_A].geometry
        point_B_wgs84 = gdf_type_B_wgs84.iloc[index].geometry
        uid_B = gdf_type_B_wgs84.iloc[index]['uid']
        type_B = gdf_type_B_wgs84.iloc[index]['feature_type']

        # Filter by the specified distance range for plotting
        if DISTANCE_MIN_FOR_PLOT <= distance <= DISTANCE_MAX_FOR_PLOT:
            plot_data_lines.append({
                'source_uid': uid_A,
                'source_type': type_A,
                'source_lat': point_A_wgs84.y,
                'source_lon': point_A_wgs84.x,
                'target_uid': uid_B,
                'target_type': type_B,
                'target_lat': point_B_wgs84.y,
                'target_lon': point_B_wgs84.x,
                'distance_m': distance,
            })

if not plot_data_lines:
    print(f"No '{TARGET_FEATURE_TYPE_A}' instances found with a nearest '{TARGET_FEATURE_TYPE_B}' within {DISTANCE_MIN_FOR_PLOT}-{DISTANCE_MAX_FOR_PLOT}m.")
else:
    df_plot_lines = pd.DataFrame(plot_data_lines)
    print(f"\nFound {len(df_plot_lines)} pairs of '{TARGET_FEATURE_TYPE_A}' and '{TARGET_FEATURE_TYPE_B}' within the specified distance range.")
    print("\nSample of calculated distances for plotting:")
    print(df_plot_lines[['source_uid', 'target_uid', 'distance_m']].head())


    # --- 4. Plotting the Results using Folium (UPDATED SECTION) ---
    print("\nGenerating Folium map plot...")

    # Load the Sheffield boundary GeoJSON
    try:
        gdf_sheffield_boundary = gpd.read_file(SHEFFIELD_BOUNDARY_GEOJSON_PATH)
        # Ensure the boundary is in WGS84 for Folium
        gdf_sheffield_boundary_wgs84 = gdf_sheffield_boundary.to_crs(new_crs)
        print(f"Loaded Sheffield boundary from {SHEFFIELD_BOUNDARY_GEOJSON_PATH}")
    except FileNotFoundError:
        print(f"Warning: Sheffield boundary GeoJSON not found at {SHEFFIELD_BOUNDARY_GEOJSON_PATH}. Skipping boundary layer.")
        gdf_sheffield_boundary_wgs84 = None
    except Exception as e:
        print(f"Error loading Sheffield boundary: {e}. Skipping boundary layer.")
        gdf_sheffield_boundary_wgs84 = None


    # Calculate map center (centroid of all features in WGS84, or use Sheffield boundary centroid)
    if gdf_sheffield_boundary_wgs84 is not None and not gdf_sheffield_boundary_wgs84.empty:
        map_center_lat = gdf_sheffield_boundary_wgs84.geometry.centroid.y.mean()
        map_center_lon = gdf_sheffield_boundary_wgs84.geometry.centroid.x.mean()
    else:
        map_center_lat = all_features_gdf_wgs84.geometry.y.mean()
        map_center_lon = all_features_gdf_wgs84.geometry.x.mean()

    m = folium.Map(location=[map_center_lat, map_center_lon], zoom_start=12, control_scale=True)

    # Create FeatureGroups for better organization in the map's layer control
    all_features_group = folium.FeatureGroup(name='All Features (Context)').add_to(m)
    type_A_group = folium.FeatureGroup(name=f'{TARGET_FEATURE_TYPE_A} Locations').add_to(m)
    type_B_group = folium.FeatureGroup(name=f'{TARGET_FEATURE_TYPE_B} Locations').add_to(m)
    lines_group = folium.FeatureGroup(name=f'Nearest Neighbors ({DISTANCE_MIN_FOR_PLOT}-{DISTANCE_MAX_FOR_PLOT}m)').add_to(m)

    # --- Add Sheffield Boundary Layer ---
    if gdf_sheffield_boundary_wgs84 is not None and not gdf_sheffield_boundary_wgs84.empty:
        folium.GeoJson(
            gdf_sheffield_boundary_wgs84.__geo_interface__,
            name='Sheffield Boundary',
            style_function=lambda x: {
                'fillColor': '#cccccc', # Light grey fill
                'color': 'black',      # Black border
                'weight': 2,           # Border thickness
                'fillOpacity': 0.1     # Semi-transparent fill
            }
        ).add_to(m) # Add directly to map or a dedicated feature group if desired

    # Add all features as light grey circle markers for context
    for idx, row in all_features_gdf_wgs84.iterrows():
        folium.CircleMarker(
            location=[row.geometry.y, row.geometry.x],
            radius=2,
            color='lightgrey',
            fill=True,
            fill_color='lightgrey',
            fill_opacity=0.6,
            tooltip=f"{row['feature_type']} (UID: {row['uid']})"
        ).add_to(all_features_group)

    # Add markers for TARGET_FEATURE_TYPE_A (e.g., Nursing Homes)
    for idx, row in gdf_type_A_wgs84.iterrows():
        folium.CircleMarker(
            location=[row.geometry.y, row.geometry.x],
            radius=5,
            color='red',
            fill=True,
            fill_color='red',
            fill_opacity=0.8,
            tooltip=f"{row['feature_type']} (UID: {row['uid']})"
        ).add_to(type_A_group)

    # Add markers for TARGET_FEATURE_TYPE_B (e.g., Diffusion Tubes)
    for idx, row in gdf_type_B_wgs84.iterrows():
        folium.CircleMarker(
            location=[row.geometry.y, row.geometry.x],
            radius=5,
            color='blue',
            fill=True,
            fill_color='blue',
            fill_opacity=0.8,
            tooltip=f"{row['feature_type']} (UID: {row['uid']})"
        ).add_to(type_B_group)

    # Add connecting lines
    for idx, row in df_plot_lines.iterrows():
        folium.PolyLine(
            locations=[[row['source_lat'], row['source_lon']], [row['target_lat'], row['target_lon']]],
            color='green',
            weight=1.5,
            opacity=0.7,
            dash_array='5, 5', # Dashed line
            tooltip=f"Distance: {row['distance_m']:.2f}m<br>{row['source_type']} (UID:{row['source_uid']})<br>{row['target_type']} (UID:{row['target_uid']})"
        ).add_to(lines_group)

    # Add layer control to the map
    folium.LayerControl().add_to(m)

    # Save the map as an HTML file
    map_filename = f"nearest_distances_folium_{TARGET_FEATURE_TYPE_A}_to_{TARGET_FEATURE_TYPE_B}_{DISTANCE_MIN_FOR_PLOT}-{DISTANCE_MAX_FOR_PLOT}m_with_boundary.html"
    map_filepath = os.path.join(output_dir_colocation, map_filename)
    m.save(map_filepath)
    print(f"Interactive map saved to: {map_filepath}")

print("\nDistance analysis and Folium plotting complete.")

In [None]:
# Display
m