In [27]:
%pip install scikit-learn
%pip install pandas

Note: you may need to restart the kernel to use updated packages.
Note: you may need to restart the kernel to use updated packages.


In [28]:
import pandas as pd

def load_and_process(req_path, test_path):
    req_df = pd.read_csv(req_path, sep=',', on_bad_lines='skip')
    test_df = pd.read_csv(test_path, sep=',', on_bad_lines='skip')
    
    # Some Purpose columns are intentionally left blank for now; populate them with empty strings
    test_df['Purpose'] = test_df['Purpose'].fillna('')

    req_text_fields = ['Feature', 'Description']
    test_text_fields = ['Purpose', 'Test steps']

    # Combine text columns for similarity matching
    req_df['full_text'] = req_df[req_text_fields].astype(str).agg(' '.join, axis=1)
    test_df['full_text'] = test_df[test_text_fields].astype(str).agg(' '.join, axis=1)
    
    # Convert to lists
    req_texts = req_df['full_text'].tolist()
    test_texts = test_df['full_text'].tolist()

    return req_df, test_df, req_texts, test_texts



In [29]:
def calculate_accuracy(pred_dict, ground_truth_path, test_path, debug=False):
    
    gt_df = pd.read_csv(ground_truth_path).dropna(subset=['Req ID'])
    gt_dict = {
        row['Req ID']: list(map(str.strip, str(row['Test ID']).split(','))) if pd.notna(row['Test ID']) else []
        for _, row in gt_df.iterrows()
    }

    ### CONFUSION MATRIX CALCULATION ###
    
    # Values for confusion matrix
    n: int = 0
    tp: int = 0
    tn: int = 0
    fp: int = 0
    fn: int = 0

    test_test = pd.read_csv(test_path, sep=',', on_bad_lines='skip')
    test_text_fields = ['ID', 'Purpose', 'Test steps']
    test_test['full_text'] = test_test[test_text_fields].astype(str).agg(' '.join, axis=1)

    curr_tests: set[str]
    curr_tests: set[str] = set(test_test["ID"].astype(str))

    frequency_table: dict[bool, dict[str, dict[str, int]]] = {True: {}, False: {}}

    for req in set(pred_dict.keys()) | set(gt_dict.keys()):
        actual_tests: set[str] = set(pred_dict.get(req, []))
        expected_tests: set[str] = set(gt_dict.get(req, []))

        # Skip if req ID returned None
        if expected_tests is None:
            print(f"Error - {current_dir}: Faulty requirement ID ({req})")
            continue

        # Positives
        curr_tp_set: set[str] = actual_tests & expected_tests
        curr_tp_count: int = len(curr_tp_set)
        if debug:
            print(f"Info - \t\t({curr_tp_count}) {curr_tp_set = }")

        curr_fp_set: set[str] = actual_tests - expected_tests
        curr_fp_count: int = len(curr_fp_set)
        if debug:
            print(f"Info - \t\t({curr_fp_count}) {curr_fp_set = }")
        
        # Negatives
        expected_ns: set[str] = curr_tests - expected_tests
        actual_ns: set[str] = curr_tests - actual_tests

        curr_tn_set: set[str] = actual_ns & expected_ns
        curr_tn_count: int = len(curr_tn_set)
        if debug:
            print(f"Info - \t\t({curr_tn_count}) {curr_tn_set = }")

        curr_fn_set: set[str] = actual_ns - expected_ns
        curr_fn_count: int = len(curr_fn_set)
        if debug:
            print(f"Info - \t\t({curr_fn_count}) {curr_fn_set = }")

        curr_n: int = curr_tp_count + curr_fp_count + curr_tn_count + curr_fn_count
        
        # Check so only the right amount of trace links were detected
        expected_curr_n: int = len(curr_tests)
        if curr_n != expected_curr_n:
            print(f"Error - \t\tExpected curr_n = {expected_curr_n}, got {curr_n = }")
        if debug:
            print(f"Info - \t\t{curr_n = }")

        # Update the frequency table

        # Get the true positives
        true_positives: dict[str, int] = frequency_table[True].get(req, None)
        # Assign a dict if one doesn't exist
        if true_positives is None:
            true_positives = {}
            frequency_table[True][req] = true_positives
        
        # Get the false positives
        false_positives: dict[str, int] = frequency_table[False].get(req, None)
        # Assign a dict if one doesn't exist
        if false_positives is None:
            false_positives = {}
            frequency_table[False][req] = false_positives

        # Add 1 for each true positive link
        for test in curr_tp_set:
            true_positives[test] = true_positives.get(test, 0) + 1

        # Add 1 for each false positive link
        for test in curr_fp_set:
            false_positives[test] = false_positives.get(test, 0) + 1


        n += curr_n
        tp += curr_tp_count
        tn += curr_tn_count
        fp += curr_fp_count
        fn += curr_fn_count
    
    accuracy: float = (tp + tn) / n if n != 0 else 0.0
    recall: float = tp / (tp + fn) if tp + fn != 0 else 0.0
    precision: float = tp / (tp + fp) if tp + fp != 0 else 0.0
    specificity: float = tn / (tn + fn) if tn + fn != 0 else 0.0
    balanced_accuracy: float = (precision + specificity) / 2
    f1: float = 2 * (recall * precision) / (recall + precision) if recall + precision != 0 else 0.0

    if debug:
        print("*********************************************")
        print(ground_truth_path)

    if debug:
        print("*********************************************")
        print(f"Accuracy: {balanced_accuracy:.2f}%")
        print("*********************************************")

  
    # assert tp + fn == 134

    return tp, fp, tn, fn, accuracy, recall, precision, balanced_accuracy, f1

