In [None]:
import os

import geopandas as gpd
import pandas as pd
from shapely.geometry import Point

detections_file = "../datasets/experiments/parkeren/detections/combined_detections_centrum.geojson"
enriched_detections_file = "../datasets/experiments/parkeren/detections/enriched_detections_centrum.geojson"
buurten_file = "../datasets/experiments/parkeren/bgt/Gebieden/GBD_buurt.shp"

bgt_folder = "../datasets/experiments/parkeren/bgt/Amsterdam_Centrum/"
bgt_voetpad_functions = ['voetpad', 'fietspad', 'voetpad op trap', 'voetgangersgebied']

parkeervlakken_file = "../datasets/experiments/parkeren/bgt//parkeervakken-parkeervakken-2025-09-12T15_32_00.255804+02_00.json"
beheerkaart_file = "../datasets/experiments/parkeren/bgt/beheerkaart_basis_eigendomsrecht.geojson"

RD_crs = "EPSG:28992"

car_dimensions = {
    "min_width": 1.4,
    "min_length": 3.0,
    "max_width": 3.1,
    "max_length": 12.0
}

conf_threshold = 0.04

In [None]:
def get_shape(geom):
    coords = geom.exterior.coords

    # get length of bounding box edges
    edge_length = (
        Point(coords[0]).distance(Point(coords[1])),
        Point(coords[1]).distance(Point(coords[2]))
    )

    return {
        "length": max(edge_length),
        "width": min(edge_length)
    }

def is_valid_detection(row):
    accept_conf = row["confidence"] >= conf_threshold
    accept_width = (
        (row["width"] >= car_dimensions["min_width"])
        and (row["width"] <= car_dimensions["max_width"])
    )
    accept_length = (
        (row["length"] >= car_dimensions["min_length"])
        and (row["length"] <= car_dimensions["max_length"])
    )
    return accept_conf and accept_width and accept_length

## Load data

In [None]:
# Load buurten shapes
buurten_gdf = gpd.read_file(buurten_file)
buurten_gdf = buurten_gdf[buurten_gdf["sdl_naam"] == "Centrum"]

# Load BGT wegdeel
_bgt_wgl_gdfs = []
_bgt_wgl_files = [file for file in os.listdir(bgt_folder) if file.startswith("BGT_WGL") and file.endswith(".shp")]
for file in _bgt_wgl_files:
    _bgt_wgl_gdfs.append(gpd.read_file(os.path.join(bgt_folder, file)))

bgt_wgl_gdf = gpd.GeoDataFrame(pd.concat(_bgt_wgl_gdfs))
bgt_wgl_gdf = bgt_wgl_gdf[bgt_wgl_gdf["eindreg"].isna()]

# Filter pedestrian zones
bgt_voetpad_gdf = bgt_wgl_gdf[bgt_wgl_gdf["bgtfunctie"].isin(bgt_voetpad_functions)]

# Load parkeervlakken
parkeervlakken_gdf = gpd.read_file(parkeervlakken_file)
parkeervlakken_gdf.to_crs(RD_crs, inplace=True)
parkeervlakken_gdf = parkeervlakken_gdf[parkeervlakken_gdf.is_valid]
parkeervlakken_gdf = parkeervlakken_gdf[parkeervlakken_gdf.intersects(buurten_gdf.union_all())]

# Load beheerkaart
beheerkaart_gdf = gpd.read_file(beheerkaart_file)
beheerkaart_gdf.to_crs(RD_crs, inplace=True)
beheerkaart_gdf = beheerkaart_gdf[beheerkaart_gdf.intersects(buurten_gdf.union_all())]
beheerkaart_shapes = beheerkaart_gdf.dissolve(by="indEigendomAmsterdam").loc[True].geometry

## Enrich detections

In [None]:
def get_buurt_and_wijk(row):
    buurt = buurten_gdf[buurten_gdf.contains(row["geometry"].centroid)]

    if len(buurt) > 0:
        buurt = buurt.iloc[0]
        return {
            "buurt_code": buurt["code"], 
            "buurt_naam": buurt["naam"], 
            "wijk_naam": buurt["wijk_naam"]
        }
    
    else:
        return {
            "buurt_code": None, 
            "buurt_naam": None, 
            "wijk_naam": None
        }

