In [22]:
from rdflib import Graph, Namespace, RDF, Literal, URIRef, RDFS
from urllib.parse import urlparse
import json

In [2]:
# Define namespaces
sphn = Namespace("https://www.biomedit.ch/rdf/sphn-schema/sphn#")
sulo = Namespace("https://aidava-dev.github.io/sulo/")
rdfs = Namespace("http://www.w3.org/2000/01/rdf-schema#")
xsd = Namespace("http://www.w3.org/2001/XMLSchema#")

# Load SPHN RDF file
g = Graph()
g.bind("sphn", sphn)
g.bind("sulo", sulo)
g.bind("rdfs", rdfs)
g.bind("xsd", xsd)
g.parse("turtles/sphn_classes.ttl", format="turtle")
print(f"{len(g)} Triples")

2195 Triples


In [3]:
# Find all classes in the graph
sphn_classes = set()

for s in g.subjects(RDF.type, rdfs.Class):
    if str(s).startswith(str(sphn)):  # Only SPHN classes
        sphn_classes.add(s)

print("SPHN Classes found:")
for cls in sphn_classes:
    print(cls)


SPHN Classes found:
https://www.biomedit.ch/rdf/sphn-schema/sphn#Deprecated
https://www.biomedit.ch/rdf/sphn-schema/sphn#samplePrimarycontainer
https://www.biomedit.ch/rdf/sphn-schema/sphn#CardiacOutputMeasurement
https://www.biomedit.ch/rdf/sphn-schema/sphn#Measurement
https://www.biomedit.ch/rdf/sphn-schema/sphn#Quantity
https://www.biomedit.ch/rdf/sphn-schema/sphn#TumorStageAssessmentResult
https://www.biomedit.ch/rdf/sphn-schema/sphn#Chromosome
https://www.biomedit.ch/rdf/sphn-schema/sphn#MicroorganismIdentificationLabTestEvent
https://www.biomedit.ch/rdf/sphn-schema/sphn#SingleNucleotideVariation
https://www.biomedit.ch/rdf/sphn-schema/sphn#Catheter
https://www.biomedit.ch/rdf/sphn-schema/sphn#GenomicTranslocation
https://www.biomedit.ch/rdf/sphn-schema/sphn#TumorSpecimen
https://www.biomedit.ch/rdf/sphn-schema/sphn#FollowUp
https://www.biomedit.ch/rdf/sphn-schema/sphn#adverseeventOutcome
https://www.biomedit.ch/rdf/sphn-schema/sphn#DataDetermination
https://www.biomedit.ch/rdf/sp

In [4]:
# Extract all properties
sphn_properties = set()

for s in g.subjects(RDF.type, RDF.Property):
    if str(s).startswith(str(sphn)):
        sphn_properties.add(s)

print("SPHN Properties found:")
for prop in sorted(sphn_properties):
    print(prop)

SPHN Properties found:
https://www.biomedit.ch/rdf/sphn-schema/sphn#dctermsconformsTo
https://www.biomedit.ch/rdf/sphn-schema/sphn#hasAcquisitionCode
https://www.biomedit.ch/rdf/sphn-schema/sphn#hasActiveIngredient
https://www.biomedit.ch/rdf/sphn-schema/sphn#hasAdministrationRouteCode
https://www.biomedit.ch/rdf/sphn-schema/sphn#hasAdministrativeCase
https://www.biomedit.ch/rdf/sphn-schema/sphn#hasAdmission
https://www.biomedit.ch/rdf/sphn-schema/sphn#hasAgentCode
https://www.biomedit.ch/rdf/sphn-schema/sphn#hasAlgorithm
https://www.biomedit.ch/rdf/sphn-schema/sphn#hasAlleleOriginCode
https://www.biomedit.ch/rdf/sphn-schema/sphn#hasAllergen
https://www.biomedit.ch/rdf/sphn-schema/sphn#hasAlternateAllele
https://www.biomedit.ch/rdf/sphn-schema/sphn#hasArticle
https://www.biomedit.ch/rdf/sphn-schema/sphn#hasAssessment
https://www.biomedit.ch/rdf/sphn-schema/sphn#hasAssociatedCountry
https://www.biomedit.ch/rdf/sphn-schema/sphn#hasAssociatedEvent
https://www.biomedit.ch/rdf/sphn-schema/s