In [30]:
from sklearn.feature_extraction.text import TfidfVectorizer
from sklearn.metrics.pairwise import cosine_similarity
import re
import numpy as np
from nltk.corpus import stopwords
from nltk.stem import WordNetLemmatizer

stop_words = set(stopwords.words('english'))
lemmatizer = WordNetLemmatizer()

def custom_preprocessor(text):
    text = text.lower()
    text = re.sub(r'[^A-Za-z\s]', '', text)
    return text

def custom_tokenizer(text):
    return [lemmatizer.lemmatize(word) for word in text.split() if word not in stop_words]

def compute_cosine(req_df, test_df, req_texts, test_texts, threshold):

    vectorizer = TfidfVectorizer(
        preprocessor=custom_preprocessor,
        tokenizer=custom_tokenizer,
        token_pattern=None
    )

    documents = req_texts + test_texts
    tfidf_matrix = vectorizer.fit_transform(documents)

    similarity_matrix = cosine_similarity(tfidf_matrix[:len(req_texts)], tfidf_matrix[len(req_texts):])

    matches = {}
    for i, req_id in enumerate(req_df['ID']):
        matched_test_ids = []
        for j, similarity in enumerate(similarity_matrix[i]):
            if similarity >= threshold:
                matched_test_ids.append(str(test_df.iloc[j]['ID']))
        matches[str(req_id)] = matched_test_ids
        
    print(f"Matches for threshold {threshold:.2f}: {len(matches)}")
    print(f"Matches: {matches}")

    return matches


### Logic including subsets


In [None]:
import json 
import os
import time
import matplotlib.pyplot as plt

# Datasets folders 
datasets = ["AMINA"]
subsets = [f"{i:02}" for i in range(1, 11)]  # 01 to 10
summary_table = pd.DataFrame()
thresholds = [0.34]
# thresholds = np.arange(0.01, 1.01, 0.01)

# For every dataset find the following:
for dataset in datasets:    
    for subset in subsets:
        req_path = f'./data/{dataset}/{subset}/RE.csv'
        test_path = f'./data/{dataset}/{subset}/ST.csv'
        ground_truth_mapping_path = f'../data/{dataset}/{subset}/mapping.csv'
        
        print(f"Dataset: {dataset}. Subset: {subset}")
        
        # Load and process data
        req_df, test_df, req_texts, test_texts = load_and_process(req_path, test_path)
        
        # Find cosine similarity and measure execution time
        start = time.time()

        for threshold in thresholds:
            threshold_str = f"{threshold:.2f}"
            print(f"Threshold: {threshold:.2f}")
            predicted_matches = compute_cosine(req_df, test_df, req_texts, test_texts, threshold)
            t = time.time() - start
            tp, fp, tn, fn, accuracy, recall, precision, balanced_accuracy, f1 = calculate_accuracy(predicted_matches, ground_truth_mapping_path, test_path, debug=False)
            summary_table = pd.concat([
                summary_table,
                pd.DataFrame([{"Dataset": dataset, "Subset": subset, "Threshold": f"{threshold:.2f}", "TP": tp, "FP": fp, "TN": tn, "FN": fn, "Accuracy": accuracy, "Recall": recall, "Precision": precision, "Balanced Accuracy": balanced_accuracy, "F1 score": f1}])
            ], ignore_index=True)
            print("*********************************************")

        
            payload: dict[str, dict] = {
                "meta": {
                    "req_path": req_path,
                    "test_path": test_path,
                    "mapping_path": ground_truth_mapping_path
                },
                "data": {
                    "links": predicted_matches,
                    "time_to_analyze": t,
                    "err": []
                }
            }
            current_time = time.strftime("%H_%M_%S", time.localtime())
            current_date = time.strftime("%Y-%m-%d", time.localtime())

            log_dir: str = f"./MIS_COSINE_{dataset}/{current_date}/{current_time}/{subset}"
            os.makedirs(log_dir, exist_ok=True)

            with open(f"{log_dir}/res.json", "w+") as out:
                json.dump(payload, out, indent=2)

