In [23]:
xmlhome = "https://grunddatamodel.datafordeler.dk/domaenemodeller/"
xmlfolder = "/Users/holmes/local_dev/semanticGIS/tests/semantictest/xml_models"



In [4]:
from html.parser import HTMLParser
from pathlib import Path
from urllib.parse import urljoin, unquote
from urllib.request import urlopen
import re


class _LinkCollector(HTMLParser):
    """Collects href targets from the simple directory listings."""

    def __init__(self):
        super().__init__()
        self.links: list[str] = []

    def handle_starttag(self, tag, attrs):
        if tag != "a":
            return
        href = dict(attrs).get("href")
        if href:
            self.links.append(href)


def _list_links(url: str) -> list[str]:
    with urlopen(url) as response:
        html = response.read().decode("utf-8", errors="ignore")
    parser = _LinkCollector()
    parser.feed(html)
    return parser.links


def _version_key(filename: str):
    name = Path(filename).name
    if not name.lower().endswith(".xml") or "_" not in name:
        return None
    version_candidate = name.split("_", 1)[0]
    if not re.fullmatch(r"\d+(?:\.\d+)*", version_candidate):
        return None
    return tuple(int(part) for part in version_candidate.split("."))


def _latest_xml(register_url: str):
    candidates = []
    for href in _list_links(register_url):
        if not href.lower().endswith(".xml") or "?" in href:
            continue
        version = _version_key(href)
        if version is None:
            continue
        candidates.append((version, href))
    if not candidates:
        return None
    candidates.sort(key=lambda item: item[0], reverse=True)
    return candidates[0][1]


base_url = xmlhome.rstrip("/") + "/"
output_root = Path(xmlfolder)
output_root.mkdir(parents=True, exist_ok=True)

register_dirs = [
    href for href in _list_links(base_url) if href.endswith("/") and href not in {"../", "./"}
 ]

completed = []
for href in register_dirs:
    register_url = urljoin(base_url, href)
    decoded_href = unquote(href.strip("/"))
    register_name = Path(decoded_href).name or "root"
    try:
        latest_file = _latest_xml(register_url)
    except Exception as exc:
        print(f"Failed to inspect {register_name}: {exc}")
        continue
    if not latest_file:
        print(f"No XML versions found for {register_name}, skipping.")
        continue
    download_url = urljoin(register_url, latest_file)
    destination_dir = output_root / register_name
    destination_dir.mkdir(parents=True, exist_ok=True)
    file_name = unquote(Path(latest_file).name)
    destination_path = destination_dir / file_name
    try:
        with urlopen(download_url) as response:
            destination_path.write_bytes(response.read())
        print(f"Downloaded {file_name} to {destination_path}")
        completed.append(register_name)
    except Exception as exc:
        print(f"Failed to download {register_name} ({latest_file}): {exc}")

if completed:
    print(f"Fetched latest XML for {len(completed)} registers.")
else:
    print("No XML models downloaded.")

No XML versions found for root, skipping.
Downloaded 2.4.2_BygningerOgBoliger.xml to /Users/holmes/local_dev/semanticGIS/tests/semantictest/xml_models/BygningerOgBoliger/2.4.2_BygningerOgBoliger.xml
Downloaded 2.2.0_CentraleVirksomhedsregister.xml to /Users/holmes/local_dev/semanticGIS/tests/semantictest/xml_models/CentraleVirksomhedsregister/2.2.0_CentraleVirksomhedsregister.xml
Downloaded 2.0.0_DAGI.xml to /Users/holmes/local_dev/semanticGIS/tests/semantictest/xml_models/DAGI/2.0.0_DAGI.xml
Downloaded 2.0.2_DanmarksAdresser.xml to /Users/holmes/local_dev/semanticGIS/tests/semantictest/xml_models/DanmarksAdresser/2.0.2_DanmarksAdresser.xml
Downloaded 1.2.1_DHMOprindelse.xml to /Users/holmes/local_dev/semanticGIS/tests/semantictest/xml_models/DHMOprindelse/1.2.1_DHMOprindelse.xml
Downloaded 1.3.0_Ejendomsbeliggenhed.xml to /Users/holmes/local_dev/semanticGIS/tests/semantictest/xml_models/Ejendomsbeliggenhed/1.3.0_Ejendomsbeliggenhed.xml
Downloaded 1.1.3_Ejendomsvurdering.xml to /Users/

