In [None]:
import geopandas as gpd

gdf = gpd.read_file(r"Areas-of-interest-POIs\merged_building_volumes_filtered.gpkg")

print(gdf.crs)

In [None]:
gdf.info()

In [None]:
import xml.etree.ElementTree as ET

tree = ET.parse(r"Areas-of-interest-POIs\BuildingFunctionTypeAdV.xml")
root = tree.getroot()

In [None]:
import xml.etree.ElementTree as ET
import pandas as pd

def read_adv_codelist(xml_path):
    ns = {"gml": "http://www.opengis.net/gml"}
    root = ET.parse(xml_path).getroot()

    rows = []
    for d in root.findall(".//gml:Definition", ns):
        code = None
        label_de = None
        for n in d.findall("gml:name", ns):
            if "codeSpace" in n.attrib:
                code = (n.text or "").strip()
            else:
                label_de = (n.text or "").strip()
        if code and label_de:
            rows.append((code, label_de))

    return (pd.DataFrame(rows, columns=["function", "label_de"])
              .drop_duplicates("function")
              .sort_values("function")
              .reset_index(drop=True))

df_codes = read_adv_codelist(r"Areas-of-interest-POIs\BuildingFunctionTypeAdV.xml")
df_codes.info()

In [None]:
len(df_codes['label_de'].unique())

In [None]:
df_codes['label_de'].value_counts()

In [None]:
df_codes.head(10)

In [None]:
df_codes.to_csv(
    r"Areas-of-interest-POIs\building_function_codelist.csv",
    index=False,
    encoding="utf-8"
)

In [None]:
# from googletrans import Translator

# df = pd.read_csv(
#     r"Areas-of-interest-POIs\building_function_codelist.csv",
#     encoding="utf-8-sig"
# )

# translator = Translator()

# def translate(text):
#     if pd.isna(text):
#         return text
#     return translator.translate(text, src="de", dest="en").text

# df["label_en"] = df["label_de"].apply(translate)

# df.to_csv(
#     r"Areas-of-interest-POIs\building_function_codelist_de_en.csv",
#     index=False,
#     encoding="utf-8-sig"
# )

# df.head(10)

In [None]:
df = pd.read_csv(r"Areas-of-interest-POIs\building_function_codelist_de_en.csv")

gdf = gdf.merge(
    df[["function", "label_de", "label_en"]],
    on="function",
    how="left"
)

In [None]:
gdf.tail()

In [None]:
osm_building_data = gpd.read_file(r"Areas-of-interest-POIs\Buildings-Area-of-study.gpkg")

osm_building_data.head()

In [None]:
osm_building_data.to_crs(gdf.crs, inplace=True)
print(osm_building_data.crs)

In [None]:
name_col = "name"  

# keep only features that actually have a name
osm_named = osm_building_data[osm_building_data[name_col].notna() & (osm_building_data[name_col].astype(str).str.strip() != "")].copy()

# spatial join: which OSM buildings intersect each gdf polygon
j = gpd.sjoin(
    gdf[["geometry"]].reset_index(names="gdf_idx"),
    osm_named[[name_col, "geometry"]],
    how="left",
    predicate="intersects"
)

# aggregate names into unique list per gdf polygon
names = (j.groupby("gdf_idx")[name_col]
           .apply(lambda s: sorted(set(str(x).strip() for x in s.dropna() if str(x).strip())))
           .rename("osm_names"))

# attach back to gdf
gdf["osm_names"] = gdf.index.to_series().map(names)

In [None]:
gdf[gdf["osm_names"].notna() & (gdf["osm_names"].str.len() > 10)].head()

In [None]:
osm_landuse_data = gpd.read_file(r"Areas-of-interest-POIs\Land-use_Area-of-study.gpkg")

osm_landuse_data.head()

In [None]:
osm_landuse_data['fclass'].value_counts()

In [None]:
osm_landuse_data['name'].value_counts()

