# Anhang: Implementierung der räumlichen Analyse

Dieser Anhang dokumentiert die methodisch relevanten Teile der Implementierung zur Evaluierung der Lösungskonzepte.

## 0. Datenbeschaffung: HU-DE via LGLN-WFS

Die Hausumringe (HU-DE) wurden direkt über den öffentlichen WFS-Dienst des Landesamts für Geoinformation und Landesvermessung Niedersachsen (LGLN) bezogen. Dieser Weg wurde gewählt, weil ein Massen-Download der HU-DE aus der GDI der Deutschen Telekom zum Analysezeitpunkt noch nicht möglich war.

Der WFS-Dienst liefert Features seitenweise (paginiert). Da die Gesamtzahl der Gebäude im Untersuchungsgebiet die maximale Seitengröße von 1.000 Features überschreitet, werden die Seiten in einer Schleife nacheinander abgerufen und anschließend zu einem einzigen GeoDataFrame zusammengeführt.

### 0.1 Einzelne WFS-Seite abrufen

`lade_lgln_layer` kapselt einen einzelnen paginierten WFS-Request. Die Antwort wird als temporäre GML-Datei zwischengespeichert, da `geopandas.read_file` eine Datei auf der Festplatte erwartet und nicht direkt aus einem Response-Objekt lesen kann.

In [None]:
import requests
import tempfile
import os
import logging

# Endpunkt des LGLN-WFS für vereinfachte ALKIS-Daten (Niedersachsen)
LGLN_WFS_URL = "https://opendata.lgln.niedersachsen.de/doorman/noauth/alkis_wfs_einfach"


def lade_lgln_layer(layer_name, bbox_utm32, count=1000, startindex=0):
    """
    Ruft eine einzelne Seite eines LGLN-WFS-Layers ab.

    Der WFS-Dienst gibt maximal 'count' Features pro Anfrage zurück.
    Über 'startindex' kann durch die Ergebnismenge paginiert werden.
    Die Antwort wird als temporäre GML-Datei gespeichert, da
    geopandas.read_file eine Datei auf der Festplatte benötigt.

    Args:
        layer_name  : WFS-Typname, z. B. 'ave:GebaeudeBauwerk'
        bbox_utm32  : Bounding Box [x_min, y_min, x_max, y_max] in EPSG:25832
        count       : Maximale Anzahl Features pro Request 
        startindex  : Startposition für die Paginierung 

    Returns:
        GeoDataFrame mit den geladenen Features oder None bei Fehler.
    """
    params = {
        'SERVICE'   : 'WFS',
        'REQUEST'   : 'GetFeature',
        'VERSION'   : '2.0.0',
        'TYPENAMES' : layer_name,
        'STARTINDEX': startindex,
        'COUNT'     : count,
        'SRSNAME'   : 'urn:ogc:def:crs:EPSG::25832',
        'BBOX'      : f"{bbox_utm32[0]},{bbox_utm32[1]},{bbox_utm32[2]},{bbox_utm32[3]},"
                      f"urn:ogc:def:crs:EPSG::25832",
        'NAMESPACES': 'xmlns(ave,http://repository.gdi-de.org/schemas/adv/produkt/alkis-vereinfacht/2.0)'
    }


    temp_file = tempfile.NamedTemporaryFile(mode='wb', suffix='.gml', delete=False)
    temp_filename = temp_file.name

    try:
        response = requests.get(LGLN_WFS_URL, params=params, timeout=120)

        if response.status_code != 200:
            temp_file.close()
            return None

        temp_file.write(response.content)
        temp_file.close()

        gdf = gpd.read_file(temp_filename, engine='fiona')
        return gdf

    except Exception as e:
        temp_file.close()
        return None

    finally:
        try:
            if os.path.exists(temp_filename):
                os.remove(temp_filename)
        except Exception as e:
            logger.warning(f'Temp-Datei konnte nicht gelöscht werden: {e}')

### 0.2 Alle Features einer BBox laden (Paginierung)

`lade_alle_features` ruft `lade_lgln_layer` wiederholt auf, bis keine weiteren Features mehr zurückgegeben werden. Gibt der Server weniger als `count` Features zurück, ist die letzte Seite erreicht und die Schleife wird beendet. Alle Teilresultate werden mit `pd.concat` zu einem einzigen GeoDataFrame zusammengeführt.

