In [2]:
import os
import json
# Define the directories
axiom_relations = [
    "subClassOf", "equivalentClass", "propertyRestrictions",
    "disjointWith", "subPropertyOf", "domain", "range",
    "characteristics", "inverseOf"
]
directories = ['Axiom_per_entity', 'Generated CQ', 'generated description']

# Load the list of files in each directory
files_in_directories = {directory: os.listdir(directory) for directory in directories}

total_data = {}
for directory in files_in_directories["Axiom_per_entity"]:
    with open(f"Axiom_per_entity/{directory}", "r") as file:
        total_data[directory.split("_")[0]] = json.load(file)

description_data = {}
for directory in files_in_directories["generated description"]:
    temp = {}
    with open(f"generated description/{directory}", "r") as file:
        for line in file:
            temp[json.loads(line)["class"]] = json.loads(line)["description"]
    description_data[directory.split("_")[0]] = temp

CQ_data = {}
for directory in files_in_directories["Generated CQ"]:
    temp = {}
    with open(f"Generated CQ/{directory}", "r") as file:
        for line in file:
            axiom = json.loads(line)["axiom"]
            for rela in axiom_relations:
                if rela in axiom:
                    cls = axiom.split(rela)[0].strip()
                    break
            if cls not in temp: temp[cls] = []
            temp[cls].append({"axiom": axiom, "CQ": json.loads(line)["CQ"]})
    CQ_data[directory.split("_")[0]] = temp


for ontology in total_data:
    for classorproperty in total_data[ontology]:
        for cp in total_data[ontology][classorproperty]:
            temp = {"axiom" : total_data[ontology][classorproperty][cp], "description" : description_data[ontology][cp], "CQ": CQ_data[ontology][cp]}
            total_data[ontology][classorproperty][cp] = temp
with open("total_data.json", "w") as json_file:
    json.dump(total_data, json_file, indent=4, ensure_ascii=False)


In [None]:
import json
import re

with open("total_data.json", "r", encoding="utf-8") as json_file:
    total_data = json.load(json_file)

# initialize types
type1, type2, type3, type4 = {}, {}, {}, {}
types = [type1, type2, type3, type4]
for t in types:
    for ontology in total_data:
        t[ontology] = {"classes": {}, "properties": {}}

class_counts = {id(t): 0 for t in types}
prop_counts  = {id(t): 0 for t in types}

def has_and_or_some_only_in_axiom(ax):
    for v in ax.values():
        if isinstance(v, list):
            for expr in v:
                if re.search(r'\b(and|or|some|only)\b', expr):
                    return True
    return False

for ontology, cps in total_data.items():
    for section in ("classes", "properties"):
        for cp, value in cps[section].items():
            ax = value.get("axiom", {}) if isinstance(value, dict) else {}

            # 1) eligible types
            if section == "classes":
                is_single_cq = (
                    "CQ" in value
                    and isinstance(value["CQ"], list)
                    and len(value["CQ"]) == 1
                )
                eligible = [type3, type4] if is_single_cq else types.copy()
            else:
                is_empty_axiom = (
                    not ax.get("characteristics")
                    and ax.get("domain") == ["None"]
                    and ax.get("range") == ["None"]
                    and not ax.get("subPropertyOf")
                    and not ax.get("inverseOf")
                )
                eligible = [type3, type4] if is_empty_axiom else types.copy()


            #  2) axiom including and/or some/only processing 
            if not has_and_or_some_only_in_axiom(ax):
                eligible = [t for t in eligible if t is not type3]

            # 3) classification
            if section == "classes":
                target = min(eligible, key=lambda t: class_counts[id(t)])
                class_counts[id(target)] += 1
            else:
                target = min(eligible, key=lambda t: prop_counts[id(t)])
                prop_counts[id(target)] += 1

            target[ontology][section][cp] = value

# results
for idx, t in enumerate(types, start=1):
    nc = sum(len(t[ont]["classes"]) for ont in t)
    np = sum(len(t[ont]["properties"]) for ont in t)
    print(f"type{idx} ➔ {nc} classes, {np} properties")

# save to json files
with open("type1.json", "w", encoding="utf-8") as f:
    json.dump(type1, f, ensure_ascii=False, indent=2)
