In [1]:
import tiktoken
import os
from openai import OpenAI
import re
import pandas as pd
import util
import prompt_template
import api

In [2]:
#Report utils
num_of_LLM_calls = 0
num_of_comp_violations = 0
num_of_part_violations = 0
num_of_compliances = 0
num_of_self_redundencies = 0
num_of_redundencies = 0
num_of_self_conflicts = 0
num_of_conflicts = 0

In [3]:
# find the largest requirement in SRS
path_FR = "documents/requirements/FR"
max_token = 0
for filename in os.listdir(path_FR):
    if filename.endswith('.txt'):
        file_path_FR = os.path.join(path_FR, filename)
        with open(file_path_FR, 'r', encoding='utf-8') as file:
            max_token = max(0, util.count_tokens(file.read(), 'gpt-4o'))
print("number of tokens in the largest requirement text: ", max_token)

number of tokens in the largest requirement text:  243


In [4]:
# chunk articles based on the largest requirement, and leave some space for user prompt
with open("input/articles.txt", 'r', encoding='utf-8') as file:
    reference_text = file.read()

article_chunks = util.chunk_text(reference_text, max_tokens = 3072 - max_token)
system_prompts = []
for chunk in article_chunks:
    system_prompt = prompt_template.prompt_cross_reference_check(reference_text=chunk, is_prompt_strict=True)
    system_prompts.append(system_prompt)

for i, prompt in enumerate(system_prompts):
    print(f"System prompt {i+1} token count: {util.count_tokens(prompt, 'gpt-4o')}")

for i, chunk in enumerate(article_chunks):
    with open(f"documents/article_chunks/article_chunk_{i}.txt", 'w', encoding='utf-8') as f:
        f.write(f"Chunk {i+1}:\n{chunk}\n\n")

System prompt 1 token count: 3125
System prompt 2 token count: 3125
System prompt 3 token count: 3126
System prompt 4 token count: 3125
System prompt 5 token count: 3126
System prompt 6 token count: 3126
System prompt 7 token count: 3125
System prompt 8 token count: 2575


In [5]:
# call GPT-4o
def call_LLM(user_prompt, system_prompt):
    client = OpenAI(api_key=api.API_KEY)
    response = client.chat.completions.create(
        model="gpt-4o-mini-2024-07-18", 
        messages=[
            {"role": "system", "content": system_prompt},
            {"role": "user", "content": user_prompt}
        ],
        #max_tokens = 4096,  
        temperature = 0.2
    )
    global num_of_LLM_calls
    num_of_LLM_calls += 1
    return response



In [6]:
# LLM result will return compliances too. -> extract violations
def extract_violations(file_path):
    with open(file_path, 'r', encoding='utf-8') as file:
        content = file.read()

    global num_of_comp_violations
    global num_of_part_violations
    global num_of_compliances
    
    violations = []
    entries = content.split('\n\n') 
    for entry in entries:
        if 'Relationship: Partial Violation' in entry:
            violations.append(entry)
            num_of_part_violations += 1
        if 'Relationship: Complete Violation' in entry:
            violations.append(entry)
            num_of_comp_violations += 1
        if 'Relationship: Compliance' in entry:
            num_of_compliances += 1
    
    return violations

In [10]:
# double loop over requirements and articles
counter = 1
data_list = []
for filename in os.listdir("documents/requirements/FR"):
    if filename.endswith('.txt') and filename != 'requirements.txt':  
        if  counter > 41:
            file_path_FR = os.path.join("documents/requirements/FR", filename)
            with open(file_path_FR, 'r', encoding='utf-8') as file:
                content = f"{counter}. {file.read()}\n"    
                compliance_result = ""
                user_prompt = prompt_template.prompt_user_compliance(content)
                for article in os.listdir("documents/article_chunks"):
                    file_path_article = os.path.join("documents/article_chunks", article)
                    with open(file_path_article, 'r', encoding='utf-8') as article_file:
                        system_prompt = prompt_template.prompt_cross_reference_check(article_file.read())
                        result = call_LLM(user_prompt, system_prompt)
                        compliance_result += result.choices[0].message.content
                with open(f"documents/compliance_result/{counter}_comp_res.txt", 'w', encoding='utf-8') as compfile:
                    compfile.write(compliance_result)
                data_list.append(extract_violations(f"documents/compliance_result/{counter}_comp_res.txt"))
                
        counter += 1
  
                

