Split the original data to train and test sets

In [2]:
import numpy as np

# Step 1: Load data from the input text file
file_path = "../data/data.txt"  # Replace with your file name
data = np.genfromtxt(file_path, delimiter=",", dtype=str)

# Extract labels (assuming labels are in the last column)
labels = data[:, -1]

# Step 2: Split the data into train and test sets weighted by label
unique_labels, counts = np.unique(labels, return_counts=True)
proportions = counts / counts.sum()

train_indices = []
test_indices = []

for label, proportion in zip(unique_labels, proportions):
    label_indices = np.where(labels == label)[0]
    np.random.shuffle(label_indices)
    split_point = int(len(label_indices) * 0.8)  # 80% train, 20% test
    train_indices.extend(label_indices[:split_point])
    test_indices.extend(label_indices[split_point:])

train_indices = np.array(train_indices)
test_indices = np.array(test_indices)

train_data = data[train_indices]
test_data = data[test_indices]

# Step 3: Save the split data into separate .txt files
np.savetxt("../data/train_data.txt", train_data, fmt="%s", delimiter=",")
np.savetxt("../data/test_data.txt", test_data, fmt="%s", delimiter=",")

print("Data successfully split and saved to 'train_data.txt' and 'test_data.txt'.")


Data successfully split and saved to 'train_data.txt' and 'test_data.txt'.


In [None]:
from langchain_google_genai import ChatGoogleGenerativeAI

llm = ChatGoogleGenerativeAI(model = 'gemini-pro', temperature = 0.5)


In [None]:
from langchain_openai import OpenAI

llm=OpenAI(model='gpt-3.5-turbo-instruct', temperature=0.1)

In [15]:
from langchain_mistralai import ChatMistralAI

llm = ChatMistralAI(model="mistral-large-latest", temperature=0.1)

In [None]:
!aws configure
from langchain_aws import ChatBedrock

llm = ChatBedrock(model="amazon.titan-text-express-v1" ,
    beta_use_converse_api=True)

In [None]:
from langchain_fireworks import ChatFireworks

llm = ChatFireworks(model="accounts/fireworks/models/llama-v3p1-70b-instruct")

In [166]:
llm

ChatFireworks(client=<fireworks.client.chat_completion.ChatCompletionV2 object at 0x7449ea4fb310>, async_client=<fireworks.client.chat_completion.ChatCompletionV2 object at 0x7449ea4fbe10>, model_name='accounts/fireworks/models/llama-v3p1-70b-instruct', model_kwargs={}, fireworks_api_key=SecretStr('**********'))

In [167]:
llm.invoke("Hello how are you")

AIMessage(content="Hello! I'm just a computer program, so I don't have feelings, but thanks for asking! How can I assist you today?", additional_kwargs={}, response_metadata={'token_usage': {'prompt_tokens': 19, 'total_tokens': 48, 'completion_tokens': 29}, 'model_name': 'accounts/fireworks/models/llama-v3p1-70b-instruct', 'system_fingerprint': '', 'finish_reason': 'stop', 'logprobs': None}, id='run-513d6b13-5b1a-44f7-a349-08660f76095a-0', usage_metadata={'input_tokens': 19, 'output_tokens': 29, 'total_tokens': 48})

In [168]:
from langchain.prompts import PromptTemplate

prompt_template = PromptTemplate.from_template(
    '''
    Classify the requirement text into one of the following categories by answering only with the category code (F, A, FT, L, LF, MN, O, PE, PO, SC, SE, or US):
    
-   F (Functional)  : Requirements detailing specific system functionalities or actions.
-   A (Availability)  : Requirements related to system uptime, accessibility, or continuous operation.
-   FT (Fault Tolerance)  : Requirements ensuring the system can handle errors or unexpected failures.
-   L (Legal)  : Requirements concerning compliance with laws, regulations, or industry standards.
-   LF (Look & Feel)  : Requirements about the appearance, design, color schemes, or visual style.
-   MN (Maintainability)  : Requirements on ease of updates, maintenance, or adjustments.
-   O (Operational)  : Requirements on system operations, such as supported platforms or environments.
-   PE (Performance)  : Requirements focused on system speed, response time, or resource efficiency.
-   PO (Portability)  : Requirements related to compatibility across various devices or platforms.
-   SC (Scalability)  : Requirements on the system's ability to handle growth in users or workload.
-   SE (Security)  : Requirements ensuring data security, authorization, or protection from threats.
-   US (Usability)  : Requirements focused on user-friendliness, ease of use, or intuitive design.

---

Now classify this new requirement:{requirement}

Your Answer: just category code (F, A, FT, L, LF, MN, O, PE, PO, SC, SE, or US)
    '''
)