def is_in_parkeervlak(row):
    return any(parkeervlakken_gdf.contains(row["geometry"].centroid))

def is_in_beheer(row):
    return beheerkaart_shapes.contains(row["geometry"].centroid)

In [None]:
# Load detections from file
centrum_detections_gdf = gpd.read_file(detections_file).set_index("index")

# Enrich detections
centrum_detections_gdf[["length", "width"]] = centrum_detections_gdf.apply(lambda row: get_shape(row["geometry"]), axis="columns", result_type="expand")
centrum_detections_gdf["valid_detection"] = centrum_detections_gdf.apply(is_valid_detection, axis="columns")
centrum_detections_gdf[["buurt_code", "buurt_naam", "wijk_naam"]] = centrum_detections_gdf.apply(get_buurt_and_wijk, axis="columns", result_type="expand")
centrum_detections_gdf["in_parkeervlak"] = centrum_detections_gdf.apply(is_in_parkeervlak, axis="columns")
centrum_detections_gdf["in_beheer"] = centrum_detections_gdf.apply(is_in_beheer, axis="columns")

# Filter valid detections
valid_detections_gdf = centrum_detections_gdf[centrum_detections_gdf["valid_detection"] & ~centrum_detections_gdf["buurt_code"].isna()].copy()

## Compute wrongly parked cars

In [None]:
voetpad_shapes = bgt_voetpad_gdf[["bgtfunctie", "geometry"]].dissolve(by="bgtfunctie")
voetpad_shapes = voetpad_shapes.intersection(beheerkaart_shapes)

In [None]:
def get_overlap(row):
    overlap = voetpad_shapes.intersection(row["geometry"])
    overlap_ratio_fietspad = overlap.loc["fietspad"].area / row.geometry.area
    
    overlap_wo_fietspad = overlap.drop("fietspad")
    overlap_ratio_voetpad = overlap_wo_fietspad.union_all().area / row.geometry.area
    overlap_voetpad_type = overlap_wo_fietspad.index[overlap_wo_fietspad.area.argmax()]
    return {
        "overlap_fietspad": overlap_ratio_fietspad,
        "overlap_voetpad": overlap_ratio_voetpad,
        "overlap_voetpad_type": overlap_voetpad_type if overlap_ratio_voetpad > 0 else "nvt"
    }

In [None]:
valid_detections_gdf[["overlap_fietspad", "overlap_voetpad", "overlap_voetpad_type"]] = valid_detections_gdf.apply(get_overlap, axis="columns", result_type="expand")

In [None]:
valid_detections_gdf["wrongly_parked"] = valid_detections_gdf["overlap_voetpad"] >= 0.25

In [None]:
def get_classification(row):
    if row["in_parkeervlak"] & ~row["wrongly_parked"]:
        return "in_parkeervlak_correct"
    
    if row["in_parkeervlak"] & row["wrongly_parked"]:
        return "in_parkeervlak_incorrect"
    
    if ~row["in_parkeervlak"] & row["wrongly_parked"]:
        return "buiten_parkeervlak"
    
    return "overig"

In [None]:
valid_detections_gdf["classification"] = valid_detections_gdf.apply(get_classification, axis="columns")

In [None]:
## Save to file
valid_detections_gdf.to_file(enriched_detections_file)

## Analysis

In [None]:
# Load file
valid_detections_gdf = gpd.read_file(enriched_detections_file).set_index("index")

In [None]:
if "centrum_detections_gdf" in locals():
    n_detections = len(centrum_detections_gdf)
    print(f"Number of detections: {n_detections}")

n_parked = valid_detections_gdf["in_parkeervlak"].sum()
wrongly_parked = valid_detections_gdf["wrongly_parked"].sum()
total_parked = (valid_detections_gdf["wrongly_parked"] | valid_detections_gdf["in_parkeervlak"]).sum()
wrongly_parked_in_perkeervlak = (valid_detections_gdf["wrongly_parked"] & valid_detections_gdf["in_parkeervlak"]).sum()
print(f"Number of cars: {len(valid_detections_gdf)}")
print(f"Number of cars in parkeervlak: {n_parked}")
print(f"Wrongly parked: {wrongly_parked} ({(wrongly_parked / total_parked) * 100:.1f}%)")
print(f" - in parkeervlak: {wrongly_parked_in_perkeervlak}")
print(f" - outside parkeervlak: {wrongly_parked - wrongly_parked_in_perkeervlak}")

