In [1]:
import geopandas as gpd
import pandas as pd
import numpy as np
from shapely.geometry import shape
from copy import deepcopy
import json

# Add this near the top
classification_order = {"low": 0, "medium": 1, "high": 2}
reverse_classification_order = {v: k.capitalize() for k, v in classification_order.items()}

# Load GeoJSON and dataframe
wards = gpd.read_file("wards.geojson").to_crs("EPSG:4326")
df = pd.read_csv("predicted_burglaries_2019_3 (1).csv")  # Replace with actual path or DataFrame source

# Optional: Sanity check for columns
assert "WardName" in df.columns
assert "Predicted_Burglaries" in df.columns
# Clean classification column
assert "Classification" in df.columns  # Optional but recommended
df["Classification_Clean"] = df["Classification"].str.lower().str.strip()

# Define a name cleaning function
def clean_name(name):
    name = name.lower().strip()
    for suffix in [" north", " south", " east", " west", " central"]:
        if name.endswith(suffix):
            name = name.replace(suffix, "")
    return name.strip()

# Clean names
df["WardName_Clean"] = df["WardName"].apply(clean_name)
wards["WardName_Clean"] = wards["NAME"].apply(clean_name)

# Aggregate to highest classification per ward
def get_max_classification(classifications):
    numeric_vals = [classification_order.get(c, -1) for c in classifications]
    max_val = max(numeric_vals)
    return reverse_classification_order[max_val]

df_class_grouped = (
    df.groupby("WardName_Clean")["Classification_Clean"]
    .apply(get_max_classification)
    .to_dict()
)

def get_classification(clean_name):
    if clean_name in df_class_grouped:
        return df_class_grouped[clean_name]
    else:
        # Check if this GeoJSON ward is part of a split
        matches = [key for key in df_class_grouped if clean_name.startswith(key)]
        if matches:
            return df_class_grouped[matches[0]]
    return None

# Prepare a mapping: Clean ward name â†’ list of full df names
df_grouped = df.groupby("WardName_Clean")["Predicted_Burglaries"].agg(list).to_dict()

# Function to get predicted burglaries per GeoJSON feature
def get_prediction(clean_name, ward_geo_name):
    if clean_name in df_grouped:
        # Multiple df entries (ward split into sub-wards): sum them
        return sum(df_grouped[clean_name])
    else:
        # Try if this geo ward is a split and df has only the base name
        matches = [key for key in df_grouped if clean_name.startswith(key)]
        if matches:
            match = matches[0]
            # Evenly divide total df prediction over all geo ward splits
            split_count = sum(1 for n in wards["WardName_Clean"] if n == match)
            if split_count > 0:
                return df_grouped[match][0] / split_count
    return None  # No match

# Inject predictions into GeoJSON
predicted_values = []
for _, row in wards.iterrows():
    pred = get_prediction(row["WardName_Clean"], row["NAME"])
    predicted_values.append(pred)

wards["Predicted_Burglaries"] = predicted_values

# Color scale for predicted values
valid_values = [val for val in predicted_values if val is not None]
vmin = np.nanmin(valid_values)
vmax = np.nanmax(valid_values)

from matplotlib import colors
def color_scale(value, vmin=vmin, vmax=vmax):
    if value is None:
        return "#cccccc"
    norm = (value - vmin) / (vmax - vmin)
    cmap = colors.LinearSegmentedColormap.from_list("", ["#add8e6", "#00008b"])
    rgb = cmap(norm)
    return colors.rgb2hex(rgb[:3])

# Prepare GeoJSON features
wards_data = deepcopy(wards.__geo_interface__)
for feature, pred in zip(wards_data["features"], predicted_values):
    clean = clean_name(feature["properties"]["NAME"])
    classification = get_classification(clean)

    feature["properties"]["Predicted_Burglaries"] = pred
    feature["properties"]["Classification"] = classification
    feature["properties"]["style"] = {
        "fillColor": color_scale(pred),
        "color": "black",
        "weight": 1,
        "fillOpacity": 0.7
    }
wards_data = json.loads(json.dumps(wards_data, default=lambda x: float(x) if isinstance(x, (np.integer, np.floating)) else x))

# Create classification-based buckets
low_features = []
medium_features = []
high_features = []

# Constants
classification_styles = {
    "Low": {"fillColor": "orange", "color": "orange", "weight": 1, "fillOpacity": 0.1},
    "Medium": {"fillColor": "orange", "color": "orange", "weight": 1, "fillOpacity": 0.5},
    "High": {"fillColor": "orange", "color": "orange", "weight": 1, "fillOpacity": 0.9}
}