In [None]:
def lade_alle_features(layer_name, bbox_utm32,
                       count_per_request=1000, max_features=100000):
    """
    Lädt alle Features eines WFS-Layers innerhalb einer Bounding Box
    durch automatische Paginierung.

    Die Schleife endet, wenn:
      a) der Server weniger als count_per_request Features zurückgibt
         (letzte Seite erreicht), oder
      b) max_features überschritten werden (Sicherheitsabbruch).

    Args:
        layer_name        : WFS-Typname, z. B. 'ave:GebaeudeBauwerk'
        bbox_utm32        : Bounding Box [x_min, y_min, x_max, y_max] in EPSG:25832
        count_per_request : Features pro Seite 
        max_features      : Maximale Gesamtanzahl als Sicherheitsabbruch 

    Returns:
        GeoDataFrame mit allen Features oder None, falls nichts geladen wurde.
    """
    all_pages = []
    startindex = 0

    logger.info(f'Starte vollständigen Abruf für {layer_name}...')

    while startindex < max_features:
        page = lade_lgln_layer(
            layer_name, bbox_utm32,
            count=count_per_request,
            startindex=startindex
        )

 
        if page is None or len(page) == 0:
            break

        all_pages.append(page)
        total_so_far = sum(len(p) for p in all_pages)
       

        if len(page) < count_per_request:
            break

        startindex += count_per_request

    if not all_pages:
        return None

    result = gpd.GeoDataFrame(
        pd.concat(all_pages, ignore_index=True),
        crs=all_pages[0].crs
    )
    return result

## 1. Hilfsfunktionen für die räumliche Verschneidung

In [None]:
import pandas as pd
import geopandas as gpd
from shapely.geometry import Point
from shapely.ops import unary_union
import numpy as np

`select_best_match` löst Mehrdeutigkeiten auf, die entstehen, wenn ein Punkt mehrere Gebäude berührt. Sie vergleicht die dokumentierte Standortadresse mit den Gebäudeadressen der Treffer und gibt das übereinstimmende Ergebnis zurück. Liegt kein Adress-Match vor, wird der erste gefundene Treffer zurückgegeben.

In [None]:

def select_best_match(group, search_col, ref_col):
    """
    Wählt bei mehreren Gebäudetreffern denjenigen aus,
    dessen Adresse mit der Standortadresse übereinstimmt.
    Gibt den ersten Treffer zurück, falls kein Match gefunden wird.
    """
    matches = group[group[search_col] == group[ref_col]]
    return matches.iloc[0] if len(matches) > 0 else group.iloc[0]

`perform_spatial_join` kapselt den GeoPandas-`sjoin`-Aufruf. Das Prädikat steuert die Art der räumlichen Beziehung: `intersects` erfasst alle Berührungen einschließlich Randpunkte, `within` erfordert die vollständige Lage innerhalb des Polygons. Bei Bedarf wird `select_best_match` gruppenweise angewendet, um je Punkt genau einen Gebäudeeintrag zu erhalten.

In [1]:
def perform_spatial_join(left_gdf, right_gdf, predicate='intersects', match_func=None, match_cols=None, operation_name="Join"):
    """
    Führt einen räumlichen Join (sjoin) zwischen zwei GeoDataFrames durch.
    predicate: 'intersects' berücksichtigt Randberührungen, 'within' erfordert vollständige Lage im Polygon.
    Falls match_func übergeben wird, wird select_best_match gruppenweise angewendet.
    """
    joined = gpd.sjoin(left_gdf, right_gdf, how='left', predicate=predicate)
    if match_func and match_cols:
        joined = joined.groupby(joined.index, group_keys=False).apply(match_func, **match_cols)
    matches = joined['index_right'].notna().sum()
    return joined

`match_addresses` kapselt den vollständigen Adressabgleich für einen räumlichen Join. Die Methode ruft `perform_spatial_join` auf und übergibt `select_best_match` als Auflösungsstrategie, sodass bei mehreren Gebäudetreffern derjenige Treffer bevorzugt wird, dessen normalisierte Adresse mit der Standortadresse übereinstimmt.

In [None]:
def match_addresses(self, points_gdf, buildings_gdf, point_addr_col, building_addr_col):
    """
    Führt einen räumlichen Join mit adressbasierter Mehrdeutigkeitsauflösung durch.

    Kombiniert perform_spatial_join mit select_best_match. Liegt ein Punkt in mehreren
    Gebäudepolygonen, wird dasjenige Gebäude bevorzugt, dessen normalisierte Adresse
    mit der Standortadresse übereinstimmt. Ohne Adresstrefferübereinstimmung wird der
    erste geometrische Treffer zurückgegeben.

    Args:
        points_gdf        : GeoDataFrame der Standortpunkte (z. B. KLS, Gf-AP, HK-DE)
        buildings_gdf     : GeoDataFrame der Gebäudepolygone
        point_addr_col    : Spaltenname der normalisierten Adresse im points_gdf
        building_addr_col : Spaltenname der normalisierten Adresse im buildings_gdf

    Returns:
        GeoDataFrame mit den Ergebnissen des räumlichen Joins inkl. 'index_right'
        (Gebäude-Index) und allen Attributspalten des buildings_gdf.
    """
    return perform_spatial_join(
        points_gdf,
        buildings_gdf,
        predicate='intersects',
        match_func=select_best_match,
        match_cols={
            'search_col': point_addr_col,
            'ref_col':    building_addr_col,
        },
        operation_name='Adressabgleich'
    )


