In [2]:
import rdflib
from rdflib import Graph, Namespace
from sentence_transformers import SentenceTransformer
import faiss
import numpy as np
from openai import OpenAI

  from .autonotebook import tqdm as notebook_tqdm


In [3]:
TTL_PATH = "cache/sysml_model_477e1397-b094-4b94-a097-2cf8218d3110.ttl"
FAISS_INDEX_PATH = "cache/faiss.index"
EMBEDDING_MODEL = "sentence-transformers/all-MiniLM-L6-v2"
OPENAI_MODEL = "gpt-4.1"

client = OpenAI()

# Namespaces
SYSML = Namespace("http://example.org/sysml#")

In [5]:
def load_graph(path: str) -> Graph:
    g = Graph()
    g.parse(path, format="ttl")
    return g

In [6]:
def extract_snippets(g: Graph) -> dict:
    snippets = {}
    for s in set(g.subjects()):
        label = g.value(subject=s, predicate=SYSML.label)
        comment = g.value(subject=s, predicate=SYSML.comment)
        type  = g.value(subject=s, predicate=SYSML.type)
        package = g.value(subject=s, predicate=SYSML.package)

        
        # neighbors
        neigh = [str(o) for o in g.objects(subject=s)] + [str(s) for s,_,_ in g.triples((None, None, s))]
        text = ""

        if label:
            label = label.replace('"', '').replace("'", "")
            text += f"Label: {label}, "

        if package:
            package_label = g.value(subject=package, predicate=SYSML.label)
            if package_label:
                text += f"Package: {package_label}, "
        
        if type: 
            type = type.replace('"', '').replace("'", "")
            text += f"Type: {type}, "

        if comment:
            comment = comment.replace('"', '').replace("'", "")
            text += f"Comment: {comment}, "
        
        if len(neigh) > 0:
            # edge to neighbors and their labels
            neigh_text = ", ".join([f"{n} ({g.value(subject=n, predicate=SYSML.label)})" for n in neigh])
            text += f"Neighbors: [{neigh_text}, ]"
            
        text.replace("(None)", "")
        text.replace("http://example.org/sysml#", "")
        key = str(s).replace("http://example.org/sysml#", "").replace("http://example.org/", "")

        snippets[key] = text
    return snippets

In [7]:
def build_vector_index(snippets: dict, model_name: str, index_path: str):
    model = SentenceTransformer(model_name)
    texts = list(snippets.values())
    embeddings = model.encode(texts, convert_to_numpy=True)

    norms = np.linalg.norm(embeddings, axis=1, keepdims=True)
    embeddings = embeddings / norms

    dim = embeddings.shape[1]
    index = faiss.IndexFlatIP(dim)
    index.add(embeddings)

    faiss.write_index(index, index_path)

    return model, index, list(snippets.keys())

In [8]:
def retrieve_nodes(query: str, model: SentenceTransformer, index: faiss.IndexFlatL2, uris: list, threshold: float = 0.8, sort = False) -> list:
    q_emb = model.encode([query], convert_to_numpy=True)
    q_emb = q_emb / np.linalg.norm(q_emb, axis=1, keepdims=True)

    lims, D, I = index.range_search(q_emb, threshold)

    start, end = lims[0], lims[1]
    matched_idxs = I[start:end]

    if sort:
        # Sort by distance
        distances = D[start:end]
        sorted_idxs = np.argsort(distances)[::-1]

    return [uris[i] for i in matched_idxs]

In [9]:
def fetch_subgraph(s_parsed: list, g: Graph) -> Graph:
    sub = Graph()
    for uri in s_parsed:
        # outgoing
        for p,o in g.predicate_objects(subject=rdflib.URIRef(uri)):
            sub.add((rdflib.URIRef(uri), p, o))
        # incoming
        for s,p in g.subject_predicates(object=rdflib.URIRef(uri)):
            sub.add((s, p, rdflib.URIRef(uri)))
    return sub

In [10]:
def serialize_subgraph(sub: Graph) -> str:
    lines = []
    for s,p,o in sub:
        lines.append(f"- {s.split('#')[-1]} {p.split('#')[-1]} {o.split('#')[-1]}")
    return "\n".join(lines)

In [11]:
def answer_query(context: str, question: str, model: str = OPENAI_MODEL) -> str:
    instruction = f"You are a SysML v2 RAG assistant and you only answer model specific questions by the context."
    input = f"Given the following model context:\n{context}\nAnswer the question: {question}"
    resp = client.responses.create(
        model=model,
        instructions=instruction,
        input=input,
    )
    
    return resp.output_text

In [12]:
g = load_graph(TTL_PATH)
snippets = extract_snippets(g)

snippets