In [None]:
landuse = osm_landuse_data.to_crs(gdf.crs)

# spatial join
j = gpd.sjoin(
    gdf[["geometry"]].reset_index(names="gdf_idx"),
    landuse[["fclass", "name", "geometry"]],
    how="left",
    predicate="intersects"
)

# aggregate landuse class
class_lu = (
    j.groupby("gdf_idx")["fclass"]
     .apply(lambda s: sorted(set(x for x in s.dropna())))
)

# aggregate landuse name
name_lu = (
    j.groupby("gdf_idx")["name"]
     .apply(lambda s: sorted(set(str(x).strip() for x in s.dropna() if str(x).strip())))
)

# attach to gdf (lists, empty list means no landuse intersected)
gdf["class_landuse"] = gdf.index.to_series().map(class_lu).apply(lambda x: x if isinstance(x, list) else [])
gdf["name_landuse"]  = gdf.index.to_series().map(name_lu).apply(lambda x: x if isinstance(x, list) else [])


In [None]:
gdf.head()

In [None]:
gdf[gdf["name_landuse"].notna() & (gdf["name_landuse"].str.len() > 0)].head()

In [None]:
df_map = pd.read_excel(
    r"Areas-of-interest-POIs\alkis_building_activity_map.xlsx"
)

gdf = gdf.merge(
    df_map,
    left_on="function",
    right_on="gfk_code",
    how="left"
).drop(columns=["gfk_code"])

In [None]:
gdf.head()

In [None]:
residencial_ALKIS = gpd.read_file('Areas-of-interest-POIs/Residencial-Landuse_ALKIS.gpkg')
print(residencial_ALKIS.crs)
residencial_ALKIS.head()

In [None]:
residencial_ALKIS.columns

In [None]:
import numpy as np

residential = residencial_ALKIS[["geometry"]].to_crs(gdf.crs)

# Spatial join (building INSIDE residential polygon)
j = gpd.sjoin(
    gdf[["geometry"]].reset_index(names="gdf_idx"),
    residential,
    how="inner",          # only include building polygons which are inside landuse polygons
    # predicate="within"
    predicate="intersects"
)

# Unique building indices that are residential
res_idx = j["gdf_idx"].unique()

# 4) Create column with NaN by default
gdf["ALKIS_Landuse_info"] = np.nan

# 5) Assign only matching buildings
gdf.loc[res_idx, "ALKIS_Landuse_info"] = "residence"

In [None]:
gdf.head()

In [None]:
gdf.info()

In [None]:
gdf['ALKIS_Landuse_info'].value_counts(dropna=False)
# 525479

In [None]:
commercial_ALKIS = gpd.read_file('Areas-of-interest-POIs/Commercial_Landuse_ALKIS.gpkg')
print(commercial_ALKIS.crs)
commercial_ALKIS.head()

In [None]:
commercial = commercial_ALKIS[["geometry"]].to_crs(gdf.crs)

j_com = gpd.sjoin(
    gdf[["geometry"]].reset_index(names="gdf_idx"),
    commercial,
    how="inner",
    # predicate="within"
    predicate="intersects"
)

com_idx = j_com["gdf_idx"].unique()

def to_list(v):
    if isinstance(v, list):
        return v
    if v is np.nan or (isinstance(v, float) and np.isnan(v)):
        return []
    return [v]

gdf["ALKIS_Landuse_info"] = gdf["ALKIS_Landuse_info"].apply(to_list)

gdf.loc[com_idx, "ALKIS_Landuse_info"] = (
    gdf.loc[com_idx, "ALKIS_Landuse_info"]
    .apply(lambda lst: lst if "commercial" in lst else lst + ["commercial"])
)

gdf["ALKIS_Landuse_info"] = gdf["ALKIS_Landuse_info"].apply(
    lambda x: np.nan if isinstance(x, list) and len(x) == 0 else x
)