In [5]:
def extract_class_and_properties(g, sphn_class):
    result = {}
    result['class'] = str(sphn_class)
    result['label'] = str(g.value(sphn_class, rdfs.label))
    result['description'] = str(g.value(sphn_class, sphn.description))
    result['properties'] = []

    for prop in g.objects(sphn_class, sphn.properties):
        prop_label = str(g.value(prop, RDFS.label))
        prop_range = g.value(prop, RDFS.range)
        if isinstance(prop_range, Literal) and prop_range.value.startswith("https://"):
            prop_range_uri = URIRef(prop_range.value)
        else:
            prop_range_uri = prop_range

        result['properties'].append({
            'property': str(prop),
            'label': prop_label,
            'range': str(prop_range_uri)
        })

        # recursion if range is class:
        if prop_range_uri and (prop_range_uri, RDF.type, RDFS.Class) in g:
            result['properties'][-1]['range_details'] = extract_class_and_properties(g, prop_range_uri)

    return result

admin_case = extract_class_and_properties(g, sphn.AdministrativeCase)
allergy = extract_class_and_properties(g, sphn.Allergy)

### Helper Methods

In [6]:
### Helper: Extract list of sulo classes from mappings (new data structure)
def extract_sulo_classes(mappings):
    sulo_class_list = []
    for m in mappings:
        sulo_class_list.extend(m["mapping"])
    return sulo_class_list

def get_sulo_property(domain_sulo_class, sulo_classes, sulo_data):
    sulo_class_list = extract_sulo_classes(sulo_classes)

    proposed_property = "sulo:hasPart"  # default fallback

    if set(sulo_class_list) == {"sulo:Role", "sulo:InformationObject"}:
        proposed_property = "sulo:hasParticipant"
    elif sulo_class_list == ["sulo:Role"]:
        proposed_property = "sulo:hasParticipant"
    elif sulo_class_list == ["sulo:InformationObject"]:
        proposed_property = "sulo:hasFeature"
    elif set(sulo_class_list) == {"sulo:Quality", "sulo:InformationObject"}:
        proposed_property = "sulo:hasFeature"
    elif set(sulo_class_list) == {"sulo:InformationObject", "sulo:SharedIdentifier"}:
        proposed_property = "sulo:hasFeature"
    elif sulo_class_list == ["sulo:Quantity"]:
        proposed_property = "sulo:hasFeature"
    elif sulo_class_list == ["sulo:Process"]:
        proposed_property = "sulo:hasPart"
    elif sulo_class_list == ["sulo:Object"]:
        proposed_property = "sulo:hasPart"
    elif sulo_class_list == ["sulo:Time"]:
        proposed_property = "sulo:atTime"
    elif sulo_class_list == ["sulo:SpatialObject"]:
        proposed_property = "sulo:isLocatedIn"

    # Validate against sulo_data
    domain_label = domain_sulo_class.split("/")[-1].replace(".html", "")
    sulo_class_info = next((item for item in sulo_data if item.get("SULO Class") == domain_label), None)
    if sulo_class_info:
        valid_properties = [p["property"] for p in sulo_class_info.get("usedProperties", [])]
        if proposed_property not in valid_properties:
            return "sulo:hasPart"
    return proposed_property

### Mapping pattern detector
def detect_mapping_pattern(domain_sulo_class, sulo_classes):
    sulo_class_list = extract_sulo_classes(sulo_classes)
    if sulo_class_list == ["sulo:InformationObject"]:
        return "SOLID"
    elif sulo_class_list == ["sulo:Role"]:
        return "PRO"
    elif set(sulo_class_list) == {"sulo:Role", "sulo:InformationObject"}:
        return "PRO+SOLID"
    else:
        return "DIRECT"

In [52]:
def get_var_name(uri):
    return uri.split("#")[-1]

def get_prefix(uri):
    if uri.startswith("https://www.biomedit.ch/rdf/sphn-schema/sphn#"):
        return "sphn"
    else:
        return "unknown"