with open("type2.json", "w", encoding="utf-8") as f:
    json.dump(type2, f, ensure_ascii=False, indent=2)
with open("type3.json", "w", encoding="utf-8") as f:
    json.dump(type3, f, ensure_ascii=False, indent=2)
with open("type4.json", "w", encoding="utf-8") as f:
    json.dump(type4, f, ensure_ascii=False, indent=2)


type1 ➔ 207 classes, 59 properties
type2 ➔ 207 classes, 58 properties
type3 ➔ 208 classes, 12 properties
type4 ➔ 712 classes, 100 properties


Type 1: Missing axiom, Type 2: Missing definition

In [None]:
import json
import random

# List of possible axiom predicates
axiom_relations = [
    "subClassOf", "equivalentClass", "propertyRestrictions",
    "disjointWith", "subPropertyOf", "domain", "range",
    "characteristics", "inverseOf"
]

def process_type1(input_path='type1.json', output_path='processed_type1.json'):
    # 1. Load the original data
    with open(input_path, 'r', encoding='utf-8') as f:
        data = json.load(f)

    # 2. Traverse each ontology
    for ontology in data.values():
        # process both classes and properties
        for section in ('classes', 'properties'):
            for name, info in ontology.get(section, {}).items():
                all_cq = info.get('CQ', [])
                filtered = []
                for entry in all_cq:
                    ax = entry['axiom']
                    # if the axiom is empty or contains "None" in domain/range, skip it
                    if " domain None" in ax or " range None" in ax:
                        continue
                    filtered.append(entry)
                # if no CQ entries are found, skip this entity
                cq_entries = filtered if filtered else all_cq
                if not cq_entries:
                    continue
                
                if section == 'classes':
                    preferred = [e for e in cq_entries if ' disjointWith ' in e['axiom']]
                else:  # section == 'properties'
                    preferred = [e for e in cq_entries if ' inverseOf ' in e['axiom']]
                
                if preferred:
                    sampled = random.choice(preferred)
                else:
                    sampled = random.choice(cq_entries)

                # b) Parse the sampled axiom to find predicate and expression
                axiom_str = sampled['axiom']
                info['removed axiom'] = axiom_str
                predicate = None
                for rel in axiom_relations:
                    if f" {rel} " in axiom_str:
                        predicate = rel
                        break
                if predicate is None:
                    # couldn't parse, skip removal
                    continue

                # split into subject and expression
                subject, expr = axiom_str.split(f" {predicate} ", 1)
                subject = subject.strip()
                expr = expr.strip()

                # c) Remove that expression from the matching list in info['axiom'],
                #    but skip deleting "None" from domain/range
                ax = info.get('axiom', {})
                if predicate in ax and isinstance(ax[predicate], list):
                    # if it's domain/range and expr is the literal "None", skip removal
                    if predicate in ('domain', 'range') and expr == "None":
                        # do not remove the 'None' placeholder
                        pass
                    else:
                        try:
                            ax[predicate].remove(expr)
                        except ValueError:
                            print(f"Expression '{expr}' not found in axiom list for {name}.")
                            # ignore if not present

                # d) Build Target CQ and Valid CQ without deleting the original 'CQ'
                target_cq = sampled['CQ']
                valid_cq = [
                    question
                    for entry in all_cq
                    if entry is not sampled
                    for question in entry['CQ']
                ]

                # e) Attach new fields
                info['Target CQ'] = target_cq
                info['Valid CQ'] = valid_cq

    # 3. Write out the processed file
    with open(output_path, 'w', encoding='utf-8') as f:
        json.dump(data, f, ensure_ascii=False, indent=2)

# run for both type1 and type2
process_type1(input_path='type classification/type1.json', output_path='Final_type1.json')
process_type1(input_path='type classification/type2.json', output_path='processed_type2.json')


Type 3: Misusing axiom

In [None]:
import re
import random

