# Alignment Ontology Generator
## DPPO to CEON

In [3]:
# Python 3.11
from pathlib import Path
import re
import openpyxl
from rdflib import Graph, URIRef, BNode, Literal, Namespace
from rdflib.namespace import RDF, RDFS, OWL, DC, DCTERMS, XSD
from collections import defaultdict
import xml.etree.ElementTree as ET
import os

mappings=[]
EXCEL_PATH = Path(r"./DPPO-CEON.xlsx")
online_path = ("https://github.com/RichZele/DPP-DPPO-CEON-Ontology-Alignment/blob/main/documentations/DPPO-CEON.xlsx")

version = 0.1
date = "2026-01-23"
onto_comment = "This ontology represents the alignment between the Digital Product Passport Ontology (DPPO) and Circular Economy Ontology Network (CEON)."

#EXCEL_PATH = Path(r"C:/Users/kebreh/OneDrive - Jonkoping University/PostDoc/DPP-And-DPPO/Generators/DPPO-CEON.xlsx")

# Alignment ontology IRI and namespace.
ONTO_IRI = URIRef("https://w3id.org/dpp/alignment/dppo-ceon/")
source_onto_IRI = URIRef("https://w3id.org/dppo/")
target_onto_IRI = URIRef("https://w3id.org/ceon/")
ALN = Namespace(str(ONTO_IRI) + "#")


namespaces = {'xmlns': 'http://w3id.org/dpp/alignment/dpp-dppo/',
              'xmlns:rdf': 'http://www.w3.org/1999/02/22-rdf-syntax-ns#',
              'xmlns:xsd': 'http://www.w3.org/2001/XMLSchema#'}

added_entity = []

def _clean_sheet_name(name: str) -> str:
    s = name.strip()
    s = re.sub(r"\s+", "_", s)
    s = re.sub(r"[^A-Za-z0-9_]", "", s)
    return s or "Sheet"


def _is_blank(value) -> bool:
    return value is None or (isinstance(value, str) and value.strip() == "")


def _as_uriref(value: str) -> URIRef:
    v = value.strip()
    return URIRef(v)


def _add_axiom_with_annotations(g: Graph,
                                source: URIRef,
                                predicate: URIRef,
                                target: URIRef,
                                mapping_id: str | None,
                                notes: str | None,
                                sheet_tag: str) -> None:
    """
    Adds the axiom triple plus an OWL reified axiom node with annotations.
    """
    g.add((source, predicate, target))

    ax = BNode()
    g.add((ax, RDF.type, OWL.Axiom))
    g.add((ax, OWL.annotatedSource, source))
    g.add((ax, OWL.annotatedProperty, predicate))
    g.add((ax, OWL.annotatedTarget, target))

    if mapping_id and mapping_id.strip():
        g.add((ax, DC.identifier, Literal(mapping_id.strip())))

    if notes and notes.strip():
        g.add((ax, RDFS.comment, Literal(notes.strip())))

    # Provenance so you can trace which worksheet produced the axiom.
    g.add((ax, DCTERMS.source, Literal(sheet_tag)))


def _add_entity(g:Graph,
               entity: URIRef,
               entity_type: str | None) -> None:
    if entity_type.strip() == "Class":
        g.add((entity, RDF.type, OWL.Class))
        added_entity.append(entity)
    if entity_type.strip() == "ObjectProperty":
        g.add((entity, RDF.type, OWL.ObjectProperty))
        added_entity.append(entity)
    if entity_type.strip() == "DataProperty":
        g.add((entity, RDF.type, OWL.DatetypeProperty))
        added_entity.append(entity)

def _get_entity_type(entity_type: str | None) -> None:
    if entity_type.strip() == "Class":
        return OWL.Class
    if entity_type.strip() == "ObjectProperty":
        return OWL.ObjectProperty
        added_entity.append(entity)
    if entity_type.strip() == "DataProperty":
        return OWL.DatetypeProperty
    

