<a href="https://colab.research.google.com/github/Supun1234/Thesis/blob/main/End2Endv2.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
# ==============================================================================
# I. SETUP: INSTALL NECESSARY LIBRARIES
# ==============================================================================
!pip install transformers torch spacy pandas -q
!pip install spacy-transformers -q
!python -m spacy download en_core_web_trf -q

# ==============================================================================
# II. IMPORT LIBRARIES AND LOAD MODELS
# ==============================================================================
import spacy
import torch
import pandas as pd
from transformers import pipeline, AutoTokenizer, AutoModelForTokenClassification

# --- Hugging Face Model (dslim/bert-base-NER) ---
hf_tokenizer = AutoTokenizer.from_pretrained("dslim/bert-base-NER")
hf_model = AutoModelForTokenClassification.from_pretrained("dslim/bert-base-NER")
hf_ner_pipeline = pipeline("ner", model=hf_model, tokenizer=hf_tokenizer, aggregation_strategy="simple")

# --- spaCy Model (with Transformer Pipeline) ---
spacy_nlp = spacy.load("en_core_web_trf")

# ==============================================================================
# III. CORE FUNCTIONS OF THE REQUIREMENT EXTRACTION PIPELINE
# ==============================================================================

def preprocess_text(text):
    """Simple text preprocessing function."""
    return text.strip()

def extract_agr_from_huggingface(sentence):
    """Extracts AGR from a sentence using Hugging Face NER."""
    ner_results = hf_ner_pipeline(sentence)
    actor = {"text": None, "confidence": 0.0}
    goal = {"text": None, "confidence": 0.0}
    rationale = {"text": None, "confidence": 0.0}

    # Heuristic: First PER or ORG is the Actor
    for entity in ner_results:
        if entity['entity_group'] in ['PER', 'ORG']:
            actor["text"] = entity['word']
            actor["confidence"] = float(entity['score'])
            break

    # Fallback for generic actors if NER fails
    if not actor["text"]:
        generic_actors = ['user', 'users', 'admin', 'customer', 'system', 'application']
        for act in generic_actors:
            if act in sentence.lower():
                actor["text"] = act
                actor["confidence"] = 0.80
                break

    # Rationale & Goal Extraction with expanded keywords
    rationale_keywords = ['so that', 'in order to', 'to', 'without']
    text_to_split = sentence

    found_rationale = False
    for keyword in rationale_keywords:
        if f" {keyword} " in text_to_split:
            parts = text_to_split.split(f" {keyword} ", 1)
            goal["text"] = parts[0].strip()
            rationale["text"] = (keyword + " " + parts[1]).strip()
            goal["confidence"] = 0.90
            rationale["confidence"] = 0.90
            found_rationale = True
            break

    if not found_rationale:
        goal["text"] = text_to_split
        goal["confidence"] = 0.85

    return {"Actor": actor, "Goal": goal, "Rationale": rationale}

def extract_agr_from_spacy(sentence):
    """Extracts AGR using spaCy with robust dependency parsing."""
    doc = spacy_nlp(sentence)
    actor = {"text": None, "confidence": 0.0}
    goal = {"text": None, "confidence": 0.0}
    rationale = {"text": None, "confidence": 0.0}

    # 1. Actor Extraction (NER with grammatical subject fallback)
    for ent in doc.ents:
        if ent.label_ in ["PERSON", "ORG"]:
            actor["text"] = ent.text
            actor["confidence"] = 0.95
            break
    if not actor["text"]:
        for token in doc:
            if "nsubj" in token.dep_:
                subject_phrase = ' '.join([t.text for t in token.subtree])
                actor["text"] = subject_phrase
                actor["confidence"] = 0.90
                break

    # 2. Goal & Rationale Extraction
    rationale_keywords = ['so that', 'in order to', 'to', 'without']
    rationale_start_index = -1
    for keyword in rationale_keywords:
        if f" {keyword} " in sentence:
            rationale_start_index = sentence.find(f" {keyword} ")
            break

    if rationale_start_index != -1:
        goal["text"] = sentence[:rationale_start_index].strip()
        rationale["text"] = sentence[rationale_start_index:].strip()
        goal["confidence"] = 0.95
        rationale["confidence"] = 0.95
    else:
        goal["text"] = sentence
        goal["confidence"] = 0.90

    # 3. Refine Goal text by removing the actor
    if actor["text"] and goal["text"] and actor["text"] in goal["text"]:
        actor_end_index = goal["text"].find(actor["text"]) + len(actor["text"])
        refined_goal_text = goal["text"][actor_end_index:].strip()
        filler_words = ["shall", "should", "must", "will", "can"]
        first_word = refined_goal_text.split(' ')[0] if refined_goal_text else ""
        if first_word in filler_words:
            refined_goal_text = refined_goal_text.replace(first_word, "", 1).strip()
        goal["text"] = refined_goal_text

    return {"Actor": actor, "Goal": goal, "Rationale": rationale}

def merge_agr_triplets(hf_agr, spacy_agr):
    """Merges AGR triplets, favoring the more robust spaCy output."""
    merged_agr = {}
    def choose_best(slot_name):
        hf_slot, spacy_slot = hf_agr[slot_name], spacy_agr[slot_name]
        if spacy_slot["text"]: return spacy_slot
        if hf_slot["text"]: return hf_slot
        return spacy_slot
    merged_agr["Actor"] = choose_best("Actor")
    merged_agr["Goal"] = choose_best("Goal")
    merged_agr["Rationale"] = choose_best("Rationale")
    return merged_agr