In [8]:
# store extracted violations in a data frame for better analysis

data = []
def parse_item(item, counter):
    id_value = counter

    relationship_match = re.search(r'Relationship: ([^\n]+)', item)
    if relationship_match:
        relationship = relationship_match.group(1).strip()
    else:
        relationship = None

    content_reference_match = re.search(r'Content Reference: ([^\n]+)', item)
    if content_reference_match:
        content_reference = content_reference_match.group(1).strip()
    else:
        content_reference = None

    reason_match = re.search(r'Reason: ([^\n]+)', item)
    if reason_match:
        reason = reason_match.group(1).strip()
    else:
        reason = None

    suggested_solution_match = re.search(r'Suggested Solution: ([^\n]+)', item)
    if suggested_solution_match:
        suggested_solution = suggested_solution_match.group(1).strip()
    else:
        suggested_solution = None

    return {
        'ID': id_value,
        'Relationship': relationship,
        'Content Reference': content_reference,
        'Reason': reason,
        'Suggested Solution': suggested_solution
    }

counter = 1
for sublist in data_list:
    for item in sublist:
        data.append(parse_item(item, counter))
    counter += 1

df = pd.DataFrame(data, index=range(1, len(data) + 1))
df.head(10)


Unnamed: 0,ID,Relationship,Content Reference,Reason,Suggested Solution
1,1,Partial Violation,Article 6,While the requirement mentions displaying an a...,Enhance the requirement to include logging of ...
2,2,Partial Violation,Article 6,The requirement does not explicitly mention th...,Implement secure multi-factor authentication f...
3,3,Complete Violation,Article 16: Capability to apply restrictions,The requirement does not specify any restricti...,"Implement restrictions on order types, trading..."
4,3,Complete Violation,Article 18: Orders received from customers,The requirement lacks a mechanism to reject or...,Introduce a validation process that checks ord...
5,3,Partial Violation,Article 6: Secure multi-factor authentication,While the requirement mentions that the user m...,Ensure that the login process includes secure ...
6,3,Partial Violation,Article 20: Record and maintain all activities,The requirement states that each online purcha...,Implement a comprehensive logging mechanism to...
7,4,Complete Violation,Article 16: Capability to apply restrictions,The requirement does not mention any restricti...,Implement restrictions on the types of orders ...
8,4,Complete Violation,Article 18: Orders received from customers,The requirement states that the system checks ...,Ensure that the system sends a notification to...
9,4,Partial Violation,Article 6: Secure multi-factor authentication,The requirement mentions that the user must lo...,Incorporate secure multi-factor authentication...
10,4,Partial Violation,Article 20: Record and maintain all activities,The requirement states that the system automat...,Ensure that the system maintains comprehensive...


In [9]:
# Extract suggested solutions for consistency check
util.delete_all_files_in_directory("documents/suggested_solutions/")
batch_size = 10
results = ""
solutions_size = 0
counter = 1
system_prompt_size = util.count_tokens(prompt_template.prompt_consistency(""), "gpt-4o")

for start in range(0, len(df), batch_size):
    end = start + batch_size
    batch = df[start:end]
    batch_str = "\n".join(f"{row.name}. {row['Suggested Solution']}" for index, row in batch.iterrows())
    solutions_size += util.count_tokens(batch_str, "gpt-4o")
    if solutions_size >= 2048 - system_prompt_size:
        with open(f"documents/suggested_solutions/{counter}_sol.txt", 'w', encoding='utf-8') as file:
            file.write(results)
        results = ""
        solutions_size = 0
        counter += 1
    results += batch_str + "\n"
    solutions_size += util.count_tokens(batch_str, "gpt-4o")

