In [110]:
!pip -q install openai rdflib faiss-cpu pandas tqdm google-generativeai
# faiss-cpu(fast vector similarity search for GraphRAG retrieval)
# tqdm (progress bars for long operations (e.g. embedding))


### Using Google Gemini instead of OpenAPI

To use the Gemini API, you'll need an API key. If you don't already have one, create a key in Google AI Studio. In Colab, add the key to the secrets manager under the "ðŸ”‘" in the left panel. Give it the name `GOOGLE_API_KEY`.

Then, we'll configure the Gemini API and define a new embedding function.

In [111]:
# Import the Python SDK
import google.generativeai as genai
# Used to securely store your API key
from google.colab import userdata

# Retrieve the API key from Colab secrets
GOOGLE_API_KEY=userdata.get('MOAFDiT2')
genai.configure(api_key=GOOGLE_API_KEY)

In [112]:
from tqdm import tqdm
import numpy as np

# Embedding model used for semantic retrieval from Gemini
GEMINI_EMBED_MODEL = "models/text-embedding-004" # You can choose another suitable Gemini embedding model if needed

def embed_texts_gemini(texts, batch_size=64):
    """
    Convert text into embedding vectors using Google Gemini.
    """
    vectors = []
    for i in tqdm(range(0, len(texts), batch_size)):
        chunk = texts[i:i+batch_size]
        try:
            resp = genai.embed_content(
                model=GEMINI_EMBED_MODEL,
                content=chunk
            )
            # Gemini's embed_content returns a dict with 'embedding' key for the list of embeddings
            vectors.extend(resp['embedding'])
        except Exception as e:
            print(f"Error embedding chunk: {e}")
            # Handle error or skip this chunk
            pass # Or raise the exception if you want to stop on error
    return np.array(vectors, dtype="float32")

# Generate embeddings using the Gemini API
embeddings = embed_texts_gemini(entity_cards)
print("Embeddings shape:", embeddings.shape)

100%|â–ˆâ–ˆâ–ˆâ–ˆâ–ˆâ–ˆâ–ˆâ–ˆâ–ˆâ–ˆ| 3/3 [00:06<00:00,  2.06s/it]

Embeddings shape: (150, 768)





In [113]:
markdown_content = """### Using Google Gemini instead of OpenAPI

To use the Gemini API, you'll need an API key. If you don't already have one, create a key in Google AI Studio.

**Important Setup Steps:**
1. **Obtain your API Key**: Go to [Google AI Studio](https://aistudio.google.com/app/apikey) and create an API key.
2. **Store in Colab Secrets**: In your Colab notebook, click on the "ðŸ”‘" icon (Secrets) in the left panel. Add a new secret named `GOOGLE_API_KEY` and paste your API key as its value. Make sure to enable "Notebook access" for this secret.

Then, we'll configure the Gemini API and define a new embedding function.
"""

print(markdown_content)
print("\nNote: This code prints the markdown content as a Python string. To actually update cell 'c7c2099b' as a markdown cell, you would need to manually copy this content into that cell and set its type to Markdown.")

### Using Google Gemini instead of OpenAPI

To use the Gemini API, you'll need an API key. If you don't already have one, create a key in Google AI Studio.