def evaluate_completeness_and_confidence(merged_agr):
    """Calculates completeness and confidence scores."""
    filled_slots = sum(1 for slot in merged_agr.values() if slot["text"])
    completeness = filled_slots / 3.0
    c_actor = merged_agr["Actor"]["confidence"] if merged_agr["Actor"]["text"] else 0
    c_goal = merged_agr["Goal"]["confidence"] if merged_agr["Goal"]["text"] else 0
    c_rationale = merged_agr["Rationale"]["confidence"] if merged_agr["Rationale"]["text"] else 0
    weighted_confidence = (c_actor + c_goal + c_rationale) / 3.0
    return completeness, weighted_confidence

# ==============================================================================
# IV. OUTPUT AND VISUALIZATION FUNCTIONS
# ==============================================================================

def display_results(final_agr, completeness, confidence):
    """Displays the final structured requirement in a table."""
    display_data = {
        'Actor': [final_agr["Actor"]["text"]],
        'Goal': [final_agr["Goal"]["text"]],
        'Rationale': [final_agr["Rationale"]["text"]],
        'Completeness': [f"{completeness:.2%}"],
        'Confidence': [f"{confidence:.2%}"]
    }
    df = pd.DataFrame(display_data)
    print("\n--- Structured Requirement ---")
    print(df.to_string(index=False))

def print_comma_separated_agr(final_agr):
    """
    (NEW) Prints the extracted AGR components as a single comma-separated string.
    """
    actor_text = final_agr['Actor']['text'] or "None"
    goal_text = final_agr['Goal']['text'] or "None"
    rationale_text = final_agr['Rationale']['text'] or "None"

    # Create the comma-separated string
    agr_string = f"Actor: {actor_text}, Goal: {goal_text}, Rationale: {rationale_text}"

    print("\n--- Comma-Separated AGR Result ---")
    print(agr_string)

def print_semantic_graph(final_agr):
    """Prints a Cypher-like text representation of the semantic graph."""
    actor_node = f"({final_agr['Actor']['text'] or 'UnspecifiedActor'})"
    goal_node = f"({final_agr['Goal']['text'] or 'UnspecifiedGoal'})"
    cypher_string = f"{actor_node} -[:PERFORMS_GOAL]-> {goal_node}"
    if final_agr['Rationale']['text']:
        rationale_node = f"({final_agr['Rationale']['text'] or 'UnspecifiedRationale'})"
        cypher_string += f" -[:WITH_CONSTRAINT_OR_PURPOSE]-> {rationale_node}"
    print("\n--- Semantic Graph (Cypher-like text) ---")
    print(cypher_string)

# ==============================================================================
# V. MAIN EXECUTION PIPELINE
# ==============================================================================

def main():
    """Main function to run the requirement extraction pipeline."""
    requirement_sentence = input("What is your requirement? ")
    processed_sentence = preprocess_text(requirement_sentence)
    print("\nRunning NER Model 1 (Hugging Face with expanded keywords)...")
    hf_agr = extract_agr_from_huggingface(processed_sentence)
    print("Running NER Model 2 (spaCy with dependency parsing)...")
    spacy_agr = extract_agr_from_spacy(processed_sentence)
    print("Merging results...")
    final_agr = merge_agr_triplets(hf_agr, spacy_agr)
    completeness, confidence = evaluate_completeness_and_confidence(final_agr)

    # --- ALL OUTPUTS ---
    display_results(final_agr, completeness, confidence) # 1. Table
    print_comma_separated_agr(final_agr)                 # 2. Comma-separated string (NEW)
    print_semantic_graph(final_agr)                      # 3. Graph

# --- Run the main program ---
if __name__ == "__main__":
    main()