def _add_axiom_with_sssom_annotations(g: Graph,
                                source: URIRef,
                                predicate: URIRef,
                                target: URIRef,
                                mapping_id: str | None,
                                notes: str | None,
                                sheet_tag: str,
                                source_label: str | None,
                                target_label: str | None,
                                source_type: str | None,
                                target_type: str | None,) -> None:
    """
    Adds the axiom triple plus an OWL reified axiom node with annotations.
    """
    _add_entity(g, source, source_type)
    _add_entity(g, target, target_type)
    g.add((source, predicate, target))

    sssom = Namespace("https://w3id.org/sssom/")
    semapv = Namespace("https://w3id.org/semapv/vocab/")

    g.bind("sssom", sssom)
    g.bind("semapv", semapv)

    #ax = BNode()

    ax = _as_uriref(ONTO_IRI + Literal(mapping_id.strip()))
    g.add((ax, RDF.type, sssom.Mapping))
    g.add((ax, OWL.annotatedSource, source))
    g.add((ax, OWL.annotatedProperty, predicate))
    g.add((ax, OWL.annotatedTarget, target))
    g.add((ax, sssom.mapping_justificatio, semapv.ManualMappingCuration))

    if mapping_id and mapping_id.strip():
        g.add((ax, DC.identifier, Literal(mapping_id.strip())))

    if notes and notes.strip():
        g.add((ax, RDFS.comment, Literal(notes.strip())))

    if source_label and source_label.strip():
        g.add((ax, sssom.subject_label, Literal(source_label.strip())))
    
    if target_label and target_label.strip():
        g.add((ax, sssom.object_label, Literal(target_label.strip())))

    if source_type and source_type.strip():
        g.add((ax, sssom.subject_type, _get_entity_type(source_type)))

    if target_type and target_type.strip():
        g.add((ax, sssom.object_type, _get_entity_type(target_type)))
        
    # Provenance so you can trace which worksheet produced the axiom.
    g.add((ax, DCTERMS.source, Literal(ONTO_IRI)))
    mapping = defaultdict()
    mapping['subject_id'] = source
    mapping['object_id'] = target
    mapping['subject_source'] = source_onto_IRI
    mapping['object_source'] = target_onto_IRI
    mapping['cardinality'] = '='
    mapping['subject_label'] = source_label.strip()
    mapping['object_label'] = target_label.strip()
    mappings.append(mapping)
    
def _relation_to_predicate(alignment_relation: str,
                           source_entity_type: str,
                           target_entity_type: str) -> URIRef:
    """
    Maps the spreadsheet relation token to an RDF predicate.
    Extend this mapping as your template grows.
    """
    rel = alignment_relation.strip().lower()

    if rel in {"equivalent_class"}:
        return OWL.equivalentClass
    if rel in {"subclass_of"}:
        return RDFS.subClassOf
    if rel in {"equivalent_property", "equivalent_object_property", "equivalent_data_property"}:
        return OWL.equivalentProperty
    if rel in {"subproperty_of", "sub_property_of"}:
        return RDFS.subPropertyOf
    if rel in {"same_as"}:
        return OWL.sameAs

    # Conservative default. You should not silently invent semantics.
    raise ValueError(f"Unsupported alignment_relation '{alignment_relation}' for types "
                     f"'{source_entity_type}' -> '{target_entity_type}'.")


def _apply_direction(source: URIRef,
                     predicate: URIRef,
                     target: URIRef,
                     direction: str) -> list[tuple[URIRef, URIRef, URIRef]]:
    """
    Returns one or two triples depending on direction.
    """
    d = (direction or "").strip().lower()

    if d == "source_to_target" or d == "":
        return [(source, predicate, target)]

    if d == "target_to_source":
        return [(target, predicate, source)]

    if d == "bidirectional":
        # For symmetric predicates, this will produce redundant statements.
        # That redundancy is acceptable, but you can optimize if you care.
        return [(source, predicate, target), (target, predicate, source)]

    raise ValueError(f"Unsupported direction '{direction}'.")