with open(f"documents/suggested_solutions/{counter}_sol.txt", 'w', encoding='utf-8') as file:
    file.write(results)

Deleted file: documents/suggested_solutions/1_sol.txt


In [28]:
# find redundant or contradicting solutions. (self consistency check)
directory = 'documents/suggested_solutions/'
text_files = [f for f in os.listdir(directory) if f.endswith('.txt')]
result = ""
if len(text_files) > 1:
    for i in range(len(text_files)):
        for j in range(i + 1, len(text_files)):
            file1_path = os.path.join(directory, text_files[i])
            file2_path = os.path.join(directory, text_files[j])
            with open(file1_path, 'r', encoding='utf-8') as file1:
                file1_contents = file1.read()

            with open(file2_path, 'r', encoding='utf-8') as file2:
                file2_contents = file2.read()
            combined_contents = file1_contents + "\n" + file2_contents
            response = call_LLM(prompt_template.prompt_user_self_consistency(combined_contents), system_prompt = prompt_template.prompt_consistency(""))
            result += response.choices[0].message.content
else:
    file_path = os.path.join(directory, "1_sol.txt")
    with open(file_path, 'r', encoding='utf-8') as file:
        file_contents = file.read()
        response = call_LLM(prompt_template.prompt_user_self_consistency(file_contents), system_prompt = prompt_template.prompt_consistency(""))
        result = response.choices[0].message.content

with open(f"documents/consistency_result/self_con_res.txt", 'w', encoding='utf-8') as confile:
    confile.write(result)

In [30]:
# df for self consistencies
user_input = []
content_from = []
similarity_score = []
similarity_number = []

with open('documents/consistency_result/self_con_res.txt', 'r') as file:
    lines = file.readlines()

for line in lines:
    if "Similarity #" in line:
        sim_number = int(re.search(r'\d+', line).group())
        similarity_number.append(sim_number)
    elif "(suggested solution):" in line:
        user_input.append(int(re.search(r'\d+', line).group()))
    elif "Content from :" in line:
        content_from.append(int(re.search(r'\d+', line).group()))
    elif "Similarity Score:" in line:
        similarity_score.append(int(re.search(r'\d+', line).group()))

df = pd.DataFrame({
    'Similarity Number': similarity_number,
    'User Input (Suggested Solution)': user_input,
    'Content From': content_from,
    'Similarity Score': similarity_score
})


df.head(10)


Unnamed: 0,Similarity Number,User Input (Suggested Solution),Content From,Similarity Score
0,1,2,11,70
1,2,5,9,100
2,3,3,7,60
3,4,4,12,65
4,5,6,10,55
5,6,8,13,50


In [33]:
# remove redundant suggestions. conflicts are manually handled.
content_from_list = df[df['Similarity Score'] >= 70]['Content From'].tolist()
num_of_self_conflicts = len(df[df['Similarity Score'] == 0]['Content From'].tolist())
num_of_self_redundencies = len(content_from_list)

number_of_sol_files = len([f for f in os.listdir("documents/suggested_solutions") if f.endswith('.txt')])


for i in range(1,number_of_sol_files + 1):
    with open(f'documents/suggested_solutions/{i}_sol.txt', 'r') as file:
        lines = file.readlines()

        filtered_lines = [line for index, line in enumerate(lines, start=1) if index not in content_from_list]
        with open(f'documents/suggested_solutions/update/{i}_updated_sol.txt', 'w') as file:
            file.writelines(filtered_lines)


In [None]:
# consistency check between remaining solutions and existing requirement document.

req_directory = 'documents/requirements/'
sol_directory = 'documents/suggested_solutions/update'

sol_files = [f for f in os.listdir(sol_directory) if f.endswith('.txt')]
req_files = [f for f in os.listdir(req_directory) if f.endswith('.txt')]
result = ""