def generate_where_block(class_data, mapping_data, parent_var=None):
    lines = []
    
    class_uri = class_data['class']
    class_short = get_var_name(class_uri)
    
    # Determine variable name
    if parent_var is None:
        parent_var = f"?{class_short}"
        target_class = mapping_data[class_uri]['targetClass']
        lines.append(f"{parent_var} a {target_class} .")
    
    # Process each property
    for prop in class_data['properties']:
        try:
            prop_uri = prop['property']
        except:
            prop_uri = prop
        prop_short = get_var_name(prop_uri)
        
        if class_uri not in mapping_data:
            continue  # no mapping for this class
        
        class_mapping = mapping_data[class_uri]
        if 'properties' not in class_mapping or prop_uri not in class_mapping['properties']:
            continue  # no mapping for this property
        
        prop_mapping = class_mapping['properties'][prop_uri]['mappings']
        
        for mapping in prop_mapping:
            target_var = f"?{prop_short}"
            if mapping['type'] == 'object':
                target_class_uri = class_mapping['properties'][prop_uri]['targetClass']
                next_class = mapping_data[target_class_uri]
                next_class['class'] = target_class_uri
                
                lines.append(" " * 2 + "\nOPTIONAL {")
                lines.append(" " * (2 + 2) + f"{parent_var} {get_prefix(prop_uri)}:{prop_short} {target_var} .")
                # lines.append(" " * (indent + 2) + f"{target_var} sphn:{target_class_short} .")
                #TODO: Generalize
                
                # Recurse if nested
                nested_block = generate_where_block(next_class, mapping_data, target_var)
                lines.extend(nested_block)
                
                lines.append(" " * 2 + "}")
            else:
                lines.append(" " * (4) + f"{parent_var} {get_prefix(prop_uri)}:{prop_short} {target_var} .")
    
    return lines

# Usage Example:
# Load your sulo_data.json once globally
with open("C:\GitHub\SULO_Mapping_LLM\data\sulo_data.json") as f:
    sulo_data = json.load(f)
with open("mapping_classes_2025_recursive.json") as f:
    mapping_table = json.load(f)

admin_case = extract_class_and_properties(g, sphn.AdministrativeCase)
where_block = generate_where_block(admin_case, mapping_table)
block="\n".join(where_block)

prefixes = """
PREFIX sphn: <https://www.biomedit.ch/rdf/sphn-schema/sphn#>
PREFIX sulo: <https://aidava-dev.github.io/sulo/>
PREFIX xsd: <http://www.w3.org/2001/XMLSchema#>
"""

sparql_query = f"""{prefixes}

{block}
"""

with open(f"queries/AdministrativeCase_query.rq", "w") as f:
        f.write(sparql_query)

In [50]:
# transformation_query = open("queries/AdministrativeCase_query.rq").read()
transformation_query = open("queries/test.rq").read()

# Load SPHN RDF file
sphn_test_g = Graph()
sphn_test_g.bind("sphn", sphn)
sphn_test_g.bind("sulo", sulo)
sphn_test_g.bind("rdfs", rdfs)
sphn_test_g.bind("xsd", xsd)
sphn_test_g.parse("turtles/sphn_instance_administrativecase_v2025.ttl", format="turtle")

transformed_result = sphn_test_g.query(transformation_query)
sulo_graph = transformed_result.graph

sulo_graph.serialize(destination="output/administrative_case_transformed_sulo.ttl", format='turtle')

<Graph identifier=N941f6d97a69f4e229b3701345cc80a59 (<class 'rdflib.graph.Graph'>)>

Step	Description	Comments
- 1	Load SPHN ontology (TTL)	You already have the SPHN TTL
- 2	Extract classes and instances	Use rdflib/SPARQL to select instances of each SPHN class
- 3	Create SPHN-to-SULO mapping table	For each SPHN class → identify corresponding SULO class
- 4	Create property mapping table	For each SPHN property → identify corresponding SULO property
- 5	Write transformation logic	Apply mapping tables to generate SULO triples
- 6	Serialize output	Export generated SULO triples as TTL or JSON-LD


In [100]:
# Create a new graph for transformed data
# Define SPARQL transformation query
# This graph contains blank node structures that miss specific variable bindings

