In [None]:
# Install required libraries (ensure they are installed only when running the script for the first time)
!pip install spacy
!pip install rdflib
!pip install owlrl
!pip3 install pydotplus graphviz
!pip install scikit-learn
!pip install rdflib
!pip install sentence_transformers

In [14]:
import os
import csv
import json
from rdflib import Graph
from sentence_transformers import SentenceTransformer, util

def load_ontology(file_path):
    g = Graph()
    g.parse(file_path)
    return g

def extract_classes_properties(g):
    classes_properties = []
    for s, p, o in g:
        if str(p) in [
            'http://www.w3.org/2000/01/rdf-schema#label',
            'http://www.w3.org/2004/02/skos/core#definition',
            'http://www.w3.org/2000/01/rdf-schema#comment',
            'http://purl.org/spar/pro/definition'
        ]:
            if hasattr(o, 'language'):
                if o.language == 'en':
                    classes_properties.append({"iri": str(s), "predicate": str(p), "value": str(o)})
            else:
                classes_properties.append({"iri": str(s), "predicate": str(p), "value": str(o)})
    return classes_properties

def combine_metadata(classes_properties):
    combined_data = {}
    
    for item in classes_properties:
        iri = item['iri']
        predicate = item['predicate']
        value = item['value']

        if iri not in combined_data:
            combined_data[iri] = {
                "label": [],
                "definition": [],
                "comment": [],
                "other": []
            }

        if predicate.endswith('label'):
            combined_data[iri]["label"].append(value)
        elif predicate.endswith('definition'):
            combined_data[iri]["definition"].append(value)
        elif predicate.endswith('comment'):
            combined_data[iri]["comment"].append(value)
        else:
            combined_data[iri]["other"].append(value)

    # Format each IRI's data into: label: definition. (comment)
    formatted = {}
    for iri, parts in combined_data.items():
        label = " / ".join(parts["label"])
        definition = " ".join(parts["definition"])
        comment = " ".join(parts["comment"])
        
        result = ""
        if label:
            result += f"{label}:"
        if definition:
            result += f" {definition}"
        if comment:
            result += f" ({comment})"

        formatted[iri] = result.strip()

    return formatted

def find_relevant_classes_properties(requirement, combined_data, model, threshold=0.0):
    requirement_embedding = model.encode(requirement, convert_to_tensor=True)
    results = []
    for iri, description in combined_data.items():
        description_embedding = model.encode(description, convert_to_tensor=True)
        similarity = util.pytorch_cos_sim(requirement_embedding, description_embedding).item()
        if similarity >= threshold:
            results.append((iri, similarity))
    results = sorted(results, key=lambda x: x[1], reverse=True)
    return results

def load_ground_truth(file_path):
    if not os.path.exists(file_path):
        return set()
    with open(file_path, 'r') as f:
        return set(line.strip() for line in f if line.strip())

def save_results(output_path_txt, output_path_csv, results):
    
    os.makedirs(os.path.dirname(output_path_txt), exist_ok=True)
    
    with open(output_path_txt, 'w') as f:
        for iri, _ in results:
            f.write(f"{iri}\n")

    with open(output_path_csv, 'w', newline='', encoding='utf-8') as f:
        writer = csv.writer(f)
        writer.writerow(['IRI', 'Similarity'])
        for iri, sim in results:
            writer.writerow([iri, f"{sim:.4f}"])

def save_evaluation(output_path, results, ground_truth, top_n=20):
    top_predictions = [iri for iri, _ in results[:top_n]]
    predicted_set = set(top_predictions)

    tp = predicted_set & ground_truth
    fp = predicted_set - ground_truth
    fn = ground_truth - predicted_set

    precision = len(tp) / (len(tp) + len(fp)) if predicted_set else 0
    recall = len(tp) / (len(tp) + len(fn)) if ground_truth else 0
    f1 = 2 * precision * recall / (precision + recall) if (precision + recall) > 0 else 0

    with open(output_path, 'w') as f:
        f.write("Evaluation Metrics:\n")
        f.write(f"Precision: {precision:.4f}\n")
        f.write(f"Recall: {recall:.4f}\n")
        f.write(f"F1-score: {f1:.4f}\n")

