In [None]:
GROQ_API_KEY=""
HF_TOKEN = ""
REQUIREMENTS_FOLDER = "../projects/Aloha/requirements.txt"
NEO4J_USERNAME = ""
NEO4J_PASSWORD = ""
NEO4J_URL = "bolt://localhost:7687"
KG_FOLDER = "../kg/aloha"
INDEX_ID = ""
MODEL = "llama-3.1-8b-instant"
READ = False
USE_BACKEND = True

In [None]:
import nest_asyncio

nest_asyncio.apply()

In [None]:
from llama_index.llms.groq import Groq
from llama_index.embeddings.huggingface import HuggingFaceEmbedding
from llama_index.core import Settings
from llama_index.llms.groq import Groq

llm = Groq(model=MODEL, api_key=GROQ_API_KEY, base_url="https://api.groq.com/openai/v1")
Settings.llm = llm
Settings.embed_model = HuggingFaceEmbedding(
    model_name="BAAI/bge-large-en-v1.5"
)

In [None]:
from llama_index.core.graph_stores import SimpleGraphStore
from llama_index.graph_stores.neo4j import Neo4jGraphStore
from llama_index.core import StorageContext

if USE_BACKEND:
    graph_store = Neo4jGraphStore(
        username=NEO4J_USERNAME,
        password=NEO4J_PASSWORD,
        url=NEO4J_URL,
    )
else: 
    graph_store = SimpleGraphStore()

storage_context = StorageContext.from_defaults(graph_store=graph_store, persist_dir=KG_FOLDER if READ else None)

# Preprocess requirements file

In [None]:
import nltk

nltk.download('punkt')
from nltk.tokenize import sent_tokenize

with open(REQUIREMENTS_FOLDER, 'r') as f:
    requirements = f.read()

sentences = sent_tokenize(requirements)

Verma, K., & Kass, A. (2008). Requirements Analysis Tool: A Tool for Automatically Analyzing Software Requirements Documents. The Semantic Web - ISWC 2008, 751–763. doi:10.1007/978-3-540-88564-1_48 

In [None]:
formatted_requirements = []

for i, sentence in enumerate(sentences):
    prompt = f"""
    You are a software analyst. Rewrite the following sentence into the appropriate requirement format, and **only return**:
    
    1. The rewritten requirement line.
    2. If it is a StandardRequirement, return a second line: Agent: <agent>
    
    Do not include any explanations, reasoning, or extra commentary. Return exactly one or two lines only.
    
    Sentence: "{sentence}"
    
    Requirement Format Rules:
    
    1. **Standard Requirement**
       Format: StandardRequirement: <agent> <modal word> <action> <rest>
       Example: StandardRequirement: The system shall generate profit reports.
       → Also return: Agent: The system
    
    2. **Conditional Requirement**
       Format: ConditionalRequirement: if <condition> then <StandardRequirement>
       Example: ConditionalRequirement: if the user enters the wrong password then the system shall send an error message.
       → Do not return the agent separately.
    
    3. **Business Rule**
       Format: BusinessRule: <rule>
       Example: BusinessRule: Only administrators can access the payroll database.
       → Do not return the agent separately.
       - Treat all requirements that start with “all”, “only” and “exactly” as business rules
    
    Begin now:"""
    output = llm.complete(prompt).text
    print(f"Original: {sentence}")
    print(f"Response: {output}")
    print("----")
    formatted_requirements.append(output)


In [None]:
import re

def parse_standard_requirements(llm_outputs):
    results = []
    for output in llm_outputs:
        lines = output.strip().splitlines()
        for requirement_type in ["StandardRequirement:", "ConditionalRequirement:"]:
            if lines and lines[0].startswith(requirement_type):
                req_line = lines[0].replace(requirement_type, "").strip()
                agent_line = None
                if len(lines) > 1 and lines[1].startswith("Agent:"):
                    agent_line = lines[1].replace("Agent:", "").strip()
                results.append({
                    "requirement": req_line,
                    "agent": agent_line
                })

    return results
reqs = parse_standard_requirements(formatted_requirements)

In [None]:
from llama_index.core.node_parser import SimpleNodeParser
from llama_index.core import VectorStoreIndex
from llama_index.core.schema import TextNode
from llama_index.core import Document

nodes = [TextNode(text=req.get("requirement"), metadata={"id": f"REQ-{i+1}", "agent": req.get("agent")}) for i, req in enumerate(reqs)]
vector_index = VectorStoreIndex(nodes)
documents = [Document(text=req.get("requirement")) for req in reqs]

#### Alt-1: Model graph as requirements relationships. 
Friedenthal, Sanford & Moore, Alan & Steiner, Rick. (2012). Modeling Text-Based Requirements and Their Relationship to Design. 10.1016/B978-0-12-385206-9.00013-2. 

In [None]:
def classify_relationship(req_a: str, req_b: str):
    prompt = f"""
            You are a software engineer analyzing software requirements. 
            Given the two requirements below, determine their relationship.
            Choose one of: 
            - derived_from: Requirement A derives from Requirement B, meaning Requirement A is detailing Requirement B. 
            - depends_on: Requirement A depends on Requirement B 
            - unrelated: There is no relation between Requirement A and Requirement B
            
            Requirement A: {req_a}
            Requirement B: {req_b}

            Output format:
            Relationship: <relationship_type>
            """

    response = llm.complete(prompt).text.strip().lower()
    match = re.search(r'relationship type:\s*`(\w+)`', response, re.IGNORECASE)
    return match.group(1) if match else None

In [None]:
from llama_index.core import PromptTemplate
from llama_index.core.query_engine import RetrieverQueryEngine

retriever = vector_index.as_retriever(similarity_top_k=10)
relationships = []