## 2. Point-in-Polygon-Analyse

Für jeden der 405 Standorte wird geprüft, ob KLS-Koordinate, Gf-AP-Koordinate und HK-DE innerhalb eines Gebäudepolygons liegen. Die Verschneidung erfolgt zunächst gegen die ALKIS-Gebäude. KLS-Koordinaten ohne ALKIS-Treffer werden anschließend gegen die HU-DE geprüft.

### 2.1 KLS-Koordinate gegen ALKIS-Gebäude

In [None]:
joined_kls = processor.match_addresses(
    gdf_points_25832,
    gdf_buildings_25832,
    'adresse_kls', #Name der spalte der normaliserten Adresse der KLS
    'adresse_normalisiert' #Name der spalte der normaliserten Adresse des Gebäudes
)
joined_kls['Im_Gebaeude'] = joined_kls['index_right'].notna()
joined_kls['BID_KLS'] = joined_kls['index_right'] #BID= BuildingID

### 2.2 KLS-Koordinate gegen HU-DE

In [None]:
# Punkte, die nicht im ALKIS-Bestand liegen, werden gegen die HU-DE-Polygone geprüft.
gdf_points_outside = gdf_points_25832[~joined_kls['Im_Gebaeude']].copy()

if len(gdf_points_outside) > 0 and len(gdf_zshh25832) > 0:
    joined_zshh = processor.match_addresses(
        gdf_points_outside,
        gdf_zshh25832,
        'adresse_kls',
        'adresse_normalisiert'
    )
    joined_zshh['ImGebaeudeZSHH'] = joined_zshh['index_right'].notna()
    joined_zshh['BID_ZSHH'] = joined_zshh['index_right']

else:
    joined_zshh = gdf_points_25832.copy()
    joined_zshh['ImGebaeudeZSHH'] = False
    joined_zshh['BID_ZSHH'] = None

### 2.3 Gf-AP-Koordinate gegen ALKIS-Gebäude

In [None]:
# Gf-AP-Koordinaten werden individuell mit ALKIS-Gebäuden verschnitten.
# Jede AP-Zeile bleibt erhalten, damit pro Standort mehrere APs verglichen werden können.
joined_ap = processor.match_addresses(
    gdf_ap,
    gdf_buildings_25832,
    'adresse_ap',
    'adresse_normalisiert',
)
joined_ap['BID_AP'] = joined_ap['index_right']
joined_ap['BID_from_AP'] = joined_ap['BID_AP']

### 2.4 HK-DE gegen ALKIS-Gebäude

In [None]:
# HK-DE-Koordinaten werden mit ALKIS-Gebäuden verschnitten.
joined_hauskoord = processor.match_addresses(
    gdf_hauskoord,
    gdf_buildings_25832,
    'adresse_kls',
    'adresse_normalisiert'
)
joined_hauskoord['BID_HK'] = joined_hauskoord['index_right']
joined_hauskoord['Hauskoord_Im_Gebaeude'] = joined_hauskoord['BID_HK'].notna()

## 3. HK-AP-Übereinstimmungscheck

Geprüft wird, ob HK-DE und Gf-AP im selben ALKIS-Gebäude liegen. Die Bedingung `Check_HK_AP_Match` ist nur dann erfüllt, wenn zusätzlich die KLS-Koordinate außerhalb des Gebäudes liegt, da nur in diesem Fall eine Korrektur über die HK-AP-Übereinstimmung methodisch relevant ist.

In [None]:
# Prüft ob KLS außerhalb und HK-DE innerhalb eines Gebäudes liegt.
cond_kls_out_hk_in = (
    (result_df['Im_Gebaeude'] == False) &
    (result_df['Hauskoord_Im_Gebaeude'] == True)
)
result_df['Check_KLS_out_HK_in'] = cond_kls_out_hk_in.map({True: 'Ja', False: 'Nein'})

cond_hk_ap_match = pd.Series(False, index=result_df.index)

# Vergleich nur für Zeilen, in denen beide Gebäude-IDs vorhanden sind.
mask_valid = (result_df['BID_HK'].notna()) & (result_df['BID_from_AP'].notna())

if mask_valid.any():
    cond_hk_ap_match.loc[mask_valid] = (
        cond_kls_out_hk_in.loc[mask_valid] &
        (result_df.loc[mask_valid, 'BID_HK'] == result_df.loc[mask_valid, 'BID_from_AP'])
    )

result_df['Check_HK_AP_Match'] = cond_hk_ap_match.map({True: 'Ja', False: 'Nein'})
result_df['HK_Status_Detail'] = result_df.apply(get_hk_status, axis=1)

## 4. Fehlerkategorisierung