def parse_structured_requirements(requirements_dict):
    parsed = {}
    for req_id, text in requirements_dict.items():
        terms = [t.strip() for t in text.split('.') if ':' in t]
        parsed[req_id] = [term.strip() for term in terms]
    return parsed

def main():
    ontologies_dir = 'Ontologies'
    terms_dir = 'GroundtruthTerms'
    output_dir = 'ExtractedTerms'
    similarity_threshold = 0.4

    requirements = {
        "requirement1": "process: A transformation unit within a system that converts input entities (products, energy, or information) into outputs using defined operators. process step: A single, identifiable action within a larger process, often part of a hierarchical decomposition. process operator: A functional unit that transforms an input state (ante) into an output state (post), typically using technical resources. sub-process: A detailed decomposition of a higher-level process operator, used to represent functional granularity. system boundary: A conceptual or physical limit within which the process and its operators are defined and executed.",
        "requirement2": "input: An entity (product, energy, or information) that flows into a process operator. output: An entity resulting from a process operator’s transformation. has input: A relation connecting an entity as input to a process operator. has output: A relation connecting an entity as output from a process operator. product: Material or physical substance involved in or resulting from a process. energy: Power or force (e.g., heat, electricity) required or emitted in a process. information: Data or knowledge controlling, monitoring, or resulting from a process. technical resource: Equipment or tools used by a process operator to perform the transformation."
    }

    model = SentenceTransformer('multi-qa-mpnet-base-dot-v1') #all-MiniLM-L6-v2, multi-qa-mpnet-base-dot-v1

    for ontology_file in os.listdir(ontologies_dir):
        if not ontology_file.lower().endswith(('.ttl', '.owl', '.rdf')):
            continue
        ontology_name = os.path.splitext(ontology_file)[0]
        ontology_path = os.path.join(ontologies_dir, ontology_file)
        g = load_ontology(ontology_path)

        classes_properties = extract_classes_properties(g)
        combined_data = combine_metadata(classes_properties)

        parsed_requirements = parse_structured_requirements(requirements)

        for requirement_id, term_defs in parsed_requirements.items():
            all_matches = []

            for term_def in term_defs:
                matches = find_relevant_classes_properties(term_def, combined_data, model, threshold=similarity_threshold)
                all_matches.extend(matches)  # Append all matches for each term_def

            # Optional: deduplicate or keep top-N
            seen = set()
            results = []
            for iri, sim in sorted(all_matches, key=lambda x: x[1], reverse=True):
                if iri not in seen:
                    results.append((iri, sim))
                    seen.add(iri)

            # Save results
            output_subdir = os.path.join(output_dir, ontology_name)
            iri_path_csv = os.path.join(output_subdir, f"{requirement_id}.csv")
            iri_path_txt = os.path.join(output_subdir, f"{requirement_id}.txt")
            save_results(iri_path_txt, iri_path_csv, results)

            # Evaluation
            gt_path = os.path.join(terms_dir, ontology_name, f"{requirement_id}.txt")
            ground_truth = load_ground_truth(gt_path)
            eval_path = os.path.join(output_subdir, f"{requirement_id}_evaluation.txt")
            save_evaluation(eval_path, results, ground_truth)

#if __name__ == "__main__":
main()


modules.json:   0%|          | 0.00/349 [00:00<?, ?B/s]

config_sentence_transformers.json:   0%|          | 0.00/116 [00:00<?, ?B/s]

README.md: 0.00B [00:00, ?B/s]

sentence_bert_config.json:   0%|          | 0.00/53.0 [00:00<?, ?B/s]

config.json:   0%|          | 0.00/571 [00:00<?, ?B/s]

Xet Storage is enabled for this repo, but the 'hf_xet' package is not installed. Falling back to regular HTTP download. For better performance, install the package with: `pip install huggingface_hub[hf_xet]` or `pip install hf_xet`


model.safetensors:   0%|          | 0.00/438M [00:00<?, ?B/s]

tokenizer_config.json:   0%|          | 0.00/363 [00:00<?, ?B/s]

vocab.txt: 0.00B [00:00, ?B/s]

tokenizer.json: 0.00B [00:00, ?B/s]

special_tokens_map.json:   0%|          | 0.00/239 [00:00<?, ?B/s]

config.json:   0%|          | 0.00/190 [00:00<?, ?B/s]

KeyboardInterrupt: 