**Important Setup Steps:**
1. **Obtain your API Key**: Go to [Google AI Studio](https://aistudio.google.com/app/apikey) and create an API key.
2. **Store in Colab Secrets**: In your Colab notebook, click on the "ðŸ”‘" icon (Secrets) in the left panel. Add a new secret named `GOOGLE_API_KEY` and paste your API key as its value. Make sure to enable "Notebook access" for this secret.

Then, we'll configure the Gemini API and define a new embedding function.


Note: This code prints the markdown content as a Python string. To actually update cell 'c7c2099b' as a markdown cell, you would need to manually copy this content into that cell and set its type to Markdown.


In [None]:
# from google.colab import files

# # This opens a file picker so you can upload your ontology.
# # Supported formats include Turtle (.ttl), RDF/XML (.rdf, .owl), N-Triples (.nt).

# uploaded = files.upload()

# # We assume you upload exactly one ontology file
# ontology_path = next(iter(uploaded.keys()))
# ontology_path


Saving MOAFDITO.rdf to MOAFDITO (1).rdf


'MOAFDITO (1).rdf'

In [114]:
from rdflib import Graph

ONTOLOGY_CANDIDATES = [
    "https://z-arghavan.github.io/MOAF-DiT/ontology.ttl",
    "https://z-arghavan.github.io/MOAF-DiT/ontology.rdf",
    "https://z-arghavan.github.io/MOAF-DiT/ontology.nt",
    "https://z-arghavan.github.io/MOAF-DiT/ontology.jsonld",
]

def load_remote_ontology() -> Graph:
    g = Graph()
    last_err = None

    for url in ONTOLOGY_CANDIDATES:
        try:
            g.parse(url)  # rdflib will often infer the format
            print("Loaded ontology from:", url)
            return g
        except Exception as e:
            last_err = e

    raise RuntimeError(f"Could not load ontology from any candidate URL. Last error: {last_err}")

g = load_remote_ontology()
print("Triples:", len(g))


Loaded ontology from: https://z-arghavan.github.io/MOAF-DiT/ontology.ttl
Triples: 484


In [None]:
# import os

# SUPPORTED_EXTENSIONS = (".ttl", ".rdf", ".owl", ".nt")

# files = [
#     f for f in os.listdir(".")
#     if f.lower().endswith(SUPPORTED_EXTENSIONS)
# ]

# if not files:
#     raise FileNotFoundError("No ontology file found in the current environment.")

# ontology_path = files[0]
# print("Using ontology file:", ontology_path)


Using ontology file: MOAFDITO.rdf


In [116]:
# We need to convert graph nodes (IRIs) into short texts that the LLM can reason over. This is crucial for GraphRAG: LLMs do not understand raw IRIs well. We turn graph structure into textual surrogates

from rdflib.namespace import RDF, RDFS, OWL, SKOS
from collections import defaultdict

# Properties commonly used for labels and descriptions in ontologies
LABEL_PROPS = [RDFS.label, SKOS.prefLabel, SKOS.altLabel]
DESC_PROPS = [RDFS.comment, SKOS.definition]

#Extracting human-readable labels and comments. Fetches labels like rdfs:label, skos:prefLabel. Falls back to compact IRI names if labels are missing. LLMs work on text, not IRIs. This bridges semantic web data and language models.
def first_literal(graph, subj, props):
    """
    Return the first literal value for a subject and a list of properties.
    Used to extract labels or comments.
    """
    for p in props:
        for o in graph.objects(subj, p):
            return str(o)
    return None

def all_literals(graph, subj, props, limit=10):
    """
    Collect up to 'limit' literal values for description properties.
    """
    out = []
    for p in props:
        for o in graph.objects(subj, p):
            out.append(str(o))
            if len(out) >= limit:
                return out
    return out

def compact_iri(iri: str) -> str:
    """
    Create a human-readable short name from an IRI.
    This is only for display, not for logic.
    """
    s = str(iri)
    if "#" in s:
        return s.split("#")[-1]
    return s.rstrip("/").split("/")[-1]


In [120]:
from rdflib import Graph

# Load the ontology into a graph object
ontology_path = "https://z-arghavan.github.io/MOAF-DiT/ontology.ttl"
g = Graph()
g.parse(ontology_path)

# We collect entities that are meaningful in an ontology:
# classes, properties, and individuals.

entities = set()

for s in g.subjects(RDF.type, OWL.Class):
    entities.add(s)

for s in g.subjects(RDF.type, RDF.Property):
    entities.add(s)

for s in g.subjects(RDF.type, OWL.ObjectProperty):
    entities.add(s)

for s in g.subjects(RDF.type, OWL.DatatypeProperty):
    entities.add(s)

for s in g.subjects(RDF.type, OWL.NamedIndividual):
    entities.add(s)

# Fallback: anything that has a label
for s in g.subjects(RDFS.label, None):
    entities.add(s)

print("Number of candidate entities:", len(entities))

Number of candidate entities: 150


In [121]:
def get_types(graph, s, limit=6):
    """
    Get RDF types of an entity, truncated to a small number.
    """
    types = []
    for t in graph.objects(s, RDF.type):
        types.append(compact_iri(t))
        if len(types) >= limit:
            break
    return types

def get_neighbour_triples(graph, s, max_triples=25):
    """
    Retrieve a small local neighbourhood of triples
    around an entity (incoming and outgoing).
    """
    triples = []

    # Outgoing triples
    for p, o in graph.predicate_objects(s):
        triples.append((s, p, o))
        if len(triples) >= max_triples:
            return triples

    # Incoming triples
    for subj, pred in graph.subject_predicates(s):
        triples.append((subj, pred, s))
        if len(triples) >= max_triples:
            return triples

    return triples

def make_entity_card(graph, s):
    """
    Create a short textual 'entity card' for an IRI.
    This is what we embed and retrieve in GraphRAG.
    """
    label = first_literal(graph, s, LABEL_PROPS) or compact_iri(s)
    descs = all_literals(graph, s, DESC_PROPS, limit=3)
    types = get_types(graph, s)

    neigh = []
    for (ss, pp, oo) in get_neighbour_triples(graph, s, max_triples=18):
        ss_txt = first_literal(graph, ss, LABEL_PROPS) or compact_iri(ss)
        pp_txt = compact_iri(pp)
        oo_txt = (
            first_literal(graph, oo, LABEL_PROPS)
            if str(oo).startswith("http")
            else str(oo)
        ) or compact_iri(oo)
        neigh.append(f"{ss_txt} {pp_txt} {oo_txt}")

    lines = [
        f"IRI: {str(s)}",
        f"Label: {label}"
    ]

    if types:
        lines.append("Types: " + ", ".join(types))
    if descs:
        lines.append("Notes: " + " | ".join(descs))
    if neigh:
        lines.append("Neighbour facts:")
        lines.extend([f"- {x}" for x in neigh[:12]])

    return "\n".join(lines)


In [122]:
# Generate all entity cards
#This is where the ontology becomes LLM-readable knowledge.
entity_cards = []
entity_iris = []

for e in entities:
    try:
        card = make_entity_card(g, e)
        if card and len(card) > 30:
            entity_cards.append(card)
            entity_iris.append(str(e))
    except Exception:
        pass

print("Entity cards created:", len(entity_cards))
print(entity_cards[0][:600])

Entity cards created: 150
IRI: http://www.semanticweb.org/MOAFDITO#hasName
Label: has Name
Types: DatatypeProperty
Neighbour facts:
- has Name type DatatypeProperty
- has Name domain Department
- has Name range string
- has Name label has Name


In [123]:
#Build a FAISS vector index (retrieval layer)
import faiss

# Dimensionality of embeddings
dim = embeddings.shape[1]

# Inner product index (used as cosine similarity after normalisation)
index = faiss.IndexFlatIP(dim)

# Normalise vectors so inner product equals cosine similarity
faiss.normalize_L2(embeddings)

# Add all entity embeddings to the index
index.add(embeddings)

print("FAISS index size:", index.ntotal)

FAISS index size: 150


#### What does 'FAISS index size' mean?

After extracting all meaningful entities (classes, properties, and individuals) from your uploaded ontology, we create a textual representation for each of them, called an 'entity card'. These entity cards are then converted into numerical vectors (embeddings) using a language model.

The **FAISS index size** refers to the number of these entity embeddings that have been added to the FAISS vector database. Essentially, it tells you how many unique entities from your ontology are now indexed and searchable by semantic similarity. When you see 'FAISS index size: 162', it means that 162 unique entities from your ontology have been vectorized and stored in the index.

In [124]:
#Retrieve relevant entities for a question
def retrieve_entities(question, top_k=8):
    """
    Retrieve the most relevant ontology entities for a natural language question.
    """
    # Use the Gemini embedding function
    q_emb = embed_texts_gemini([question])

    q_vec = np.array(q_emb, dtype="float32")
    faiss.normalize_L2(q_vec)

    scores, idxs = index.search(q_vec, top_k)

    results = []
    for score, idx in zip(scores[0], idxs[0]):
        results.append({
            "score": float(score),
            "iri": entity_iris[idx],
            "card": entity_cards[idx]
        })
    return results

question = "how can we know if a board is in which process?"
hits = retrieve_entities(question, top_k=6)

for h in hits:
    print(h["score"], h["iri"])

100%|â–ˆâ–ˆâ–ˆâ–ˆâ–ˆâ–ˆâ–ˆâ–ˆâ–ˆâ–ˆ| 1/1 [00:00<00:00,  1.91it/s]

0.5565840005874634 http://www.semanticweb.org/MOAFDITO#BoardType
0.536327064037323 http://www.semanticweb.org/MOAFDITO#hasBoardID
0.5309848785400391 http://www.semanticweb.org/MOAFDITO#processedInDepartment
0.5277382135391235 http://www.semanticweb.org/MOAFDITO#belongsToBoardFamily
0.5109294652938843 http://www.semanticweb.org/MOAFDITO#BoardFamily
0.5048397779464722 http://www.semanticweb.org/MOAFDITO#refersToProcessStep





In [125]:
#Expand a local subgraph
from rdflib import URIRef

def triple_to_text(ss, pp, oo):
    """
    Convert a triple into a short readable sentence.
    """
    ss_txt = first_literal(g, ss, LABEL_PROPS) or compact_iri(ss)
    pp_txt = compact_iri(pp)
    oo_txt = (
        first_literal(g, oo, LABEL_PROPS)
        if str(oo).startswith("http")
        else str(oo)
    ) or compact_iri(oo)
    return f"{ss_txt} {pp_txt} {oo_txt}"

def expand_subgraph(seed_iris, max_triples_per_seed=30):
    """
    Retrieve a small neighbourhood of triples
    around an entity (incoming and outgoing).
    """
    seen = set()
    out = []

    for iri in seed_iris:
        s = URIRef(iri)

        for p, o in g.predicate_objects(s):
            t = (str(s), str(p), str(o))
            if t not in seen:
                seen.add(t)
                out.append((s, p, o))

        for subj, pred in g.subject_predicates(s):
            t = (str(subj), str(pred), str(s))
            if t not in seen:
                seen.add(t)
                out.append((subj, pred, s))

    return out

seed_iris = [h["iri"] for h in hits]
triples = expand_subgraph(seed_iris)

triple_texts = [
    f"T{i+1}: {triple_to_text(ss, pp, oo)}"
    for i, (ss, pp, oo) in enumerate(triples[:120])
]

print(triple_texts[:10])

['T1: Board Type type Class', 'T2: Board Type subClassOf nac560eac68294fb0929240f169686ef7b8', 'T3: Board Type comment Should we create 37 boards here?Update: according to the new report, there are 65 unique board types', 'T4: Board Type label Board Type', 'T5: belongs To Board Family domain Board Type', 'T6: has Board Type range Board Type', 'T7: has Route domain Board Type', 'T8: has Work Order For Board Type range Board Type', 'T9: processed In Department domain Board Type', 'T10: nac560eac68294fb0929240f169686ef7b5 first Board Type']


In [128]:
# Define your question here
question = input("Please enter your question: ")

print(f"Question set: {question}")

KeyboardInterrupt: Interrupted by user

In [127]:
#Ask the LLM using GraphRAG
import google.generativeai as genai # Ensure genai is imported if not already

MODEL = "models/gemini-2.5-flash" # Using models/gemini-2.0-flash as requested

def answer_with_graphrag(question, hits, triple_texts, max_triples=80):
    """
    Ask the LLM to answer using only the retrieved graph evidence.
    """
    entity_list = "\n".join(
        [f"- {h['iri']} (score={h['score']:.3f})" for h in hits]
    )

    context = [
        "Resolved entities:",
        entity_list,
        "",
        "Knowledge graph evidence (triples):",
    ]
    context.extend(triple_texts[:max_triples])

    prompt = (
        "Answer the question based only on the provided context.\n"
        "If the answer is not available in the context, state that explicitly.\n\n"
        f"Question: {question}\n\n"
        "Context:\n"
        + "\n".join(context)
    )

    model = genai.GenerativeModel(MODEL)
    resp = model.generate_content(prompt)

    return resp.text

# We have 'question', 'hits', and 'triple_texts' from previous steps
# question = input("Please enter your question: ") # Use the 'question' variable already set
answer = answer_with_graphrag(question, hits, triple_texts)
print(f"Question: {question}\n")
print(f"Answer: {answer}")

Question: what is departmen

Answer: Based on the provided context:

The term "Department" appears as the range of the `processed In Department` property (T21: `processed In Department range Department`). This means that a `BoardType` (which is the domain of `processed In Department` as per T9) can be processed *in* a `Department`.

However, the context does not provide a direct definition, comment, or type (e.g., whether it is a Class, DatatypeProperty, etc.) specifically for "Department" itself.


**Subgraph expansion end here.**

**NOW, the code below helps to ask questions in natural language and turn them into SPARQL queries**

In [None]:
#Extract a small schema hint for the prompt
def schema_hint(graph, max_items=80):
    """
    Build a compact hint about predicates and classes present in the graph.
    This helps the LLM write valid SPARQL for your ontology.
    """
    preds = set()
    for s, p, o in graph:
        preds.add(p)
        if len(preds) >= max_items:
            break

    pred_lines = [f"- {str(p)}" for p in list(preds)[:max_items]]
    return "Common predicates you can use:\n" + "\n".join(pred_lines)

SCHEMA = schema_hint(g, max_items=60)
print(SCHEMA[:1200])


Common predicates you can use:
- http://www.w3.org/2002/07/owl#onProperty
- http://www.w3.org/2002/07/owl#minQualifiedCardinality
- http://www.w3.org/2000/01/rdf-schema#subClassOf
- http://www.w3.org/2000/01/rdf-schema#range
- http://www.w3.org/2000/01/rdf-schema#seeAlso
- http://www.w3.org/2000/01/rdf-schema#label
- http://www.w3.org/1999/02/22-rdf-syntax-ns#type
- http://www.w3.org/1999/02/22-rdf-syntax-ns#rest
- http://www.w3.org/2002/07/owl#onClass
- http://purl.org/dc/terms/description
- http://www.w3.org/2000/01/rdf-schema#subPropertyOf
- http://www.w3.org/2002/07/owl#unionOf
- http://www.w3.org/2000/01/rdf-schema#comment
- http://www.w3.org/2002/07/owl#inverseOf
- http://purl.org/dc/terms/creator
- http://www.w3.org/1999/02/22-rdf-syntax-ns#first
- http://purl.org/dc/terms/descriptions
- http://www.w3.org/2000/01/rdf-schema#domain
- http://www.w3.org/2002/07/owl#versionInfo
- http://purl.org/dc/terms/contributor
- http://purl.org/dc/terms/title
- http://purl.org/dc/terms/license

In [None]:
#Build a vocabulary index from the uploaded ontology
import re
from rdflib import RDF, RDFS, OWL, URIRef, Literal
from collections import defaultdict

def build_vocab_index(g):
    """
    Returns:
      vocab: dict with keys "classes", "obj_props", "data_props", "props"
      label_index: list of (iri, label_text, kind)
    """
    classes = set(g.subjects(RDF.type, OWL.Class)) | set(g.subjects(RDF.type, RDFS.Class))
    obj_props = set(g.subjects(RDF.type, OWL.ObjectProperty))
    data_props = set(g.subjects(RDF.type, OWL.DatatypeProperty))
    props = set(g.subjects(RDF.type, RDF.Property)) | obj_props | data_props

    def texts(s):
        out = []
        for p in (RDFS.label, RDFS.comment):
            for o in g.objects(s, p):
                if isinstance(o, Literal):
                    out.append(str(o))
        return out

    vocab = {
        "classes": sorted(map(str, classes)),
        "obj_props": sorted(map(str, obj_props)),
        "data_props": sorted(map(str, data_props)),
        "props": sorted(map(str, props)),
    }

    label_index = []
    def add_kind(iris, kind):
        for iri in iris:
            t = " ".join(texts(URIRef(iri))).strip()
            if t:
                label_index.append((iri, t.lower(), kind))
            else:
                label_index.append((iri, iri.lower(), kind))

    add_kind(vocab["classes"], "class")
    add_kind(vocab["obj_props"], "object_property")
    add_kind(vocab["data_props"], "datatype_property")
    add_kind(vocab["props"], "property")

    return vocab, label_index

vocab, label_index = build_vocab_index(g)
len(vocab["classes"]), len(vocab["props"])


(88, 51)

In [None]:
#Retrieve only the terms that look relevant to the question
import math

def normalise(text: str) -> str:
    return re.sub(r"[^a-z0-9]+", " ", text.lower()).strip()

def retrieve_terms(question: str, label_index, top_k=40):
    q = normalise(question)
    q_tokens = set(q.split())

    scored = []
    for iri, label_text, kind in label_index:
        lt = normalise(label_text)
        lt_tokens = set(lt.split())
        overlap = len(q_tokens & lt_tokens)

        if overlap == 0:
            continue

        # light bonus if direct substring match exists
        bonus = 2 if any(tok in lt for tok in q_tokens) else 0
        score = overlap + bonus
        scored.append((score, iri, kind, label_text))

    scored.sort(reverse=True, key=lambda x: x[0])
    return scored[:top_k]

def format_allow_list(term_hits):
    lines = []
    for score, iri, kind, label_text in term_hits:
        snippet = label_text[:160].replace("\n", " ")
        lines.append(f"- <{iri}>  ({kind})  label_or_desc: {snippet}")
    return "\n".join(lines)

term_hits = retrieve_terms("List the properties and definitions for the concept Deconstruction, if it exists.", label_index)
print(format_allow_list(term_hits[:15]))


- <http://www.semanticweb.org/MOAFDITO#Material_preparation_personnel>  (class)  label_or_desc: personnel who checks the materials required for assembly and delivers them to the line workers.
- <http://www.semanticweb.org/MOAFDITO#Wave_soldering_exit_personnel>  (class)  label_or_desc: aybar has inspector entity. should this person be inspector or not? the personnel who visually inspects the boards exiting the soldering machine and intervenes 
- <http://www.semanticweb.org/MOAFDITO#processedInDepartment>  (object_property)  label_or_desc: processed in department a bit explanatin: if a boardtype hasroute some route, and that route routehasstep some step, and that step indepartment some department,
- <http://www.semanticweb.org/MOAFDITO#processedInDepartment>  (property)  label_or_desc: processed in department a bit explanatin: if a boardtype hasroute some route, and that route routehasstep some step, and that step indepartment some department,
- <http://www.semanticweb.org/MOAFDITO#Boar

In [None]:
 # Define your question for SPARQL here
sparql_question = input("Please enter your question for SPARQL generation: ")

# print(f"SPARQL Question set: {sparql_question}")

Please enter your question for SPARQL generation: What changes in route selection could reduce load in Testing?


In [None]:
#Gemini generates SPARQL
def question_to_sparql(question: str) -> str:
    """
    Use Gemini to write a SPARQL query for the given question.
    We instruct it to only output SPARQL, no explanations. Use the uploaded ontology only.
    """
    model = genai.GenerativeModel(MODEL)
    term_hits = retrieve_terms(question, label_index, top_k=50)
    allow_list = format_allow_list(term_hits)

    prompt = (
        "You write SPARQL for an RDF graph.\n"
        "Return only SPARQL, no prose.\n"
        "Closed world constraint:\n"
        "1) You MUST use only IRIs that appear in the allow list below and the available ontologies online.\n"
        "2) If the allow list is insufficient, write a query that discovers candidates using rdfs:label and rdfs:comment within this graph.\n"
        "3) Use LIMIT 50.\n\n"
        "Allow list of IRIs (use only these):\n"
        f"{allow_list}\n\n"
        "Question:\n"
        f"{question}\n"
        f"{SCHEMA}\n\n"
        f"Question: {question}\n"
    )

    resp = model.generate_content(prompt)
    sparql_query = resp.text.strip()

    # Remove markdown code block fences if present
    if sparql_query.startswith('```sparql'):
        sparql_query = sparql_query.lstrip('```sparql').rstrip('```').strip()
    elif sparql_query.startswith('```'):
        sparql_query = sparql_query.lstrip('```').rstrip('```').strip()

    return sparql_query

def run_sparql(query: str):
    """
    Execute SPARQL on the local rdflib graph.
    """
    return g.query(query)

# q2 = "List the properties and definitions for the concept Deconstruction, if it exists." # Old hardcoded question
sparql = question_to_sparql(sparql_question) # Using the new input variable
print(sparql)

results = run_sparql(sparql)
print("Rows:", len(results))
for row in list(results)[:10]:
    print(row)

PREFIX moafdito: <http://www.semanticweb.org/MOAFDITO#>
PREFIX rdf: <http://www.w3.org/1999/02/22-rdf-syntax-ns#>

SELECT DISTINCT ?entity ?route ?testingStep
WHERE {
  # Find route steps that are identified as Testing or EOL Testing
  ?testingStep rdf:type ?testingType .
  FILTER (?testingType = moafdito:Testing || ?testingType = moafdito:EOL_Testing) .

  # Find the route that contains this specific testing step
  ?route moafdito:hasRouteStep ?testingStep .

  # Find entities (e.g., BoardMaterialRequirements or other items) that are associated with this route
  ?entity moafdito:hasRoute ?route .
}
LIMIT 50
Rows: 0


In [None]:
#validator that rejects hallucinated IRIs, then auto repairz
from rdflib.plugins.sparql.parser import parseQuery

IRI_RE = re.compile(r"<([^>]+)>")

def extract_iris_from_sparql(q: str):
    return set(IRI_RE.findall(q))

def iri_exists_in_graph(g, iri: str) -> bool:
    u = URIRef(iri)
    return (u, None, None) in g or (None, u, None) in g or (None, None, u) in g

def validate_sparql_against_graph(g, q: str):
    # syntax check
    try:
        parseQuery(q)
    except Exception as e:
        return False, f"SPARQL parse error: {e}"

    # vocabulary check
    iris = extract_iris_from_sparql(q)
    unknown = [iri for iri in iris if not iri_exists_in_graph(g, iri)]
    if unknown:
        return False, "Unknown IRIs not found in graph:\n" + "\n".join(f"- {u}" for u in sorted(set(unknown)))
    return True, "OK"

def repair_sparql(question: str, bad_query: str, error_msg: str) -> str:
    model = genai.GenerativeModel(MODEL)

    term_hits = retrieve_terms(question, label_index, top_k=80)
    allow_list = format_allow_list(term_hits)

    prompt = (
        "Fix the SPARQL query.\n"
        "Return only SPARQL, no prose.\n"
        "You MUST use only IRIs from the allow list.\n\n"
        f"Error:\n{error_msg}\n\n"
        f"Bad query:\n{bad_query}\n\n"
        "Allow list:\n"
        f"{allow_list}\n\n"
        f"Question:\n{question}\n"
        "Constraints: LIMIT 50.\n"
    )
    resp = model.generate_content(prompt)
    q = resp.text.strip()
    if q.startswith("```"):
        q = re.sub(r"^```[a-zA-Z]*\s*", "", q)
        q = re.sub(r"\s*```$", "", q).strip()
    return q

def question_to_valid_sparql(question: str, max_fixes: int = 2) -> str:
    q = question_to_sparql(question)
    ok, msg = validate_sparql_against_graph(g, q)
    fixes = 0
    while not ok and fixes < max_fixes:
        q = repair_sparql(question, q, msg)
        ok, msg = validate_sparql_against_graph(g, q)
        fixes += 1
    return q