def _mutate_axiom(expr: str,
                  swap_some_only=True,
                  swap_and_or=True) -> str:
    
    stack = ['']
    brackets = []
    for ch in expr:
        if ch in '([{':
            stack.append('')
            brackets.append(ch)
        elif ch in ')]}':
            inner = stack.pop()
            open_br = brackets.pop()
            mutated_inner = _mutate_axiom(inner, swap_some_only, swap_and_or)
            close_br = {'(': ')', '[': ']', '{': '}'}[open_br]
            stack[-1] += f'{open_br}{mutated_inner}{close_br}'
        else:
            stack[-1] += ch

    level_str = stack[0]

    if swap_and_or and re.search(r'\band\b|\bor\b', level_str):
        if random.random() < 0.5:
            level_str = re.sub(r'\band\b', '__TMP_AND__', level_str)
            level_str = re.sub(r'\bor\b', 'and', level_str)
            level_str = re.sub(r'__TMP_AND__', 'or', level_str)

    if swap_some_only and random.random() < 0.5:
        matches = list(re.finditer(r'\bsome\b|\bonly\b', level_str))
        if matches:
            m = random.choice(matches)
            orig = m.group(0)
            repl = 'only' if orig == 'some' else 'some'
            level_str = level_str[:m.start()] + repl + level_str[m.end():]

    return level_str

def mutate_axiom(expr: str,
                 swap_some_only=True,
                 swap_and_or=True) -> str:
    # This function mutates an axiom expression by swapping "some" with "only" 
    # and/or swapping "and" with "or" based on the provided flags.
    mutated = _mutate_axiom(expr, swap_some_only, swap_and_or)
    if mutated == expr:
        if re.search(r'\band\b|\bor\b', expr):
            mutated = re.sub(r'\band\b', '__TMP_AND__', expr)
            mutated = re.sub(r'\bor\b', 'and', mutated)
            mutated = re.sub(r'__TMP_AND__', 'or', mutated)
        elif re.search(r'\bsome\b|\bonly\b', expr):
            matches = list(re.finditer(r'\bsome\b|\bonly\b', expr))
            m = random.choice(matches)
            orig = m.group(0)
            repl = 'only' if orig == 'some' else 'some'
            mutated = expr[:m.start()] + repl + expr[m.end():]
    return mutated

# test
orig = "[eats some (plants and animals)] and [drinks only water or milk]"
for _ in range(5):
    print(mutate_axiom(orig))


[eats some (plants or animals)] and [drinks some water or milk]
[eats only (plants or animals)] and [drinks only water or milk]
[eats some (plants and animals)] and [drinks some water and milk]
[eats only (plants or animals)] and [drinks some water or milk]
[eats some (plants or animals)] and [drinks only water and milk]


In [None]:
import json
import random

# List of possible axiom predicates
axiom_relations = [
    "subClassOf", "equivalentClass", "propertyRestrictions",
    "disjointWith", "subPropertyOf", "domain", "range",
    "characteristics", "inverseOf"
]

def process_type3(input_path='type classification/type3.json',
                  output_path='Final_type3.json'):
    # 1. Load the original data
    with open(input_path, 'r', encoding='utf-8') as f:
        data = json.load(f)

    # 2. Traverse each ontology
    for ontology in data.values():
        # process both classes and properties
        for section in ('classes', 'properties'):
            for name, info in ontology.get(section, {}).items():
                all_cq = info.get('CQ', [])
                filtered = []
                for entry in all_cq:
                    ax = entry['axiom']
                    # if the axiom is empty or contains "None" in domain/range, skip it
                    if " domain None" in ax and " range None" in ax:
                        continue
                    filtered.append(entry)
                cq_entries = filtered if filtered else all_cq
                if not cq_entries:
                    continue

                # a) extract CQ entries with "and" or "or" or "some" or "only"
                candidates = [
                    entry for entry in cq_entries
                    if re.search(r'\b(and|or|some|only)\b', entry['axiom'])
                ]
                if not candidates:
                    # if and/or/some/only are not present in any axiom, skip this entity
                    continue
                sampled = random.choice(candidates)

                # b) Parse the sampled axiom to find predicate and expression
                axiom_str = sampled['axiom']
                # mutate the axiom
                info['editted axiom'] = mutate_axiom(axiom_str)
                info['removed axiom'] = axiom_str

                predicate = None
                for rel in axiom_relations:
                    if f" {rel} " in axiom_str:
                        predicate = rel
                        break
                if predicate is None:
                    # couldn't parse, skip
                    continue

                # split into subject and original expression
                subject, expr = axiom_str.split(f" {predicate} ", 1)
                expr = expr.strip()

                # split into edited expression
                _, editted_expr = info['editted axiom'].split(f" {predicate} ", 1)
                editted_expr = editted_expr.strip()

                # c) change the original expression to the edited expression 
                ax = info.get('axiom', {})
                if predicate in ax and isinstance(ax[predicate], list):
                    # if it's domain/range and expr is the literal "None", skip removal
                    if not (predicate in ('domain', 'range') and expr == "None"):
                        try:
                            ax[predicate].remove(expr)
                            ax[predicate].append(editted_expr)
                        except ValueError:
                            print(f"Expression '{expr}' not found in axiom list for {name}.")

                # d) Build Target CQ and Valid CQ without deleting the original 'CQ'
                target_cq = sampled['CQ']
                valid_cq = [
                    question
                    for entry in all_cq
                    if entry is not sampled
                    for question in entry['CQ']
                ]

                # e) Attach new fields
                info['Target CQ'] = target_cq
                info['Valid CQ'] = valid_cq

    # 3. Write out the processed file
    with open(output_path, 'w', encoding='utf-8') as f:
        json.dump(data, f, ensure_ascii=False, indent=2)