Ein Standort gilt als Problemfall, wenn die KLS-Koordinate nicht dem dokumentierten Gebäude zugeordnet werden kann. Es werden vier Fehlerkategorien unterschieden, die sich gegenseitig ausschließen. `Ist_Problemfall` fasst alle vier zu einem binären Flag zusammen.

In [None]:
result_df["Problem_Status"] = "OK"

im_gebaeude = result_df["Im_Gebaeude"].fillna(False).astype(bool)
ap_im_gebaeude = result_df["AP_Im_Gebaeude"].fillna(False).astype(bool)

if "KLS_Gebaeude_hat_APs" in result_df.columns:
    kls_hat_aps = result_df["KLS_Gebaeude_hat_APs"].fillna(False).astype(bool)
else:
    kls_hat_aps = pd.Series(False, index=result_df.index)

bid_kls = result_df["BID_KLS"]

if 'BID_AP_Specific' in result_df.columns:
    bid_ap = result_df["BID_AP_Specific"]
else:
    bid_ap = result_df["BID_from_AP"]

mask_both_exist = bid_kls.notna() & bid_ap.notna()

bid_unequal = pd.Series(False, index=result_df.index)
bid_unequal.loc[mask_both_exist] = (
    bid_kls.loc[mask_both_exist].astype(str) != bid_ap.loc[mask_both_exist].astype(str)
)

# Fehlerkategorien: P1a = KLS außerhalb, AP innen; P1b = beide außerhalb;
# P2 = verschiedene Gebäude; P3 = fremder AP im KLS-Gebäude
cond_p1a = (~im_gebaeude) & ap_im_gebaeude
cond_p1b = (~im_gebaeude) & (~ap_im_gebaeude)
cond_falsches_gebaeude_basis = im_gebaeude & mask_both_exist & bid_unequal
cond_p3 = cond_falsches_gebaeude_basis & kls_hat_aps
cond_p2 = cond_falsches_gebaeude_basis & ~cond_p3

result_df.loc[cond_p1a, "Problem_Status"] = "PROBLEM KLS außerhalb, AP im Gebäude"
result_df.loc[cond_p1b, "Problem_Status"] = "PROBLEM KLS und AP außerhalb"
result_df.loc[cond_p2, "Problem_Status"] = "PROBLEM Falsches Gebäude"
result_df.loc[cond_p3, "Problem_Status"] = "PROBLEM KLS mit fremdem AP im Gebäude"

result_df["Ist_Problemfall"] = 0
result_df.loc[cond_p1a | cond_p1b | cond_p2 | cond_p3, "Ist_Problemfall"] = 1


## 5. Lösungsweg-Prüfung und Priorisierung

Für jeden Problemfall wird geprüft, welches der Lösungskonzepte greift. Die Konzepte werden zunächst unabhängig voneinander bewertet und dann in einer festen Prioritätsreihenfolge zu einem eindeutigen Lösungsweg zusammengeführt. Jeder Standort erhält genau einen Lösungsweg oder den Status 'Ungelöst'.

In [None]:
pid_kls_safe = result_df['PID_KLS'].fillna(-9999).astype('Int64') if 'PID_KLS' in result_df.columns else pd.Series(-9999, index=result_df.index)
pid_ap_safe  = result_df['PID_AP'].fillna(-9999).astype('Int64') if 'PID_AP' in result_df.columns else pd.Series(-9999, index=result_df.index)
bid_hk_safe  = result_df['BID_HK'].fillna(-9999).astype('Int64') if 'BID_HK' in result_df.columns else pd.Series(-9999, index=result_df.index)
bid_ap_safe  = result_df['BID_from_AP'].fillna(-9999).astype('Int64') if 'BID_from_AP' in result_df.columns else pd.Series(-9999, index=result_df.index)

mask_problem = (result_df['Ist_Problemfall'] == 1)

# Flurstück-Logik: lösbar wenn KLS und Gf-AP auf demselben Flurstück liegen und nHK-DE = 1.
if {'PID_KLS', 'PID_AP', 'Im_Flurstueck', 'Anzahl_HK_Geometrisch'}.issubset(result_df.columns):
    result_df['Check_CanSolve_Flurstueck'] = (
        mask_problem &
        (result_df['Im_Flurstueck'] == True) &
        (result_df['PID_KLS'].notna()) &
        (result_df['PID_AP'].notna()) &
        (pid_kls_safe == pid_ap_safe) &
        (result_df['Anzahl_HK_Geometrisch'] == 1)
    )
else:
    result_df['Check_CanSolve_Flurstueck'] = False

# HU-DE-Integration: lösbar wenn die KLS-Koordinate innerhalb eines HU-DE-Polygons liegt.
if 'ImGebaeudeZSHH' in result_df.columns:
    result_df['Check_CanSolve_ZSHH'] = (
        mask_problem &
        (result_df['ImGebaeudeZSHH'] == True)
    )
else:
    result_df['Check_CanSolve_ZSHH'] = False