In [None]:
gdf.head()

In [None]:
gdf.info()

In [None]:
gdf['ALKIS_Landuse_info'].value_counts(dropna=False)

In [None]:
industries_ALKIS = gpd.read_file('Areas-of-interest-POIs/Industries_Landuse_ALKIS.gpkg')
print(industries_ALKIS.crs)
industries_ALKIS.head()

In [None]:
industries = industries_ALKIS[["geometry"]].to_crs(gdf.crs)

j_ind = gpd.sjoin(
    gdf[["geometry"]].reset_index(names="gdf_idx"),
    industries,
    how="inner",
    predicate="intersects"
    # predicate="within"
)

ind_idx = j_ind["gdf_idx"].unique()

def to_list(v):
    if isinstance(v, list):
        return v
    if v is np.nan or (isinstance(v, float) and np.isnan(v)):
        return []
    return [v]

gdf["ALKIS_Landuse_info"] = gdf["ALKIS_Landuse_info"].apply(to_list)

gdf.loc[ind_idx, "ALKIS_Landuse_info"] = (
    gdf.loc[ind_idx, "ALKIS_Landuse_info"]
    .apply(lambda lst: lst if "industrial" in lst else lst + ["industrial"])
)

gdf["ALKIS_Landuse_info"] = gdf["ALKIS_Landuse_info"].apply(
    lambda x: np.nan if isinstance(x, list) and len(x) == 0 else x
)

In [None]:
gdf.head()

In [None]:
gdf['ALKIS_Landuse_info'].value_counts(dropna=False)

In [None]:
gdf.info()

In [None]:
public_office_ALKIS = gpd.read_file('Areas-of-interest-POIs/Public-office_Landuse_ALKIS.gpkg')
print(public_office_ALKIS.crs)
public_office_ALKIS.head()

In [None]:
public_office = public_office_ALKIS[["geometry"]].to_crs(gdf.crs)

j_pub = gpd.sjoin(
    gdf[["geometry"]].reset_index(names="gdf_idx"),
    public_office,
    how="inner",
    # predicate="within"
    predicate="intersects"
)

pub_idx = j_pub["gdf_idx"].unique()

def to_list(v):
    if isinstance(v, list):
        return v
    if v is np.nan or (isinstance(v, float) and np.isnan(v)):
        return []
    return [v]

gdf["ALKIS_Landuse_info"] = gdf["ALKIS_Landuse_info"].apply(to_list)

gdf.loc[pub_idx, "ALKIS_Landuse_info"] = (
    gdf.loc[pub_idx, "ALKIS_Landuse_info"]
    .apply(lambda lst: lst if "public_office" in lst else lst + ["public_office"])
)

gdf["ALKIS_Landuse_info"] = gdf["ALKIS_Landuse_info"].apply(
    lambda x: np.nan if isinstance(x, list) and len(x) == 0 else x
)

In [None]:
gdf.head()

In [None]:
gdf['ALKIS_Landuse_info'].value_counts(dropna=False)

In [None]:
sport_ALKIS = gpd.read_file('Areas-of-interest-POIs/Sports-area_Landuse_ALKIS.gpkg')
print(sport_ALKIS.crs)
sport_ALKIS.head()

In [None]:
sport = sport_ALKIS[["geometry"]].to_crs(gdf.crs)

j_sport = gpd.sjoin(
    gdf[["geometry"]].reset_index(names="gdf_idx"),
    sport,
    how="inner",
    # predicate="within"
    predicate="intersects"
)

sport_idx = j_sport["gdf_idx"].unique()

def to_list(v):
    if isinstance(v, list):
        return v
    if v is np.nan or (isinstance(v, float) and np.isnan(v)):
        return []
    return [v]

gdf["ALKIS_Landuse_info"] = gdf["ALKIS_Landuse_info"].apply(to_list)