print("\n")


# Convert Threshold back to float for sorting if necessary
summary_table["Threshold"] = summary_table["Threshold"].astype(float)
summary_table = summary_table.sort_values(by=["Threshold", "Dataset", "Subset"])
grouped = summary_table.groupby(["Threshold"], as_index=False).agg({"F1 score": "mean", "Recall": "mean", "Precision": "mean"})
grouped = grouped.sort_values(by=["F1 score", "Recall", "Precision"], ascending=[False, False, False])

# avg_f1_per_threshold = grouped.groupby("Threshold", as_index=False).agg({"F1 score": "mean"})
best_row = grouped.loc[grouped["F1 score"].idxmax()]

# Print Best Metrics per Subset (based on max F1 score)
print("\nBest Metrics per Subset (Based on F1 Score):")
print("+------------------+----------+----------------+----------+---------+----------+---------+------------+--------+----------+-------------------+-------------------+")
print("| Dataset          | Subset   | Threshold      | TP       |FP       | TN       |FN       |Accuracy    | Recall | Precision| Balanced Accuracy |      F1 score     |")
print("+------------------+----------+----------------+----------+---------+----------+---------+------------+--------+----------+-------------------+-------------------+")
for _, row in summary_table.iterrows():
    print(
        f"| {row['Dataset']:<16} "
        f"| {row['Subset']:<8} "
        f"| {row['Threshold']:<14} "
        f"| {int(row['TP']):<8} "
        f"| {int(row['FP']):<7} "
        f"| {int(row['TN']):<8} "
        f"| {int(row['FN']):<7} "
        f"| {row['Accuracy']:<10.2f} "
        f"| {row['Recall']:<6.2f} "
        f"| {row['Precision']:<8.2f} "
        f"| {row['Balanced Accuracy']:<18f}"
        f"| {row['F1 score']:<18f}|")
print("+----------------------+------------------+----------------+----------+---------+----------+---------+------------+--------+----------+-------------------+-------------------+")

print(grouped.to_string(index=False))

best_threshold = best_row["Threshold"]
best_avg_f1 = best_row["F1 score"]
print(f"🏆 Best global threshold: {best_threshold:.2f} with average F1 score: {best_avg_f1:.4f}")


Dataset: AMINA. Subset: 01
Threshold: 0.34
Matches for threshold 0.34: 25
Matches: {'S1': ['406'], 'S2': ['35'], 'S12': ['364'], 'S24': ['43', '44', '248', '249'], 'S56': ['245'], 'S59': ['248', '249', '250'], 'S75': ['451', '452'], 'S89': [], 'S116': [], 'S135': ['216'], 'S177': ['352'], 'S182': ['480', '486'], 'S234': ['21'], 'S236': ['23'], 'B53': ['434'], 'B85': ['238'], 'B87': ['271'], 'B93': ['281'], 'B105': ['10'], 'B106': ['642'], 'B108': ['32'], 'S549': ['90'], 'S555': ['96'], 'S557': ['98'], 'S558': ['99']}
*********************************************
Dataset: AMINA. Subset: 02
Threshold: 0.34
Matches for threshold 0.34: 25
Matches: {'S2': ['35'], 'S12': ['364'], 'S27': ['414', '415'], 'S28': ['63', '485'], 'S30': ['483', '484'], 'S38': ['71', '331', '596'], 'S75': ['451', '452'], 'S111': ['188'], 'S117': [], 'S135': ['216'], 'S137': ['220'], 'S143': ['596'], 'S148': ['216', '256'], 'S152': ['260'], 'S163': ['273'], 'S174': ['348'], 'S236': ['23'], 'B24': ['319'], 'B34': ['3