In [None]:
import pandas as pd
import numpy as np
import os, sys
from pathlib import Path

import nltk
from nltk import word_tokenize
import contractions
import spacy

# nltk.download("wordnet")

from sklearn.feature_extraction.text import TfidfVectorizer
from sklearn.metrics.pairwise import cosine_similarity

from sentence_transformers import SentenceTransformer
from sklearn.cluster import AgglomerativeClustering

sys.path.append(os.path.abspath(".."))

from config import DATASET_NAME, EXPERIMENT_NAME, GENERATION_TECHNIQUE

# embed_model = SentenceTransformer("all-MiniLM-L6-v2")

In [None]:
# Set up paths and directories
experiment_path = Path(f"../../data/{DATASET_NAME}/experiment_outputs/{EXPERIMENT_NAME}/{GENERATION_TECHNIQUE}/")

us_scen_sim_path = experiment_path / 'similarity_scores'
os.makedirs(us_scen_sim_path, exist_ok=True)

embeddings_path = experiment_path / 'embeddings'
os.makedirs(embeddings_path, exist_ok=True)

In [None]:
# Create dict of the embedding models we want to use
embed_model_dict = {
    "all-MiniLM-L6-v2": SentenceTransformer("all-MiniLM-L6-v2")
}

#### Embed scenario data for scenario--user story traceability experiments.

In [None]:
df = pd.read_csv(experiment_path / 'parsed_scenario_data.csv')

df.head()

In [None]:
df.shape

In [None]:
# Get unique LLMs used, for looping later
llms = df["model"].unique().tolist()

llms

In [None]:
us_df = df[["us_id", "us_text"]].drop_duplicates(subset=["us_id"]).reset_index(drop=True)

us_df.head()

In [None]:
us_ids = us_df["us_id"].tolist()
us_texts = us_df["us_text"].tolist() 

In [None]:
# Function to generate and save embeddings (us or scenario) to pickle file
def generate_and_save_embeddings(ids, texts, embedding_model, filename):
    embeddings = embedding_model.encode(texts, show_progress_bar=True)

    embeddings_df = pd.DataFrame({
        "id": ids,
        "embedding": embeddings.tolist()
    })

    os.makedirs(experiment_path / 'embeddings', exist_ok=True)
    embeddings_df.to_pickle(experiment_path / 'embeddings' / filename)


In [None]:
# NOTE: uncomment below code to generate US embeddings

# # Generate and save US embeddings using each embedding model
# for embed_model_name, embed_model in embed_model_dict.items():
#     generate_and_save_embeddings(
#         us_ids,
#         us_texts,
#         embed_model,
#         f'us_embeddings_{embed_model_name}.pkl'
#     )

In [None]:
df.head(1)

In [None]:
# NOTE: uncomment below code to generate scenario embeddings

# # Generate and save scenario (each LLM generated set) embeddings using each embedding model
# for embed_model_name, embed_model in embed_model_dict.items():
#     for llm, group in df.groupby("model"):
#         scenario_ids = group["scenario_id"].tolist()
#         scenario_texts = group["scenario_text"].tolist()

#         generate_and_save_embeddings(
#             scenario_ids,
#             scenario_texts,
#             embed_model,
#             f'{llm}_scenario_embeddings_{embed_model_name}.pkl'
#         )

In [None]:
results = []

for embed_model_name in embed_model_dict.keys():
    # Load US embeddings from pickle
    us_embeddings_df = pd.read_pickle(experiment_path / f"embeddings/us_embeddings_{embed_model_name}.pkl")

    us_ids = us_embeddings_df["id"].tolist()
    us_embeddings = np.stack(us_embeddings_df["embedding"].values) # Convert to numpy array for cosine similarity computation

    for llm in llms:
        # Load scenario embeddings from pickle
        scenario_embeddings_df = pd.read_pickle(embeddings_path / f"{llm}_scenario_embeddings_{embed_model_name}.pkl")

        scenario_ids = scenario_embeddings_df["id"].tolist()
        scenario_embeddings = np.stack(scenario_embeddings_df["embedding"].values)

        # Compute cosine similarity
        cosine_sim_matrix = cosine_similarity(us_embeddings, scenario_embeddings)

        # NOTE: we don't preprocess texts for embeddings so preprocessed fields won't be included here
        current_results = [{
            "model": llm, 
            "us_id": us_ids[i],
            "scenario_id": scenario_ids[j],
            "metric": f"{embed_model_name}_embedding_cosine-sim",
            "similarity_score": cosine_sim_matrix[i, j]
        }
        for i in range(len(us_ids)) 
        for j in range(len(scenario_ids))
        ]

        results.extend(current_results)


