In [None]:
import random
from py4j.java_gateway import JavaGateway
from itertools import combinations

In [None]:
class DirectedLabeledGraph:
    def __init__(self):
        self.nodes = {}
        self.edges = {}

    def add_node(self, node, label=None):
        if node not in self.nodes:
            self.nodes[node] = set()
        if label:
            self.nodes[node].add(label)

    def add_edge(self, source, target, label):
        if (source, target) not in self.edges:
            self.edges[(source, target)] = set()
        self.edges[(source, target)].add(label)

    def get_node_labels(self, node):
        return self.nodes.get(node, set())

    def get_edge_labels(self, source, target):
        return self.edges.get((source, target), set())


class Interpretation(DirectedLabeledGraph):
    def __init__(self):
        super().__init__()
        self.element_counter = 0

    def add_concept_to_node(self, node, concept):
        """Add a concept to a node."""
        if node not in self.nodes:
            self.add_node(node)
        self.nodes[node].add(concept)
        return concept in self.nodes[node]

    def exists_edge(self, source, role, target):
        """Check if there exists an edge with a specific role between two nodes."""
        return role in self.get_edge_labels(source, target)

    def create_new_element(self, concept):
        """Create a new element with a specified concept."""
        new_element = f"d{self.element_counter}"
        self.element_counter += 1
        self.add_node(new_element, concept)
        return new_element

In [None]:
ontology_file = 'pizza.owl'
interpretation = Interpretation()

gateway = JavaGateway()
formatter = gateway.getSimpleDLFormatter()
elFactory = gateway.getELFactory()
ontology = gateway.getOWLParser().parseFile(ontology_file)
tbox = ontology.tbox()
print(tbox.size())
axioms = tbox.getAxioms()
for axiom in axioms:
    axiomType = axiom.getClass().getSimpleName()
    if axiomType == "EquivalenceAxiom":
        sides = axiom.getConcepts()
        l2r = elFactory.getGCI(sides[0], sides[1])
        r2l = elFactory.getGCI(sides[1], sides[0])
        tbox.add(l2r)
        tbox.add(r2l)
        ontology.remove(axiom)
    elif axiomType != "GeneralConceptInclusion":
        ontology.remove(axiom)
print(tbox.size())
print(len(tbox.getAxioms()))

In [None]:
concepts_dict = {}
conjunction_dict = {}
existential_dict = {}

for concept in ontology.getSubConcepts():
    formatted_concept = formatter.format(concept)
    conceptType = concept.getClass().getSimpleName()
    concepts_dict[formatted_concept] = {
        'concept': concept,
        'conceptType': concept.getClass().getSimpleName()
    }
    if conceptType == "ConceptConjunction":
        conjunction_dict[formatted_concept] = {
            'left': formatter.format(concept.getConjuncts()[0]),
            'right': formatter.format(concept.getConjuncts()[1])
        }
    if conceptType == "ExistentialRoleRestriction":
        existential_dict[formatted_concept] = {
            'role': formatter.format(concept.role()),
            'filter': formatter.format(concept.filler())
        }

axioms_dict = {}

for axiom in tbox.getAxioms():
    lhs_formatted = formatter.format(axiom.lhs())
    rhs_formatted = formatter.format(axiom.rhs())
    
    if lhs_formatted not in axioms_dict:
        axioms_dict[lhs_formatted] = []

    axioms_dict[lhs_formatted].append(rhs_formatted)

In [None]:
def apply_rule_1():
    changed = False
    for d in interpretation.nodes:
        if '⊤' not in interpretation.get_node_labels(d):
            interpretation.add_concept_to_node(d,'⊤')
            changed = True
    return changed

In [None]:
def apply_rule_2():
    changed = False        
    for d in interpretation.nodes:
        concepts = interpretation.get_node_labels(d).copy()
        for concept in concepts:
            if concept in conjunction_dict.keys():
                if conjunction_dict[concept]['left'] not in concepts:
                    interpretation.add_concept_to_node(d, conjunction_dict[concept]['left'])
                    changed = True
                if conjunction_dict[concept]['right'] not in concepts:
                    interpretation.add_concept_to_node(d, conjunction_dict[concept]['right'])
                    changed = True
    return changed

In [None]:
def apply_rule_3():
    changed = False        
    for d in interpretation.nodes:
        concepts = interpretation.get_node_labels(d).copy()
        concepts.remove('⊤')
        pairs = list(combinations(concepts,2))
        for A, B in pairs:
            if ('('+A+' ⊓ '+B+')' not in concepts) and ('('+B+' ⊓ '+A+')' not in concepts):
                if '('+A+' ⊓ '+B+')' in concepts_dict.keys():
                    interpretation.add_concept_to_node(d, '('+A+' ⊓ '+B+')')
                    changed = True
                elif '('+B+' ⊓ '+A+')' in concepts_dict.keys():
                    interpretation.add_concept_to_node(d, '('+B+' ⊓ '+A+')')
                    changed = True
    return changed

In [None]:
def apply_rule_4():
    changed = False
    for d in list(interpretation.nodes):
        concepts = interpretation.get_node_labels(d).copy()
        for concept in concepts & existential_dict.keys():
            role = existential_dict[concept]['role']
            filter = existential_dict[concept]['filter']
            existing_successor = None
            for (source, target), roles in interpretation.edges.items():
                if source == d and role in roles and filter in interpretation.get_node_labels(target):
                    existing_successor = target
                    break
            if not existing_successor:                            
                new_node = interpretation.create_new_element(filter)
                interpretation.add_edge(d, new_node, role)                
                changed = True
    return changed

In [None]:
def apply_rule_5():
    changed = False
    for (source, target), roles in interpretation.edges.items():
        concept_s = interpretation.get_node_labels(source)
        concept_t = interpretation.get_node_labels(target)
        for role in roles:
            new_labels = set(['∃'+role+'.'+c for c in concept_t]) & concepts_dict.keys() - concept_s
            for label in new_labels:
                interpretation.add_concept_to_node(source, label)                
                changed = True
    return changed

In [None]:
def apply_rule_6():
    changed = False
    for d in interpretation.nodes:
        concepts = interpretation.get_node_labels(d).copy()        
        for concept in (concepts & axioms_dict.keys()):
            for D in axioms_dict[concept]:
                if D not in concepts:                
                    interpretation.add_concept_to_node(d, D)                    
                    changed = True
    return changed

In [None]:
class_name = random.choice(list(concepts_dict.keys()))
interpretation.create_new_element(class_name)

changed = True
while changed:
    changed = False
    if apply_rule_1():        
        changed = True
    if apply_rule_2():
        changed = True
    if apply_rule_3():
        changed = True
    if apply_rule_4():
        changed = True
    if apply_rule_5():
        changed = True
    if apply_rule_6():
        changed = True
subsumes=interpretation.get_node_labels('d0')

conceptNames = [formatter.format(c) for c in ontology.getConceptNames()]

for subsume in subsumes:
    if subsume in conceptNames:
        print(subsume)

Add a ELK reasoner to test the result.