gdf.loc[sport_idx, "ALKIS_Landuse_info"] = (
    gdf.loc[sport_idx, "ALKIS_Landuse_info"]
    .apply(lambda lst: lst if "sport" in lst else lst + ["sport"])
)

gdf["ALKIS_Landuse_info"] = gdf["ALKIS_Landuse_info"].apply(
    lambda x: np.nan if isinstance(x, list) and len(x) == 0 else x
)

In [None]:
gdf.head()

In [None]:
gdf['ALKIS_Landuse_info'].value_counts(dropna=False)

In [None]:
gdf[gdf['ALKIS_Landuse_info'].isna()].head()

In [None]:
# gdf[gdf['ALKIS_Landuse_info'].isna()].to_file('Buildings-with-no-ALKIS-tags-intersect.gpkg')

In [None]:
# gdf[gdf['ALKIS_Landuse_info'].isna()].info()

In [None]:
gdf.columns

In [None]:
gdf.head()

In [None]:
len(gdf['label_en'].isna())/len(gdf)*100 

In [None]:
gdf[gdf['osm_names'].notna()].head()

In [None]:
gdf['gfk_class'].value_counts()

In [None]:
random_samples = gdf.sample(n=50)

random_samples = random_samples[['gml_id', 'Stadt', 'Strasse', 'HausNr', 'Name', 
                                 'area_m2', 'volume_m3', 'geometry', 'label_de', 'label_en', 
                                 'osm_names', 'class_landuse', 'name_landuse','gfk_class',
                                 'gfk_name', 'ALKIS_Landuse_info']]

In [None]:
random_samples.head()

In [None]:
import requests

HF_TOKEN = "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"

r = requests.get(
    "https://router.huggingface.co/v1/models",
    headers={"Authorization": f"Bearer {HF_TOKEN}"},
    timeout=30,
)

print("STATUS:", r.status_code)
print(r.text[:500])
r.raise_for_status()

data = r.json()
print("Models returned:", len(data.get("data", [])))
for m in data.get("data", [])[:30]:
    print(m.get("id"))


In [None]:
"""
Final runnable script: classify each row of `random_sample` using HF Router + Llama 3.1 8B Instruct.

- Sends ONE row at a time (as JSON) to the model
- Model must return ONLY valid JSON: {"gml_id": ..., "labels": [...], "short_reason": "..."}
- Multi-label allowed; empty list allowed
- Drops heavy geometry by default
- Robust JSON parsing + retry logic
"""

import json
import time
import re
import ast
import requests
import pandas as pd

# -------------------------
# CONFIG
# -------------------------
MODEL = "meta-llama/Meta-Llama-3-8B-Instruct"
URL = "https://router.huggingface.co/v1/chat/completions"

HEADERS = {
    "Authorization": f"Bearer {HF_TOKEN}",
    "Content-Type": "application/json",
}

TARGET_LABELS = [
    "workplace",
    "university",
    "kindergarden",
    "shopping (non essential)",
    "essentials activity",
    "leisure",
]