{'ElectricVehicleModel/VehicleInstances/highPerformanceEV/battery': 'Label: battery, Type: http://example.org/sysml#PartUsage, Neighbors: [battery (None), http://example.org/sysml#ElectricVehicleModel/VehicleInstances/highPerformanceEV/battery/capacity (None), http://example.org/sysml#ElectricVehicleModel/VehicleInstances/highPerformanceEV/battery/voltage (None), http://example.org/sysml#PartUsage (None), http://example.org/sysml#ElectricVehicleModel/VehicleInstances/highPerformanceEV (None), ]',
 'ElectricVehicleModel/VehicleInstances/cityCompactEV': 'Label: cityCompactEV, Type: http://example.org/sysml#PartUsage, Comment: Instance representing a compact city vehicle optimized for efficiency and lower power.\r\n, Neighbors: [Instance representing a compact city vehicle optimized for efficiency and lower power.\r\n (None), http://example.org/sysml#ElectricVehicleModel/PartDefinitions/ElectricVehicle (None), cityCompactEV (None), http://example.org/sysml#ElectricVehicleModel/PartDefinit

In [13]:
g = load_graph(TTL_PATH)

# Extract snippets
snippets = extract_snippets(g)

# Build or load vector index
try:
    model, index, uris = build_vector_index(snippets, EMBEDDING_MODEL, FAISS_INDEX_PATH)
except Exception:
    # load existing
    model = SentenceTransformer(EMBEDDING_MODEL)
    index = faiss.read_index(FAISS_INDEX_PATH)
    uris = list(snippets.keys())

In [14]:
# count all embeddings, and map them back to a string to count the byte usage
embeddings = model.encode(list(snippets.values()), convert_to_numpy=True)
embeddings = embeddings / np.linalg.norm(embeddings, axis=1, keepdims=True)

len(uris)


43

In [15]:
question = "What is the the high performance electric vehicle for?"
top_uris = retrieve_nodes(question, model, index, uris, threshold=0.35, sort=True)
# Fetch subgraph
sub = fetch_subgraph(top_uris, g)
context = serialize_subgraph(sub)

print(f"Context:\n{context}\n")
print(f"Total bytes of context: {len(context.encode('utf-8'))} bytes\n")
print(f"Number of retrieved elements: {len(top_uris)}\n")

Context:


Total bytes of context: 0 bytes

Number of retrieved elements: 14



In [179]:
len(context)

10888

In [180]:
answer = answer_query(context, question)
print("Answer:\n", answer)

Answer:
 Based on the model context, the high performance electric vehicle is represented by the instance **ElectricVehicleModel/VehicleInstances/highPerformanceEV**. This instance is described with the following comment:

> Instance representing a high-performance electric vehicle with upgraded battery and motor.

Therefore, the **high performance electric vehicle** in this model is intended for scenarios where higher battery capacity and higher motor performance (as indicated by attributes such as increased efficiency and potentially higher torque) are required compared to other vehicle instances like the city compact EV. It is specifically modeled to represent a vehicle variant focused on delivering enhanced performance, which may include faster acceleration, longer range, or greater power output.


### Evaluation
- Precision
- Recall
- F1 Score
- Coverage of Available Context
- Coverage of Sys

In [170]:
retrieved_important_elements = 3
missed_important_elements = 0

In [None]:
sysml_model_bytes = 3354
total_context_bytes = uris_bytes = sum(len(uri.encode('utf-8')) for uri in uris)
total_context_elements = len(uris)

total_retrieved_bytes = len(context.encode('utf-8'))
total_retrieved_elements = len(top_uris)

# --- Metric definitions ---
def precision(relevant_retrieved: int, total_retrieved: int) -> float:
    """Precision: fraction of retrieved entries that are relevant."""
    if total_retrieved == 0:
        return 0.0
    return relevant_retrieved / total_retrieved

def recall(relevant_retrieved: int, false_negatives: int) -> float:
    """Recall: fraction of truly relevant entries that were retrieved.
    total_relevant = retrieved_relevant + missed_important (false negatives)
    """
    total_relevant = relevant_retrieved + false_negatives
    if total_relevant == 0:
        return 0.0
    return relevant_retrieved / total_relevant

def f1_score(p: float, r: float) -> float:
    """F1-score: harmonic mean of precision and recall."""
    if p + r == 0:
        return 0.0
    return 2 * (p * r) / (p + r)

def coverage_available(retrieved_bytes: int, total_bytes: int) -> float:
    """Cₐ: fraction of the total corpus (bytes) retrieved."""
    if total_bytes == 0:
        return 0.0
    return retrieved_bytes / total_bytes

def coverage_model(retrieved_bytes: int, model_bytes: int) -> float:
    """Cₘ: fraction of the SysML v2 model (bytes) retrieved."""
    if model_bytes == 0:
        return 0.0
    return retrieved_bytes / model_bytes

P  = precision(retrieved_important_elements, total_retrieved_elements)
R  = recall   (retrieved_important_elements, missed_important_elements)
F1 = f1_score (P, R)
Ca = coverage_available(total_retrieved_bytes, total_context_bytes)
Cm = coverage_model   (total_retrieved_bytes, sysml_model_bytes)

print("Precision,Recall,F1,Coverage_Available,Coverage_Model")
print(f"{P:.3f},{R:.3f},{F1:.3f},{Ca:.3f},{Cm:.3f}")


Precision,Recall,F1,Coverage_Available,Coverage_Model
0.103,1.000,0.188,4.319,4.603