for node in nodes:
    similar = retriever.retrieve(node.text)
    for sim in similar:
        src_id = node.metadata["id"]
        tgt_id = sim.node.metadata["id"]
        if src_id != tgt_id:
            relation = classify_relationship(node.text, sim.node.text)
            print(relation)
            if relation:
                relationships.append({
                    "source": src_id,
                    "source_requirement": node.text,
                    "target": tgt_id,
                    "target_requirement": sim.node.text,
                    "relation": relation
                })


#### Alt-2: Model requirements as a Conceptual Graph for requirements
Jaramillo, C. M. Z., Gelbukh, A., & Isaza, F. A. (2006). Pre-conceptual Schema: A Conceptual-Graph-Like Knowledge Representation for Requirements Elicitation. MICAI 2006: Advances in Artificial Intelligence, 27–37. doi:10.1007/11925231_3 

In [None]:
from llama_index.core.query_engine import CustomQueryEngine
from llama_index.core.base.response.schema import Response
import json
from pydantic import BaseModel
from typing import List, Optional, Literal

class ConnectionArc(BaseModel):
    from_: str 
    to: str

class ImplicationArc(BaseModel):
    from_: str
    to: str
    label: Optional[Literal["yes", "no"]] = "yes"

class CRGModel(BaseModel):
    requirement_id: str
    requirement_text: str
    concepts: List[str]
    dynamic_relationships: List[str]
    structural_relationships: List[str]
    conditionals: List[str]
    connection_arcs: List[ConnectionArc]
    implication_arcs: List[ImplicationArc]

prompt_template = PromptTemplate("""
    You are a requirements modeling assistant. Your task is to extract a Conceptual Requirement Graph (CRG) from a natural language software requirement. Use the following definitions and rules to structure your output. Return your results as JSON.
    
    Definitions:
    
    Node Types
    - Concept: A noun representing an entity, person, thing, or property (e.g., user, profile, post).
    - Dynamic Relationship: An action verb representing a user/system operation (e.g., display, login, register).
    - Structural Relationship: A relationship with the label "is" or "has" (e.g., user _has_ profile).
    - Conditional: A logical condition that must be true before an action can occur (e.g., user is logged in).
    
    Edge Types
    - Connection Arc: Links a concept to a relationship or vice versa (no label).
    - Implication Arc: Links a dynamic relationship or conditional to a dynamic relationship. Label it "yes" or "no" (default: "yes").
    
    Topology Rules
    - Each concept must be connected via a connection arc.
    - A dynamic relationship must have exactly one incoming and one outgoing connection arc (to concepts).
    - A structural relationship must have one incoming and one or more outgoing connection arcs (to concepts).
    - A conditional has no incoming arcs and one or more outgoing implication arcs (to dynamic relationships).
    
    Labeling Rules
    - Concepts are labeled with nouns.
    - Dynamic relationships use action verbs.
    - Structural relationships are labeled "is" or "has".
    - Conditionals are logical expressions.
    - Implication arcs are labeled "yes" or "no".
    
    Guidelines
    - Use consistent labels for repeating concepts.
    - Include implicit conditionals if implied.
    - Do not infer extra relationships not clearly expressed in the requirement.
    - Every node and arc must follow the defined topological and labeling rules.
    
    
    Requirement ID: {requirement_id}
    Requirement Text: {requirement_text}
    
    Your response:
    """)

In [None]:
from typing import List, Tuple

def crg_to_triplets(crg: CRGModel) -> List[Tuple[str, str, str]]:
    triplets = []
    
    for concept in crg.concepts:
        triplets.append((concept, "is_a", "Concept"))

    for action in crg.dynamic_relationships:
        triplets.append((action, "is_a", "Action"))

    for relation in crg.structural_relationships:
        triplets.append((relation, "is_a", "StructuralRelationship"))

    for cond in crg.conditionals:
        triplets.append((cond, "is_a", "Condition"))

    for arc in crg.connection_arcs:
        triplets.append((arc.from_, "connects_to", arc.to))

    for arc in crg.implication_arcs:
        label = arc.label if arc.label else "yes"
        triplets.append((arc.from_, f"implies_{label}", arc.to))

    return triplets


In [None]:
all_triplets = []

sllm = llm.as_structured_llm(output_cls=CRGModel)

for i, req in enumerate(reqs):
    prompt = prompt_template.format(
        requirement_id=f"REQ-{i}",
        requirement_text=req.get("requirement")
    )
    all_triplets.extend(crg_to_triplets(sllm.complete(prompt).raw))

# Create KG

In [None]:
from llama_index.core.indices.loading import load_index_from_storage
from llama_index.core import KnowledgeGraphIndex

if READ:
    kg_index = load_index_from_storage(storage_context=storage_context, index_id=INDEX_ID)
else:
    if USE_BACKEND:
        graph_store.query("MATCH (n) DETACH DELETE n")
    
    kg_index = KnowledgeGraphIndex([], storage_context=storage_context)

Alt-1

In [None]:
for rel in relationships:
    if rel["relation"] != "unrelated":
        graph_store.upsert_triplet(rel["source_requirement"], rel["relation"], rel["target_requirement"])


Alt-2

In [None]:
import spacy
nlp = spacy.load("en_core_web_sm")

def normalize_concept(name: str) -> str:
    doc = nlp(name.lower())
    return " ".join([token.lemma_ for token in doc])

for triplet in all_triplets:
    graph_store.upsert_triplet(normalize_concept(triplet[0]), triplet[1], normalize_concept(triplet[2]))

In [None]:
query_engine = kg_index.as_query_engine()

print("What actions require a user to be logged in?")
response = query_engine.query("What actions require a user to be logged in?")
print(response)