In [24]:
from __future__ import annotations

import json
from pathlib import Path
import xml.etree.ElementTree as ET


def _tag_name(elem):
    tag = elem.tag
    if isinstance(tag, str):
        if tag.startswith("{") and "}" in tag:
            return tag.split("}", 1)[1]
        if ":" in tag:
            return tag.split(":", 1)[1]
    return tag


def _attr(elem, name):
    if name in elem.attrib:
        return elem.attrib[name]
    for key, value in elem.attrib.items():
        if key.endswith(name):
            return value
        if "}" in key and key.split("}", 1)[1] == name:
            return value
        if ":" in key and key.split(":", 1)[1] == name:
            return value
    return None


def _xmi_type(elem):
    for key in (
        "{http://www.omg.org/spec/XMI/20131001}type",
        "{http://www.omg.org/spec/XMI/20110701}type",
        "xmi:type",
    ):
        if key in elem.attrib:
            return elem.attrib[key]
    for key, value in elem.attrib.items():
        if key.endswith("}type"):
            return value
    return None


def _index_elements(root):
    index = {}
    for elem in root.iter():
        elem_id = _attr(elem, "id")
        if not elem_id or elem_id in index:
            continue
        index[elem_id] = {
            "name": elem.attrib.get("name"),
            "umlType": elem.attrib.get("UMLType")
            or elem.attrib.get("umlType")
            or _xmi_type(elem)
            or _tag_name(elem),
            "xmiType": _xmi_type(elem),
            "tag": _tag_name(elem),
        }
    return index


def _version_tuple(filename: str) -> tuple[int, ...]:
    candidate = Path(filename).stem.split("_", 1)[0]
    parts = []
    for part in candidate.split("."):
        if not part.isdigit():
            return ()
        parts.append(int(part))
    return tuple(parts)


def _parse_attributes(class_elem, class_name, id_index):
    attrs = []
    for child in class_elem:
        if _tag_name(child) != "ownedAttribute":
            continue
        attr_name = child.attrib.get("name")
        if not attr_name:
            continue
        attr_kind = _xmi_type(child) or _tag_name(child)
        attr_id = _attr(child, "id")
        type_ref = child.attrib.get("type") or _attr(child, "type")
        association_id = child.attrib.get("association") or _attr(child, "association")
        lower = child.attrib.get("lower")
        upper = child.attrib.get("upper")
        for grandchild in child:
            tag = _tag_name(grandchild)
            if tag == "lowerValue":
                lower = grandchild.attrib.get("value")
            elif tag == "upperValue":
                upper = grandchild.attrib.get("value")
            elif tag == "type":
                type_ref = (
                    _attr(grandchild, "idref")
                    or grandchild.attrib.get("href")
                    or grandchild.attrib.get("value")
                    or type_ref
                )
        type_meta = id_index.get(type_ref or "")
        type_name = type_meta.get("name") if type_meta else None
        type_uml = None
        if type_meta:
            type_uml = (
                type_meta.get("umlType")
                or type_meta.get("xmiType")
                or type_meta.get("tag")
            )
        lower_disp = lower if lower not in (None, "") else "0"
        upper_disp = upper if upper not in (None, "", "-1") else "*"
        attrs.append(
            {
                "id": attr_id,
                "name": attr_name,
                "sourceClass": class_name,
                "typeId": type_ref,
                "typeName": type_name,
                "typeUml": type_uml,
                "type": attr_kind,
                "lowerMultiplicity": lower_disp,
                "upperMultiplicity": upper_disp,
                "cardinality": f"{lower_disp}..{upper_disp}",
                "associationId": association_id,
                "isAssociation": bool(association_id),
                "isForeignKey": bool(association_id),
            }
        )
    return attrs


def _dedup_attributes(attributes):
    seen = set()
    ordered = []
    for attr in attributes:
        key = (attr["name"], attr.get("sourceClass"))
        if key in seen:
            continue
        seen.add(key)
        ordered.append(dict(attr))
    return ordered