transformation_query = """
PREFIX sphn: <https://biomedit.ch/rdf/sphn-ontology/sphn#>
PREFIX sulo: <https://w3id.org/sulo/>
PREFIX xsd: <http://www.w3.org/2001/XMLSchema#>
PREFIX snomed: <http://snomed.info/id/>
PREFIX : <https://rdf.aidava.eu/resource/>

CONSTRUCT {
  ?case a sphn:AdministrativeCase .
  ?case sulo:hasPart [ a snomed:305056002;  sulo:isLocatedIn ?originLocation ;sulo:occursIn [ a sulo:Time; sulo:hasValue ?admissionDateTime ] ].
  ?case sulo:hasPart [ a snomed:58000006;  sulo:isLocatedIn ?dischargeLocation ; sulo:occursIn [ a sulo:Time; sulo:hasValue ?dischargeDateTime ] ].
  ?case sulo:isLocatedIn ?location.
  ?case sulo:hasParticipant [ a :PatientRole; sulo:isFeatureOf ?subjectPseudoIdentifier ].
}

WHERE {
  OPTIONAL { ?case sphn:hasAdmissionDateTime ?admissionDateTime. }
  OPTIONAL { ?case sphn:hasOriginLocation ?originLocation. }

  OPTIONAL { ?case sphn:hasDischargeDateTime ?dischargeDateTime. }
  OPTIONAL { ?case sphn:hasLocation ?location. }

  OPTIONAL { ?case sphn:hasDischargeLocation ?dischargeLocation. }
  OPTIONAL { ?case sphn:hasSubjectPseudoIdentifier ?subjectPseudoIdentifier. }
}
"""

# Create a new graph for transformed data
transformed_g = g.query(transformation_query)
sulo_graph = transformed_g.graph
sulo_graph.bind("sphn", sphn)
sulo_graph.bind("sulo", sulo)
sulo_graph.bind("snomed", snomed)
sulo_graph.bind("aidava", aidava)

sulo_graph.serialize(destination="transformed_sulo.ttl", format='turtle')

NameError: name 'snomed' is not defined

## Transform the RDF in SPHN to SULO

In [None]:
### TESTING

# Define SPARQL transformation query
transformation_query = """
PREFIX sphn: <https://biomedit.ch/rdf/sphn-ontology/sphn#>
PREFIX sulo: <https://w3id.org/sulo/>
PREFIX xsd: <http://www.w3.org/2001/XMLSchema#>
PREFIX snomed: <http://snomed.info/id/>
PREFIX : <https://rdf.aidava.eu/resource/>

CONSTRUCT {
  ?case a sphn:AdministrativeCase .
  ?case sulo:hasPart ?admission .

  # ?admission a snomed:305056002 .
  # ?admission sulo:isLocatedIn ?originLocation .
  # ?admission sulo:occursIn ?admission_time .
  # ?admission_time a sulo:Time .
  # ?admission_time sulo:hasValue ?admissionDateTime .

  # ?case sulo:hasPart [ a snomed:305056002;  sulo:isLocatedIn ?originLocation ;sulo:occursIn [ a sulo:Time; sulo:hasValue ?admissionDateTime ] ].
  # ?case sulo:hasPart [ a snomed:58000006;  sulo:isLocatedIn ?dischargeLocation ; sulo:occursIn [ a sulo:Time; sulo:hasValue ?dischargeDateTime ] ].
  # ?case sulo:isLocatedIn ?location.
  # ?case sulo:hasParticipant [ a :PatientRole; sulo:isFeatureOf ?subjectPseudoIdentifier ].
}

WHERE {
  ?case a sphn:AdministrativeCase .
  BIND( IF( BOUND(?admissionDateTime) || BOUND(?originLocation) ,
      UUID(), # then: generate a UUID
       ""     # else: return a fallback string
    )
    AS ?uuid

   OPTIONAL {
    FILTER(BOUND(?admissionDateTime))
    BIND(IRI(CONCAT(STR(?case), "admission",STR(?admissionDateTime))) AS ?admission)
    #BIND(IRI(CONCAT(STR(?admission), "admission_time")) AS ?admission_time)
  }

  OPTIONAL { ?case sphn:hasAdmissionDateTime ?admissionDateTime. }
  OPTIONAL { ?case sphn:hasOriginLocation ?originLocation. }

  # OPTIONAL { ?case sphn:hasDischargeDateTime ?dischargeDateTime. }
  # OPTIONAL { ?case sphn:hasLocation ?location. }

  # OPTIONAL { ?case sphn:hasDischargeLocation ?dischargeLocation. }
  # OPTIONAL { ?case sphn:hasSubjectPseudoIdentifier ?subjectPseudoIdentifier. }
}
"""