def build_alignment_graph(excel_path: Path) -> Graph:
    wb = openpyxl.load_workbook(excel_path, data_only=True)

    g = Graph()
    g.bind("owl", OWL)
    g.bind("rdfs", RDFS)
    g.bind("dc", DC)
    g.bind("dcterms", DCTERMS)
    g.bind("aln", ALN)

    # Ontology header
    g.add((ONTO_IRI, RDF.type, OWL.Ontology))
    g.add((ONTO_IRI, RDFS.label, Literal("Alignment ontology: DPPO to CEON")))
    g.add((ONTO_IRI, DCTERMS.source, Literal(str(excel_path))))

    for sheet_name in wb.sheetnames:
        ws = wb[sheet_name]
        sheet_tag = _clean_sheet_name(sheet_name)

        # Row 1 is header, row 2 is guidance, data starts at row 3.
        headers = [c.value for c in ws[1]]
        if any(_is_blank(h) for h in headers):
            raise ValueError(f"Header row has blank columns in sheet '{sheet_name}'.")

        header_index = {str(h).strip(): i for i, h in enumerate(headers)}

        required = [
            "mapping_id",
            "source_entity_type",
            "source_iri",
            "target_entity_type",
            "target_iri",
            "alignment_relation",
            "direction",
            "mapping_notes & justification",
        ]
        missing = [c for c in required if c not in header_index]
        if missing:
            raise ValueError(f"Missing required columns in sheet '{sheet_name}': {missing}")

        for row in ws.iter_rows(min_row=3, values_only=True):
            # Skip empty rows
            if all(_is_blank(v) for v in row):
                continue

            mapping_id = row[header_index["mapping_id"]]
            source_entity_type = row[header_index["source_entity_type"]]
            source_iri = row[header_index["source_iri"]]
            target_entity_type = row[header_index["target_entity_type"]]
            target_iri = row[header_index["target_iri"]]
            alignment_relation = row[header_index["alignment_relation"]]
            direction = row[header_index["direction"]]
            notes = row[header_index["mapping_notes & justification"]]

            # Skip malformed rows but do not silently accept them.
            if _is_blank(source_iri) or _is_blank(target_iri) or _is_blank(alignment_relation):
                raise ValueError(f"Malformed mapping row in sheet '{sheet_name}'. "
                                 f"mapping_id='{mapping_id}' source_iri='{source_iri}' target_iri='{target_iri}' "
                                 f"alignment_relation='{alignment_relation}'")

            s = _as_uriref(str(source_iri))
            t = _as_uriref(str(target_iri))

            pred = _relation_to_predicate(
                str(alignment_relation),
                str(source_entity_type or ""),
                str(target_entity_type or ""),
            )

            triples = _apply_direction(s, pred, t, str(direction or ""))

            for ss, pp, tt in triples:
                _add_axiom_with_annotations(
                    g=g,
                    source=ss,
                    predicate=pp,
                    target=tt,
                    mapping_id=str(mapping_id) if not _is_blank(mapping_id) else None,
                    notes=str(notes) if not _is_blank(notes) else None,
                    sheet_tag=sheet_tag,
                )

    return g

def build_alignment_sssom_graph(excel_path: Path) -> Graph:
    wb = openpyxl.load_workbook(excel_path, data_only=True)

    g = Graph()
    g.bind("owl", OWL)
    g.bind("rdfs", RDFS)
    g.bind("dc", DC)
    g.bind("dcterms", DCTERMS)


   
    # Ontology header
    g.add((ONTO_IRI, RDF.type, OWL.Ontology))
    #g.add((ONTO_IRI, DCTERMS.description, Literal("Alignment ontology: DPP to DPPO")))
    g.add((ONTO_IRI, DCTERMS.title, Literal("The alignment ontology between DPPO and CEON")))
    g.add((ONTO_IRI, DCTERMS.creator, Literal("Rahel Kebede")))
    g.add((ONTO_IRI, DCTERMS.contributor, Literal("Rahel Kebede")))
    g.add((ONTO_IRI, DCTERMS.contributor, Literal("Huanyu Li")))
    g.add((ONTO_IRI, DCTERMS.contributor, Literal("Eva Blomqvist")))
    g.add((ONTO_IRI, DCTERMS.license, Literal("https://creativecommons.org/licenses/by/4.0/")))
    g.add((ONTO_IRI, DCTERMS.source, Literal(str(online_path))))
    g.add((ONTO_IRI, OWL.versionInfo, Literal(str(version))))
    g.add((ONTO_IRI, DCTERMS.created, Literal(date, datatype=XSD.date)))
    g.add((ONTO_IRI, RDFS.comment, Literal(onto_comment)))

    g.add((OWL.annotatedProperty, RDF.type, RDF.Property))
    g.add((OWL.annotatedSource, RDF.type, RDF.Property))
    g.add((OWL.annotatedTarget, RDF.type, RDF.Property))


    for sheet_name in wb.sheetnames:
        ws = wb[sheet_name]
        sheet_tag = _clean_sheet_name(sheet_name)

        # Row 1 is header, row 2 is guidance, data starts at row 3.
        headers = [c.value for c in ws[1]]
        if any(_is_blank(h) for h in headers):
            raise ValueError(f"Header row has blank columns in sheet '{sheet_name}'.")

        header_index = {str(h).strip(): i for i, h in enumerate(headers)}

        required = [
            "mapping_id",
            "source_entity_type",
            "source_iri",
            "target_entity_type",
            "target_iri",
            "alignment_relation",
            "direction",
            "mapping_notes & justification",
        ]
        missing = [c for c in required if c not in header_index]
        if missing:
            raise ValueError(f"Missing required columns in sheet '{sheet_name}': {missing}")

        for row in ws.iter_rows(min_row=3, values_only=True):
            # Skip empty rows
            if all(_is_blank(v) for v in row):
                continue

            mapping_id = row[header_index["mapping_id"]]
            source_entity_type = row[header_index["source_entity_type"]]
            source_iri = row[header_index["source_iri"]]
            source_label = row[header_index["source_label"]]
            target_entity_type = row[header_index["target_entity_type"]]
            target_iri = row[header_index["target_iri"]]
            target_label = row[header_index["target_label"]]
            alignment_relation = row[header_index["alignment_relation"]]
            direction = row[header_index["direction"]]
            notes = row[header_index["mapping_notes & justification"]]

            # Skip malformed rows but do not silently accept them.
            if _is_blank(source_iri) or _is_blank(target_iri) or _is_blank(alignment_relation):
                raise ValueError(f"Malformed mapping row in sheet '{sheet_name}'. "
                                 f"mapping_id='{mapping_id}' source_iri='{source_iri}' target_iri='{target_iri}' "
                                 f"alignment_relation='{alignment_relation}'")

            s = _as_uriref(str(source_iri))
            t = _as_uriref(str(target_iri))

            pred = _relation_to_predicate(
                str(alignment_relation),
                str(source_entity_type or ""),
                str(target_entity_type or ""),
            )

            triple = [(s, pred, t)]

            for ss, pp, tt in triple:
                _add_axiom_with_sssom_annotations(
                    g=g,
                    source=ss,
                    predicate=pp,
                    target=tt,
                    mapping_id=str(mapping_id) if not _is_blank(mapping_id) else None,
                    notes=str(notes) if not _is_blank(notes) else None,
                    sheet_tag=sheet_tag,
                    source_label=source_label,
                    target_label=target_label,
                    source_type=source_entity_type,
                    target_type=target_entity_type
                )

    return g

