In [23]:
import json
import re # For regular expressions, useful for cleaning blank node IDs
from rdflib import Graph, Literal, URIRef, BNode # Import Graph, Literal, URIRef, BNode from rdflib
from rdflib.namespace import RDF, DCAT, DCTERMS, FOAF, PROV, SKOS, XSD, Namespace # Import necessary namespaces


In [16]:
dcat_turtle_document = """
@prefix cnt: <http://www.w3.org/2011/content#> .
@prefix dcat: <http://www.w3.org/ns/dcat#> .
@prefix dcterms: <http://purl.org/dc/terms/> .
@prefix dqv: <http://www.w3.org/ns/dqv#> .
@prefix foaf: <http://xmlns.com/foaf/0.1/> .
@prefix geo: <http://www.opengis.net/ont/geosparql#> .
@prefix locn: <http://www.w3.org/ns/locn#> .
@prefix owl: <http://www.w3.org/2002/07/owl#> .
@prefix prov: <http://www.w3.org/ns/prov#> .
@prefix schema1: <http://schema.org/> .
@prefix skos: <http://www.w3.org/2004/02/skos/core#> .
@prefix vcard: <http://www.w3.org/2006/vcard/ns#> .
@prefix xsd: <http://www.w3.org/2001/XMLSchema#> .
@prefix dcatapnl: <http://modellen.geostandaarden.nl/dcat-ap-nl/> .

<http://publications.europa.eu/resource/authority/frequency/IRREG> a dcterms:Frequency .

<http://www.opengis.net/def/crs/EPSG/0/4326> a dcterms:Standard ;
    dcterms:type <http://inspire.ec.europa.eu/glossary/SpatialReferenceSystem> .

<https://dataplatform.knmi.nl/catalog/datasets/index.html?x-dataset=waarneemstations&x-dataset-version=5> a foaf:Document ;
    dcterms:title "KNMI Data Platform"@en .

<http://publications.europa.eu/resource/authority/language/ENG> a dcterms:LinguisticSystem .

<http://inspire.ec.europa.eu/metadata-codelist/ResponsiblePartyRole/pointOfContact> a dcat:Role .

[] a dcat:Dataset ;
    dcterms:accrualPeriodicity <http://publications.europa.eu/resource/authority/frequency/IRREG> ;
    dcterms:description "KNMI collects observations from the automatic weather stations situated in the Netherlands and BES islands on locations such as aerodromes, North Sea platforms and wind poles. This dataset provides metadata on these weather stations, such as location, name and type. The data in this dataset is formatted as NetCDF. It is also available as a CSV file in this dataset: https://dataplatform.knmi.nl/dataset/waarneemstations-csv-1-0."@en,
        "Het KNMI verzamelt gegevens van automatische waarneemstations in Nederland en BES eilanden op locaties zoals luchthavens (Aerodrome), Noordzee platformen en windpalen. Deze dataset bevat de metadata van de weerstations, zoals locatie, naam en type. De data in deze dataset is beschikbaar in NetCDF formaat. De data is ook beschikbaar als CSV bestand in de volgende dataset: https://dataplatform.knmi.nl/dataset/waarneemstations-csv-1-0."@nl ;
    dcterms:identifier "urn:xkdc:ds:nl.knmi::waarneemstations/5/"^^xsd:anyURI ;
    dcterms:issued "2023-09-28"^^xsd:date ;
    dcterms:language <http://publications.europa.eu/resource/authority/language/ENG> ;
    dcterms:provenance [ a dcterms:ProvenanceStatement ;
            dcterms:description "Station archive KNMI"@en,
                "Stationsarchief KNMI"@nl ] ;
     dcterms:spatial [ a dcterms:Location ;
            dcat:bbox "POLYGON((3.3130 50.7500, 7.2275 50.7500, 7.2275 53.6755, 3.3130 53.6755, 3.3130 50.7500))"^^geo:wktLiteral ] ;
    dcterms:temporal [ a dcterms:PeriodOfTime ;
            schema1:endDate "9999-12-31"^^xsd:date ;
            schema1:startDate "2023-09-28"^^xsd:date ;
            dcat:endDate "9999-12-31"^^xsd:date ;
            dcat:startDate "2023-09-28"^^xsd:date ] ;
    dcterms:title "Meteo data - information on stations in the KNMI observations network"@en,
        "Meteo data - informatie over waarneemstations in het KNMI netwerk"@nl ;
    dcat:contactPoint [ a vcard:Organization, vcard:Kind ;
            vcard:fn "Koninklijk Nederlands Meteorologisch Instituut (KNMI)"@en,
                "Koninklijk Nederlands Meteorologisch Instituut (KNMI)"@nl ;
            vcard:hasEmail <mailto:opendata@knmi.nl> ] ;
    dcat:keyword "Inspire dataset"@en,
        "Inspire dataset"@nl ;
    dcat:landingPage <https://dataplatform.knmi.nl/catalog/datasets/index.html?x-dataset=waarneemstations&x-dataset-version=5> ;
    dcat:theme [ a skos:Concept ;
            skos:inScheme [ a skos:ConceptScheme ;
                    dcterms:issued "2013-01-01"^^xsd:date ;
                    dcterms:title "DG Discovery Properties Vocabulary 1.0"@en ] ;
            skos:prefLabel "Observation station"@en,
                "Waarneemstation"@nl ],
        [ a skos:Concept ;
            skos:inScheme [ a skos:ConceptScheme ;
                    dcterms:issued "2008-06-01"^^xsd:date ;
                    dcterms:title "GEMET - INSPIRE themes, version 1.0"@en ] ;
            skos:prefLabel "Environmental monitoring facilities"@en,
                "Milieubewakingsvoorzieningen"@nl ],
        [ a skos:Concept ;
            skos:inScheme [ a skos:ConceptScheme ;
                    dcterms:issued "2013-01-01"^^xsd:date ;
                    dcterms:title "DG Discovery Properties Vocabulary 1.0"@en ] ;
            skos:prefLabel "Network"@en,
                "Netwerk"@nl ],
        [ a skos:Concept ;
            skos:inScheme [ a skos:ConceptScheme ;
                    dcterms:issued "2013-01-01"^^xsd:date ;
                    dcterms:title "DG Discovery Properties Vocabulary 1.0"@en ] ;
            skos:prefLabel "Sensors"@en,
                "Sensoren"@nl ] ;
    dcat:spatialResolutionInMeters "1"^^xsd:decimal ;
    prov:qualifiedAttribution [ a prov:Attribution ;
            dcterms:type <http://inspire.ec.europa.eu/metadata-codelist/ResponsiblePartyRole/pointOfContact> ;
            dcat:hadRole <http://inspire.ec.europa.eu/metadata-codelist/ResponsiblePartyRole/pointOfContact> ;
            prov:agent [ a prov:Agent,
                        foaf:Organization ;
                    foaf:mbox <mailto:opendata@knmi.nl> ;
                    foaf:name "Koninklijk Nederlands Meteorologisch Instituut (KNMI)"@en,
                        "Koninklijk Nederlands Meteorologisch Instituut (KNMI)"@nl ] ] ;
    .


"""