for i in range(len(sol_files)):
    for j in range(len(req_files)):
        sol_path = os.path.join(sol_directory, sol_files[i])
        req_path = os.path.join(req_directory, req_files[j])
        with open(sol_path, 'r', encoding='utf-8') as file1:
            sol_contents = file1.read()
        with open(req_path, 'r', encoding='utf-8') as file2:
            req_contents = file2.read()
        response = call_LLM(prompt_template.prompt_user_consistency(sol_contents), system_prompt = prompt_template.prompt_consistency(req_contents))
        result += response.choices[0].message.content

with open(f"documents/consistency_result/con_res.txt", 'w', encoding='utf-8') as confile:
    confile.write(result)


In [None]:
# df for consistencies
user_input = []
content_from = []
similarity_score = []
similarity_number = []

with open('documents/consistency_result/con_res.txt', 'r') as file:
    lines = file.readlines()

for line in lines:
    if "Similarity #" in line:
        sim_number = int(re.search(r'\d+', line).group())
        similarity_number.append(sim_number)
    elif "(suggested solution):" in line:
        user_input.append(int(re.search(r'\d+', line).group()))
    elif "Content from :" in line:
        content_from.append(int(re.search(r'\d+', line).group()))
    elif "Similarity Score:" in line:
        similarity_score.append(int(re.search(r'\d+', line).group()))

df = pd.DataFrame({
    'Similarity Number': similarity_number,
    'User Input (Suggested Solution)': user_input,
    'Content From': content_from,
    'Similarity Score': similarity_score
})

content_from_list = df[df['Similarity Score'] >= 70]['Content From'].tolist()
num_of_conflicts = len(df[df['Similarity Score'] == 0]['Content From'].tolist()) - num_of_self_conflicts
num_of_redundencies = len(content_from_list) - num_of_self_redundencies

df.head(10)


In [42]:
import plotly.graph_objects as go
from plotly.subplots import make_subplots
import pandas as pd

violations_data = {'Category': ['Complete Violations', 'Partial Violations', 'Compliances'],
                   'Count': [50, 30, 120]}
consistency_data = {'Category': ['Self Redundancy', 'Self Conflicts', 'Redundancy', 'Conflicts'],
                    'Count': [15, 8, 12, 10]}
accuracy_data = {'Category': ['Complete Violations', 'Partial Violations', 'Compliances',
                              'Self Redundancy', 'Self Conflicts', 'Redundancy', 'Conflicts'],
                 'Accuracy': [95, 92, 98, 90, 88, 93, 91]}
llm_calls = 500 

df_violations = pd.DataFrame(violations_data)
df_consistency = pd.DataFrame(consistency_data)
df_accuracy = pd.DataFrame(accuracy_data)

fig = make_subplots(rows=2, cols=2, 
                    specs=[[{'type': 'bar'}, {'type': 'pie'}],
                           [{'type': 'bar', 'colspan': 2}, None]],
                    subplot_titles=('Violations and Compliances', 'Consistency Issues',
                                    'Accuracy by Category'))

fig.add_trace(go.Bar(x=df_violations['Category'], y=df_violations['Count'],
                     marker_color='rgba(58, 71, 80, 0.6)', name='Count'),
              row=1, col=1)

fig.add_trace(go.Pie(labels=df_consistency['Category'], values=df_consistency['Count'],
                     marker=dict(colors=['#FFA07A', '#98FB98', '#87CEFA', '#DDA0DD']),
                     textinfo='label+percent', hole=0.3),
              row=1, col=2)

fig.add_trace(go.Bar(x=df_accuracy['Category'], y=df_accuracy['Accuracy'],
                     marker_color='rgba(246, 78, 139, 0.6)', name='Accuracy'),
              row=2, col=1)

fig.update_layout(height=800, width=1000, title_text="Analysis Dashboard",
                  showlegend=False, template="plotly_white")

fig.add_annotation(text=f"Total LLM Calls: {llm_calls}",
                   xref="paper", yref="paper",
                   x=0.5, y=-0.15, showarrow=False,
                   font=dict(size=14))

fig.show()