In [None]:
import urllib

import contextily as ctx
import geopandas as gpd
import matplotlib.pyplot as plt
import osmnx as ox
import pandas as pd
import polars as pl
import requests
from brdr.aligner import Aligner
from brdr.enums import OpenbaarDomeinStrategy
from brdr.loader import DictLoader
from shapely.ops import unary_union
from shapely.wkt import loads

In [None]:
datasette_base_url = "https://datasette.planning.data.gov.uk/conservation-area.csv"

query = """
select * 
from entity
"""
encoded_query = urllib.parse.urlencode({"sql": query})

r = requests.get(f"{datasette_base_url}?{encoded_query}", auth=("user", "pass"))

filename = "datasette_data.csv"
with open(filename, "wb") as f_out:
    f_out.write(r.content)

data = pl.read_csv(filename)

In [None]:
def download_osm_polygons(
    polygon_for_osm,
    osm_tags,
    osm_query_crs,
    plot_crs,
):
    # Download Open Street Map polygons
    osm_features_df = gpd.GeoDataFrame(geometry=[], crs=osm_query_crs)
    osm_features_base = ox.features_from_polygon(polygon_for_osm, osm_tags)
    osm_features_df = osm_features_base[
        osm_features_base.geometry.geom_type.isin(
            ["Polygon", "MultiPolygon", "LineString"]
        )
    ]
    # This ensures that LineStrings are also interpreted as v.small polygons - shortcut for all in one solution.
    osm_features_df = osm_features_df.to_crs(plot_crs).buffer(0.00000000001)
    print(f"Downloaded {len(osm_features_df)} OSM polygons.")

    # Align OSM co-ordinate systems
    if not osm_features_df.empty:
        if osm_features_df.crs == plot_crs:
            osm_features_proj = osm_features_df
        else:
            osm_features_proj = osm_features_df.to_crs(plot_crs)
    else:
        osm_features_proj = gpd.GeoDataFrame(geometry=[], crs=plot_crs)
    return osm_features_proj

In [None]:
def process_osm_polygons(osm_features_proj, used_osm_indices):
    reference_geom = None

    if not osm_features_proj.empty:
        geometries_to_combine = None

        if used_osm_indices is None:
            # Case 1: Use ALL downloaded features
            print("Using all polygons.")
            geometries_to_combine = osm_features_proj.geometry
        elif isinstance(used_osm_indices, list) and used_osm_indices:
            # Case 2: Use features at specific indices
            print(f"Selecting polygons at indices {used_osm_indices}.")
            geometries_to_combine = osm_features_proj.iloc[used_osm_indices].geometry

        # Combine if geometries were selected
        if geometries_to_combine is not None and not geometries_to_combine.empty:
            # Combine all the geometries we need
            combined_reference = unary_union(geometries_to_combine)  # .buffer(0)
            if not combined_reference.is_empty:
                reference_geom = combined_reference
            else:
                print("Empty osm polygons when combining")
        else:
            print("Indices may be out of scope")

    else:
        print("No OSM polygons downloaded.")
    return reference_geom

In [None]:
def get_snapped_polygon(
    reference_geom,
    original_proj,
    brdr_distance,
    brdr_strategy,
    brdr_threshold,
    plot_crs,
):
    aligned_df = None
    if reference_geom is not None and not original_proj.empty:
        thematic_geom = original_proj["geometry"].iloc[0]
        if thematic_geom.is_valid or thematic_geom.buffer(0).is_valid:
            if not thematic_geom.is_valid:
                thematic_geom = thematic_geom.buffer(0)

            # alignment logic
            aligner = Aligner(crs=plot_crs)
            loader_thematic = DictLoader({"theme_id_1": thematic_geom})
            aligner.load_thematic_data(loader_thematic)
            loader_reference = DictLoader({"ref_id_1": reference_geom})
            aligner.load_reference_data(loader_reference)

            process_result = aligner.process(
                relevant_distance=brdr_distance,
                od_strategy=brdr_strategy,
                threshold_overlap_percentage=brdr_threshold,
            )
            aligned_geom = process_result["theme_id_1"][brdr_distance]["result"]
            if not aligned_geom.is_valid:
                aligned_geom = aligned_geom.buffer(0)

            else:
                aligned_df = gpd.GeoDataFrame(
                    [1], geometry=[aligned_geom], crs=plot_crs
                )
    return aligned_df