def _collect_inherited(class_id, classes, memo, stack=None):
    # Walk the generalization tree to gather inherited attributes once per class.
    if class_id in memo:
        return [dict(attr) for attr in memo[class_id]]
    if stack is None:
        stack = set()
    if class_id in stack:
        return []
    stack.add(class_id)
    inherited = []
    for parent_id in classes.get(class_id, {}).get("parents", []):
        parent = classes.get(parent_id)
        if not parent:
            continue
        inherited.extend(dict(attr) for attr in parent["ownAttributes"])
        inherited.extend(_collect_inherited(parent_id, classes, memo, stack))
    stack.remove(class_id)
    memo[class_id] = _dedup_attributes(inherited)
    return [dict(attr) for attr in memo[class_id]]


def _find_register_package(root):
    for elem in root:
        if _tag_name(elem) != "Model":
            continue
        for child in elem:
            if _tag_name(child) == "packagedElement" and _xmi_type(child) == "uml:Package":
                return _attr(child, "id"), child.attrib.get("name")
        break
    return None, None


def _summarize_collection(xml_path, register_name):
    tree = ET.parse(xml_path)
    root = tree.getroot()
    id_index = _index_elements(root)
    register_package_id, register_package_name = _find_register_package(root)

    classes = {}
    for elem in root.iter():
        if _tag_name(elem) != "packagedElement":
            continue
        if _xmi_type(elem) != "uml:Class":
            continue
        class_id = _attr(elem, "id")
        if not class_id:
            continue
        class_name = elem.attrib.get("name") or class_id
        parents = []
        for child in elem:
            if _tag_name(child) == "generalization":
                parent_id = _attr(child, "general")
                if parent_id:
                    parents.append(parent_id)
        classes[class_id] = {
            "id": class_id,
            "name": class_name,
            "isAbstract": (_attr(elem, "isAbstract") or "").lower() in {"true", "1"},
            "parents": parents,
            "ownAttributes": _parse_attributes(elem, class_name, id_index),
        }

    inheritance_cache = {}
    class_entries = []
    for class_id, class_info in sorted(classes.items(), key=lambda item: item[1]["name"]):
        inherited = _collect_inherited(class_id, classes, inheritance_cache, set())
        own_attrs = [dict(attr) for attr in class_info["ownAttributes"]]
        all_attrs = _dedup_attributes(own_attrs + inherited)
        class_entries.append(
            {
                "id": class_id,
                "register": register_name,
                "name": class_info["name"],
                "isAbstract": class_info["isAbstract"],
                "ownAttributes": own_attrs,
                "inheritedAttributes": inherited,
                "attributes": all_attrs,
                "attributeCount": len(all_attrs),
            }
        )

    abstract_names = sorted(
        class_info["name"]
        for class_info in classes.values()
        if class_info["isAbstract"]
    )
    register_package = None
    if register_package_id or register_package_name:
        register_package = {
            "id": register_package_id,
            "name": register_package_name,
        }
    return {
        "xmlFile": xml_path.name,
        "registerPackage": register_package,
        "classes": class_entries,
        "abstractClasses": abstract_names,
    }


root_dir = Path(xmlfolder)
if not root_dir.exists():
    raise FileNotFoundError(f"XML folder not found: {root_dir}")

collection_summaries = {}
for register_dir in sorted((p for p in root_dir.iterdir() if p.is_dir()), key=lambda p: p.name):
    xml_files = sorted(
        register_dir.glob("*.xml"),
        key=lambda path: (_version_tuple(path.name), path.name),
    )
    if not xml_files:
        continue
    xml_path = xml_files[-1]
    try:
        collection_summaries[register_dir.name] = _summarize_collection(xml_path, register_dir.name)
    except Exception as exc:
        print(f"Failed to summarize {xml_path}: {exc}")

if not collection_summaries:
    print(f"No XML models found under {root_dir}")