# Go through each feature and assign style based on classification
for feature in wards_data["features"]:
    classification = feature["properties"].get("Classification", None)

    # Treat missing or None classification as "Low"
    if classification not in classification_styles:
        classification = "Low"

    feature["properties"]["style"] = classification_styles[classification]

    if classification == "Low":
        low_features.append(feature)
    elif classification == "Medium":
        medium_features.append(feature)
    elif classification == "High":
        high_features.append(feature)


# Wrap each group into a valid GeoJSON structure
wards_low_data = {"type": "FeatureCollection", "features": low_features}
wards_medium_data = {"type": "FeatureCollection", "features": medium_features}
wards_high_data = {"type": "FeatureCollection", "features": high_features}

# Save to file or keep in memory
with open("wards_low.geojson", "w") as f:
    json.dump(wards_low_data, f)
with open("wards_medium.geojson", "w") as f:
    json.dump(wards_medium_data, f)
with open("wards_high.geojson", "w") as f:
    json.dump(wards_high_data, f)

In [2]:
import dash
from dash import dcc, html
import dash_leaflet as dl
from dash.dependencies import Input, Output, State
from shapely.geometry import Point, shape
import json

app = dash.Dash(__name__)

# Load the split GeoJSONs
with open("wards_low.geojson") as f:
    wards_low_data = json.load(f)
with open("wards_medium.geojson") as f:
    wards_medium_data = json.load(f)
with open("wards_high.geojson") as f:
    wards_high_data = json.load(f)

app.layout = html.Div([
    html.Div(id="prediction-output", style={"marginTop": "20px", "fontSize": "20px", "fontWeight": "bold"}),
    dl.Map(id="map", center=[51.5074, -0.1278], zoom=10, children=[
        dl.TileLayer(),
        dl.GeoJSON(
            data=wards_low_data,
            id="wards-low",
            zoomToBoundsOnClick=True,
            options={"interactive": True},
            style={'color': 'blue', 'fillColor': 'green', 'fillOpacity': 0.3, 'weight': 1},
            hoverStyle=dict(weight=3, color='#666', dashArray='')
        ),
        dl.GeoJSON(
            data=wards_medium_data,
            id="wards-medium",
            zoomToBoundsOnClick=True,
            options={"interactive": True},
            style={'color': 'blue', 'fillColor': 'orange', 'fillOpacity': 0.6, 'weight': 1},
            hoverStyle=dict(weight=3, color='#666', dashArray='')
        ),
        dl.GeoJSON(
            data=wards_high_data,
            id="wards-high",
            zoomToBoundsOnClick=True,
            options={"interactive": True},
            style={'color': 'blue', 'fillColor': 'red', 'fillOpacity': 0.9, 'weight': 1},
            hoverStyle=dict(weight=3, color='#666', dashArray='')
        ),
        dl.LayerGroup(id="ward-tooltip-marker")
    ], style={'width': '100%', 'height': '1029px'}),
    dcc.Store(id="clicked-ward")
])

@app.callback(
    Output('prediction-output', 'children'),
    Output('ward-tooltip-marker', 'children'),
    Input("map", "clickData"),
    State('wards-low', 'data'),
    State('wards-medium', 'data'),
    State('wards-high', 'data')
)
def zoom_to_ward(clickData, wards_low, wards_medium, wards_high):
    if clickData is None:
        return "", None

    try:
        lat = clickData['latlng']['lat']
        lng = clickData['latlng']['lng']
        click_point = Point(lng, lat)
    except Exception:
        return "", None

    all_wards = (wards_low or {}).get("features", []) + \
                (wards_medium or {}).get("features", []) + \
                (wards_high or {}).get("features", [])

    for feature in all_wards:
        ward_geom = shape(feature['geometry'])
        if ward_geom.contains(click_point):
            ward_name = feature['properties'].get('NAME', 'unknown')
            prediction = feature['properties'].get("Predicted_Burglaries", None)
            classification = feature['properties'].get("Classification", "Unknown")

            centroid = ward_geom.centroid
            tooltip_position = [centroid.y, centroid.x]

            return (
                f"Predicted Burglaries: {prediction or 'No data'} | Predicted Risk Factor: {classification}",
                dl.Marker(
                    position=tooltip_position,
                    children=[
                        dl.Tooltip(
                            html.Div([
                                html.Div(f"Ward: {ward_name}", style={"fontSize": "16px", "fontWeight": "bold", "color": "#003366", "marginBottom": "5px"}),
                                html.Div("Predicted Burglaries:", style={"fontSize": "18px", "fontWeight": "bold", "color": "#b30000"}),
                                html.Div(f"{prediction if prediction is not None else 'No data'}", style={"fontSize": "32px", "fontWeight": "bold", "color": "#b30000"})
                            ]),
                            permanent=True,
                            direction="top",
                            offset=[0, -20]
                        )
                    ]
                )
            )

    return "", None

if __name__ == "__main__":
    app.run(debug=True)