# HU-DE via HK+AP: lösbar wenn HK-DE und mindestens ein Gf-AP im selben HU-DE-Gebäude liegen.
if {'BIDZSHH_HK', 'BID_from_AP_ZSHH', 'Hauskoord_Im_GebaeudeZSHH'}.issubset(result_df.columns):

    def check_zshh_hkap_match(group):
        """Prüft ob mindestens ein Gf-AP derselben KLS-ID im gleichen HU-DE-Gebäude wie die HK-DE liegt."""
        bid_hk_zshh = group['BIDZSHH_HK'].iloc[0] if group['BIDZSHH_HK'].notna().any() else None
        hk_in_zshh  = group['Hauskoord_Im_GebaeudeZSHH'].iloc[0] if group['Hauskoord_Im_GebaeudeZSHH'].notna().any() else False
        kls_in_zshh = group['ImGebaeudeZSHH'].iloc[0] if group['ImGebaeudeZSHH'].notna().any() else True

        if not hk_in_zshh or pd.isna(bid_hk_zshh) or kls_in_zshh:
            return pd.Series(False, index=group.index)

        ap_matches_hk = (group['BID_from_AP_ZSHH'] == bid_hk_zshh) & group['BID_from_AP_ZSHH'].notna()
        return pd.Series(ap_matches_hk.any(), index=group.index)

    result_df['Check_CanSolve_ZSHH_HKAP'] = False
    if mask_problem.any():
        problem_rows = result_df[mask_problem]
        matched = problem_rows.groupby('KLS_ID', group_keys=False).apply(check_zshh_hkap_match)
        result_df.loc[mask_problem, 'Check_CanSolve_ZSHH_HKAP'] = matched
else:
    result_df['Check_CanSolve_ZSHH_HKAP'] = False

# HK-AP-Übereinstimmung (ALKIS): lösbar wenn HK-DE und Gf-AP im selben ALKIS-Gebäude liegen.
if {'BID_HK', 'BID_from_AP', 'Hauskoord_Im_Gebaeude'}.issubset(result_df.columns):
    result_df['Check_CanSolve_HK_AP'] = (
        mask_problem &
        (result_df['Hauskoord_Im_Gebaeude'] == True) &
        (result_df['BID_HK'].notna()) &
        (result_df['BID_from_AP'].notna()) &
        (bid_hk_safe == bid_ap_safe)
    )
else:
    result_df['Check_CanSolve_HK_AP'] = False

# Fremder Gf-AP identifizieren.
result_df['Check_CanSolve_FremdAP'] = (mask_problem & cond_p3) if 'cond_p3' in locals() else False

# Gf-AP-Konzept: lösbar wenn KLS außerhalb liegt, der Gf-AP geometrisch innerhalb eines Gebäudes liegt und die in Megaplan dokumentierte Adresse des Gf-AP mit der amtlichen ALKIS-Adresse 
# dieses Gebäudes übereinstimmt (keine fehlerhafte Dokumentation).
if {'AP_Im_Gebaeude', 'adresse_ap', 'Adresse_Gebaeude_AP'}.issubset(result_df.columns):
    cond_ap_adresse_korrekt = (
        result_df['adresse_ap'].notna() &
        result_df['Adresse_Gebaeude_AP'].notna() &
        (
            result_df['adresse_ap'].astype(str).str.strip() ==
            result_df['Adresse_Gebaeude_AP'].astype(str).str.strip()
        )
    )
    result_df['Check_CanSolve_GfAP'] = (
        mask_problem &
        (~result_df['Im_Gebaeude'].fillna(False).astype(bool)) &
        (result_df['AP_Im_Gebaeude'].fillna(False).astype(bool)) &
        cond_ap_adresse_korrekt
    )
else:
    result_df['Check_CanSolve_GfAP'] = False

# Priorisierung: Flurstück > HU-DE > HK+AP (ALKIS) > HU-DE via HK+AP > Gf-AP-Konzept
result_df['Loesungsweg'] = 'OK'
result_df.loc[mask_problem, 'Loesungsweg'] = 'Ungelöst'

mask_p1 = result_df['Check_CanSolve_Flurstueck']
result_df.loc[mask_p1, 'Loesungsweg'] = 'Gelöst: Flurstück'

mask_p2 = result_df['Check_CanSolve_ZSHH'] & ~mask_p1
result_df.loc[mask_p2, 'Loesungsweg'] = 'Gelöst: HU-DE'

mask_p3 = result_df['Check_CanSolve_HK_AP'] & ~mask_p1 & ~mask_p2
result_df.loc[mask_p3, 'Loesungsweg'] = 'Gelöst: HK-DE und Gf-AP im selben Gebäude'

mask_p4 = result_df['Check_CanSolve_ZSHH_HKAP'] & ~mask_p1 & ~mask_p2 & ~mask_p3
result_df.loc[mask_p4, 'Loesungsweg'] = 'Gelöst: HU-DE via HK+AP'