[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m44.0/44.0 kB[0m [31m2.4 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m795.8/795.8 kB[0m [31m34.3 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m313.4/313.4 kB[0m [31m22.0 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m10.0/10.0 MB[0m [31m81.7 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m3.1/3.1 MB[0m [31m52.5 MB/s[0m eta [36m0:00:00[0m
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m457.4/457.4 MB[0m [31m1.4 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m237.9/237.9 kB[0m [31m13.2 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m734.0/734.0 kB[0m [31m38.8 MB/s[0m eta [36m0:00:00[0m
[?25h[38;5;2m✔ Download and installat

The secret `HF_TOKEN` does not exist in your Colab secrets.
To authenticate with the Hugging Face Hub, create a token in your settings tab (https://huggingface.co/settings/tokens), set it as secret in your Google Colab and restart your session.
You will be able to reuse this secret in all of your notebooks.
Please note that authentication is recommended but still optional to access public models or datasets.


tokenizer_config.json:   0%|          | 0.00/59.0 [00:00<?, ?B/s]

config.json:   0%|          | 0.00/829 [00:00<?, ?B/s]

vocab.txt: 0.00B [00:00, ?B/s]

added_tokens.json:   0%|          | 0.00/2.00 [00:00<?, ?B/s]

special_tokens_map.json:   0%|          | 0.00/112 [00:00<?, ?B/s]

model.safetensors:   0%|          | 0.00/433M [00:00<?, ?B/s]

Some weights of the model checkpoint at dslim/bert-base-NER were not used when initializing BertForTokenClassification: ['bert.pooler.dense.bias', 'bert.pooler.dense.weight']
- This IS expected if you are initializing BertForTokenClassification from the checkpoint of a model trained on another task or with another architecture (e.g. initializing a BertForSequenceClassification model from a BertForPreTraining model).
- This IS NOT expected if you are initializing BertForTokenClassification from the checkpoint of a model that you expect to be exactly identical (initializing a BertForSequenceClassification model from a BertForSequenceClassification model).
Device set to use cpu


What is your requirement? System must notify the user when login fails due to incorrect credentials.”

Running NER Model 1 (Hugging Face with expanded keywords)...
Running NER Model 2 (spaCy with dependency parsing)...
Merging results...

--- Structured Requirement ---
 Actor                                 Goal                  Rationale Completeness Confidence
System notify the user when login fails due to incorrect credentials.”      100.00%     93.33%

--- Comma-Separated AGR Result ---
Actor: System, Goal: notify the user when login fails due, Rationale: to incorrect credentials.”

--- Semantic Graph (Cypher-like text) ---
(System) -[:PERFORMS_GOAL]-> (notify the user when login fails due) -[:WITH_CONSTRAINT_OR_PURPOSE]-> (to incorrect credentials.”)


In [None]:
if __name__ == "__main__":
    main()

What is your requirement? As an admin, I need to access the dashboard to monitor system performance in real time.

Running NER Model 1 (Hugging Face with expanded keywords)...
Running NER Model 2 (spaCy with dependency parsing)...
Merging results...

--- Structured Requirement ---
Actor Goal                                                           Rationale Completeness Confidence
    I need to access the dashboard to monitor system performance in real time.      100.00%     93.33%

--- Comma-Separated AGR Result ---
Actor: I, Goal: need, Rationale: to access the dashboard to monitor system performance in real time.

--- Semantic Graph (Cypher-like text) ---
(I) -[:PERFORMS_GOAL]-> (need) -[:WITH_CONSTRAINT_OR_PURPOSE]-> (to access the dashboard to monitor system performance in real time.)


In [None]:
# ==============================================================================
# I. SETUP: INSTALL NECESSARY LIBRARIES
# ==============================================================================
# This cell installs all the required Python packages for the pipeline.
# The `-q` flag is used for a quieter installation.
!pip install transformers torch spacy pandas -q
!pip install spacy-transformers -q
!python -m spacy download en_core_web_trf -q

# ==============================================================================
# II. IMPORT LIBRARIES AND LOAD MODELS
# ==============================================================================
import spacy
import torch
import pandas as pd
import re
from transformers import pipeline, AutoTokenizer, AutoModelForTokenClassification

# --- Hugging Face Model (dslim/bert-base-NER) ---
hf_tokenizer = AutoTokenizer.from_pretrained("dslim/bert-base-NER")
hf_model = AutoModelForTokenClassification.from_pretrained("dslim/bert-base-NER")
hf_ner_pipeline = pipeline("ner", model=hf_model, tokenizer=hf_tokenizer, aggregation_strategy="simple")

# --- spaCy Model (with Transformer Pipeline) ---
spacy_nlp = spacy.load("en_core_web_trf")

# ==============================================================================
# III. CORE FUNCTIONS OF THE REQUIREMENT EXTRACTION PIPELINE
# ==============================================================================

def preprocess_text(text):
    """Simple text preprocessing function to remove leading/trailing whitespace."""
    return text.strip()

def extract_agr_from_huggingface(sentence):
    """Extracts AGR from a sentence using Hugging Face NER heuristics."""
    ner_results = hf_ner_pipeline(sentence)
    actor, goal, rationale = {"text": None, "confidence": 0.0}, {"text": None, "confidence": 0.0}, {"text": None, "confidence": 0.0}
    for entity in ner_results:
        if entity['entity_group'] in ['PER', 'ORG']:
            actor["text"], actor["confidence"] = entity['word'], float(entity['score'])
            break
    goal["text"], goal["confidence"] = sentence, 0.75
    return {"Actor": actor, "Goal": goal, "Rationale": rationale}

def extract_agr_from_spacy(sentence):
    """Extracts AGR from a sentence using spaCy's transformer pipeline and grammatical parsing."""
    doc = spacy_nlp(sentence)
    actor, goal, rationale = {"text": None, "confidence": 0.0}, {"text": None, "confidence": 0.0}, {"text": None, "confidence": 0.0}
    for ent in doc.ents:
        if ent.label_ in ["PERSON", "ORG"]:
            actor["text"], actor["confidence"] = ent.text, 0.95
            break
    if not actor["text"]:
        for token in doc:
            if "nsubj" in token.dep_:
                actor["text"], actor["confidence"] = ' '.join([t.text for t in token.subtree]), 0.90
                break
    goal["text"], goal["confidence"] = sentence, 0.85
    return {"Actor": actor, "Goal": goal, "Rationale": rationale}

def apply_post_processing_rules(sentence):
    """Applies domain-specific rules to extract AGR from common requirement patterns."""
    # Pattern: "As a <actor>, I need to <goal> in order to <rationale>"
    pattern1 = re.compile(r"As an? (.*?), I need to (.*?)(?: in order to| to) (.*)", re.IGNORECASE)
    match1 = pattern1.match(sentence)
    if match1:
        return {
            "Actor": {"text": match1.group(1).strip(), "confidence": 1.0},
            "Goal": {"text": match1.group(2).strip(), "confidence": 1.0},
            "Rationale": {"text": match1.group(3).strip(), "confidence": 1.0}
        }
    # Pattern: "The <actor> shall <goal> so that <rationale>"
    pattern2 = re.compile(r"The (.*?) shall (.*?)(?: so that| to) (.*)", re.IGNORECASE)
    match2 = pattern2.match(sentence)
    if match2:
        return {
            "Actor": {"text": match2.group(1).strip(), "confidence": 1.0},
            "Goal": {"text": match2.group(2).strip(), "confidence": 1.0},
            "Rationale": {"text": match2.group(3).strip(), "confidence": 1.0}
        }
    return None

def merge_agr_triplets(hf_agr, spacy_agr, rule_agr):
    """Merges AGR triplets, giving highest priority to the rule-based extraction."""
    if rule_agr:
        print("INFO: A post-processing rule was successfully applied.")
        return rule_agr
    merged_agr = {}
    for slot in ["Actor", "Goal", "Rationale"]:
        if spacy_agr[slot]["text"]: merged_agr[slot] = spacy_agr[slot]
        elif hf_agr[slot]["text"]: merged_agr[slot] = hf_agr[slot]
        else: merged_agr[slot] = {"text": None, "confidence": 0.0}
    return merged_agr

def evaluate_completeness_and_confidence(merged_agr):
    """Calculates slot completeness and the weighted confidence score."""
    filled_slots = sum(1 for slot in merged_agr.values() if slot["text"])
    completeness = filled_slots / 3.0
    c_actor = merged_agr["Actor"]["confidence"] if merged_agr["Actor"]["text"] else 0
    c_goal = merged_agr["Goal"]["confidence"] if merged_agr["Goal"]["text"] else 0
    c_rationale = merged_agr["Rationale"]["confidence"] if merged_agr["Rationale"]["text"] else 0
    weighted_confidence = (c_actor + c_goal + c_rationale) / 3.0
    return completeness, weighted_confidence

# ==============================================================================
# IV. OUTPUT AND VISUALIZATION FUNCTIONS
# ==============================================================================

def display_results(final_agr, completeness, confidence):
    """Displays the final structured requirement in a pandas DataFrame table."""
    display_data = {
        'Actor': [final_agr["Actor"]["text"]],
        'Goal': [final_agr["Goal"]["text"]],
        'Rationale': [final_agr["Rationale"]["text"]],
        'Completeness': [f"{completeness:.2%}"],
        'Confidence': [f"{confidence:.2%}"]
    }
    df = pd.DataFrame(display_data)
    print("\n--- Structured Requirement ---")
    print(df.to_string(index=False))
    print(f"\nCompleteness Score: {completeness:.2%}")
    print(f"Confidence Score: {confidence:.2%}")

def print_comma_separated_agr(final_agr):
    """(NEW) Prints the extracted AGR components as a single comma-separated string."""
    actor_text = final_agr['Actor']['text'] or "None"
    goal_text = final_agr['Goal']['text'] or "None"
    rationale_text = final_agr['Rationale']['text'] or "None"
    agr_string = f"Actor: {actor_text}, Goal: {goal_text}, Rationale: {rationale_text}"
    print("\n--- Comma-Separated AGR Result ---")
    print(agr_string)

def print_semantic_graph(final_agr):
    """Prints a Cypher-like text representation of the semantic graph."""
    actor_node = f"({final_agr['Actor']['text'] or 'UnspecifiedActor'})"
    goal_node = f"({final_agr['Goal']['text'] or 'UnspecifiedGoal'})"
    cypher_string = f"{actor_node} -[:GOAL]-> {goal_node}"
    if final_agr['Rationale']['text']:
        rationale_node = f"({final_agr['Rationale']['text'] or 'UnspecifiedRationale'})"
        cypher_string += f" -[:RATIONALE]-> {rationale_node}"
    print("\n--- Semantic Graph (Cypher-like text) ---")
    print(cypher_string)

# ==============================================================================
# V. MAIN EXECUTION PIPELINE
# ==============================================================================

def main():
    """Main function to run the complete requirement extraction pipeline."""
    requirement_sentence = input("What is your requirement? ")
    processed_sentence = preprocess_text(requirement_sentence)

    # Run all extraction methods
    rule_based_agr = apply_post_processing_rules(processed_sentence)
    print("\nRunning NER Model 1 (Hugging Face)...")
    hf_agr = extract_agr_from_huggingface(processed_sentence)
    print("Running NER Model 2 (spaCy Transformer)...")
    spacy_agr = extract_agr_from_spacy(processed_sentence)

    # Merge and Evaluate
    print("Merging results...")
    final_agr = merge_agr_triplets(hf_agr, spacy_agr, rule_based_agr)
    completeness, confidence = evaluate_completeness_and_confidence(final_agr)

    # Display all outputs
    display_results(final_agr, completeness, confidence)
    print_comma_separated_agr(final_agr)
    print_semantic_graph(final_agr)

# --- Run the main program ---
if __name__ == "__main__":
    main()

[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m457.4/457.4 MB[0m [31m3.8 MB/s[0m eta [36m0:00:00[0m
[?25h[38;5;2m✔ Download and installation successful[0m
You can now load the package via spacy.load('en_core_web_trf')
[38;5;3m⚠ Restart to reload dependencies[0m
If you are in a Jupyter or Colab notebook, you may need to restart Python in
order to load all the package's dependencies. You can do this by selecting the
'Restart kernel' or 'Restart runtime' option.


Some weights of the model checkpoint at dslim/bert-base-NER were not used when initializing BertForTokenClassification: ['bert.pooler.dense.bias', 'bert.pooler.dense.weight']
- This IS expected if you are initializing BertForTokenClassification from the checkpoint of a model trained on another task or with another architecture (e.g. initializing a BertForSequenceClassification model from a BertForPreTraining model).
- This IS NOT expected if you are initializing BertForTokenClassification from the checkpoint of a model that you expect to be exactly identical (initializing a BertForSequenceClassification model from a BertForSequenceClassification model).
Device set to use cpu


What is your requirement? System must notify the user when login fails due to incorrect credentials.

Running NER Model 1 (Hugging Face)...
Running NER Model 2 (spaCy Transformer)...
Merging results...

--- Structured Requirement ---
 Actor                                                                       Goal Rationale Completeness Confidence
System System must notify the user when login fails due to incorrect credentials.      None       66.67%     58.33%

Completeness Score: 66.67%
Confidence Score: 58.33%

--- Comma-Separated AGR Result ---
Actor: System, Goal: System must notify the user when login fails due to incorrect credentials., Rationale: None

--- Semantic Graph (Cypher-like text) ---
(System) -[:GOAL]-> (System must notify the user when login fails due to incorrect credentials.)


In [None]:
if __name__ == "__main__":
    main()

What is your requirement? The system shall allow the customer to view their order history to track past purchases.

Running NER Model 1 (Hugging Face)...
Running NER Model 2 (spaCy Transformer)...
Merging results...
INFO: A post-processing rule was successfully applied.

--- Structured Requirement ---
 Actor               Goal                                         Rationale Completeness Confidence
system allow the customer view their order history to track past purchases.      100.00%    100.00%

Completeness Score: 100.00%
Confidence Score: 100.00%

--- Comma-Separated AGR Result ---
Actor: system, Goal: allow the customer, Rationale: view their order history to track past purchases.

--- Semantic Graph (Cypher-like text) ---
(system) -[:GOAL]-> (allow the customer) -[:RATIONALE]-> (view their order history to track past purchases.)


In [None]:
# ==============================================================================
# I. SETUP: INSTALL NECESSARY LIBRARIES
# ==============================================================================
!pip install transformers torch spacy pandas -q
!pip install spacy-transformers -q
!python -m spacy download en_core_web_trf -q

# ==============================================================================
# II. IMPORT LIBRARIES AND LOAD MODELS
# ==============================================================================
import spacy
import torch
import pandas as pd
import re
from transformers import pipeline, AutoTokenizer, AutoModelForTokenClassification

# --- Hugging Face Model (dslim/bert-base-NER) ---
hf_tokenizer = AutoTokenizer.from_pretrained("dslim/bert-base-NER")
hf_model = AutoModelForTokenClassification.from_pretrained("dslim/bert-base-NER")
hf_ner_pipeline = pipeline("ner", model=hf_model, tokenizer=hf_tokenizer, aggregation_strategy="simple")

# --- spaCy Model (with Transformer Pipeline) ---
spacy_nlp = spacy.load("en_core_web_trf")

# ==============================================================================
# III. CORE FUNCTIONS OF THE REQUIREMENT EXTRACTION PIPELINE
# ==============================================================================

def preprocess_text(text):
    """Simple text preprocessing function."""
    return text.strip()

def apply_post_processing_rules(sentence):
    """
    Applies domain-specific rules to extract AGR from common requirement patterns.
    Returns an AGR dictionary if a pattern matches, otherwise returns None.
    """
    # Pattern 1: "As a <actor>, I need to <goal> in order to <rationale>"
    pattern1 = re.compile(r"As an? (.*?), I need to (.*?)(?: in order to| to) (.*)", re.IGNORECASE)
    match1 = pattern1.match(sentence)
    if match1:
        return {
            "Actor": {"text": match1.group(1).strip(), "confidence": 1.0},
            "Goal": {"text": match1.group(2).strip(), "confidence": 1.0},
            "Rationale": {"text": match1.group(3).strip(), "confidence": 1.0}
        }

    # Pattern 2: "The <actor> shall <goal> so that <rationale>"
    pattern2 = re.compile(r"The (.*?) shall (.*?)(?: so that| to) (.*)", re.IGNORECASE)
    match2 = pattern2.match(sentence)
    if match2:
        return {
            "Actor": {"text": match2.group(1).strip(), "confidence": 1.0},
            "Goal": {"text": match2.group(2).strip(), "confidence": 1.0},
            "Rationale": {"text": match2.group(3).strip(), "confidence": 1.0}
        }

    # **(NEW & IMPROVED) Pattern 3: Catches modals like "should/will" and constraints like "without"**
    pattern3 = re.compile(r"The (.*?) (?:should|shall|must|will) (.*?)( without .*)", re.IGNORECASE)
    match3 = pattern3.match(sentence)
    if match3:
        return {
            "Actor": {"text": match3.group(1).strip(), "confidence": 1.0},
            "Goal": {"text": match3.group(2).strip(), "confidence": 1.0},
            "Rationale": {"text": match3.group(3).strip(), "confidence": 1.0} # "without..." is the rationale/constraint
        }

    return None # No pattern matched

def extract_agr_with_heuristics(sentence):
    """
    A unified heuristic extractor using spaCy for robust parsing. This is the fallback.
    IMPROVED: Recognizes 'without' as a rationale keyword.
    """
    doc = spacy_nlp(sentence)
    actor, goal, rationale = {"text": None, "confidence": 0.0}, {"text": None, "confidence": 0.0}, {"text": None, "confidence": 0.0}

    # 1. Actor Extraction (NER with grammatical subject fallback)
    for ent in doc.ents:
        if ent.label_ in ["PERSON", "ORG"]:
            actor["text"], actor["confidence"] = ent.text, 0.95
            break
    if not actor["text"]:
        for token in doc:
            if "nsubj" in token.dep_:
                actor["text"], actor["confidence"] = ' '.join([t.text for t in token.subtree]), 0.90
                break

    # 2. Goal & Rationale Extraction using an expanded keyword list
    rationale_keywords = ['so that', 'in order to', 'to', 'without'] # Added 'without'
    text_to_split = sentence

    for keyword in rationale_keywords:
        # Use regex to find the keyword as a whole word
        match = re.search(r'\s' + re.escape(keyword) + r'\s', text_to_split, re.IGNORECASE)
        if match:
            split_point = match.start()
            goal["text"] = text_to_split[:split_point].strip()
            rationale["text"] = text_to_split[split_point:].strip()
            goal["confidence"], rationale["confidence"] = 0.85, 0.85
            # Refine goal to remove actor
            if actor["text"] and goal["text"] and actor["text"] in goal["text"]:
                goal["text"] = goal["text"].replace(actor["text"], "").strip()
            return {"Actor": actor, "Goal": goal, "Rationale": rationale}

    # Fallback if no rationale keyword is found
    goal["text"] = sentence
    if actor["text"] and actor["text"] in goal["text"]:
        goal["text"] = goal["text"].replace(actor["text"], "").strip()
    goal["confidence"] = 0.80

    return {"Actor": actor, "Goal": goal, "Rationale": rationale}


def merge_agr_triplets(heuristic_agr, rule_agr):
    """Merges AGR triplets, giving absolute priority to the rule-based extraction."""
    if rule_agr:
        print("INFO: A post-processing rule was successfully applied.")
        return rule_agr
    print("INFO: No specific rule matched. Using heuristic-based extraction.")
    return heuristic_agr

def evaluate_completeness_and_confidence(merged_agr):
    """Calculates slot completeness and the weighted confidence score."""
    filled_slots = sum(1 for slot in merged_agr.values() if slot["text"])
    completeness = filled_slots / 3.0
    c_actor = merged_agr["Actor"]["confidence"] if merged_agr["Actor"]["text"] else 0
    c_goal = merged_agr["Goal"]["confidence"] if merged_agr["Goal"]["text"] else 0
    c_rationale = merged_agr["Rationale"]["confidence"] if merged_agr["Rationale"]["text"] else 0
    weighted_confidence = (c_actor + c_goal + c_rationale) / 3.0
    return completeness, weighted_confidence

# ==============================================================================
# IV. OUTPUT AND VISUALIZATION FUNCTIONS
# ==============================================================================

def display_results(final_agr, completeness, confidence):
    """Displays the final structured requirement in a pandas DataFrame table."""
    display_data = {
        'Actor': [final_agr["Actor"]["text"]],
        'Goal': [final_agr["Goal"]["text"]],
        'Rationale': [final_agr["Rationale"]["text"]],
        'Completeness': [f"{completeness:.2%}"],
        'Confidence': [f"{confidence:.2%}"]
    }
    df = pd.DataFrame(display_data)
    print("\n--- Structured Requirement ---")
    print(df.to_string(index=False))
    print(f"\nCompleteness Score: {completeness:.2%}")
    print(f"Confidence Score: {confidence:.2%}")

def print_comma_separated_agr(final_agr):
    """Prints the extracted AGR components as a single comma-separated string."""
    actor_text = final_agr['Actor']['text'] or "None"
    goal_text = final_agr['Goal']['text'] or "None"
    rationale_text = final_agr['Rationale']['text'] or "None"
    agr_string = f"Actor: {actor_text}, Goal: {goal_text}, Rationale: {rationale_text}"
    print("\n--- Comma-Separated AGR Result ---")
    print(agr_string)

def print_semantic_graph(final_agr):
    """Prints a Cypher-like text representation of the semantic graph."""
    actor_node = f"({final_agr['Actor']['text'] or 'UnspecifiedActor'})"
    goal_node = f"({final_agr['Goal']['text'] or 'UnspecifiedGoal'})"
    cypher_string = f"{actor_node} -[:GOAL]-> {goal_node}"
    if final_agr['Rationale']['text']:
        rationale_node = f"({final_agr['Rationale']['text'] or 'UnspecifiedRationale'})"
        cypher_string += f" -[:RATIONALE]-> {rationale_node}"
    print("\n--- Semantic Graph (Cypher-like text) ---")
    print(cypher_string)

# ==============================================================================
# V. MAIN EXECUTION PIPELINE
# ==============================================================================

def main():
    """Main function to run the complete requirement extraction pipeline."""
    requirement_sentence = input("What is your requirement? ")
    processed_sentence = preprocess_text(requirement_sentence)

    # 1. Try rule-based extraction first
    rule_based_agr = apply_post_processing_rules(processed_sentence)

    # 2. Run the heuristic model as a fallback
    # Note: We've simplified to one robust heuristic model to avoid conflicting simple models
    heuristic_agr = extract_agr_with_heuristics(processed_sentence)

    # 3. Merge and Evaluate (giving priority to rules)
    print("\nMerging results...")
    final_agr = merge_agr_triplets(heuristic_agr, rule_based_agr)
    completeness, confidence = evaluate_completeness_and_confidence(final_agr)

    # 4. Display all outputs
    display_results(final_agr, completeness, confidence)
    print_comma_separated_agr(final_agr)
    print_semantic_graph(final_agr)

# --- Run the main program ---
if __name__ == "__main__":
    main()

[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m44.0/44.0 kB[0m [31m1.6 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m795.8/795.8 kB[0m [31m13.5 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m313.4/313.4 kB[0m [31m22.4 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m10.0/10.0 MB[0m [31m73.0 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m3.1/3.1 MB[0m [31m59.2 MB/s[0m eta [36m0:00:00[0m
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m457.4/457.4 MB[0m [31m3.9 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m237.9/237.9 kB[0m [31m4.8 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m734.0/734.0 kB[0m [31m20.2 MB/s[0m eta [36m0:00:00[0m
[?25h[38;5;2m✔ Download and installati

The secret `HF_TOKEN` does not exist in your Colab secrets.
To authenticate with the Hugging Face Hub, create a token in your settings tab (https://huggingface.co/settings/tokens), set it as secret in your Google Colab and restart your session.
You will be able to reuse this secret in all of your notebooks.
Please note that authentication is recommended but still optional to access public models or datasets.


tokenizer_config.json:   0%|          | 0.00/59.0 [00:00<?, ?B/s]

config.json:   0%|          | 0.00/829 [00:00<?, ?B/s]

vocab.txt: 0.00B [00:00, ?B/s]

added_tokens.json:   0%|          | 0.00/2.00 [00:00<?, ?B/s]

special_tokens_map.json:   0%|          | 0.00/112 [00:00<?, ?B/s]

model.safetensors:   0%|          | 0.00/433M [00:00<?, ?B/s]

Some weights of the model checkpoint at dslim/bert-base-NER were not used when initializing BertForTokenClassification: ['bert.pooler.dense.bias', 'bert.pooler.dense.weight']
- This IS expected if you are initializing BertForTokenClassification from the checkpoint of a model trained on another task or with another architecture (e.g. initializing a BertForSequenceClassification model from a BertForPreTraining model).
- This IS NOT expected if you are initializing BertForTokenClassification from the checkpoint of a model that you expect to be exactly identical (initializing a BertForSequenceClassification model from a BertForSequenceClassification model).
Device set to use cpu


What is your requirement? It should prevent unauthorized access to sensitive configuration files.

Merging results...
INFO: No specific rule matched. Using heuristic-based extraction.

--- Structured Requirement ---
Actor                               Goal                         Rationale Completeness Confidence
   It should prevent unauthorized access to sensitive configuration files.      100.00%     86.67%

Completeness Score: 100.00%
Confidence Score: 86.67%

--- Comma-Separated AGR Result ---
Actor: It, Goal: should prevent unauthorized access, Rationale: to sensitive configuration files.

--- Semantic Graph (Cypher-like text) ---
(It) -[:GOAL]-> (should prevent unauthorized access) -[:RATIONALE]-> (to sensitive configuration files.)


In [None]:
if __name__ == "__main__":
    main()

What is your requirement? I want to receive a daily report of activity logs sent to my email.

Merging results...
INFO: No specific rule matched. Using heuristic-based extraction.

--- Structured Requirement ---
Actor Goal                                                    Rationale Completeness Confidence
    I want to receive a daily report of activity logs sent to my email.      100.00%     86.67%

Completeness Score: 100.00%
Confidence Score: 86.67%

--- Comma-Separated AGR Result ---
Actor: I, Goal: want, Rationale: to receive a daily report of activity logs sent to my email.

--- Semantic Graph (Cypher-like text) ---
(I) -[:GOAL]-> (want) -[:RATIONALE]-> (to receive a daily report of activity logs sent to my email.)


In [None]:
# ==============================================================================
# I. SETUP: INSTALL NECESSARY LIBRARIES
# ==============================================================================
# This cell installs all required packages with quiet output for a clean notebook.
!pip install transformers torch spacy pandas shap -q
!pip install spacy-transformers -q
!python -m spacy download en_core_web_trf -q

# ==============================================================================
# II. IMPORT LIBRARIES AND INITIALIZE MODELS
# ==============================================================================
import spacy
import pandas as pd
import re
from transformers import pipeline

# --- Step 2 Models: AGR Extraction ---
# Initialize the spaCy model for advanced dependency parsing.
spacy_nlp = spacy.load("en_core_web_trf")

# --- Step 4 Model: Zero-Shot Classification ---
# Initialize the powerful Zero-Shot classifier for high-accuracy classification.
zsc_classifier = pipeline("zero-shot-classification", model="facebook/bart-large-mnli")

# ==============================================================================
# III. STEP 2: ADVANCED REQUIREMENT EXTRACTION (AGR MODEL)
# ==============================================================================

class AGRExtractor:
    """
    A sophisticated extractor that uses rules and syntactic dependency parsing
    to achieve high accuracy in identifying Actor, Goal, and Rationale.
    """
    def __init__(self, nlp_model):
        self.nlp = nlp_model

    def _apply_rules(self, sentence):
        """Applies high-precision regex rules for common patterns."""
        # Pattern 1: User Story format "As a..."
        pattern1 = re.compile(r"As an? (.*?), I (?:want to|need to|can) (.*?)(?: so that | to | in order to )(.*)", re.IGNORECASE)
        match1 = pattern1.match(sentence)
        if match1:
            return {
                "Actor": {"text": match1.group(1).strip(), "confidence": 1.0},
                "Goal": {"text": match1.group(2).strip(), "confidence": 1.0},
                "Rationale": {"text": match1.group(3).strip(), "confidence": 1.0},
                "method": "Rule-Based (User Story)"
            }
        # Pattern 2: System-level format "The system shall..."
        pattern2 = re.compile(r"The (.*?) (?:shall|should|must|will) (.*?)(?: to | without | so that )(.*)", re.IGNORECASE)
        match2 = pattern2.match(sentence)
        if match2:
            return {
                "Actor": {"text": match2.group(1).strip(), "confidence": 1.0},
                "Goal": {"text": match2.group(2).strip(), "confidence": 1.0},
                "Rationale": {"text": match2.group(3).strip(), "confidence": 1.0},
                "method": "Rule-Based (System Req)"
            }
        return None

    def _extract_with_dependency_parsing(self, sentence):
        """Fallback method using grammatical structure for high-correctness extraction."""
        doc = self.nlp(sentence)
        actor, goal, rationale = None, None, None

        root_verb = next((token for token in doc if token.dep_ == "ROOT"), None)
        if not root_verb: return None

        # 1. Extract Actor: Find the nominal subject (nsubj) of the root verb
        subjects = [child for child in root_verb.children if child.dep_ == "nsubj"]
        if subjects:
            actor = ' '.join(t.text for t in subjects[0].subtree)

        # 2. Extract Rationale: Look for purpose clauses (advcl) or prepositions
        rationale_clause = None
        for token in doc:
            if token.dep_ == 'advcl' and token.head in list(root_verb.children) + [root_verb]:
                rationale_clause = ' '.join(t.text for t in token.subtree); break
            if token.dep_ == 'prep' and token.head == root_verb and token.text.lower() in ['for', 'without', 'to']:
                rationale_clause = ' '.join(t.text for t in token.subtree); break

        if rationale_clause: rationale = rationale_clause

        # 3. Extract Goal: The main verb phrase, excluding the rationale
        goal_end_index = sentence.find(rationale) if rationale else len(sentence)
        full_goal_phrase = sentence[:goal_end_index]
        if actor and full_goal_phrase.lower().startswith(actor.lower()):
            goal = full_goal_phrase[len(actor):].strip()
        else:
            goal = full_goal_phrase

        return {
            "Actor": {"text": actor, "confidence": 0.95 if actor else 0.0},
            "Goal": {"text": goal, "confidence": 0.90 if goal else 0.0},
            "Rationale": {"text": rationale, "confidence": 0.90 if rationale else 0.0},
            "method": "Dependency Parsing"
        }

    def extract(self, sentence):
        """Main extraction method that intelligently chooses the best approach."""
        rule_result = self._apply_rules(sentence)
        if rule_result: return rule_result
        return self._extract_with_dependency_parsing(sentence)

def calculate_metrics(final_agr):
    """Calculates completeness, confidence, and sets a manual review flag."""
    filled_slots = sum(1 for slot in ["Actor", "Goal", "Rationale"] if final_agr[slot].get("text"))
    completeness = filled_slots / 3.0

    weights = {"Actor": 1.0, "Goal": 1.0, "Rationale": 1.0}
    weighted_sum = sum(weights[slot] * final_agr[slot]["confidence"] for slot in weights)
    weighted_confidence = weighted_sum / sum(weights.values())

    manual_review = completeness < 1.0 or weighted_confidence < 0.90

    return {
        "Completeness": f"{completeness:.2%}",
        "Confidence": f"{weighted_confidence:.2%}",
        "Manual Review": manual_review
    }

# ==============================================================================
# IV. STEP 3 & 4: SEMANTIC GRAPH AND CLASSIFICATION
# ==============================================================================

def print_semantic_graph(final_agr):
    """Prints the semantic graph in Cypher-like text format."""
    actor_node = f"({final_agr['Actor']['text'] or 'UnspecifiedActor'})"
    goal_node = f"({final_agr['Goal']['text'] or 'UnspecifiedGoal'})"
    cypher_string = f"{actor_node} -[:PERFORMS_GOAL]-> {goal_node}"
    if final_agr['Rationale']['text']:
        rationale_node = f"({final_agr['Rationale']['text'] or 'UnspecifiedRationale'})"
        cypher_string += f" -[:JUSTIFIES]-> {rationale_node}"
    print(cypher_string)

def classify_requirement_zero_shot(sentence):
    """Performs multi-aspect classification using a Zero-Shot model."""
    type_labels = ["Functional requirement", "Non-Functional requirement"]
    type_result = zsc_classifier(sentence, candidate_labels=type_labels)
    aspect1_label = type_result['labels'][0]
    aspect1_confidence = type_result['scores'][0]

    aspect2_label, aspect2_confidence = "N/A", 0.0
    if "Non-Functional" in aspect1_label:
        quality_labels = ["Usability", "Performance", "Security", "Reliability", "Maintainability"]
        quality_result = zsc_classifier(sentence, candidate_labels=quality_labels)
        aspect2_label, aspect2_confidence = quality_result['labels'][0], quality_result['scores'][0]

    manual_review = aspect1_confidence < 0.80 or ("Non-Functional" in aspect1_label and aspect2_confidence < 0.60)
    return {
        "Predicted Type": aspect1_label, "Predicted Quality": aspect2_label,
        "Confidence": f"{max(aspect1_confidence, aspect2_confidence):.2%}", "Manual Review": manual_review
    }

def generate_attribution_map_simulation(sentence):
    """Simulates SHAP/LIME by highlighting keywords for explainability."""
    # This simulation remains rule-based for clear textual output
    keywords = ['fast', 'performance', 'speed', 'secure', 'encrypt', 'password', 'user-friendly', 'easy', 'intuitive', 'robust', 'availability']
    highlighted_text = sentence
    found = False
    for kw in keywords:
        if re.search(r'\b' + kw + r'\b', sentence, re.IGNORECASE):
            found = True
            highlighted_text = re.sub(f"({kw})", r"**\1**", highlighted_text, flags=re.IGNORECASE)
    if not found: return "Attribution: Classification based on overall sentence semantics."
    return f"Attribution Simulation: Classification likely driven by keywords: {highlighted_text}"

# ==============================================================================
# V. STEP 5: OUTPUT SUMMARY & MAIN EXECUTION
# ==============================================================================

def main():
    """
    Main function to run the complete end-to-end, fine-tuned pipeline.
    This function will execute exactly once.
    """
    try:
        # --- Step 1: User Input ---
        requirement_sentence = input("What is your requirement? ")

        # --- Step 2: Requirement Extraction ---
        print("\nINFO: Analyzing requirement to extract Actor, Goal, and Rationale...")
        extractor = AGRExtractor(spacy_nlp)
        final_agr = extractor.extract(requirement_sentence)
        metrics = calculate_metrics(final_agr)

        # --- Step 4: Requirement Classification ---
        print("INFO: Classifying requirement type and quality attributes...")
        classification_results = classify_requirement_zero_shot(requirement_sentence)
        attribution_map = generate_attribution_map_simulation(requirement_sentence)

        # --- Step 5: Output Summary ---
        print("\n" + "="*60)
        print("      Automated Requirement Processing Results (Fine-Tuned)")
        print("="*60 + "\n")

        # Section 1: Structured Requirement
        print("--- 1. Structured Requirement (AGR) ---")
        print(f"(Extraction Method: {final_agr.get('method', 'N/A')})")
        agr_data = {
            'Actor': [final_agr['Actor']['text']], 'Goal': [final_agr['Goal']['text']],
            'Rationale': [final_agr['Rationale']['text']], 'Completeness': [metrics['Completeness']],
            'Confidence': [metrics['Confidence']], 'Manual Review': [metrics['Manual Review']]
        }
        print(pd.DataFrame(agr_data).to_string(index=False))

        # Section 2: Semantic Graph
        print("\n\n--- 2. Semantic Graph (Text Format) ---")
        print_semantic_graph(final_agr)

        # Section 3: Classification Results
        print("\n\n--- 3. Classification Results ---")
        class_data = {
            'Predicted Type': [classification_results['Predicted Type']],
            'Predicted Quality': [classification_results['Predicted Quality']],
            'Confidence': [classification_results['Confidence']],
            'Manual Review': [classification_results['Manual Review']]
        }
        print(pd.DataFrame(class_data).to_string(index=False))
        print(f"\n{attribution_map}")

    except Exception as e:
        print(f"\nAn error occurred: {e}")
    finally:
        # This message will always print at the end of the execution.
        print("\n" + "="*60)
        print("                Pipeline Execution Finished.")
        print("="*60)

# --- This block ensures main() is called only once when the script is run ---
if __name__ == "__main__":
    main()