process_type3()


Type 4 : Alignment

In [5]:
def process_type4(input_path='type classification/type4.json', output_path='Final_type4.json'):
    # 1. Load the original data
    with open(input_path, 'r', encoding='utf-8') as f:
        data = json.load(f)
    over2_axiom_num =0
    # 3. Process each ontology
    for ontology in data.values():
        for section in ('classes', 'properties'):
            for name, info in ontology.get(section, {}).items():
                temp_list = []
                for entry in info.get('CQ', []):
                    for cq in entry['CQ']:
                        if cq not in temp_list:
                            temp_list.append(cq)

                info['Target CQ'] = random.sample(temp_list, min(3, len(temp_list)))
                info['Valid CQ'] = temp_list
                if len(temp_list)>3: over2_axiom_num+=1
    print("over2_axiom_num: ", over2_axiom_num)
                    

    # 4. Write out the processed file
    with open(output_path, 'w', encoding='utf-8') as f:
        json.dump(data, f, ensure_ascii=False, indent=2)
process_type4()

over2_axiom_num:  100


Merge and transform to training dataset

In [6]:
import json, os
total_dataset = {}
train_dataset = {}
test_dataset = {}
# Load JSON files from the "Final types" directory
final_types_directory = "processed types"
for file_name in os.listdir(final_types_directory):
    if file_name.endswith(".json"):  
        with open(os.path.join(final_types_directory, file_name), "r", encoding="utf-8") as json_file:
            temp_dataset = json.load(json_file)
            for ontology in temp_dataset:
                if ontology not in total_dataset:
                    total_dataset[ontology] = {}
                for classorprop in temp_dataset[ontology]:
                    if classorprop not in total_dataset[ontology]:
                        total_dataset[ontology][classorprop] = {}
                    for cp in temp_dataset[ontology][classorprop]:
                        if cp not in total_dataset[ontology][classorprop]:
                            total_dataset[ontology][classorprop][cp] = {}
                        # Merge the data
                        total_dataset[ontology][classorprop][cp].update(temp_dataset[ontology][classorprop][cp])
                        # Split the data into train and test datasets (9:1 ratio) for each file name
                    items = list(temp_dataset[ontology][classorprop].items())
                    random.shuffle(items)
                    split_index = int(len(items) * 0.9)
                    train_items = dict(items[:split_index])
                    test_items = dict(items[split_index:])

                    if ontology not in train_dataset:
                        train_dataset[ontology] = {}
                    if ontology not in test_dataset:
                        test_dataset[ontology] = {}

                    if classorprop not in train_dataset[ontology]:
                        train_dataset[ontology][classorprop] = {}
                    if classorprop not in test_dataset[ontology]:
                        test_dataset[ontology][classorprop] = {}

                    for cp in train_items: train_dataset[ontology][classorprop][cp] = train_items[cp]
                    for cp in test_items: test_dataset[ontology][classorprop][cp] = test_items[cp]