mask_p5 = result_df['Check_CanSolve_GfAP'] & ~mask_p1 & ~mask_p2 & ~mask_p3 & ~mask_p4
result_df.loc[mask_p5, 'Loesungsweg'] = 'Gelöst: MP-Konzept'


Nach der Lösungsweg-Zuweisung enthält `result_df` für jeden KLS-Standort eine Zeile pro Gf-AP. Für die weiteren Auswertungen wird je Standort genau ein repräsentativer Eintrag benötigt. Die Auswahl folgt einer dreistufigen Priorität: Ein Gf-AP, der geometrisch innerhalb eines Gebäudes liegt, wird einem außenliegenden vorgezogen. Bei Gleichstand gewinnt der Gf-AP mit der geringsten metrischen Distanz zur KLS-Koordinate.

In [None]:
# AP_Im_Gebaeude als Bool sicherstellen (True=1 > False=0 bei absteigender Sortierung)
result_df['AP_Im_Gebaeude'] = result_df['AP_Im_Gebaeude'].fillna(False).astype(bool)

# Distanz_m mit Fallback befüllen, falls Spalte fehlt
if 'Distanz_m' not in result_df.columns:
    result_df['Distanz_m'] = 9999

# Sortierung: KLS_ID aufsteigend, AP_Im_Gebaeude absteigend ("Im Gebäude" gewinnt),
# Distanz_m aufsteigend (kleinere Distanz gewinnt bei Gleichstand)
result_df = result_df.sort_values(
    by=['KLS_ID', 'AP_Im_Gebaeude', 'Distanz_m'],
    ascending=[True, False, True]
)

# Reduktion auf einen repräsentativen Eintrag pro KLS-Standort
result_df = result_df.drop_duplicates(subset='KLS_ID', keep='first').reset_index(drop=True)


## 6. Erkennung fehlerhaft dokumentierter Gf-APs

Ein Gf-AP gilt als fehlerhaft dokumentiert, wenn er geometrisch in einem Gebäude liegt, dessen Adresse nicht mit der Adresse des HK-DE-Gebäudes übereinstimmt. Ergänzend wird geprüft, ob eine andere HK-DE im AP-Gebäude existiert, was auf eine Verwechslung zweier Standorte hindeuten kann.

In [None]:
result_df['Check_AP_Falsches_Gebaeude'] = 'Nein'
result_df['Andere_HK_Im_AP_Gebaeude'] = 'Nein'

# Ein Gf-AP gilt als fehlerhaft dokumentiert, wenn HK-DE und Gf-AP in unterschiedlichen Gebäuden liegen.
hk_im_gebaeude_mit_adresse = (
    (result_df['Hauskoord_Im_Gebaeude'] == True) &
    (result_df['Adresse_Gebaeude_HK'].notna())
)

ap_im_anderen_gebaeude = (
    (result_df['Adresse_Gebaeude_AP'].notna()) &
    (result_df['Adresse_Gebaeude_HK'].astype(str).str.strip() !=
     result_df['Adresse_Gebaeude_AP'].astype(str).str.strip())
)

ap_falsches_gebaeude = hk_im_gebaeude_mit_adresse & ap_im_anderen_gebaeude
result_df.loc[ap_falsches_gebaeude, 'Check_AP_Falsches_Gebaeude'] = 'Ja'

# Prüft zusätzlich, ob eine andere HK-DE im AP-Gebäude existiert (Hinweis auf Vertauschung).
if ap_falsches_gebaeude.any():
    hk_in_gebaeude = result_df[
        (result_df['Hauskoord_Im_Gebaeude'] == True) &
        (result_df['Adresse_Gebaeude_HK'].notna())
    ].copy()

    hk_pro_gebaeude = hk_in_gebaeude.groupby('Adresse_Gebaeude_HK')['KLS_ID'].apply(set).to_dict()

    for idx in result_df[ap_falsches_gebaeude].index:
        eigene_kls_id = result_df.loc[idx, 'KLS_ID']
        ap_gebaeude_adresse = result_df.loc[idx, 'Adresse_Gebaeude_AP']

        if pd.notna(ap_gebaeude_adresse) and ap_gebaeude_adresse in hk_pro_gebaeude:
            andere_hks = hk_pro_gebaeude[ap_gebaeude_adresse] - {eigene_kls_id}
            if len(andere_hks) > 0:
                result_df.loc[idx, 'Andere_HK_Im_AP_Gebaeude'] = 'Ja'

## 7. Versatz-Analyse (Nearest-Neighbor-Distanzen)

`compute_versatz_analysis` berechnet für jeden Standort die metrische Distanz zur nächstgelegenen Gebäudegeometrie mittels `gpd.sjoin_nearest`. Die Funktion klassifiziert das Ergebnis anschließend in drei Kategorien:

- **Im Polygon (OK):** Die KLS-Koordinate liegt innerhalb eines ALKIS-Gebäudepolygons.
- **Verdacht Digitalisierungsversatz (<= 3 m):** Die Koordinate liegt knapp außerhalb, was auf einen systematischen Digitalisierungsversatz hinweist.
- **Falschverortung (> 3 m):** Die Koordinate liegt deutlich außerhalb jedes Gebäudes.

In [None]:
def compute_versatz_analysis(points_gdf, buildings_gdf, result_df,
                             klsid_col='KLSID',
                             in_building_col='ImGebaeude',
                             building_addr_col='adresse_normalisiert',
                             schwellenwert_m=3.0):
    """
    Berechnet die metrische Distanz jeder KLS-Koordinate zum nächstgelegenen
    ALKIS-Gebäudepolygon und klassifiziert den Versatzstatus.

    Vorgehen:
      1. gpd.sjoin_nearest sucht für jeden Punkt das geometrisch nächste Gebäude
         und liefert die Distanz in Metern.
      2. Duplikate (Punkt hat identischen Abstand zu zwei Gebäuden) werden durch
         Behalten des ersten Treffers aufgelöst.
      3. Klassifizierung nach Schwellenwert:
           - 'Im Polygon (OK)'                  : Punkt bereits als ImGebaeude=True bekannt
           - 'Verdacht Digitalisierungsversatz'  : 0.01 m <= Distanz <= schwellenwert_m
           - 'Falschverortung'                   : Distanz > schwellenwert_m

    Args:
        points_gdf        : GeoDataFrame der KLS-Punkte 
        buildings_gdf     : GeoDataFrame der ALKIS-Gebäudepolygone
        result_df         : resultdf, in das die neuen Spalten geschrieben werden
        klsid_col         : Name der KLS-ID-Spalte
        in_building_col   : Name der Bool-Spalte 'Punkt im Gebäude' 
        building_addr_col : Name der Adressspalte in buildings_gdf
        schwellenwert_m   : Distanz-Schwellenwert in Metern 

    Returns:
        result_df mit drei neuen Spalten:
          - 'DistanzzuAdressPolygon' : Distanz in Metern zum nächsten Gebäude
          - 'VersatzStatus'           : Klassifizierung (s.o.)
          - 'AdresseNaechstesGebaeude': Adresse des nächsten Gebäudes
    """

    points_work = points_gdf.copy()
    bldgs_work  = buildings_gdf.copy()


    def _unpack_addr(val):
        if isinstance(val, (set, list, tuple)):
            return str(list(val)[0]) if len(val) > 0 else ''
        return str(val)

    if building_addr_col in bldgs_work.columns:
        bldgs_work['adresse_nearest_display'] = (
            bldgs_work[building_addr_col].apply(_unpack_addr)
        )
    else:
        bldgs_work['adresse_nearest_display'] = 'Keine Adresse'


    # distance_col speichert die euklidische Distanz in Metern
    joined_nearest = gpd.sjoin_nearest(
        points_work,
        bldgs_work[['geometry', 'adresse_nearest_display']],
        how='left',
        distance_col='dist_nearest'
    )

    joined_nearest = joined_nearest.drop_duplicates(
        subset=klsid_col, keep='first'
    )


    dist_map = joined_nearest.set_index(klsid_col)['dist_nearest']
    addr_map = joined_nearest.set_index(klsid_col)['adresse_nearest_display']

    result_df['DistanzzuAdressPolygon'] = result_df[klsid_col].map(dist_map)
    result_df['AdresseNaechstesGebaeude'] = result_df[klsid_col].map(addr_map)

    # Punkte, die bereits als 'ImGebaeude=True' erkannt wurden, erhalten die Distanz 0.0 
    result_df.loc[result_df[in_building_col] == True,
                  'DistanzzuAdressPolygon'] = 0.0

    def _classify_versatz(row):
        dist   = row['DistanzzuAdressPolygon']
        imgeb  = row.get(in_building_col, False)

        if imgeb:
            return 'Im Polygon (OK)'

        if pd.isna(dist):
            return 'Kein Gebäude in der Nähe gefunden'

        if dist < 0.01:
            return 'Im Polygon (OK)'

        if dist <= schwellenwert_m:
            return f'Verdacht Digitalisierungsversatz (<={schwellenwert_m:.0f}m)'

        return f'Falschverortung (>{schwellenwert_m:.0f}m)'

    result_df['VersatzStatus'] = result_df.apply(_classify_versatz, axis=1)


---

## 8. Flurstücksverschneidung

> **Hinweis zur Einordnung:** Dieser Abschnitt implementiert das in Kapitel 4.3 konzipierte Verschneidungsverfahren für zusammenhängende Flurstücke. Das Ablaufdiagramm des vollständigen Algorithmus befindet sich im Anhang.



