# Domain Document Partitioning - Coverage Index Calcualtion

In [None]:
!pip install python-docx pdfminer.six beautifulsoup4 nltk ipywidgets --quiet


[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m244.3/244.3 kB[0m [31m4.9 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m5.6/5.6 MB[0m [31m20.7 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m1.6/1.6 MB[0m [31m30.3 MB/s[0m eta [36m0:00:00[0m
[?25h

In [None]:
# Import libraries
import os
import re
import requests
from bs4 import BeautifulSoup
from docx import Document
from pdfminer.high_level import extract_text
from io import BytesIO
from nltk.stem.porter import PorterStemmer

In [None]:
# Initialize the Porter Stemmer
stemmer = PorterStemmer()

# Create a set with all the words in english language
english_words = set()
with open("words_alpha.txt") as f:
    for line in f:
        english_words.add(line.strip().lower())

# Function to clean and filter text
def clean_text(text):
    # Split into tokens using regex
    tokens = re.findall(r'\b[a-zA-Z]+\b', text)

    # Filter proper nouns (capitalized words that are not at the start of a sentence)
    filtered_tokens = []
    for i, token in enumerate(tokens):
        if i > 0 and token[0].isupper() and not tokens[i - 1].endswith(('.','?','!')):  # Likely a proper noun
            continue  # Skip proper noun
        elif token.lower() in english_words:  # Keep valid English words
            filtered_tokens.append(token.lower())
        else:
            continue  # Keep other words
        # Stem each token
    stemmed_tokens = [stemmer.stem(token) for token in filtered_tokens]
    return stemmed_tokens

# Function to extract text from a file (DOCX or PDF)
def extract_text_from_file(file_path):
    extension = file_path.split('.')[-1].lower()
    with open(file_path, 'rb') as f:
        file_content = f.read()

    if extension in ['docx', 'doc']:
        document = Document(BytesIO(file_content))
        text = "\n".join([para.text for para in document.paragraphs])
    elif extension == 'pdf':
        text = extract_text(BytesIO(file_content))
    else:
        text = ""
    return " ".join(clean_text(text))

# Function to scrape text from a URL
def scrape_text_from_url(url):
    try:
        response = requests.get(url)
        soup = BeautifulSoup(response.text, 'html.parser')
        paragraphs = soup.find_all('p')
        text = "\n".join([para.get_text() for para in paragraphs])
        return text
    except Exception as e:
        print(f"Error fetching {url}: {str(e)}")
        return ""

# Function to calculate the coverage index
def calculate_coverage(requirement_text, domain_texts):
    req_terms = set(requirement_text.split())
    domain_terms = set()
    for domain_text in domain_texts:
        domain_terms.update(domain_text.split())

    uncovered_words = req_terms - domain_terms
    coverage = len(req_terms.intersection(domain_terms)) / len(req_terms) if req_terms else 0
    return coverage, uncovered_words

In [None]:
# Input requirement document
req_file_path = ""
try:
    req_content = extract_text_from_file(req_file_path)
    print(f"Requirement document '{req_file_path}' loaded successfully.")
except Exception as e:
    print(f"Error reading requirement document: {e}")

# Input domain files folder
domain_folder = ""
domain_texts_from_files = []

if os.path.isdir(domain_folder):
    print("\nLoading domain texts from files...")
    for file_name in os.listdir(domain_folder):
        file_path = os.path.join(domain_folder, file_name)
        if file_name.lower().endswith(('.docx', '.pdf')):
            try:
                text = extract_text_from_file(file_path)
                domain_texts_from_files.append(text)
                print(f"- Successfully loaded text from: {file_name}")
            except Exception as e:
                print(f"- Failed to read file '{file_name}': {e}")
else:
    print(f"\nError: The folder '{domain_folder}' does not exist or is not accessible.")

# Input domain URLs
domain_urls = [
    "https://example.com/page1",
    "https://example.com/page2"
]

# Scrape domain texts from URLs
domain_texts_from_urls = [scrape_text_from_url(url) for url in domain_urls]

# Combine all domain texts
domain_texts = domain_texts_from_urls + domain_texts_from_files

# Calculate coverage index and uncovered words
if req_content and domain_texts:
    coverage, uncovered_words = calculate_coverage(req_content, domain_texts)
    print(f"\nCoverage Index: {coverage:.2f}")
    print("\nWords not covered:")
    for word in uncovered_words:
        print(f"- {word}")
else:
    print("\nError: No valid requirement text or domain texts found for coverage calculation.")


# Knowledge Base Creation

In [None]:
!pip install pinecone[grpc] PyPDF2 openai --quiet

[?25l   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m0.0/232.6 kB[0m [31m?[0m eta [36m-:--:--[0m[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m232.6/232.6 kB[0m [31m15.1 MB/s[0m eta [36m0:00:00[0m
[?25h[?25l   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m0.0/1.3 MB[0m [31m?[0m eta [36m-:--:--[0m[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m1.3/1.3 MB[0m [31m47.4 MB/s[0m eta [36m0:00:00[0m
[?25h[?25l   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m0.0/319.7 kB[0m [31m?[0m eta [36m-:--:--[0m[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m319.7/319.7 kB[0m [31m22.1 MB/s[0m eta [36m0:00:00[0m
[?25h[?25l   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m0.0/421.4 kB[0m [31m?[0m eta [36m-:--:--[0m[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m421.4/421.4 kB[0m [31m26.1 MB/s[0m eta [36m0:00:00[0m
[?25h

In [None]:
# Import libraries
import openai
import pinecone
import numpy as np
import os
from sklearn.metrics.pairwise import cosine_similarity
from pinecone import ServerlessSpec
from pinecone.grpc import PineconeGRPC as Pinecone
import PyPDF2

# Set API keys
OPENAI_API_KEY = ""
PINECONE_API_KEY = ""
openai.api_key = OPENAI_API_KEY

# Initialize Pinecone client
pc = Pinecone(api_key=PINECONE_API_KEY)

# File paths (placeholders)
REQUIREMENT_DOC_PATH = ""
DOMAIN_DOC_FOLDERS = ["dd_1", "dd_2", "dd_3"]

# Embedding model name
EMBEDDING_MODEL = "text-embedding-ada-002"

# Maximum chunk size to avoid exceeding model limits
MAX_CHUNK_SIZE = 1000  # Adjust based on model token limits

from openai import OpenAI

# Initialize the OpenAI client
client = OpenAI(api_key=OPENAI_API_KEY)

In [None]:
# Read PDF function
def read_document(file_path):
    """Reads text from a PDF file."""
    try:
        with open(file_path, "rb") as file:
            reader = PyPDF2.PdfReader(file)
            text = ""
            for page_num, page in enumerate(reader.pages):
                page_text = page.extract_text()
                if page_text:
                    text += page_text
                else:
                    print(f"Warning: No text extracted from page {page_num + 1}")
            if not text:
                print("Warning: No text extracted from the PDF.")
            return text
    except Exception as e:
        print(f"Error reading PDF file: {e}")
        return ""


def read_all_documents_in_folder(folder_path):
    """Reads and concatenates text from all documents in a folder."""
    all_text = ""
    for file_name in os.listdir(folder_path):
        file_path = os.path.join(folder_path, file_name)
        if os.path.isfile(file_path):
            all_text += read_document(file_path) + "\n"
    return all_text


def chunk_text(text, max_chunk_size=MAX_CHUNK_SIZE):
    """Splits text into smaller chunks by character count to fit within model constraints."""
    return [text[i:i + max_chunk_size] for i in range(0, len(text), max_chunk_size)]


def get_embedding(text):
    """Gets the embedding for a given text using OpenAI's API."""
    response = openai.embeddings.create(input=[text], model=EMBEDDING_MODEL)
    return np.array(response.data[0].embedding, dtype=np.float32)

def create_pinecone_index(index_name, embeddings, chunks, namespace):
    """Creates a Pinecone index and upserts embeddings along with the text."""
    batch_size = 100
    dimension = embeddings.shape[1]

    # Interact with the Pinecone index
    index = pc.Index('requirement-index')

    # Prepare data to upsert (embedding + text as metadata)
    upsert_data = [(str(i), embedding.tolist(), {"text": chunks[i]}) for i, embedding in enumerate(embeddings)]

    # Upsert the data in batches
    for i in range(0, len(upsert_data), batch_size):
        batch = upsert_data[i:i + batch_size]
        index.upsert(vectors=batch, namespace=namespace)
        print(f"Upserted batch {i // batch_size + 1} of {len(upsert_data) // batch_size + 1} batches.")

    return index

def build_knowledge_base(text, index_name, namespace):
    """Builds a Pinecone vector database for a given document text."""
    chunks = chunk_text(text)
    embeddings = np.array([get_embedding(chunk) for chunk in chunks])
    index = create_pinecone_index(index_name, embeddings, chunks, namespace)
    return index, chunks

## Build Requirement Knowledge Base

In [None]:
# Build requirement knowledge base
req_text = read_document(REQUIREMENT_DOC_PATH)

In [None]:
req_index_name = "requirement-index"
req_index, req_chunks = build_knowledge_base(req_text, req_index_name, 'kb_r')

Upserted batch 1 of 1 batches.


In [None]:
index_list = pc.list_indexes()

print(index_list)

[{
    "name": "requirement-index",
    "dimension": 1536,
    "metric": "cosine",
    "host": "requirement-index-41400d9.svc.aped-4627-b74a.pinecone.io",
    "spec": {
        "serverless": {
            "cloud": "aws",
            "region": "us-east-1"
        }
    },
    "status": {
        "ready": true,
        "state": "Ready"
    },
    "deletion_protection": "disabled"
}]


## Build Domain Knowledge Bases

In [None]:
# Build domain knowledge bases
domain_indices = []
domain_chunks_list = []
for i, folder in enumerate(DOMAIN_DOC_FOLDERS):
    print(folder)
    domain_text = read_all_documents_in_folder(folder)
    domain_index_name = f"domain-index-{i}"
    index, chunks = build_knowledge_base(domain_text, 'requirement-index', domain_index_name)
    domain_indices.append(index)
    domain_chunks_list.append(chunks)

dd_1
Upserted batch 1 of 16 batches.
Upserted batch 2 of 16 batches.
Upserted batch 3 of 16 batches.
Upserted batch 4 of 16 batches.
Upserted batch 5 of 16 batches.
Upserted batch 6 of 16 batches.
Upserted batch 7 of 16 batches.
Upserted batch 8 of 16 batches.
Upserted batch 9 of 16 batches.
Upserted batch 10 of 16 batches.
Upserted batch 11 of 16 batches.
Upserted batch 12 of 16 batches.
Upserted batch 13 of 16 batches.
Upserted batch 14 of 16 batches.
Upserted batch 15 of 16 batches.
Upserted batch 16 of 16 batches.
dd_2
Upserted batch 1 of 11 batches.
Upserted batch 2 of 11 batches.
Upserted batch 3 of 11 batches.
Upserted batch 4 of 11 batches.
Upserted batch 5 of 11 batches.
Upserted batch 6 of 11 batches.
Upserted batch 7 of 11 batches.
Upserted batch 8 of 11 batches.
Upserted batch 9 of 11 batches.
Upserted batch 10 of 11 batches.
Upserted batch 11 of 11 batches.
dd_3
Upserted batch 1 of 14 batches.
Upserted batch 2 of 14 batches.
Upserted batch 3 of 14 batches.
Upserted batch 4

# Generate Elucidation Questions

In [None]:
import pandas as pd
import json
from openai import OpenAI

# Initialize the OpenAI client
client = OpenAI(api_key=OPENAI_API_KEY)

import json

def generate_elucidation_questions(requirement_text, comment_text):
    """Generates structured elucidation questions using JSON format for easier parsing."""

    # System message to enforce strict response format and task adherence
    system_prompt = """
    The assistant strictly adheres to the user's instructions and tasks. The tasks given by the user will be challenging, so the assistant should pay close attention while solving the provided complex tasks. The assistant's response is always a JSON object and does not include any additional details.
    """

    # User message providing input requirement and context
    user_prompt = f"""
    **Task:** Identify ambiguous terms within the given requirement statement. Generate elucidation questions to clarify these ambiguities and make the requirement more precise.

    **Start of examples:**

    **Example 1**
    **Given requirement statement:** "The system shall process user requests quickly."
    **Given additional context:** "Users should not experience noticeable delays."
    **Expected Output (JSON):**
    {{
        "Elucidations": [
            {{
                "Ambiguous Term": "quickly",
                "Elucidation Question": "What specific response time is expected for processing user requests?"
            }},
            {{
                "Ambiguous Term": "user requests",
                "Elucidation Question": "What types of user requests need to be processed?"
            }}
        ]
    }}

    ---

    **Example 2**
    **Given requirement statement:** "The application shall support high-resolution images."
    **Given additional context:** "Users will upload various image formats."
    **Expected Output (JSON):**
    {{
        "Elucidations": [
            {{
                "Ambiguous Term": "high-resolution",
                "Elucidation Question": "What minimum resolution (in pixels) qualifies as 'high-resolution'?"
            }},
            {{
                "Ambiguous Term": "various image formats",
                "Elucidation Question": "Which specific image formats should be supported?"
            }}
        ]
    }}

    ---

    **Example 3**
    **Given requirement statement:** "The system shall restrict access to sensitive data."
    **Given additional context:** "Only authorized users should view confidential information."
    **Expected Output (JSON):**
    {{
        "Elucidations": [
            {{
                "Ambiguous Term": "restrict access",
                "Elucidation Question": "What specific mechanisms will be used to enforce access restrictions?"
            }},
            {{
                "Ambiguous Term": "sensitive data",
                "Elucidation Question": "Which categories of data are considered sensitive?"
            }},
            {{
                "Ambiguous Term": "authorized users",
                "Elucidation Question": "What criteria determine if a user is authorized?"
            }}
        ]
    }}

    ---

    **End of examples.**

    Now provide your analysis in JSON with keys `"Elucidations"`, `"Ambiguous Term"`, and `"Elucidation Question"` for the following given requirement statement and context.

    **Given requirement statement:** "{requirement_text}"

    **Given additional context:** "{comment_text}"
    """

    try:
        # Call the model
        response = client.chat.completions.create(
            messages=[
                {"role": "system", "content": system_prompt},
                {"role": "user", "content": user_prompt}
            ],
            model="gpt-3.5-turbo"
        )

        # Extract and parse the response as JSON
        elucidation_text = response.choices[0].message.content.strip()

        try:
            elucidation_data = json.loads(elucidation_text)
            return elucidation_data.get("Elucidations", [])  # Return list of elucidations
        except json.JSONDecodeError:
            print(f"Warning: Could not parse JSON for requirement: {requirement_text}")
            return []  # Return empty list if JSON parsing fails

    except Exception as e:
        print(f"Error processing requirement: {requirement_text}\n{e}")
        return []


def process_requirements(input_excel_path, output_excel_path):
    """Process the Excel file, generate structured elucidation questions, and save the result."""
    # Read the Excel file
    df = pd.read_excel(input_excel_path)

    elucidations = []

    for index, row in df.iterrows():
        requirement_text = row['Requirement']
        comment_text = row.get('Comment', '')  # Get comment, default to empty string if missing

        # Generate structured elucidation questions
        elucidation_pairs = generate_elucidation_questions(requirement_text, comment_text)

        # If no elucidations were found, still include the requirement with "None" values
        if not elucidation_pairs:
            elucidations.append({
                'Requirement': requirement_text,
                'Comment': comment_text,
                'Ambiguous Term': "None",
                'Elucidation Question': "None"
            })
        else:
            for item in elucidation_pairs:
                elucidations.append({
                    'Requirement': requirement_text,
                    'Comment': comment_text,
                    'Ambiguous Term': item['Ambiguous Term'],
                    'Elucidation Question': item['Elucidation Question']
                })

    # Create a new DataFrame with structured results
    result_df = pd.DataFrame(elucidations)

    # Save to a new Excel file
    result_df.to_excel(output_excel_path, index=False)

# Example usage
input_excel_path = "/content/Requirements.xlsx"
output_excel_path = "EQs.xlsx"
process_requirements(input_excel_path, output_excel_path)

# Interpretation Comparison and Pragmatic Ambiguity Detection

In [None]:
import pandas as pd
import numpy as np
from sklearn.metrics.pairwise import cosine_similarity
from openai import OpenAI
import pinecone
import regex as re

# ✅ Initialize OpenAI client
client = OpenAI(api_key=OPENAI_API_KEY)

# ✅ Define Pinecone index
INDEX_NAME = "requirement-index"
index = pc.Index(INDEX_NAME)

# ✅ Define namespaces and thresholds
REQUIREMENT_NAMESPACE = "kb_r"
DOMAIN_NAMESPACES = ["domain-index-0", "domain-index-1", "domain-index-2"]
SIMILARITY_THRESHOLD = 0.85  # Pragmatic ambiguity detection threshold (between domain knowledge bases)
REQ_SIMILARITY_THRESHOLD = 0.75  # Requirement KB similarity threshold

def get_embedding(text):
    """Generates an embedding for the given text using OpenAI API."""
    response = client.embeddings.create(input=[text], model="text-embedding-ada-002")
    return np.array(response.data[0].embedding, dtype=np.float32)

def search_pinecone(query_embedding, namespace, top_k=3):
    """Searches Pinecone namespace for relevant text chunks and vectors."""
    results = index.query(
        namespace=namespace,
        vector=query_embedding.tolist(),
        top_k=top_k,
        include_metadata=True,
        include_values=True
    )

    retrieved_texts = [match.metadata.get("text", "").strip() for match in results.matches if "text" in match.metadata]
    retrieved_vectors = [match.values for match in results.matches]

    return retrieved_texts, retrieved_vectors

def calculate_similarity(vec1, vec2):
    """Computes cosine similarity between two vectors."""
    return cosine_similarity(np.array(vec1).reshape(1, -1), np.array(vec2).reshape(1, -1))[0][0]

def verify_answer_with_gpt(requirement_text, elucidation_question):
    """Checks if the requirement text answers the elucidation question."""
    prompt = f"""
    You are an expert in requirements analysis. Given text from a requirement document and an elucidation question,
    determine if the requirement text contains the answer to the question.

    Requirement Text: "{requirement_text}"
    Clarification Question: "{elucidation_question}"

    Answer with ONLY "YES" or "NO". Do not provide any explanations.
    """

    response = client.chat.completions.create(
        model="gpt-3.5-turbo",
        messages=[{"role": "user", "content": prompt}]
    )

    return response.choices[0].message.content.strip().upper() == "YES"

def process_clarifications(input_excel_path, output_excel_path):
    """Processes clarification questions grouped by requirement."""
    df = pd.read_excel(input_excel_path)
    grouped_reqs = df.groupby("Requirement")
    results = []

    for requirement_text, group in grouped_reqs:
        is_ambiguous = False  # Default assumption: No ambiguity
        answers_list = []
        print(requirement_text)

        for _, row in group.iterrows():
            elucidation_question  = row["Elucidation Question"]

            # ✅ Skip empty elucidation questions
            if pd.isna(elucidation_question) or elucidation_question.strip() == "":
                continue

            # ✅ Step 1: Generate embedding for the question
            query_embedding = get_embedding(elucidation_question)

            # ✅ Step 2: Search in Requirement KB
            req_texts, requirement_vectors = search_pinecone(query_embedding, REQUIREMENT_NAMESPACE, top_k=3)

            # ✅ Clean retrieved requirement texts
            req_texts = [re.sub(r"\s+", " ", text).strip() for text in req_texts]

            # ✅ Compute similarity scores for retrieved requirements
            requirement_scores = [calculate_similarity(query_embedding, vec) for vec in requirement_vectors]

            # ✅ Filter requirement texts that meet similarity threshold
            above_threshold_texts = [req_texts[i] for i, score in enumerate(requirement_scores) if score >= REQ_SIMILARITY_THRESHOLD]

            # ✅ Step 3: Verify with GPT if any requirement text contains the answer
            gpt_says_answered = any(verify_answer_with_gpt(text, elucidation_question) for text in above_threshold_texts) if above_threshold_texts else False

            if gpt_says_answered:
                source = "Requirement KB"
                answers = {"domain-index-0": "N/A", "domain-index-1": "N/A", "domain-index-2": "N/A"}
                is_ambiguous = False  # No ambiguity if the requirement KB has the answer
            else:
                is_ambiguous = True  # Proceed to domain KB checks

            # ✅ Step 4: Check across Domain KBs if ambiguity persists
            if is_ambiguous:
                domain_texts_dict = {
                    ns: [re.sub(r"\s+", " ", text).strip() for text in search_pinecone(query_embedding, ns, top_k=3)[0]]
                    for ns in DOMAIN_NAMESPACES
                }
                domain_vectors_dict = {ns: search_pinecone(query_embedding, ns, top_k=3)[1] for ns in DOMAIN_NAMESPACES}

                # ✅ Step 5: Assume ambiguous by default
                is_ambiguous = True
                similarity_scores = []

                # ✅ Step 6: Check similarity across all three namespaces
                ns1, ns2, ns3 = DOMAIN_NAMESPACES  # Assuming exactly 3 namespaces
                reference_vectors = []  # Store multiple reference vectors

                # ✅ Compare top 3 vectors between ns1 and ns2 to find reference vectors
                for vec1 in domain_vectors_dict[ns1][:3]:
                    for vec2 in domain_vectors_dict[ns2][:3]:
                        similarity = calculate_similarity(vec1, vec2)
                        similarity_scores.append((ns1, ns2, similarity))

                        if similarity >= SIMILARITY_THRESHOLD:
                            reference_vectors.append(vec1)  # ✅ Store multiple reference vectors
                    if reference_vectors:
                        break

                # ✅ Step 7: If reference vectors exist, check similarity in ns3
                if reference_vectors:
                    for vec3 in domain_vectors_dict[ns3][:3]:
                        for ref_vec in reference_vectors:
                            similarity = calculate_similarity(ref_vec, vec3)
                            similarity_scores.append((ns1, ns3, similarity))

                            if similarity >= SIMILARITY_THRESHOLD:
                                is_ambiguous = False  # ✅ If ns3 is also similar, mark unambiguous
                                break
                        if not is_ambiguous:
                            break

            if is_ambiguous:
                source = "Domain KB"

            answers_list.append({
                "Requirement": requirement_text,
                "Elucidation Question": elucidation_question,
                "Pragmatic Ambiguity": "Yes" if is_ambiguous else "No",
                "Source": source,
            })

            if is_ambiguous:
                break  # ✅ If one EQ is "Yes", stop processing further CQs for this requirement

        # ✅ Mark all EQs for the same requirement as "Yes" if any one is "Yes"
        if is_ambiguous:
            for ans in answers_list:
                ans["Pragmatic Ambiguity"] = "Yes"

        results.extend(answers_list)

    # ✅ Save results to an Excel file
    result_df = pd.DataFrame(results)
    result_df.to_excel(output_excel_path, index=False)

# ✅ Example usage
input_excel_path = "EQs.xlsx"
output_excel_path = "Answers.xlsx"
process_clarifications(input_excel_path, output_excel_path)


The Clarus system shall be able to access in-situ environmental observations from data collectors.
The Clarus system shall be able to access remotely sensed environmental observations from data collectors.
The Clarus system shall be able to receive roadway weather measurements derived from VII data.
The Clarus system shall calculate derived environmental data from observations.
The Clarus system shall collect, quality control, and disseminate environmental data.


# Evaluation - Pragmatic Ambiguity Detection

In [None]:
# ✅ Load Ground Truth and Predicted Data
ground_truth_path = "ground_truth.xlsx"
predicted_path = "answers.xlsx"

# Read Excel files
ground_truth_df = pd.read_excel(ground_truth_path)
predicted_df = pd.read_excel(predicted_path)

# ✅ Find missing requirements
missing_requirements = ground_truth_df[~ground_truth_df["Requirement"].isin(predicted_df["Requirement"])]

# ✅ If there are missing requirements, add them with "No" label
if not missing_requirements.empty:
    print(f"Adding {len(missing_requirements)} missing requirements to Answer Excel.")

    # Create new rows with Pragmatic Ambiguity set to "No"
    missing_requirements = missing_requirements[["Requirement"]].copy()
    missing_requirements["Elucidation Question"] = "N/A"
    missing_requirements["Pragmatic Ambiguity"] = "No"
    missing_requirements["Source"] = "Not Evaluated"

    # Append missing requirements to predicted_df
    predicted_df = pd.concat([predicted_df, missing_requirements], ignore_index=True)

# ✅ Save the updated Answer Excel file
updated_path = "updated_answers.xlsx"
predicted_df.to_excel(updated_path, index=False)

print(f"Updated Answer Excel saved as: {updated_path}")


Adding 31 missing requirements to Answer Excel.
Updated Answer Excel saved as: Updated_Answer_ChatGPT.xlsx


In [None]:
import pandas as pd
from sklearn.metrics import precision_recall_fscore_support, confusion_matrix

# ✅ Convert "Yes"/"No" labels to binary format (1 = Ambiguous, 0 = Not Ambiguous)
ground_truth_df["Pragmatic Ambiguity"] = ground_truth_df["Pragmatic Ambiguity"].map({"Yes": 1, "No": 0})
predicted_df["Pragmatic Ambiguity"] = predicted_df["Pragmatic Ambiguity"].map({"Yes": 1, "No": 0})

# ✅ Aggregate predictions per requirement (If any CQ is "Yes", requirement is "Yes")
predicted_agg = predicted_df.groupby("Requirement", as_index=False)["Pragmatic Ambiguity"].max()

# ✅ Merge ground truth with aggregated predictions
merged_df = ground_truth_df.merge(predicted_agg, on="Requirement", how="left", suffixes=("_true", "_pred"))

# ✅ Fix missing values in predictions by replacing NaN with "No" (0)
merged_df["Pragmatic Ambiguity_pred"].fillna(0, inplace=True)

# ✅ Convert to integers (Ensure no NaN values)
merged_df["Pragmatic Ambiguity_true"] = merged_df["Pragmatic Ambiguity_true"].astype(int)
merged_df["Pragmatic Ambiguity_pred"] = merged_df["Pragmatic Ambiguity_pred"].astype(int)

# ✅ Compute Confusion Matrix to get TP, TN, FP, FN
tn, fp, fn, tp = confusion_matrix(
    merged_df["Pragmatic Ambiguity_true"],
    merged_df["Pragmatic Ambiguity_pred"]
).ravel()

# ✅ Ensure total count matches 140
total_count = tn + fp + fn + tp
assert total_count == len(merged_df), f"Error: Total count {total_count} does not match 140!"

# ✅ Compute Precision, Recall, and F2-score using manual formulas
precision = tp / (tp + fp) if (tp + fp) > 0 else 0
recall = tp / (tp + fn) if (tp + fn) > 0 else 0
f2 = (5 * precision * recall) / ((4 * precision) + recall) if (precision + recall) > 0 else 0  # F2-score formula

# ✅ Print results
print(f"Total Requirements: {len(merged_df)} (Should be 140)")
print(f"True Positives (TP): {tp}")
print(f"True Negatives (TN): {tn}")
print(f"False Positives (FP): {fp} (Model incorrectly marked ambiguous)")
print(f"False Negatives (FN): {fn} (Model failed to detect ambiguity)")
print(f"Precision: {precision:.4f}")
print(f"Recall: {recall:.4f}")
print(f"F2 Score: {f2:.4f}")

# ✅ Extract FP and FN Requirements
fp_requirements = merged_df[(merged_df["Pragmatic Ambiguity_true"] == 0) & (merged_df["Pragmatic Ambiguity_pred"] == 1)]
fn_requirements = merged_df[(merged_df["Pragmatic Ambiguity_true"] == 1) & (merged_df["Pragmatic Ambiguity_pred"] == 0)]

print("\n🔴 False Positives (FP) - Model Incorrectly Marked Ambiguous:")
print(fp_requirements[["Requirement", "Pragmatic Ambiguity_true", "Pragmatic Ambiguity_pred"]])

print("\n🟠 False Negatives (FN) - Model Failed to Detect Ambiguity:")
print(fn_requirements[["Requirement", "Pragmatic Ambiguity_true", "Pragmatic Ambiguity_pred"]])


Total Requirements: 140 (Should be 140)
True Positives (TP): 41
True Negatives (TN): 64
False Positives (FP): 17 (Model incorrectly marked ambiguous)
False Negatives (FN): 18 (Model failed to detect ambiguity)
Precision: 0.7069
Recall: 0.6949
F2 Score: 0.6973

🔴 False Positives (FP) - Model Incorrectly Marked Ambiguous:
                                           Requirement  \
2    The Clarus system shall be able to access remo...   
43   The Clarus system shall not require approval t...   
48   The Clarus system shall accept data through a ...   
49   The Clarus system shall be able to communicate...   
52   The Clarus system shall be able to collect env...   
61   The Clarus system shall provide a user interfa...   
67   The Clarus system shall be able to operate on ...   
73   All HTML coding shall meet FHWA requirements f...   
75   The Clarus system shall be able to use latitud...   
81   The Clarus system shall be able to publish env...   
90   The Clarus system shall be able to 

The behavior will change in pandas 3.0. This inplace method will never work because the intermediate object on which we are setting values always behaves as a copy.

For example, when doing 'df[col].method(value, inplace=True)', try using 'df.method({col: value}, inplace=True)' or df[col] = df[col].method(value) instead, to perform the operation inplace on the original object.


  merged_df["Pragmatic Ambiguity_pred"].fillna(0, inplace=True)


# Pragmatic Ambiguity Resolution

In [None]:
import pandas as pd
import numpy as np
from openai import OpenAI
import pinecone
import regex as re
from sklearn.metrics.pairwise import cosine_similarity

# ✅ Initialize OpenAI client
client = OpenAI(api_key=OPENAI_API_KEY)

# ✅ Define Pinecone index
INDEX_NAME = "requirement-index"
index = pc.Index(INDEX_NAME)

# ✅ Define namespaces
REQUIREMENT_NAMESPACE = "kb_r"
DOMAIN_NAMESPACES = ["domain-index-0", "domain-index-1", "domain-index-2"]
TOP_K = 3  # Retrieve top 3 relevant chunks

import numpy as np

def get_embedding(text):
    """Generates an embedding for the given text using OpenAI API, ensuring valid inputs."""
    if not isinstance(text, str) or text.strip() == "":
        raise ValueError("Invalid input: Text for embedding must be a non-empty string.")

    response = client.embeddings.create(input=[text], model="text-embedding-ada-002")
    embedding = np.array(response.data[0].embedding, dtype=np.float32)

    # ✅ Check for NaN or Inf values
    if not np.isfinite(embedding).all():
        raise ValueError(f"Embedding contains invalid values: {embedding}")

    return embedding

def calculate_similarity(vec1, vec2):
    """Computes cosine similarity between two vectors."""
    return cosine_similarity(np.array(vec1).reshape(1, -1), np.array(vec2).reshape(1, -1))[0][0]

def search_pinecone(query_embedding, namespace, top_k=3):
    """Searches Pinecone namespace for top-k relevant text chunks."""
    results = index.query(
        namespace=namespace,
        vector=query_embedding.tolist(),
        top_k=top_k,
        include_metadata=True,
        include_values=False  # Only need metadata (text chunks)
    )

    retrieved_texts = [match.metadata.get("text", "").strip() for match in results.matches if "text" in match.metadata]
    return retrieved_texts

def generate_resolution_with_gpt(requirement_text, elucidation_questions, req_chunks, domain_chunks):
    """Generates a final unambiguous requirement incorporating all suggested resolutions."""

    # System prompt to enforce strict adherence to instructions
    system_prompt = """
    The assistant strictly adheres to the user's instructions and tasks. The tasks given by the user will be challenging,
    so the assistant should pay close attention while solving the provided complex tasks. The assistant's response will
    directly address the user's request without including any additional details.
    """

    # User prompt with structured input
    user_prompt = f"""
    **Task**: Given the original requirement statement, elucidation questions, relevant requirement knowledge,
    and relevant domain knowledge, rewrite the requirement to remove any ambiguities while preserving the original intent.

    **Start of examples:**

    **Example 1**
    **Original Requirement**: "The system shall process user requests quickly."
    **Elucidation Questions**: "What specific response time is expected for processing user requests?"
    **Relevant Requirement Knowledge**: "Industry standards suggest response time under 2 seconds."
    **Relevant Domain Knowledge**: "User feedback indicates delays over 3 seconds cause frustration."
    **Rewritten Requirement**: "The system shall process user requests within 2 seconds to align with industry standards and user expectations."

    **Example 2**
    **Original Requirement**: "The application shall support high-resolution images."
    **Elucidation Questions**: "What minimum resolution (in pixels) qualifies as 'high-resolution'?"
    **Relevant Requirement Knowledge**: "Previous versions supported up to 1080p resolution."
    **Relevant Domain Knowledge**: "Competitor applications support resolutions up to 4K."
    **Rewritten Requirement**: "The application shall support images up to 4K resolution while maintaining compatibility with 1080p formats."

    **Example 3**
    **Original Requirement**: "The system shall restrict access to sensitive data."
    **Elucidation Questions**: "What specific mechanisms will be used to enforce access restrictions?"
    **Relevant Requirement Knowledge**: "Role-based access control (RBAC) is currently implemented."
    **Relevant Domain Knowledge**: "Regulatory guidelines require multi-factor authentication (MFA) for sensitive data."
    **Rewritten Requirement**: "The system shall restrict access to sensitive data using role-based access control (RBAC) and enforce multi-factor authentication (MFA) for compliance with regulatory guidelines."

    **End of examples.**

    Now provide your rewritten requirement for the following input:

    **Original Requirement**: "{requirement_text}"

    **Elucidation Questions**: "{', '.join(elucidation_questions)}"

    **Relevant Requirement Knowledge**: "{req_chunks}"

    **Relevant Domain Knowledge**: "{domain_chunks}"
    """

    # Call the model
    response = client.chat.completions.create(
        model="gpt-3.5-turbo",
        messages=[
            {"role": "system", "content": system_prompt},
            {"role": "user", "content": user_prompt}
        ],
        max_tokens=300
    )

    return response.choices[0].message.content.strip()

def process_resolutions(input_excel_path, output_excel_path, similarity_threshold=0.77):
    """Processes detected ambiguities and generates resolution suggestions only if similarity exceeds threshold."""
    df = pd.read_excel(input_excel_path)

    results = []

    # Group all elucidation questions per requirement
    grouped_reqs = df.groupby("Requirement")["Elucidation Question"].apply(list).reset_index()

    for _, row in grouped_reqs.iterrows():
        requirement_text = row["Requirement"]
        elucidation_questions = row["Elucidation Question"]

        if not elucidation_questions or all(pd.isna(q) or q.strip() == "" for q in elucidation_questions):
            continue

        # Generate embedding for the concatenated elucidation questions
        combined_question_text = " | ".join([q for q in elucidation_questions if isinstance(q, str) and q.strip()])
        question_embedding = get_embedding(combined_question_text)

        # Retrieve chunks & generate resolution for each Domain KB (only if similarity exceeds threshold)
        domain_resolutions = {}
        for namespace in DOMAIN_NAMESPACES:
            domain_chunks = search_pinecone(question_embedding, namespace, top_k=TOP_K)
            domain_chunks_str = "\n".join(domain_chunks)

            if domain_chunks:
                domain_embedding = get_embedding(domain_chunks_str)
                similarity = calculate_similarity(question_embedding, domain_embedding)

                if similarity >= similarity_threshold:
                    resolution = generate_resolution_with_gpt(requirement_text, elucidation_questions, domain_chunks_str)
                else:
                    resolution = ""  # Skip resolution generation if similarity is below the threshold
            else:
                resolution = ""

            domain_resolutions[namespace] = resolution

        # Store results
        results.append({
            "🔹 Requirement": requirement_text,
            "🔹 Elucidation Questions": " | ".join(elucidation_questions),
            "✅ Resolution (Domain KB 1)": domain_resolutions.get("domain-index-0", ""),
            "✅ Resolution (Domain KB 2)": domain_resolutions.get("domain-index-1", ""),
            "✅ Resolution (Domain KB 3)": domain_resolutions.get("domain-index-2", "")
        })

    # Save results to an Excel file
    result_df = pd.DataFrame(results)
    result_df.to_excel(output_excel_path, index=False)

# Example usage
input_excel_path = "EQs.xlsx"
output_excel_path = "Candidate_resolutions.xlsx"
process_resolutions(input_excel_path, output_excel_path, similarity_threshold=0.77)