# -------------------------
# DETAILED SYSTEM PROMPT
# -------------------------
SYSTEM_PROMPT = f"""
You are an expert annotator for urban building function classification.

You will be given ONE building record as JSON with fields like:
- gml_id (unique building id)
- city/address fields (Stadt, Strasse/Road, HausNr)
- names (Name, label_de, label_en, osm_names)
- landuse hints (class_landuse, name_landuse, ALKIS_Landuse_info, gfk_class, gfk_name)
- size proxies (area_m2, volume_m3)
Some fields may be missing (null) or empty lists.

Your task:
Assign ZERO OR MORE labels from this exact allowed list:
{TARGET_LABELS}

Important:
- Think holistically: interpret the record like a human reading a bundle of clues.
- Do NOT blindly trust any single field (landuse/ALKIS/gfk may be misleading or generic).
- Prefer the real-world "what people go there for" function when possible.
- Multi-label is allowed ONLY when the building genuinely supports multiple functions.
- If there is not enough evidence for any label, return an empty list [].

Label meanings (use these interpretations):
- workplace: office/administration/industrial/logistics/production/company premises.
- university: university, campus buildings, institutes, lecture halls, mensa/student services strongly tied to higher education.
- kindergarden: Kita, Kindergarten, Krippe, daycare, early childhood education facilities.
- essentials activity: everyday essential services (supermarket/grocery, pharmacy, doctor/clinic/hospital, basic banking/post, etc.).
- shopping (non essential): retail that is typically discretionary (fashion, electronics, furniture, specialty retail).
- leisure: recreation/culture/sport/entertainment (sports facilities, gyms, riding halls, museums, theaters, cinemas, etc.).

Output format STRICTNESS:
Return ONLY valid JSON. No markdown. No extra text.
Must be exactly:
{{
  "gml_id": "<string or number as provided>",
  "labels": ["<zero or more labels from the allowed list>"],
  "short_reason": "<one short sentence explaining the main evidence>"
}}

Validation rules:
- "labels" must be an array.
- Each label must match one of the allowed labels EXACTLY.
- short_reason should be concise (max ~25 words).
"""

# -------------------------
# HELPERS
# -------------------------
def safe_to_jsonable(v):
    """Convert NaNs and numpy types; keep lists/dicts; try parse list-like strings."""
    if v is None:
        return None
    # pandas NaN
    try:
        if pd.isna(v):
            return None
    except Exception:
        pass

    # Convert numpy scalars to python scalars
    if hasattr(v, "item") and callable(v.item):
        try:
            return v.item()
        except Exception:
            pass

    # If it's already list/dict
    if isinstance(v, (list, dict)):
        return v

    # Try parse strings that look like lists: "['a','b']"
    if isinstance(v, str):
        s = v.strip()
        if s.startswith("[") and s.endswith("]"):
            try:
                parsed = ast.literal_eval(s)
                if isinstance(parsed, list):
                    return parsed
            except Exception:
                return v
    return v


def row_to_prompt_dict(row: pd.Series, drop_geometry=True) -> dict:
    d = {}
    for k, v in row.to_dict().items():
        if drop_geometry and k.lower() == "geometry":
            continue
        d[k] = safe_to_jsonable(v)
    return d


def build_messages(row_dict: dict):
    user_content = (
        "Classify this building record.\n"
        f"Allowed labels: {TARGET_LABELS}\n\n"
        "Building record (JSON):\n"
        + json.dumps(row_dict, ensure_ascii=False)
        + "\n\nReturn only JSON."
    )
    return [
        {"role": "system", "content": SYSTEM_PROMPT.strip()},
        {"role": "user", "content": user_content},
    ]


def extract_json_object(text: str) -> str:
    """
    Some models may accidentally wrap JSON in extra text.
    This extracts the first top-level JSON object {...}.
    """
    text = text.strip()
    # If it's already pure JSON object
    if text.startswith("{") and text.endswith("}"):
        return text

    # Fallback: find first {...} block
    m = re.search(r"\{.*\}", text, flags=re.DOTALL)
    if not m:
        raise ValueError("No JSON object found in model output.")
    return m.group(0)


def validate_result(obj: dict, original_gml_id):
    if not isinstance(obj, dict):
        raise ValueError("Result is not a JSON object.")
    if "gml_id" not in obj or "labels" not in obj or "short_reason" not in obj:
        raise ValueError("Missing required keys (gml_id, labels, short_reason).")
    if not isinstance(obj["labels"], list):
        raise ValueError('"labels" must be a list.')
    for lab in obj["labels"]:
        if lab not in TARGET_LABELS:
            raise ValueError(f'Invalid label: {lab}')
    # Keep gml_id consistent if possible
    # (We won't fail hard if type differs, but we try to preserve original)
    return True