**Designentscheidung:** Das Verfahren basiert bewusst auf rein geometrischen Verschneidungsoperationen und verzichtet auf adressbasiertes Matching, da das Attribut `Lagebezeichnung` in den ALKIS-Flurstücksdaten qualitativ inkonsistent ist (teils vollständige Adresse, teils nur Straßenname, teils leer).

In [None]:
def aggregate_parcels_per_building(
    gdf_buildings: gpd.GeoDataFrame,
    gdf_parcels: gpd.GeoDataFrame,
    min_building_fraction: float = 0.20
) -> gpd.GeoDataFrame:
    """
    Aggregiert Flurstücke pro Gebäude (20 %-Regel) und behält alle
    nicht zugeordneten Flurstücke unverändert bei.

    Vorgehen:
      1. gpd.overlay (Intersection) berechnet die Schnittfläche
         zwischen jedem Gebäude und jedem Flurstück.
      2. Die 20 %-Regel filtert Flurstücke mit zu geringer Überdeckung.
      3. Bei Konflikt (Flurstück unter mehreren Gebäuden) gewinnt das
         Gebäude mit der größten Schnittfläche.
      4. Valide Flurstücke werden pro Gebäude per unary_union verschmolzen.
      5. Übrige Flurstücke werden ohne Verschmelzung angehängt.

    Args:
        gdf_buildings         : ALKIS-Gebäudepolygone 
        gdf_parcels           : ALKIS-Flurstücke 
        min_building_fraction : 20 %-Schwellenwert 

    Returns:
        GeoDataFrame mit verschmolzenen Flurstücken (hat_gebaeude=True)
        und übrigen Einzel-Flurstücken (hat_gebaeude=False).
        Neue Spalten: bid, anzahl_flurstuecke, hat_gebaeude.
    """

    buildings = gdf_buildings.copy().reset_index(drop=True)
    buildings['bid']    = buildings.index
    buildings['b_area'] = buildings.geometry.area

    parcels = gdf_parcels.copy().reset_index(drop=True)
    parcels['pid'] = parcels.index

    # Räumliche Verschneidung (Intersection)

    overlay = gpd.overlay(
        buildings[['bid', 'b_area', 'geometry']],
        parcels[['pid', 'geometry']],
        how='intersection',
        keep_geom_type=True,
        make_valid=True
    )

    building_to_parcels = {}
    processed_pids      = set()

    if not overlay.empty:
        overlay['intersection_area'] = overlay.geometry.area
        overlay['frac'] = overlay['intersection_area'] / overlay['b_area']

        # 20 %-Regel anwenden
        valid = overlay[overlay['frac'] >= min_building_fraction].copy()

        # Konfliktlösung: Bei Flurstück unter mehreren Gebäuden gewinnt das Gebäude mit der größten Schnittfläche
        valid = valid.sort_values('intersection_area', ascending=False)
        valid_unique = valid.drop_duplicates(subset=['pid'], keep='first')

        building_to_parcels = valid_unique.groupby('bid')['pid'].apply(set).to_dict()
        processed_pids      = set(valid_unique['pid'].unique())



    # Flurstücke pro Gebäude verschmelzen

    parcel_cols = ['flstkennz', 'gemarkung', 'flur', 'lagebeztxt']
    result_rows = []

    for bid, pids in building_to_parcels.items():
        b_row    = buildings.loc[bid]
        p_subset = parcels[parcels['pid'].isin(pids)]
        if p_subset.empty:
            continue

        # Union-Operation eliminiert interne Flurstücksgrenzen
        merged_geom = unary_union(p_subset.geometry.tolist())

        entry = {
            'bid'               : bid,
            'anzahl_flurstuecke': len(pids),
            'hat_gebaeude'      : True,
            'geometry'          : merged_geom
        }
        # Flurstücks-Attribute zusammenfassen (Duplikate entfernen)
        for col in parcel_cols:
            if col in p_subset.columns:
                vals = {str(x).strip() for x in p_subset[col] if pd.notna(x)}
                entry[col] = ', '.join(sorted(vals)) if vals else None

        result_rows.append(entry)

    gdf_merged = gpd.GeoDataFrame(result_rows, crs=parcels.crs)

    # Übrige Flurstücke ohne Gebäudezuordnung anhängen

    leftovers = parcels[~parcels['pid'].isin(processed_pids)].copy()
    leftovers['bid']                = None
    leftovers['anzahl_flurstuecke'] = 1
    leftovers['hat_gebaeude']       = False


    # Spalten angleichen, damit pd.concat keine Fehler wirft
    for c in gdf_merged.columns:
        if c != 'geometry' and c not in leftovers.columns:
            leftovers[c] = None

    # Finales GeoDataFrame zusammenführen
    final_gdf = gpd.GeoDataFrame(
        pd.concat(
            [gdf_merged, leftovers[[*gdf_merged.columns]]],
            ignore_index=True
        ),
        geometry='geometry',
        crs=parcels.crs
    )

    return final_gdf