else:
    summary_path = root_dir / "class_summary.json"
    summary_path.write_text(
        json.dumps(collection_summaries, ensure_ascii=False, indent=2),
        encoding="utf-8",
    )
    print(f"Wrote summaries for {len(collection_summaries)} registers to {summary_path}")

    preview_count = 3
    for register_name in list(collection_summaries)[:preview_count]:
        info = collection_summaries[register_name]
        print(
            f"{register_name}: {len(info['classes'])} classes / "
            f"{len(info['abstractClasses'])} abstract"
        )
        for cls in info["classes"][:3]:
            sample = ", ".join(attr["name"] for attr in cls["attributes"][:3])
            if not sample:
                sample = "no attributes"
            print(f"  - {cls['name']} ({cls['attributeCount']} attrs) {sample}")
        print()


Wrote summaries for 17 registers to /Users/holmes/local_dev/semanticGIS/tests/semantictest/xml_models/class_summary.json
BygningerOgBoliger: 12 classes / 1 abstract
  - BBRSag (34 attrs) sag001Byggesagsnummer, sag002Byggesagsdato, sag003Byggetilladelsesdato
  - Bygning (90 attrs) byg007Bygningsnummer, byg021BygningensAnvendelse, byg024AntalLejlighederMedKøkken
  - Bygværkselement (12 attrs) forretningshændelse, forretningsområde, status

CentraleVirksomhedsregister: 27 classes / 4 abstract
  - AndenDeltager (11 attrs) personNavn, cvrAdresse, andenDeltagerType
  - CVRAdresse (26 attrs) adresseFritekst, bogstav, bygningsnummer
  - CVREnhed (8 attrs) id, registreringFra, registreringsaktør

DAGI: 54 classes / 1 abstract
  - AdministrativInddeling (11 attrs) id, navn, registreringFra
  - Afstemningsområde (17 attrs) afstemningsområdenummer, afstemningsstedNavn, afstemningsstedAdresse
  - Afstemningsområde_2000000 (17 attrs) afstemningsområdenummer, afstemningsstedNavn, afstemningsstedAdres

In [25]:
TARGET_DOC_TAGS = {
    "definition (da)": "definitionDa",
    "example (da)": "exampleDa",
    "comment (da)": "commentDa",
}


def _clean_doc_text(raw: str | None) -> str | None:
    if not raw:
        return None
    text = raw.replace("&lt;memo&gt;", "").replace("&lt;/memo&gt;", "")
    text = text.replace("<memo>", "").replace("</memo>", "")
    if "#NOTES#" in text:
        text = text.split("#NOTES#", 1)[1]
    text = text.replace("&#xA;", "\n").strip()
    return text or None


def _extract_tag_bundle(tags_elem) -> dict[str, str] | None:
    if tags_elem is None:
        return None
    bundle = {}
    for tag in tags_elem:
        tag_name = tag.attrib.get("name")
        target_key = TARGET_DOC_TAGS.get(tag_name)
        if not target_key:
            continue
        value = _clean_doc_text(tag.attrib.get("notes") or tag.attrib.get("value"))
        if value:
            bundle[target_key] = value
    return bundle or None


def _collect_documentation(xml_path: Path) -> dict[str, dict[str, str]]:
    tree = ET.parse(xml_path)
    root = tree.getroot()
    extension = None
    for elem in root:
        if _tag_name(elem) == "Extension":
            extension = elem
            break
    if extension is None:
        return {}
    elements_container = None
    for child in extension:
        if _tag_name(child) == "elements":
            elements_container = child
            break
    if elements_container is None:
        return {}

    doc_map: dict[str, dict[str, str]] = {}
    for element in elements_container:
        elem_id = _attr(element, "idref") or _attr(element, "id")
        if not elem_id:
            continue
        tag_node = next((child for child in element if _tag_name(child) == "tags"), None)
        element_doc = _extract_tag_bundle(tag_node)
        if element_doc:
            doc_map[elem_id] = element_doc
        for child in element:
            if _tag_name(child) != "attributes":
                continue
            for attr_elem in child:
                attr_id = _attr(attr_elem, "idref") or _attr(attr_elem, "id")
                if not attr_id:
                    continue
                attr_tags = next((gc for gc in attr_elem if _tag_name(gc) == "tags"), None)
                attr_doc = _extract_tag_bundle(attr_tags)
                if attr_doc:
                    doc_map[attr_id] = attr_doc
    return doc_map


