In [1]:
# Import necessary libraries
import json
from pyld import jsonld
from rdflib import Graph, URIRef, Literal, BNode, Namespace
from rdflib.namespace import RDF, DCTERMS, SKOS, DCAT, XSD, RDFS # Import common RDF namespaces


In [4]:
shacl_graph = Graph()
shacl_graph.parse(r'..\\_sources\\dcat-ap-nl-dataset\\rules.shacl', format='turtle')

<Graph identifier=Nbadba0a878f2489ea86d92963d2b325b (<class 'rdflib.graph.Graph'>)>

In [5]:
print(f"SHACL graph has {len(shacl_graph)} triples.")
print("\n" + "="*50 + "\n")

SHACL graph has 70 triples.




In [6]:
jsonld_context = {
    "@context": []
}

# Add common prefix definitions
prefix_mapping = {}
for prefix, namespace_uri in shacl_graph.namespace_manager.namespaces():
    # Only include common prefixes, or those explicitly used in SHACL paths
    if str(prefix) not in ["dcatapnl-sh", "eush", "sh", "rdf", "rdfs", "xsd"]: # Exclude SHACL internal prefixes
        prefix_mapping[str(prefix)] = str(namespace_uri)
jsonld_context["@context"].append(prefix_mapping)

In [7]:
# Create a dictionary for term definitions
term_definitions = {}

# Iterate through sh:property declarations to extract information
for s, p, o in shacl_graph.triples((None, RDF.type, URIRef("http://www.w3.org/ns/shacl#PropertyShape"))):
    path_iri = shacl_graph.value(s, URIRef("http://www.w3.org/ns/shacl#path"))
    sh_name = shacl_graph.value(s, URIRef("http://www.w3.org/ns/shacl#name"))

    if path_iri:
        # Generate a reasonable compact term (JSON key)
        # Prioritize sh:name if available, otherwise derive from IRI
        if sh_name:
            compact_term = str(sh_name).replace(" ", "").replace("_", "").lower()
        else:
            compact_term = path_iri.split('/')[-1].split('#')[-1] # Get last part of IRI

        term_entry = {"@id": str(path_iri)}

        # Check for @type (datatype)
        datatype = shacl_graph.value(s, URIRef("http://www.w3.org/ns/shacl#datatype"))
        if datatype:
            # Map XSD datatypes to JSON-LD @type
            if datatype == XSD.string:
                term_entry["@type"] = "xsd:string"
            elif datatype == XSD.integer:
                term_entry["@type"] = "xsd:integer"
            elif datatype == XSD.boolean:
                term_entry["@type"] = "xsd:boolean"
            elif datatype == XSD.dateTime:
                term_entry["@type"] = "xsd:dateTime"
            else:
                # For unknown or resource types, treat as @id
                term_entry["@type"] = "@id"
        else:
            # If sh:nodeKind is BlankNodeOrIRI, typically means it's a reference to another resource
            node_kind = shacl_graph.value(s, URIRef("http://www.w3.org/ns/shacl#nodeKind"))
            if node_kind == URIRef("http://www.w3.org/ns/shacl#BlankNodeOrIRI"):
                term_entry["@type"] = "@id" # Represent as URI in JSON

        # Check for @container (cardinality)
        min_count = shacl_graph.value(s, URIRef("http://www.w3.org/ns/shacl#minCount"))
        max_count = shacl_graph.value(s, URIRef("http://www.w3.org/ns/shacl#maxCount"))

        if max_count and max_count.value > 1:
            term_entry["@container"] = "@set" # Assume @set for multiple values if not specifically @list
        elif not max_count and min_count and min_count.value > 1:
            term_entry["@container"] = "@set" # If no max but min > 1, implies multiple
        elif not max_count and not min_count: # No cardinality specified, assume can be multiple
            term_entry["@container"] = "@set"


        # Add the term definition
        term_definitions[compact_term] = term_entry

jsonld_context["@context"].append(term_definitions)