In [None]:
sim_df = pd.DataFrame(results)

sim_df.head()

In [None]:
# Add us_text and scenario_text to sim_df
sim_df = (
    sim_df
    .merge(df[['scenario_id', 'scenario_text']], on="scenario_id", how="left")
    .merge(us_df[['us_id', 'us_text']], on="us_id", how="left")
)

In [None]:
sim_df.head()

In [None]:
# Check for any missing us_texts after merge
sim_df[sim_df["us_text"].isna()]

In [None]:
# Check for any missing scenario_texts after merge
sim_df[sim_df["scenario_text"].isna()]

In [None]:
sim_df.to_csv(us_scen_sim_path / 'embedding_cosine_similarity_scores.csv', index=False)

#### Embed step data for weighted step embedding traceability experiments. 

In [None]:
step_df = pd.read_csv(experiment_path / 'processed_step_data.csv')

step_df.head()

In [None]:
for embed_model_name, embed_model in embed_model_dict.items():
    for llm, group in step_df.groupby("model"):
        step_ids = group["step_id"].tolist()
        step_texts = group["flat_step"].tolist()

        generate_and_save_embeddings(
            step_ids,
            step_texts,
            embed_model,
            f'{llm}_step_embeddings_{embed_model_name}.pkl'
        )

In [None]:
step_averaged_results = []

for embed_model_name in embed_model_dict.keys():
    us_embeddings_df = pd.read_pickle(experiment_path / f"embeddings/us_embeddings_{embed_model_name}.pkl")

    us_ids = us_embeddings_df["id"].tolist()
    us_embeddings = np.stack(us_embeddings_df["embedding"].values) # Convert to numpy array for cosine similarity computation

    for llm in llms:
        step_embeddings_df = pd.read_pickle(embeddings_path / f"{llm}_step_embeddings_{embed_model_name}.pkl")

        step_embeddings_df["scenario_id"] = step_embeddings_df["id"].str.rsplit("_", n=1).str[0]

        # Get scenario embeddings by averaging step embeddings for each scenario
        scenario_embeddings_df = step_embeddings_df.groupby("scenario_id")["embedding"].apply(
            lambda embeddings_list: np.mean(np.stack(embeddings_list.values), axis=0) # Turn list of step embeddings into numpy array and average column-wise
        ).reset_index()

        # Normalise the averaged embeddings
        scenario_embeddings_df["embedding"] = scenario_embeddings_df["embedding"].apply(
            lambda embedding: embedding / np.linalg.norm(embedding) if np.linalg.norm(embedding) != 0 else embedding
        )

        scenario_ids = scenario_embeddings_df["scenario_id"].tolist()
        scenario_embeddings = np.stack(scenario_embeddings_df["embedding"].values)

        # step_ids = step_embeddings_df["id"].tolist()
        # step_embeddings = np.stack(step_embeddings_df["embedding"].values)

        # Compute cosine similarity
        cosine_sim_matrix = cosine_similarity(us_embeddings, scenario_embeddings)

        current_results = [{
            "model": llm, 
            "us_id": us_ids[i],
            "scenario_id": scenario_ids[j],
            "metric": f"{embed_model_name}_step_averaged_embedding_cosine-sim",
            "similarity_score": cosine_sim_matrix[i, j]
        }
        for i in range(len(us_ids)) 
        for j in range(len(scenario_ids))
        ]

        step_averaged_results.extend(current_results)        


In [None]:
step_averaged_sim_df = pd.DataFrame(step_averaged_results)
    
step_averaged_sim_df.head()

In [None]:
# Add us_text and scenario_text to step_averaged_sim_df
step_averaged_sim_df = (
    step_averaged_sim_df
    .merge(df[['scenario_id', 'scenario_text']], on="scenario_id", how="left")
    .merge(us_df[['us_id', 'us_text']], on="us_id", how="left")
)

In [None]:
step_averaged_sim_df.head()

In [None]:
step_averaged_sim_df.isna().sum()

In [None]:
step_averaged_sim_df.to_csv(us_scen_sim_path / 'step_averaged_embedding_cosine_similarity_scores.csv', index=False)