# Save the merged data to a new JSON file
with open("merged dataset/Final_dataset.json", "w", encoding="utf-8") as json_file:
    json.dump(total_dataset, json_file, indent=4, ensure_ascii=False)
with open("merged dataset/Final_train_dataset.json", "w", encoding="utf-8") as json_file:
    json.dump(train_dataset, json_file, indent=4, ensure_ascii=False)
with open("merged dataset/Final_test_dataset.json", "w", encoding="utf-8") as json_file:
    json.dump(test_dataset, json_file, indent=4, ensure_ascii=False)

In [7]:
def dataset_construct(ontology, type, class_name,description, axiom, TCQ, VCQ, Taxiom, datatype, CQ):
    return {"data":{"input": f"""As an ontology engineer, generate a list of competency questions based on the following description and axiom.
Definition of competency questions: the questions that outline the scope of ontology and provide an idea about the knowledge that needs to be entailed in the ontology.
Avoid using narrative questions + axioms.
Don't generate unnecessary text. Output only the three questions, separated by ` | ` (pipe with spaces). Stop generation after the third question.
{type} name: {class_name}
Description: {description}
Axiom: {axiom}
Generated CQs:""",
            "output": f"{TCQ[0]} | {TCQ[1]} | {TCQ[2]} "},
            "metadata": {
            "ontology": ontology,
            "class": class_name,
            "axiom": axiom,
            "datatype": datatype,
            "TCQ": TCQ,
            "VCQ": VCQ,
            "Taxiom": Taxiom,
            "CQ" : CQ
                }}
def save_dataset(dataset, output_path):
    for ontology in dataset:
        for classorprop in dataset[ontology]:
            for cp in dataset[ontology][classorprop]:
                if classorprop == "classes":
                    type = "Class"
                else:
                    type = "Property"
                axiom = dataset[ontology][classorprop][cp]["axiom"]
                description = dataset[ontology][classorprop][cp]["description"]
                TCQ = dataset[ontology][classorprop][cp]["Target CQ"]
                VCQ = dataset[ontology][classorprop][cp]["Valid CQ"]
                Taxiom = dataset[ontology][classorprop][cp]["removed axiom"] if "removed axiom" in dataset[ontology][classorprop][cp] else "None"
                datatype = dataset[ontology][classorprop][cp]["type"]
                CQ = dataset[ontology][classorprop][cp]["CQ"]
                for cq in TCQ:
                    if cq not in VCQ:
                        VCQ.append(cq)
                line_data = dataset_construct(ontology, type, cp,description,axiom, TCQ, VCQ, Taxiom, datatype, CQ)["data"]
                line_metadata = dataset_construct(ontology, type, cp,description,axiom, TCQ, VCQ, Taxiom, datatype, CQ)["metadata"]
                with open(f"{output_path}.jsonl", "a", encoding="utf-8") as jsonl_file:
                    jsonl_file.write(json.dumps(line_data, ensure_ascii=False) + "\n")
                with open(f"{output_path}_meta.jsonl", "a", encoding="utf-8") as jsonl_file:
                    jsonl_file.write(json.dumps(line_metadata, ensure_ascii=False) + "\n")
save_dataset(train_dataset, "train_dataset")
save_dataset(test_dataset, "test_dataset")

In [8]:
import copy
#Generalizablility setting(unseen ontology)
onto_list = {"AWO": ["AfricanWildlifeOntology1"],
"OntoDT": ["OntoDT"],"SWO": ["swo"],"Pizza": ["pizza"],"Stuff": ["stuff"],
"DEM@Care": ["lab", "time", "home", "exchangemodel", "event"]}
with open("merged dataset/Final_dataset.json", "r", encoding="utf-8") as json_file:
    total_data = json.load(json_file)

for onto in onto_list:
    temp_train = copy.deepcopy(total_data)
    temp_test = {}
    for owl in onto_list[onto]:
        temp_test[owl] = copy.deepcopy(total_data[owl])
        del temp_train[owl]
    save_dataset(temp_train, f"additional settings/Generalizability/unseen ontology/{onto}/train_dataset")
    save_dataset(temp_test, f"additional settings/Generalizability/unseen ontology/{onto}/test_dataset")