In [8]:
print("--- Inferred JSON-LD Context ---")
print(json.dumps(jsonld_context, indent=2))
print("\n" + "="*50 + "\n")

--- Inferred JSON-LD Context ---
{
  "@context": [
    {
      "brick": "https://brickschema.org/schema/Brick#",
      "csvw": "http://www.w3.org/ns/csvw#",
      "dc": "http://purl.org/dc/elements/1.1/",
      "dcat": "http://www.w3.org/ns/dcat#",
      "dcmitype": "http://purl.org/dc/dcmitype/",
      "dcam": "http://purl.org/dc/dcam/",
      "doap": "http://usefulinc.com/ns/doap#",
      "foaf": "http://xmlns.com/foaf/0.1/",
      "geo": "http://www.opengis.net/ont/geosparql#",
      "odrl": "http://www.w3.org/ns/odrl/2/",
      "org": "http://www.w3.org/ns/org#",
      "prof": "http://www.w3.org/ns/dx/prof/",
      "prov": "http://www.w3.org/ns/prov#",
      "qb": "http://purl.org/linked-data/cube#",
      "schema": "https://schema.org/",
      "skos": "http://www.w3.org/2004/02/skos/core#",
      "sosa": "http://www.w3.org/ns/sosa/",
      "ssn": "http://www.w3.org/ns/ssn/",
      "time": "http://www.w3.org/2006/time#",
      "vann": "http://purl.org/vocab/vann/",
      "void": "h

In [9]:
# --- 🛠️ Step 4: Infer JSON Schema from SHACL ---
json_schema = {
    "$schema": "http://json-schema.org/draft-07/schema#",
    "title": "Dataset Schema (Inferred from SHACL)",
    "description": "JSON Schema inferred from SHACL constraints for dcat:Dataset.",
    "type": "object",
    "properties": {},
    "required": []
}

In [11]:
# Identify the main target class from SHACL (e.g., dcat:Dataset)
target_class = shacl_graph.value(URIRef("http://modellen.geostandaarden.nl/dcat-ap-nl/id/shape/DatasetShape"), URIRef("http://www.w3.org/ns/shacl#targetClass"))

if target_class:
    json_schema["title"] = f"{target_class.split('/')[-1]} Schema (Inferred from SHACL)"
    json_schema["description"] = f"JSON Schema inferred from SHACL constraints for {target_class}."

    # Iterate over sh:property links from the main NodeShape
    for shape_s, sh_prop_p, prop_shape_id in shacl_graph.triples((URIRef("http://modellen.geostandaarden.nl/dcat-ap-nl/id/shape/DatasetShape"), URIRef("http://www.w3.org/ns/shacl#property"), None)):

        # Now get the details of each individual property shape (e.g., dcatapnl-sh:DatasetShape_accessRights_minCount)
        path_iri = shacl_graph.value(prop_shape_id, URIRef("http://www.w3.org/ns/shacl#path"))
        sh_name = shacl_graph.value(prop_shape_id, URIRef("http://www.w3.org/ns/shacl#name"))
        min_count = shacl_graph.value(prop_shape_id, URIRef("http://www.w3.org/ns/shacl#minCount"))
        max_count = shacl_graph.value(prop_shape_id, URIRef("http://www.w3.org/ns/shacl#maxCount"))
        datatype = shacl_graph.value(prop_shape_id, URIRef("http://www.w3.org/ns/shacl#datatype"))
        node_kind = shacl_graph.value(prop_shape_id, URIRef("http://www.w3.org/ns/shacl#nodeKind"))
        sh_description = shacl_graph.value(prop_shape_id, URIRef("http://www.w3.org/ns/shacl#description"))

        if path_iri:
            if sh_name:
                compact_term = str(sh_name).replace(" ", "").replace("_", "").lower()
            else:
                compact_term = path_iri.split('/')[-1].split('#')[-1]

            property_schema = {"description": str(sh_description) if sh_description else f"Corresponds to RDF property {path_iri}"}

            # Handle cardinality for JSON Schema (array or single value)
            is_array = False
            if max_count and max_count.value > 1:
                is_array = True
            elif not max_count and min_count and min_count.value > 1:
                is_array = True
            elif compact_term in ["accessrights", "theme"]: # These were arrays in your previous data
                is_array = True

            if is_array:
                property_schema["type"] = "array"
                item_schema = {}
                # Infer item type based on datatype or nodeKind
                if datatype == XSD.string or (node_kind and node_kind == RDFS.Resource): # RDFS.Resource for URIs
                    item_schema = {"type": "string"}
                elif datatype == XSD.integer:
                    item_schema = {"type": "integer"}
                elif node_kind == URIRef("http://www.w3.org/ns/shacl#BlankNodeOrIRI"):
                    item_schema = {"type": "string", "format": "uri"} # It's a URI reference
                else:
                    item_schema = {"type": "string"} # Default

                # Specific handling for complex nested structures like 'theme'
                if compact_term == "theme":
                    item_schema = {
                        "type": "object",
                        "properties": {
                            "concept": {
                                "type": "object",
                                "properties": {
                                    "source": {"type": "string", "format": "uri", "description": "IRI for the concept source."},
                                    "prefLabel": {
                                        "type": "object",
                                        "description": "Language-tagged preferred labels for the concept.",
                                        "patternProperties": {
                                            "^[a-z]{2}(-[A-Z]{2})?$": {"type": "string"}
                                        },
                                        "additionalProperties": False
                                    }
                                },
                                "required": ["source", "prefLabel"]
                            }
                        },
                        "required": ["concept"]
                    }

                property_schema["items"] = item_schema
                # If min_count is present for an array, it means minItems
                if min_count and min_count.value > 0:
                    property_schema["minItems"] = min_count.value


            else: # Single value
                if datatype == XSD.string or (node_kind and node_kind == RDFS.Resource):
                    property_schema["type"] = "string"
                elif datatype == XSD.integer:
                    property_schema["type"] = "integer"
                elif datatype == XSD.boolean:
                    property_schema["type"] = "boolean"
                elif datatype == XSD.dateTime:
                    # FIX: This line caused the SyntaxError. It should be a dictionary.
                    property_schema.update({"type": "string", "format": "date-time"})
                elif node_kind == URIRef("http://www.w3.org/ns/shacl#BlankNodeOrIRI"):
                    property_schema["type"] = "string"
                    property_schema["format"] = "uri"
                else:
                    property_schema["type"] = "string" # Default

            json_schema["properties"][compact_term] = property_schema

            # Mark as required if minCount is 1 or more
            if min_count and min_count.value >= 1:
                json_schema["required"].append(compact_term)

print("--- Inferred JSON Schema ---")
print(json.dumps(json_schema, indent=2))
print("\n" + "="*50 + "\n")

--- Inferred JSON Schema ---
{
  "$schema": "http://json-schema.org/draft-07/schema#",
  "title": "dcat#Dataset Schema (Inferred from SHACL)",
  "description": "JSON Schema inferred from SHACL constraints for http://www.w3.org/ns/dcat#Dataset.",
  "type": "object",
  "properties": {
    "accessrights": {
      "description": "Corresponds to RDF property http://purl.org/dc/terms/accessRights",
      "type": "array",
      "items": {
        "type": "string"
      },
      "minItems": 1
    },
    "applicablelegislation": {
      "description": "Corresponds to RDF property http://data.europa.eu/r5r/applicableLegislation",
      "type": "string",
      "format": "uri"
    },
    "contactpoint": {
      "description": "Corresponds to RDF property http://www.w3.org/ns/dcat#contactPoint",
      "type": "string"
    },
    "creator": {
      "description": "Corresponds to RDF property http://purl.org/dc/terms/creator",
      "type": "string"
    },
    "hvdcategory": {
      "description": "T