summary_path = Path(xmlfolder) / "class_summary.json"
if not summary_path.exists():
    raise FileNotFoundError(f"Summary not found, run the previous cell first: {summary_path}")

data = json.loads(summary_path.read_text(encoding="utf-8"))

for register_name, register_info in data.items():
    xml_file = register_info.get("xmlFile")
    if not xml_file:
        continue
    xml_path = Path(xmlfolder) / register_name / xml_file
    if not xml_path.exists():
        print(f"Missing XML for {register_name}, skipping descriptions")
        continue
    doc_map = _collect_documentation(xml_path)
    register_pkg = register_info.get("registerPackage") or {}
    register_pkg_id = register_pkg.get("id") if isinstance(register_pkg, dict) else None
    if register_pkg_id and register_pkg_id in doc_map:
        register_info["registerDocumentation"] = doc_map[register_pkg_id]
    for cls in register_info.get("classes", []):
        cls_doc = doc_map.get(cls.get("id"))
        if cls_doc:
            cls["documentation"] = cls_doc
        for bucket in ("ownAttributes", "inheritedAttributes", "attributes"):
            for attr in cls.get(bucket, []):
                attr_doc = doc_map.get(attr.get("id"))
                if attr_doc:
                    attr["documentation"] = attr_doc

summary_path.write_text(
    json.dumps(data, ensure_ascii=False, indent=2),
    encoding="utf-8",
)
print("Enriched documentation for classes, attributes, and registers.")

Enriched documentation for classes, attributes, and registers.


In [26]:
from collections import defaultdict

summary_path = Path(xmlfolder) / "class_summary.json"
if not summary_path.exists():
    raise FileNotFoundError(f"Summary not found, run the previous cell first: {summary_path}")

data = json.loads(summary_path.read_text(encoding="utf-8"))
class_index: dict[str, dict[str, str | None]] = {}
for register_name, register_info in data.items():
    for cls in register_info.get("classes", []):
        class_index[cls.get("id")] = {
            "register": register_name,
            "className": cls.get("name"),
            "registerXml": register_info.get("xmlFile"),
        }

def _enrich_attribute(attr: dict) -> bool:
    if not attr.get("isForeignKey"):
        attr.pop("foreignKeyTarget", None)
        return False
    target_id = attr.get("typeId")
    target_meta = class_index.get(target_id)
    if target_meta:
        attr["foreignKeyTarget"] = {
            "classId": target_id,
            "className": target_meta.get("className"),
            "register": target_meta.get("register"),
            "registerXml": target_meta.get("registerXml"),
        }
    else:
        attr["foreignKeyTarget"] = {
            "classId": target_id,
            "className": attr.get("typeName"),
            "register": None,
            "registerXml": None,
        }
    return True

fk_counts = defaultdict(int)
for register_name, register_info in data.items():
    for cls in register_info.get("classes", []):
        for bucket in ("ownAttributes", "inheritedAttributes", "attributes"):
            for attr in cls.get(bucket, []):
                if _enrich_attribute(attr):
                    fk_counts[register_name] += 1

summary_path.write_text(
    json.dumps(data, ensure_ascii=False, indent=2),
    encoding="utf-8",
)
print(
    f"Annotated foreign keys for {sum(fk_counts.values())} attributes across "
    f"{len([k for k in fk_counts if fk_counts[k]])} registers"
)
for register_name, count in sorted(fk_counts.items()):
    if count:
        print(f"  - {register_name}: {count}")


Annotated foreign keys for 787 attributes across 12 registers
  - BygningerOgBoliger: 84
  - CentraleVirksomhedsregister: 256
  - DAGI: 64
  - DanmarksAdresser: 34
  - Ejendomsbeliggenhed: 8
  - Ejendomsvurdering: 16
  - Ejerfortegnelsen: 64
  - GeoDanmark: 12
  - Matrikel: 100
  - Person: 107
  - SkatteforvaltningensVirksomhedsregister: 26
  - Stednavne: 16