In [None]:
valid_detections_gdf[valid_detections_gdf["wrongly_parked"]]["overlap_voetpad_type"].value_counts()

In [None]:
def get_perc_wrongly_parked(perc):
    # wrongly_parked = valid_detections_gdf["overlap_voetpad"] >= perc
    # total_parked = (wrongly_parked |  valid_detections_gdf["in_parkeervlak"]).sum()
    wrongly_parked = valid_detections_gdf[valid_detections_gdf["in_parkeervlak"]]["overlap_voetpad"] >= perc
    total_parked = valid_detections_gdf["in_parkeervlak"].sum()
    return (wrongly_parked.sum() / total_parked) * 100

percentages = [x / 20 for x in range(1, 21)]

data = {
    p: get_perc_wrongly_parked(p) for p in percentages
}

_df = pd.Series(data=data)

In [None]:
ax = _df.plot()
ax.set(xlabel="Overlap met Voetpad", ylabel="% Wrongly Parked")
ax.set_xticks([x / 10 for x in range(1, 11)]);
ax.figure.savefig("../datasets/experiments/parkeren/plot_perc_wrong.png", dpi=150)

In [None]:
wijken = (
    valid_detections_gdf[valid_detections_gdf["classification"]!="overig"][["wijk_naam", "classification"]]
    .groupby("wijk_naam")
    .value_counts()
    .reset_index()
    .pivot(index="wijk_naam", columns="classification", values="count")
)

buurten = (
    valid_detections_gdf[valid_detections_gdf["classification"]!="overig"][["buurt_naam", "classification"]]
    .groupby("buurt_naam")
    .value_counts()
    .reset_index()
    .pivot(index="buurt_naam", columns="classification", values="count")
    .fillna(0)
    .astype("int64")
)

In [None]:
wijken["percentage_incorrect"] = (wijken.drop(columns="in_parkeervlak_correct").sum(axis="columns") / wijken.sum(axis="columns"))
buurten["percentage_incorrect"] = (buurten.drop(columns="in_parkeervlak_correct").sum(axis="columns") / buurten.sum(axis="columns"))

In [None]:
buurten_output = "../datasets/experiments/parkeren/detections/telling_per_buurt.xlsx"
wijken_output = "../datasets/experiments/parkeren/detections/telling_per_wijk.xlsx"

buurten.to_excel(buurten_output)
wijken.to_excel(wijken_output)

In [None]:
columns_to_keep = [
    'source_file', 'confidence', 'geometry', 'length', 'width', 
    'buurt_code', 'buurt_naam', 'wijk_naam', 
    'in_parkeervlak', 'in_beheer', 
    'overlap_fietspad', 'overlap_voetpad', 'overlap_voetpad_type',
    'classification'
]

In [None]:
final_gdf = valid_detections_gdf[columns_to_keep]

In [None]:
final_gdf.to_file("../datasets/experiments/parkeren/detections/20250922_foutparkeren_centrum_analyse_cvt.geojson", driver='GeoJSON')

In [None]:
with pd.ExcelWriter("../datasets/experiments/parkeren/detections/20250922_foutparkeren_centrum_analyse_cvt.xlsx") as writer:
    final_gdf.to_excel(writer, sheet_name="Detecties", index=False)
    wijken.to_excel(writer, sheet_name="Per wijk", index=True)
    buurten.to_excel(writer, sheet_name="Per buurt", index=True)

## Plotting

In [None]:
buurten_plot = gpd.GeoDataFrame(buurten.join(other=buurten_gdf[["naam", "geometry"]].set_index("naam")))

In [None]:
ax = buurten_plot.plot(column="percentage_incorrect", cmap="RdYlGn_r", legend=True, figsize=(10,8))
ax.set_title("Percentage foutparkeerders")
ax.axis('off');
ax.figure.savefig("../datasets/experiments/parkeren/buurten_percentage.png", dpi=150)

In [None]:
buurten_plot["totaal_incorrect"] = buurten_plot[["in_parkeervlak_incorrect", "buiten_parkeervlak"]].sum(axis="columns")
ax = buurten_plot.plot(column="totaal_incorrect", cmap="RdYlGn_r", legend=True, figsize=(10,8))
ax.set_title("Totaal aantal foutparkeerders")
ax.axis('off');
ax.figure.savefig("../datasets/experiments/parkeren/buurten_totaal.png", dpi=150)