examples = [{
        "question": "The system shall display real-time data updates in a dashboard format accessible to all users.",
        "answer": "F"
    },
    {
        "question": "The product shall maintain 99.5% uptime during business hours, ensuring minimal service interruptions.",
        "answer": "A",
    },
    {
        "question": "The system shall continue to operate normally in case of a single hardware failure.",
        "answer": "FT",
    },
    {
        "question": "The application shall comply with GDPR regulations to ensure data privacy for all EU users.",
        "answer": "L",
    },
    {
        "question": "The product interface shall adhere to corporate branding guidelines, using the approved color palette and typography.",
        "answer": "LF",
    },
    {
        "question": "The system shall allow software updates to be applied without downtime for 95% of maintenance events.",
        "answer": "MN",
    },
    {
        "question": "The product shall support all versions of Windows and Mac operating systems released in the last five years.",
        "answer": "O",
    },
    {
        "question": "The system shall process and return search results within 3 seconds for 90% of user queries.",
        "answer": "PE",
    },
    {
        "question": "The application must be usable on both desktop and mobile devices without a loss of functionality.",
        "answer": "PO",
    },
    {
        "question": "The system shall handle up to 10,000 concurrent users without degradation of performance.",
        "answer": "SC",
    },
    {
        "question": "All user data shall be encrypted in transit and at rest to prevent unauthorized access.",
        "answer": "SE",
    },
    {
        "question": "The product shall allow 90% of first-time users to complete their initial setup within 5 minutes.,",
        "answer": "US",
    },
]

In [169]:
from langchain.chains import LLMChain
from langchain_core.prompts import FewShotPromptTemplate

chain = LLMChain(
    llm= llm, 
    prompt = prompt_template,
    verbose = False
    )

requirement = "The user interface shall have standard menus  buttons for navigation"



prompt = FewShotPromptTemplate(
    examples=examples,
    example_prompt=prompt_template,
    suffix="Requirement: {input}",
    input_variables=["requirement"],
)

response = chain.invoke(input = requirement)

print(response['text'])


LF


In [170]:
# File path
file_path = '../data/test_data.txt'

# Read the file
with open(file_path, 'r') as file:
    lines = file.readlines()

# Initialize lists to store separated data
ids = []
texts = []
classes = []

# Process each line
for line in lines:
    # Split the line using a comma as the primary delimiter
    parts = line.split(",", 2)  # Split into 3 parts: ID, text, and class
    if len(parts) == 3:
        ids.append(parts[0].strip())  # First part is the ID
        texts.append(parts[1].strip().strip("'"))  # Second part is the text (strip single quotes)
        classes.append(parts[2].strip())  # Third part is the class

# Combine the lists into a structured format (list of dictionaries)
test = [{"id": id_, "text": text, "class": class_} for id_, text, class_ in zip(ids, texts, classes)]

# Display the first few rows to verify
for row in test[:5]:  # Print only the first 5 rows
    print(row)


{'id': '12', 'text': 'The product shall be available 99% of the time. Rationale: To avoid service interruption during busiest customer service response periods. The product shall be available 99.99% of the time for regular business days.', 'class': 'A'}
{'id': '37', 'text': 'The software is available for use from the supermarket opening time to the closing time.', 'class': 'A'}
{'id': '8', 'text': 'The website shall be available for use 24 hours per day  365 days per year.', 'class': 'A'}
{'id': '8', 'text': 'All movies shall be streamed on demand  at any time of the day.', 'class': 'A'}
{'id': '3', 'text': 'The system shall be available for use between the hours of 8am and 6pm.', 'class': 'A'}


In [171]:
import time

output_of_prompt = []

i = 0
while i != len(test):
    try:
        output = chain.invoke(f"{test[i]['text']}")
        output_of_prompt.append({'i':i,
                                'text':test[i]['text'],
                                'labels':test[i]['class'],
                                'prompt_output': output})

        time.sleep(2)
        i += 1
        print(f'{i}/{len(test)} done :)')
    except Exception as e:
        print(f'\n sleep time by 10 seconds for {i}')
        print("Error: ",e)
        time.sleep(10)

1/198 done :)
2/198 done :)
3/198 done :)
4/198 done :)
5/198 done :)
6/198 done :)
7/198 done :)
8/198 done :)
9/198 done :)
10/198 done :)
11/198 done :)
12/198 done :)
13/198 done :)
14/198 done :)
15/198 done :)
16/198 done :)
17/198 done :)
18/198 done :)
19/198 done :)
20/198 done :)
21/198 done :)
22/198 done :)
23/198 done :)
24/198 done :)
25/198 done :)
26/198 done :)
27/198 done :)
28/198 done :)
29/198 done :)
30/198 done :)
31/198 done :)
32/198 done :)
33/198 done :)
34/198 done :)
35/198 done :)
36/198 done :)
37/198 done :)
38/198 done :)
39/198 done :)
40/198 done :)
41/198 done :)
42/198 done :)
43/198 done :)
44/198 done :)
45/198 done :)
46/198 done :)
47/198 done :)
48/198 done :)
49/198 done :)
50/198 done :)
51/198 done :)
52/198 done :)
53/198 done :)
54/198 done :)
55/198 done :)
56/198 done :)
57/198 done :)
58/198 done :)
59/198 done :)
60/198 done :)
61/198 done :)
62/198 done :)
63/198 done :)
64/198 done :)
65/198 done :)
66/198 done :)
67/198 done :)
68/1