def generate_simple_rdf_file(mappings_lst, onto1_name, onto2_name, output_path = './'):
    # Create the root element
    onto1_url = mappings_lst[0]['subject_source']
    onto2_url = mappings_lst[0]['object_source']
    xml_node = ET.Element("xml", version="1.0", encoding="UTF-8")
    root = ET.Element("rdf:RDF", attrib={'xmlns': namespaces['xmlns'], 'xmlns:rdf': namespaces['xmlns:rdf'], 'xmlns:xsd': namespaces['xmlns:xsd']})
    # Create sub-elements
    alignment_element = ET.SubElement(root, "Alignment")
    xml_element = ET.SubElement(alignment_element, "xml")
    xml_element.text = 'yes'
    level_element = ET.SubElement(alignment_element, "level")
    level_element.text = '0'
    onto1_element = ET.SubElement(alignment_element, "onto1")
    onto1_element.text = onto1_url
    onto2_element = ET.SubElement(alignment_element, "onto2")
    onto2_element.text = onto2_url
    url1_element = ET.SubElement(alignment_element, "url1")
    url1_element.text = onto1_url
    url2_element = ET.SubElement(alignment_element, "url2")
    url2_element.text = onto2_url
    for mapping in mappings_lst:
        mapping_element = ET.SubElement(alignment_element, "map")
        cell_element = ET.SubElement(mapping_element, "cell")
        entity1_element = ET.SubElement(cell_element, "entity1", attrib={'rdf:resource': mapping['subject_id']})
        entity2_element = ET.SubElement(cell_element, "entity2", attrib={'rdf:resource': mapping['object_id']})
        #measure_element = ET.SubElement(cell_element, "measure", attrib={'rdf:datatype': 'xsd:float'})
        #measure_element.text=str(mapping['confidence'])
        relation_element = ET.SubElement(cell_element, "relation")
        relation_element.text=mapping['cardinality']
    #tree = ET.ElementTree(xml_node)
    tree = ET.ElementTree(root)
    #tree._setroot(root)  # Set the root element
    ET.indent(tree, space="  ", level=0)
    alignment_file = '{output}{onto1}-{onto2}.rdf'.format(output=output_path, onto1=onto1_name, onto2=onto2_name)
    if not os.path.exists(output_path):
        os.makedirs(output_path)
    with open(alignment_file, "wb") as file:
        tree.write(file, xml_declaration=True, method='xml', encoding='UTF-8')
    return

if __name__ == "__main__":
    store = build_alignment_sssom_graph(EXCEL_PATH)

    output_path = EXCEL_PATH.parent / "DPPO-CEON-alignment.ttl"

    store.serialize(
        destination=str(output_path),
        format="turtle"
    )
    generate_simple_rdf_file(mappings,  'dppo', 'ceon')
    print(f"Alignment ontology serialized to: {output_path}")

Alignment ontology serialized to: DPPO-CEON-alignment.ttl