def classify_row_with_llm(row_dict: dict, max_retries=3, backoff_sec=2.0):
    payload = {
        "model": MODEL,
        "messages": build_messages(row_dict),
        "temperature": 0.2,
        "max_tokens": 300,
    }

    original_gml_id = row_dict.get("gml_id", None)

    last_err = None
    for attempt in range(1, max_retries + 1):
        try:
            r = requests.post(URL, headers=HEADERS, json=payload, timeout=60)
            r.raise_for_status()
            data = r.json()
            content = data["choices"][0]["message"]["content"]
            json_str = extract_json_object(content)
            obj = json.loads(json_str)
            validate_result(obj, original_gml_id)
            return obj
        except Exception as e:
            last_err = e
            # small backoff then retry
            time.sleep(backoff_sec * attempt)

    raise RuntimeError(f"Failed to classify row after {max_retries} retries: {last_err}")


# -------------------------
# MAIN: classify random_sample
# -------------------------
# Expect you already have: random_sample = <your dataframe>
# Example:
# random_sample = pd.read_csv("your_file.csv")

def classify_dataframe(random_sample: pd.DataFrame) -> pd.DataFrame:
    assigned = []
    reasons = []

    for idx, row in random_samples.iterrows():
        row_dict = row_to_prompt_dict(row, drop_geometry=True)
        result = classify_row_with_llm(row_dict)

        assigned.append(result.get("labels", []))
        reasons.append(result.get("short_reason", ""))

        # Optional: progress print
        if (len(assigned) % 10) == 0:
            print(f"Classified {len(assigned)} / {len(random_samples)} rows...")

    out = random_samples.copy()
    out["assigned_classes"] = assigned
    out["llm_reasoning"] = reasons
    return out


# ---- RUN ----
classified_df = classify_dataframe(random_samples)
# classified_df.to_csv("random_sample_classified.csv", index=False)
# print("Saved -> random_sample_classified.csv")

In [None]:
classified_df.head()

In [None]:
import pandas as pd
import geopandas as gpd
import folium
from folium.features import GeoJsonTooltip

df2 =classified_df.copy()

# robust geometry handling
geom_col = next((c for c in df2.columns if str(c).strip().lower() in ["geometry","geom","wkt","the_geom"]), None)
if geom_col is None:
    raise ValueError(f"No geometry column found. Columns: {list(df2.columns)}")
if geom_col != "geometry":
    df2 = df2.rename(columns={geom_col: "geometry"})

if pd.api.types.is_string_dtype(df2["geometry"]):
    df2["geometry"] = gpd.GeoSeries.from_wkt(df2["geometry"])

gdf = gpd.GeoDataFrame(df2, geometry="geometry")
gdf = gdf[~gdf.geometry.isna()].copy()
gdf["geometry"] = gdf.geometry.buffer(0)

if gdf.crs is None:
    gdf = gdf.set_crs(25832, allow_override=True)

gdf = gdf.to_crs(4326)

# flatten list column for styling/tooltip
if "assigned_classes" in gdf.columns:
    gdf["assigned_class"] = gdf["assigned_classes"].apply(
        lambda x: x[0] if isinstance(x, list) and len(x) else "unknown"
    )
else:
    gdf["assigned_class"] = "unknown"

# center map
c = gdf.geometry.centroid
m = folium.Map(location=[float(c.y.mean()), float(c.x.mean())], zoom_start=11, tiles="OpenStreetMap")

tooltip_cols = [c for c in ["gml_id","Stadt","Strasse","HausNr","label_en","area_m2","volume_m3","assigned_class"] if c in gdf.columns]

folium.GeoJson(
    gdf,
    name="buildings",
    style_function=lambda feat: {"weight": 1, "fillOpacity": 0.5},
    tooltip=GeoJsonTooltip(fields=tooltip_cols, aliases=tooltip_cols, sticky=False)
).add_to(m)

folium.LayerControl().add_to(m)

m  # in notebook