In [172]:
output_of_prompt

[{'i': 0,
  'text': 'The product shall be available 99% of the time. Rationale: To avoid service interruption during busiest customer service response periods. The product shall be available 99.99% of the time for regular business days.',
  'labels': 'A',
  'prompt_output': {'requirement': 'The product shall be available 99% of the time. Rationale: To avoid service interruption during busiest customer service response periods. The product shall be available 99.99% of the time for regular business days.',
   'text': 'A'}},
 {'i': 1,
  'text': 'The software is available for use from the supermarket opening time to the closing time.',
  'labels': 'A',
  'prompt_output': {'requirement': 'The software is available for use from the supermarket opening time to the closing time.',
   'text': 'A'}},
 {'i': 2,
  'text': 'The website shall be available for use 24 hours per day  365 days per year.',
  'labels': 'A',
  'prompt_output': {'requirement': 'The website shall be available for use 24 hour

In [173]:
pred_bad = []
label = []
data  = output_of_prompt
for d in data:
    pred_bad.append(d['prompt_output']['text'])
    label.append(d['labels'])

In [174]:
import re
pred = []

# Process each line
for line in pred_bad:
    # Remove \n and content inside parentheses using regular expressions
    cleaned_line = re.sub(r"\n", "", line)  # Remove newline characters
    cleaned_line = re.sub(r"\s*\(.*?\)", "", cleaned_line)  # Remove content in parentheses
    pred.append(cleaned_line.strip())  # Strip leading and trailing spaces


In [175]:
import re

def extract_categories(pred, categories):
    """
    Extract valid categories from a list of strings.

    :param pred: List of strings containing text and categories
    :param categories: Set of valid categories
    :return: List of extracted valid categories, with 'Invalid' for unmatched items
    """
    filtered_data = []
    for item in pred:
        # Match any valid category as a standalone word
        match = re.findall(r'\b(' + '|'.join(categories) + r')\b', item)
        if match:
            # Add the first matched category (or all matches if needed)
            filtered_data.append(match[0])
        else:
            filtered_data.append("Invalid")  # Add 'Invalid' if no category is found
    return filtered_data


# List of valid categories
categories = {'F', 'A', 'FT', 'L', 'LF', 'MN', 'O', 'PE', 'PO', 'SC', 'SE', 'US'}

# Extract valid categories
filtered_data = extract_categories(pred, categories)

# Verify the lengths
print(f"Length of pred: {len(pred)}")
print(f"Length of filtered_data: {len(filtered_data)}")

# Output the result
pred = filtered_data


Length of pred: 198
Length of filtered_data: 198


In [176]:
import pandas as pd

comparison_df = pd.DataFrame({
    'Pred': pred,
    'Labels': label
})

# Configure pandas to display all rows
pd.set_option('display.max_rows', None)  # Show all rows
pd.set_option('display.max_columns', None)  # Show all columns

# Display the DataFrame
comparison_df

Unnamed: 0,Pred,Labels
0,A,A
1,A,A
2,A,A
3,F,A
4,A,A
5,A,A
6,A,A
7,F,F
8,F,F
9,F,F


In [177]:
import numpy as np
from collections import defaultdict

def perf_measure(y_true, y_pred):
    y_true = np.array(y_true)
    y_pred = np.array(y_pred)

    if len(y_true) != len(y_pred):
        raise ValueError("The lengths of y_true and y_pred must match.")

    # Get the unique classes
    classes = np.unique(np.concatenate((y_true, y_pred)))

    # Initialize dictionaries to store results
    metrics = {cls: {"TP": 0, "FP": 0, "TN": 0, "FN": 0} for cls in classes}

    for cls in classes:
        metrics[cls]["TP"] = np.sum((y_true == cls) & (y_pred == cls))
        metrics[cls]["FP"] = np.sum((y_true != cls) & (y_pred == cls))
        metrics[cls]["TN"] = np.sum((y_true != cls) & (y_pred != cls))
        metrics[cls]["FN"] = np.sum((y_true == cls) & (y_pred != cls))

    TP = sum(values["TP"] for values in metrics.values())
    FP = sum(values["FP"] for values in metrics.values())
    TN = sum(values["TN"] for values in metrics.values())
    FN = sum(values["FN"] for values in metrics.values())
    return TP, FP, TN, FN

TP, FP, TN, FN = perf_measure(label, pred)

accuracy = (TP + TN) / (TP + FP + TN + FN)
recall = TP / (TP + FN) if (TP + FN) > 0 else 0
precision = TP / (TP + FP) if (TP + FP) > 0 else 0
F1 = (2 * precision * recall) / (precision + recall) if (precision + recall) > 0 else 0

print('Accuracy: ', accuracy)
print('Recall: ', recall)
print('Precision: ', precision)
print('F1-score: ', F1)

Accuracy:  0.9663299663299664
Recall:  0.797979797979798
Precision:  0.797979797979798
F1-score:  0.7979797979797979