In [24]:
def dcat_turtle_to_ogc_records_feature(turtle_string: str) -> dict:
    """
    Converts a DCAT Dataset RDF Turtle string to an OGC API Records compatible
    GeoJSON Feature structure by directly parsing the Turtle.

    Args:
        turtle_string (str): The input DCAT Dataset RDF data in Turtle format.

    Returns:
        dict: A GeoJSON Feature dictionary representing the OGC API Record.
              Returns an empty dictionary if parsing or conversion fails.
    """
    g = Graph()
    try:
        # Parse the Turtle string into an RDF graph
        g.parse(data=turtle_string, format="turtle")
    except Exception as e:
        print(f"Error parsing Turtle string: {e}")
        return {}

    # Define common prefixes for cleaner JSON keys
    # Add any other prefixes you expect in your data
    prefixes = {
        str(DCAT): "dcat",
        str(DCTERMS): "dcterms",
        str(FOAF): "foaf",
        str(PROV): "prov",
        str(SKOS): "skos",
        str(XSD): "xsd",
        str(Namespace("http://www.w3.org/2006/vcard/ns#")): "vcard",
        str(Namespace("http://www.w3.org/2011/content#")): "cnt", # content namespace
        str(Namespace("http://www.w3.org/ns/dqv#")): "dqv", # dqv namespace
        str(Namespace("http://www.opengis.net/ont/geosparql#")): "geo", # geosparql namespace
        str(Namespace("http://www.w3.org/2006/vcard/ns#")): "locn", # locn namespace (corrected in Turtle)
        str(Namespace("http://www.w3.org/2002/07/owl#")): "owl", # owl namespace
        str(Namespace("http://schema.org/")): "schema1" # For schema1:endDate etc.
    }

    def get_prefixed_name(uri_ref: URIRef) -> str:
        """Converts a full URI into a prefixed name (e.g., http://www.w3.org/ns/dcat#Dataset -> dcat:Dataset)."""
        uri_str = str(uri_ref)
        for prefix_uri, prefix_name in prefixes.items():
            if uri_str.startswith(prefix_uri):
                return f"{prefix_name}:{uri_str[len(prefix_uri):]}"
        return uri_str # Return full URI if no prefix found

    def process_node_to_json(node, graph, visited_bnodes: set):
        """
        Recursively processes an RDF node (URI, Literal, or Blank Node) into a JSON value.
        `visited_bnodes` is used to prevent infinite recursion for cyclic graphs.
        """
        if isinstance(node, URIRef):
            return str(node)
        elif isinstance(node, Literal):
            # Handle literals with language tags or datatypes
            if node.language:
                return {node.language: str(node)}
            elif node.datatype:
                # Attempt to convert to Python native types based on xsd:datatype
                if node.datatype == XSD.integer:
                    return int(node)
                elif node.datatype == XSD.decimal or node.datatype == XSD.double:
                    return float(node)
                elif node.datatype == XSD.boolean:
                    return str(node).lower() == 'true'
                elif node.datatype == URIRef("http://www.opengis.net/ont/geosparql#geoJSONLiteral"):
                    try:
                        return json.loads(str(node))
                    except json.JSONDecodeError:
                        return str(node) # Fallback if not valid JSON
                else:
                    return str(node) # Default to string for other datatypes
            else:
                # Simple string literal
                return str(node)
        elif isinstance(node, BNode):
            # Handle blank nodes by recursively processing their properties
            if node in visited_bnodes:
                # Avoid infinite recursion for cyclic graphs by returning its ID
                return f"_:{node}" 
            
            visited_bnodes.add(node) # Mark as visited

            bnode_obj = {}
            # Iterate over predicate-object pairs for this blank node
            for p, o in graph.predicate_objects(node):
                prop_key = get_prefixed_name(p)
                value = process_node_to_json(o, graph, visited_bnodes)

                if prop_key not in bnode_obj:
                    bnode_obj[prop_key] = value
                else:
                    # If property already exists, convert to list if not already
                    if not isinstance(bnode_obj[prop_key], list):
                        bnode_obj[prop_key] = [bnode_obj[prop_key]]
                    # Handle multi-language values correctly if adding to a list
                    if isinstance(value, dict) and len(value) == 1 and list(value.keys())[0].isalpha() and len(list(value.keys())[0]) <= 3 and \
                       isinstance(bnode_obj[prop_key], list) and \
                       all(isinstance(item, dict) and len(item) == 1 and list(item.keys())[0].isalpha() and len(list(item.keys())[0]) <= 3 for item in bnode_obj[prop_key]):
                        # If all existing items and new value are single-language dicts, merge them
                        merged_lang_dict = {}
                        for item in bnode_obj[prop_key]:
                            merged_lang_dict.update(item)
                        merged_lang_dict.update(value)
                        bnode_obj[prop_key] = merged_lang_dict
                    else:
                        bnode_obj[prop_key].append(value)
            return bnode_obj
        else:
            return str(node) # Fallback for unexpected node types


    # Find the main dcat:Dataset. In your Turtle, it's a blank node.
    main_dataset_subject = None
    for s, p, o in g.triples((None, RDF.type, DCAT.Dataset)):
        main_dataset_subject = s
        break # Assuming one main dataset for this transformation

    if not main_dataset_subject:
        print("Error: Could not find the main dcat:Dataset in the Turtle content.")
        return {}

    # Start processing from the main dataset subject
    # Use a fresh set for visited_bnodes for the top-level processing
    processed_dataset = process_node_to_json(main_dataset_subject, g, set())

    # Initialize the OGC API Records Feature structure
    ogc_record = {
        "type": "Feature",
        # Use dcterms:identifier if available, otherwise a generated ID or blank node ID
        "id": processed_dataset.get("dcterms:identifier", str(main_dataset_subject)),
        "geometry": None, # Will be populated from dcat:bbox
        "properties": {}
    }
    
    # Iterate through the processed dataset and map properties
    properties = ogc_record["properties"]

    for key, value in processed_dataset.items():
        # Skip internal JSON-LD/RDF properties that don't map directly
        if key in ["@type", "@id"]:
            continue

        # Map dcat:bbox to geometry
        if key == "dcat:bbox":
            ogc_record["geometry"] = value
            continue

        # Handle specific mappings for OGC API Records
        if key == "dcterms:title":
            properties["title"] = value
        elif key == "dcterms:description":
            properties["description"] = value
        elif key == "dcterms:issued":
            properties["issued"] = value
        elif key == "dcterms:language":
            properties["language"] = value
        elif key == "dcat:keyword":
            # Keywords might be a language map or a list of strings
            if isinstance(value, dict):
                properties["keywords"] = list(value.values())
            elif isinstance(value, list):
                properties["keywords"] = value
            else:
                properties["keywords"] = [value]
        elif key == "dcat:landingPage":
            if value:
                ogc_record.setdefault("links", []).append({
                    "rel": "about",
                    "href": value,
                    "type": "text/html",
                    "title": properties.get("title", {}).get("en", "Landing Page")
                })
        elif key == "dcat:spatialResolutionInMeters":
            properties["spatialResolutionInMeters"] = value
        elif key == "dcterms:provenance":
            # Provenance is a nested object
            properties["provenance"] = value
        elif key == "dcterms:temporal":
            # Temporal is a nested object
            properties["temporalExtent"] = {
                "startDate": value.get("schema1:startDate") or value.get("dcat:startDate"),
                "endDate": value.get("schema1:endDate") or value.get("dcat:endDate")
            }
        elif key == "dcat:contactPoint":
            # ContactPoint is a nested object
            properties["contactPoint"] = {
                "organizationName": value.get("vcard:fn"),
                "email": value.get("vcard:hasEmail")
            }
        elif key == "dcat:theme":
            # Theme is a list of nested objects
            properties["themes"] = []
            if isinstance(value, list):
                for theme_item in value:
                    theme_obj = {"prefLabel": theme_item.get("skos:prefLabel")}
                    if "skos:inScheme" in theme_item:
                        theme_obj["inScheme"] = {"title": theme_item["skos:inScheme"].get("dcterms:title")}
                    properties["themes"].append(theme_obj)
            else: # Handle single theme
                theme_obj = {"prefLabel": value.get("skos:prefLabel")}
                if "skos:inScheme" in value:
                    theme_obj["inScheme"] = {"title": value["skos:inScheme"].get("dcterms:title")}
                properties["themes"].append(theme_obj)
        elif key == "prov:qualifiedAttribution":
            # Qualified Attribution is a nested object
            agent_data = value.get("prov:agent")
            if agent_data:
                properties["publisher"] = {
                    "name": agent_data.get("foaf:name"),
                    "email": agent_data.get("foaf:mbox")
                }
        else:
            # For any other top-level properties not explicitly mapped, add them directly
            properties[get_prefixed_name(URIRef(key))] = value # Ensure key is prefixed

    # If the ID is still a blank node ID, replace it with a placeholder for API compatibility
    if ogc_record["id"].startswith("_:"):
        ogc_record["id"] = "urn:ogc:record:knmi-waarneemstations-v5"

    return ogc_record

In [25]:
ogc_records_feature = dcat_turtle_to_ogc_records_feature(dcat_turtle_document)

print(json.dumps(ogc_records_feature, indent=2))

{
  "type": "Feature",
  "id": "urn:xkdc:ds:nl.knmi::waarneemstations/5/",
  "geometry": null,
  "properties": {
    "http://www.w3.org/1999/02/22-rdf-syntax-ns#type": "http://www.w3.org/ns/dcat#Dataset",
    "dcterms:accrualPeriodicity": "http://publications.europa.eu/resource/authority/frequency/IRREG",
    "description": {
      "en": "KNMI collects observations from the automatic weather stations situated in the Netherlands and BES islands on locations such as aerodromes, North Sea platforms and wind poles. This dataset provides metadata on these weather stations, such as location, name and type. The data in this dataset is formatted as NetCDF. It is also available as a CSV file in this dataset: https://dataplatform.knmi.nl/dataset/waarneemstations-csv-1-0.",
      "nl": "Het KNMI verzamelt gegevens van automatische waarneemstations in Nederland en BES eilanden op locaties zoals luchthavens (Aerodrome), Noordzee platformen en windpalen. Deze dataset bevat de metadata van de weersta