In [None]:
from aerial_image_detection.raster_utils import RasterData

full_image_path = f"../datasets/experiments/parkeren/luchtfotos/luchtfotos_centrum_2025/2025_121000_487000_RGB_JPEG_hrl.tif"

raster_data = RasterData(full_image_path)

x = 0
y = 900
w = 100
h = 100

cropped_img, cropped_poly = raster_data.get_relative_crop((x, y, x+w, y+h))

cropped_detections = valid_detections_gdf[valid_detections_gdf.intersects(cropped_poly)]

In [None]:
import ast
import numpy as np

from aerial_image_detection.plot_utils import plot_obb_boxes_on_image

import cv2
import IPython

def show_bgr(img):
    _, ret = cv2.imencode('.jpg', img) 
    i = IPython.display.Image(data=ret)
    IPython.display.display(i)


def transform_box(box):
    box_array = np.array(box)
    box_array[:, 0] = box_array[:, 0] - (x * 12.5)
    box_array[:, 1] = cropped_img.shape[1] - (12500 - box_array[:, 1] - (y * 12.5))
    return box_array


colors = {
    # "overig": "grey",
    "in_parkeervlak_correct": (0, 151, 22),
    "in_parkeervlak_incorrect": (0, 171, 255),
    "buiten_parkeervlak": (0, 0, 255),
}

[x_min, y_min, x_max, y_max] = map(int, cropped_poly.bounds)

image_with_obb = np.flip(cropped_img.copy(), 2)

for name, color in colors.items():

    name_gdf = cropped_detections[cropped_detections["classification"]==name]
    obb_boxes = np.concatenate(
        [
            [
                transform_box(ast.literal_eval(box)) for box in list(cropped_detections["bounding_box"])
                for box in name_gdf["bounding_box"]
            ]
        ]
    ).tolist()
    
    obb_cls = [0]*len(obb_boxes)

    image_with_obb = plot_obb_boxes_on_image(
        image_with_obb,
        obb_cls,
        obb_boxes,
        single_color=color,
        line_width=3,
    )

In [None]:
# show_bgr(image_with_obb)

In [None]:
filename = f"../datasets/experiments/parkeren/output_images/2025_{x_min}_{y_min}_{w}_hr.jpg"

_ = cv2.imwrite(filename=filename, img=image_with_obb)

In [None]:
%matplotlib inline

import matplotlib.pyplot as plt
import numpy as np

padding = 10

fig, ax = plt.subplots(1, figsize=(10, 10), constrained_layout=True)

[x_min, y_min, x_max, y_max] = map(int, cropped_poly.bounds)

ax.imshow(cropped_img, extent=[x_min, x_max, y_min, y_max])

colors = {
    # "overig": "grey",
    "in_parkeervlak_correct": "green",
    "in_parkeervlak_incorrect": "orange",
    "buiten_parkeervlak": "red"
}
labels = {
    # "overig": "Overig",
    "in_parkeervlak_correct": "Parkeervlak: correct",
    "in_parkeervlak_incorrect": "Parkeervlak: incorrect",
    "buiten_parkeervlak": "Buiten parkeervlak"
}
legend_items = []
for key in labels.keys():
    detections_for_key = cropped_detections[cropped_detections["classification"]==key]
    if len(detections_for_key) > 0:
        detections_for_key.boundary.plot(ax=ax, color=colors[key])
    legend_items.append(plt.Line2D([0], [0], color=colors[key], label=labels[key]))

ax.set_xlabel('X')
ax.set_ylabel('Y')

ax.set_xticks(range(x_min, x_max+1, 100))
ax.set_xticklabels(range(x_min, x_max+1, 100))
ax.set_yticks(range(y_min, y_max+1, 100))
ax.set_yticklabels(range(y_min, y_max+1, 100))

ax.set_xlim((x_min - padding, x_max + padding))
ax.set_ylim((y_min - padding, y_max + padding))
ax.set_aspect('equal', adjustable='box')

ax.legend(handles=legend_items, loc='upper center')

filename = f"../datasets/experiments/parkeren/output_images/issues_2025_{x_min}_{y_min}_{w}.jpg"
plt.savefig(filename, dpi=150)

plt.show()