In [None]:
def plot_all_areas(aligned_df, original_proj, osm_features_proj, osm_tags, plot_crs):
    fig, ax = plt.subplots(figsize=(12, 12))

    plot_layers_for_bounds = [original_proj]
    if aligned_df is not None:
        plot_layers_for_bounds.append(aligned_df)
    combined_bounds_df = pd.concat(
        [df for df in plot_layers_for_bounds if not df.empty], ignore_index=True
    )
    plot_xlim, plot_ylim = None, None
    if not combined_bounds_df.empty:
        xmin, ymin, xmax, ymax = combined_bounds_df.total_bounds
        if xmax > xmin and ymax > ymin:  # Basic check
            x_padding = (xmax - xmin) * 0.1
            y_padding = (ymax - ymin) * 0.1
            plot_xlim = (xmin - x_padding, xmax + x_padding)
            plot_ylim = (ymin - y_padding, ymax + y_padding)

    # Plot original area
    original_proj.boundary.plot(
        ax=ax,
        color="black",
        linestyle="--",
        linewidth=2.0,
        label="Original Area",
        zorder=2,
        alpha=0.8,
    )

    # Plot new boundary
    if aligned_df is not None:
        aligned_df.boundary.plot(
            ax=ax, color="red", linewidth=2.0, label="New Area", zorder=4, alpha=0.9
        )

    # Plot OSM polygons
    if not osm_features_proj.empty:
        osm_features_proj.plot(
            ax=ax,
            label=f"OSM Features ({list(osm_tags.keys())[0]})",
            zorder=3,
            edgecolor="blue",
            facecolor="lightblue",
            linewidth=0.5,
            alpha=0.6,
        )

    # Set Limits (if calculated)
    if plot_xlim and plot_ylim:
        ax.set_xlim(plot_xlim)
        ax.set_ylim(plot_ylim)

    # Add Basemap
    ctx.add_basemap(
        ax,
        source=ctx.providers.OpenStreetMap.Mapnik,
        crs=plot_crs,
        zoom="auto",
        zorder=1,
    )

    # Final Appearance
    ax.set_axis_off()
    ax.set_title(
        f"Original & Aligned Area with base Features ({osm_tags})\n(Plot CRS: {plot_crs})"
    )
    ax.legend(loc="best")  # Adjust legend location if needed
    plt.tight_layout()
    plt.show()
    print("Plot displayed.")

In [None]:
def align_and_plot_area(
    original_wkt: str,
    initial_crs: str = "EPSG:4326",
    osm_tags: dict = {"building": True},
    brdr_distance: float = 20,
    brdr_threshold: float = 10,
    brdr_strategy=OpenbaarDomeinStrategy.SNAP_ONLY_VERTICES,
    plot_crs: str = "EPSG:3857",
    osm_query_crs: str = "EPSG:4326",
    used_osm_indices: list | None = None,
    polygon_detection_buffer: float = 0.00001,
):

    # Load datasette polygon for area
    original_geom = loads(original_wkt)
    original_df = gpd.GeoDataFrame([1], geometry=[original_geom], crs=initial_crs)

    # Align co-ordinate systems
    if original_df.crs == osm_query_crs:
        df_for_query = original_df
    else:
        df_for_query = original_df.to_crs(osm_query_crs)
    # Need to add a small buffer to ensure that all nearby features are captured. May need testing.
    polygon_for_osm = df_for_query.geometry.iloc[0].buffer(polygon_detection_buffer)

    if original_df.crs == plot_crs:
        original_proj = original_df
    else:
        original_proj = original_df.to_crs(plot_crs)

    # # Download Open Street Map polygons
    osm_features_proj = download_osm_polygons(
        polygon_for_osm=polygon_for_osm,
        osm_tags=osm_tags,
        osm_query_crs=osm_query_crs,
        plot_crs=plot_crs,
    )

    # Prep natural features
    reference_geom = process_osm_polygons(osm_features_proj, used_osm_indices)

    # Brdr for snapping polygons
    aligned_df = get_snapped_polygon(
        reference_geom=reference_geom,
        original_proj=original_proj,
        brdr_distance=brdr_distance,
        brdr_strategy=brdr_strategy,
        brdr_threshold=brdr_threshold,
        plot_crs=plot_crs,
    )

    # Plot results
    plot_all_areas(
        aligned_df=aligned_df,
        original_proj=original_proj,
        osm_features_proj=osm_features_proj,
        osm_tags=osm_tags,
        plot_crs=plot_crs,
    )

    return aligned_df

In [None]:
input_tags = {
    "landuse": ["residential"],
    # 'landuse': ['farmyard'],
    # 'highway': ['track'],
    # 'waterway': ['drain'],
}
input_brdr_distance = 20
input_brdr_threshold = 10
# input_brdr_strategy = OpenbaarDomeinStrategy.SNAP_ONLY_VERTICES
input_brdr_strategy = OpenbaarDomeinStrategy.SNAP_PREFER_VERTICES
initial_crs = "EPSG:4326"
target_crs = "EPSG:3857"  # CRS for brdr and plotting
target_osm_crs = "EPSG:4326"  # CRS for osmnx input polygon


# --- Run the Function ---
aligned_result_df = align_and_plot_area(
    original_wkt=data["geometry"][4],
    initial_crs=initial_crs,
    osm_tags=input_tags,
    brdr_distance=input_brdr_distance,
    brdr_threshold=input_brdr_threshold,
    brdr_strategy=input_brdr_strategy,
    plot_crs=target_crs,
    osm_query_crs=target_osm_crs,
    used_osm_indices=None,
)