# Create a new graph for transformed data
transformed_g = g.query(transformation_query)
sulo_graph = transformed_g.graph
sulo_graph.bind("sphn", sphn)
sulo_graph.bind("sulo", sulo)
sulo_graph.bind("snomed", snomed)
sulo_graph.bind("aidava", aidava)

sulo_graph.serialize(destination="transformed_sulo.ttl", format='turtle')

<Graph identifier=N2cf4672d6bb644948a1c7e5ccd167332 (<class 'rdflib.graph.Graph'>)>

In [None]:
# Define SPARQL query
query_x = """
PREFIX sphn: <https://biomedit.ch/rdf/sphn-ontology/sphn#>
PREFIX sulo: <https://w3id.org/sulo/>
PREFIX snomed: <http://snomed.info/id/>
PREFIX xsd: <http://www.w3.org/2001/XMLSchema#>

SELECT ?case ?admissionDateTime
WHERE {
   ?case a sphn:AdministrativeCase .
   ?case sulo:hasPart ?admission_process .
   ?admission_process a snomed:305056002 .
   ?admission_process sulo:occursIn ?admissionDateTime .
}
"""

In [None]:
transformed_g = Graph()
transformed_g.parse("transformed_sulo.ttl", format="turtle")

<Graph identifier=Na7b418e0d0de408aac5a3089a8e7b9f6 (<class 'rdflib.graph.Graph'>)>

In [None]:
# Run query on transformed graph
results = transformed_g.query(query_x)

# Print results
for row in results:
    print(f"Visit: {row}")

Visit: (rdflib.term.URIRef('https://rdf.aidava.eu/resource/AdministrativeCase/1503'), rdflib.term.BNode('nf0dfeac93a4844e3b7becd5d9562289fb4'))
Visit: (rdflib.term.URIRef('https://rdf.aidava.eu/resource/AdministrativeCase/2604'), rdflib.term.BNode('nf0dfeac93a4844e3b7becd5d9562289fb9'))


In [None]:
sulo2sphnv2025 = """
PREFIX sphn: <https://biomedit.ch/rdf/sphn-ontology/sphn#>
PREFIX sulo: <https://w3id.org/sulo/>
PREFIX xsd: <http://www.w3.org/2001/XMLSchema#>
PREFIX snomed: <http://snomed.info/id/>
PREFIX : <https://rdf.aidava.eu/resource/>

CONSTRUCT {
  ?case a sphn:AdministrativeCase .

  ?case sphn:admission ?admission .
  ?admission a sphn:Admission.
  ?admission sphn:hasDateTime ?admissionDateTime .
  ?admission sphn:hasOriginLocation ?originLocation .

  ?case sphn:discharge ?discarge .
  ?discarge a sphn:Discharge .
  ?discarge sphn:hasDateTime ?dischargeDateTime .
  ?discarge sphn:hasTargetLocation ?targetLocation .
}
WHERE {
    ?case a sphn:AdministrativeCase .
    OPTIONAL {
      ?case sulo:hasPart ?admission .
      ?admission a snomed:305056002 .
      OPTIONAL {
        ?admission sulo:occursIn ?time .
        ?time a sulo:Time .
        ?time sulo:hasValue ?admissionDateTime .
      }
      OPTIONAL {
        ?admission sulo:isLocatedIn ?originLocation .
      }
    }
    OPTIONAL {
      ?case sulo:hasPart ?discharge .
      ?discharge a snomed:58000006 .
      OPTIONAL {
        ?discharge sulo:occursIn ?time .
        ?time a sulo:Time .
        ?time sulo:hasValue ?dischargeDateTime .
      }
      OPTIONAL {
        ?discharge sulo:isLocatedIn ?targetLocation .
      }
    }


}
"""

results = transformed_g.query(sulo2sphnv2025)
graph = results.graph
graph.bind("sphn", sphn)
graph.bind("sulo", sulo)
graph.bind("snomed", snomed)
graph.bind("aidava", aidava)
graph.serialize(destination="sulo2sphnv2025.ttl", format='turtle')


<Graph identifier=Nf416864f927749c2bfd967629b15b32c (<class 'rdflib.graph.Graph'>)>