## Baseline 1 for clustering similar test steps in natural language 

Assign test steps that are exactly the same (in terms of text) to the same cluster

In [1]:
# import libraries
import os
import gc
import pandas as pd
import numpy as np
import math
import statistics as st
import re
import string
import time
import spacy
from wordcloud import WordCloud
from textwrap import wrap
import matplotlib.pyplot as plt
from collections import defaultdict  # For word frequency

from sklearn.cluster import AgglomerativeClustering
from sklearn.cluster import KMeans
import scipy.cluster.hierarchy as sch
from sklearn.metrics import silhouette_score
from tqdm import tqdm
from joblib import Parallel, delayed
import multiprocessing as mp
from multiprocessing import Queue

from nltk.corpus import stopwords 
from nltk.tokenize import RegexpTokenizer, word_tokenize, TweetTokenizer
from nltk.stem import WordNetLemmatizer 
import nltk

import gensim
from gensim.test.utils import datapath
from gensim.models import Word2Vec, Phrases, KeyedVectors
from sklearn.decomposition import PCA

from sklearn.feature_extraction.text import CountVectorizer
from IPython.display import HTML
from IPython.display import display

In [None]:
nltk.download('wordnet')

### Data preprocessing functions

In [6]:
# Function to compute number of unique words in df
def get_number_unique_words(df):
    words_list = list()
    test_steps = list(df["Steps"])
    for step in test_steps:
        for word in step:
            words_list.append(word)
    number_unique_words = len(set(words_list))
    return number_unique_words

In [7]:
# Function to compute number of unique words in df ('test case name' field)
def get_number_unique_words_name(df):
    words_list = list()
    test_names = list(df["Case_Name"])
    for name in test_names:
        for word in name:
            words_list.append(word)
    number_unique_words = len(set(words_list))
    return number_unique_words

In [8]:
# Function to get list of words that occur less than a certain number of times
def get_word_frequency(df):
    words_list = list()
    test_steps = list(df["Steps"])
    for step in test_steps:
        for word in step:
            words_list.append(word)
    unique_words_list = set(words_list)
    word_occurrence_dict = {}
    for each_word in unique_words_list:
        word_occurrence_dict[each_word] = 0

    for step in test_steps:
        for word in step:
            word_occurrence_dict[word] += 1
            
    ten_times_occurrence_words = list()
    # get list of words that occur only once
    for word, occurrence in word_occurrence_dict.items():
        if occurrence < 2:
            ten_times_occurrence_words.append(word)

    return ten_times_occurrence_words

In [9]:
# Function to get list of words that occur less than a certain number of times ('test case name' field)
def get_word_frequency_name(df):
    words_list = list()
    test_names = list(df["Case_Name"])
    for name in test_names:
        for word in name:
            words_list.append(word)
    unique_words_list = set(words_list)
    word_occurrence_dict = {}
    for each_word in unique_words_list:
        word_occurrence_dict[each_word] = 0

    for name in test_names:
        for word in name:
            word_occurrence_dict[word] += 1
            
    ten_times_occurrence_words = list()
    # get list of words that occur only once
    for word, occurrence in word_occurrence_dict.items():
        if occurrence < 2:
            ten_times_occurrence_words.append(word)

    return ten_times_occurrence_words

In [10]:
# Function to remove problematic/mispelled words from vocabulary
def remove_problematic_words(df):
    number_unique_words = get_number_unique_words(df)
    print("Number of unique words across all test steps: ", number_unique_words)
    
    # load file with problematic words that exist in the test data
    problematic_words = open('word2vec_vocab_problematic.txt', 'r')
    problematic_words_list = list()
    for word in problematic_words:
        problematic_words_list.append(word.lstrip().rstrip())
    
    for index, row in df.iterrows():
        step = row["Steps"]
        df.loc[index]["Steps"] = [elem for elem in step if not elem in problematic_words_list]
        
    number_unique_words = get_number_unique_words(df)
    print("Number of unique words across all test steps after removing problematic words: ", number_unique_words)

In [11]:
# function to fix problematic/mispelled words from vocabulary
def fix_problematic_words(df):
    number_unique_words = get_number_unique_words(df)
    print("Number of unique words across all test steps: ", number_unique_words)
    
    # load file with problematic words that exist in the test data
    problematic_words = open('word2vec_vocab_to_fix.txt', 'r')
    problematic_words_dict = {}
    for line in problematic_words:
        full_line = line.split(':')
        try:
            problematic_words_dict[full_line[0]] = [x.replace('\n', '') for x in full_line[1].split(',')]
        except:
            problematic_words_dict[full_line[0]] = full_line[1].replace('\n', '')
    
    for index, row in df.iterrows():
        step = row["Steps"]
        modified_step = list()
        for word in step:
            if word in problematic_words_dict:
                modified_step.extend(problematic_words_dict[word])
            else:
                modified_step.append(word)
        df.loc[index]["Steps"] = modified_step 
        
    number_unique_words = get_number_unique_words(df)
    print("Number of unique words across all test steps after fixing problematic words: ", number_unique_words)

In [8]:
def preprocess_clean_data(df):

    # Preprocessing and clean test steps
    print("Cleaning test step field...")
    
    df["Steps"] = df["Steps"].apply(lambda x: re.sub(r'http\S+', 'URL', x))
    df["Steps"] = df["Steps"].apply(lambda x: re.sub('\/[\w-]*', '', x))
    df["Steps"] = df["Steps"].apply(lambda x: re.sub(r'\{[^)]*\}', '', x))

    # lowercase the step descriptions
    df["Steps"] = df["Steps"].apply(lambda x: x.lower())

    # remove digits and words with digits
    df["Steps"] = df["Steps"].apply(lambda x: re.sub('\w*\d\w*','', x))

    # remove punctuations
    df["Steps"] = df["Steps"].apply(lambda x: re.sub('[%s]' % re.escape(string.punctuation), ' ', x))

    # remove extra spaces
    df["Steps"] = df["Steps"].apply(lambda x: re.sub(' +',' ',x))

    # tokenization
    df["Steps"] = df["Steps"].apply(lambda x: TweetTokenizer().tokenize(x))
    number_unique_words = get_number_unique_words(df)
    print("Number of unique words across all test steps: ", number_unique_words)

    remove_problematic_words(df)
    fix_problematic_words(df)
    
    # stopword removal
    stop_words = set(stopwords.words('english'))
    df["Steps"] = df["Steps"].apply(lambda x: [w for w in x if not w in stop_words])
    number_unique_words = get_number_unique_words(df)
    print("Number of unique words in test steps after stopword removal: ", number_unique_words)

    # lemmatization
    lemmatizer = WordNetLemmatizer() 
    df["Steps"] = df["Steps"].apply(lambda x: [lemmatizer.lemmatize(w) for w in x])
    number_unique_words = get_number_unique_words(df)
    print("Number of unique words in test steps after lemmatization: ", number_unique_words)

    # remove words that occur less than 10 times
    ten_times_occurrence_words = get_word_frequency(df)
    print("Number of words that occurred less than 10 times in test steps: ", len(ten_times_occurrence_words))

    for index, row in df.iterrows():
        current_test_step = row["Steps"]
        list_words_to_remove = list()
        for word in current_test_step:
            if word in ten_times_occurrence_words:
                list_words_to_remove.append(word)

        test_steps_df.loc[index]["Steps"] = [elem for elem in current_test_step if not elem in list_words_to_remove]

    print("Dataset size after preprocessing: " , df.shape)

### Read and preprocess files with test cases and build dataframe

In [20]:
# Get data directory and list of xlsx files
current_dir = os.getcwd() 
parent_dir = os.path.dirname(current_dir) + "\\filtered_data\\"
xlsxfiles = [os.path.join(root, name)
             for root, dirs, files in os.walk(parent_dir)
             for name in files
             if name.endswith((".xlsx"))]

In [21]:
# Declare pandas df to be populated
column_names = ["Type", "Key", "Case_Name", "Step_ID", "Steps"]
test_steps_df = pd.DataFrame(columns = column_names)

# Index to add data to the df
index_to_add = 0

print("Reading input data...")   
for test_file in xlsxfiles:
    # load data and iterate through it
    test_data_df = pd.read_excel(test_file)
    for index, row in test_data_df.iterrows():
        current_type = row["Type"]
        current_key = row["Key"]
        current_name = row["Case_Name"]
        current_step_id = row["Step_ID"]
        current_steps = row["Steps"]
        test_steps_df.loc[index_to_add] = [current_type, current_key, current_name, current_step_id, current_steps]
        index_to_add += 1

print("Done!")
print("Shape of data => ", test_steps_df.shape)

Reading input data...
Done!
Shape of data =>  (15668, 5)


In [22]:
# Call preprocessing function
preprocess_clean_data(test_steps_df)

Cleaning test case name field...
Dataset size before preprocessing:  (15668, 5)
Number of unique words across all test names:  1519
Number of unique words in test names after stopword removal:  1447
Number of words that occurred only once in test case names:  164
Number of unique words in test names in the end:  1138
Dataset size after preprocessing:  (15668, 5)


In [25]:
# Build tuples with (step_id, step_text) - used to retrieve the step ID in the end (after the clustering) - and get only test steps for clustering
step_id_text_tuple_list = list()
test_steps_clustering_list = list()
for index, row in test_steps_df.iterrows():
    step_id = row["Step_ID"]
    step_text = row["Steps"]
    step_id_text_tuple_list.append((step_id,step_text))

    temp_list = list()
    if isinstance(row["Steps"], list):
        for elem in row["Steps"]:
            temp_list.append(elem)
    else:
        if isinstance(row["Steps"], str):
            temp_list.append(row["Steps"])
        
    # Build list of lists of tokens (words)
    test_steps_clustering_list.append(temp_list)
    
print("Length of list of tuples:" , len(step_id_text_tuple_list))
print("Length of list with test steps: " , len(test_steps_clustering_list))

Length of list of tuples: 15668
Length of list with test steps:  15668


In [26]:
# Remove empty steps
index = 0
steps_to_remove = list()
for step in test_steps_clustering_list:
    if len(step) == 0:
        steps_to_remove.append(index)
    index += 1

step_id_text_tuple_list = [step_id_text_tuple_list[index] for index in range(len(step_id_text_tuple_list)) if not index in steps_to_remove]
test_steps_clustering_list = [test_steps_clustering_list[index] for index in range(len(test_steps_clustering_list)) if not index in steps_to_remove]
print("Length of list of tuples:" , len(step_id_text_tuple_list))
print("Length of list with test steps: " , len(test_steps_clustering_list))

Length of list of tuples: 15644
Length of list with test steps:  15644


### Basic stats for the test steps

In [19]:
# Total number of test steps
print("Total number of steps : ", len(test_steps_training_list))

# Average number of word per test step, name, and type together
total_number_words_steps = sum([len(steps) for steps in test_steps_training_list])
avg_words_per_step = total_number_words_steps/len(test_steps_training_list)
print("Average number of words per test step: ", avg_words_per_step)

# Most frequent words
word_freq = defaultdict(int)
for step in test_steps_training_list:
    for word in step:
        word_freq[str(word)] += 1
sorted(word_freq, key=word_freq.get, reverse=True)[:5]

Total number of steps :  15644
Average number of words per test step:  3.925722321656865
1742


['verify', 'validate', 'check', 'button', 'item']

### Compare test step texts for similarity

In [71]:
same_steps_list = []
found_flag = [False] * len(test_steps_clustering_list)

for i in range(len(test_steps_clustering_list)-1):
    temp_set = set()
    if not found_flag[i]:
        temp_set.add(i)
        found_flag[i] = True
    else:
        continue
        
    for j in range(i+1, len(test_steps_clustering_list)):
        if found_flag[j]:
            continue
        else:
            if test_steps_clustering_list[i] == test_steps_clustering_list[j]:
                temp_set.add(j)
                found_flag[j] = True
    same_steps_list.append(temp_set)

In [72]:
print("Final number of clusters : ", len(same_steps_list))

Final number of clusters :  4407


In [82]:
# Example of cluster with more than 10 items
for key in cluster_labels_dict:
    if len(cluster_labels_dict[key]) > 10:
        print(key)
        break

12


In [120]:
# Save clustered data
path_save_data = "baseline_1/baseline_1_clustered_data.txt"
out_cluster_file = open(path_save_data, "a")
cluster_id = 0
for elem in same_steps_list:
    for index in elem:
        str_to_save = "[" + str(cluster_id) + "]:\t\t" + test_steps_df.loc[index]["Key"] + "\t\t" + str(step_id_text_tuple_list[index][0]) + "\t\t" + str(test_steps_clustering_list[index]) + "\n"
        out_cluster_file.write(str_to_save)
    cluster_id += 1
out_cluster_file.close()

In [121]:
# Save cluster labels (step IDs)
path_save_labels = "baseline_1/baseline_1_cluster_labels.txt"
out_cluster_file = open(path_save_labels, "a")
cluster_id = 0
for elem in same_steps_list:
    str_to_save = "[" + str(cluster_id) + "]: " + ','.join(str(step_id_text_tuple_list[x][0]) for x in elem) + "\n"
    out_cluster_file.write(str_to_save)
    cluster_id += 1
out_cluster_file.close()

### Compute F-score

#### Load ground truth of similar test steps (to compute F-score)

In [6]:
# Read excel files with manually clustered samples
manual_sample_dir = 'sample_manual_ground_truth/clusters/'
sample_files = os.listdir(manual_sample_dir)

In [3]:
manual_clusters_dict = {}
for sample in sample_files:
    sample_df = pd.read_excel(manual_sample_dir + sample)
    for index, row in sample_df.iterrows():
        cluster_id = row['cluster_id']
        step_id = row['step_id']
        if step_id in manual_clusters_dict:
            existing_list = manual_clusters_dict[step_id]
            existing_list.append(cluster_id)
            manual_clusters_dict[step_id] = existing_list
        else:
            manual_clusters_dict[step_id] = [cluster_id]

In [4]:
print("Number of test step samples which were manually clustered: ", len(manual_clusters_dict))

Number of test step samples which were manually clustered:  394


In [5]:
test_steps_to_evaluate_list = list(manual_clusters_dict.keys())

In [101]:
appr_clusters_dict = {}
cluster_id = 0
for elem in same_steps_list:
    for index in elem:
        step_id = step_id_text_tuple_list[index][0]
        appr_clusters_dict[int(step_id)] = cluster_id  
    cluster_id += 1

In [102]:
print("Number of test steps which were clustered by the approach: ", len(appr_clusters_dict))

Number of test steps which were clustered by the approach:  15644


In [122]:
# Declare and initialize variables to compute F-score
TP = 0
FP = 0
TN = 0
FN = 0

In [123]:
# Iterate through list of test steps to evaluate
wrong = []
for i in range(len(test_steps_to_evaluate_list)-1):
    for j in range(i+1, len(test_steps_to_evaluate_list)):
        step_id_1 = test_steps_to_evaluate_list[i]
        step_id_2 = test_steps_to_evaluate_list[j]
        
        # true positive case
        if (manual_clusters_dict[step_id_1] == manual_clusters_dict[step_id_2]) and (appr_clusters_dict[step_id_1] == appr_clusters_dict[step_id_2]):
            TP += 1
            
        # false positive case
        elif (manual_clusters_dict[step_id_1] != manual_clusters_dict[step_id_2]) and (appr_clusters_dict[step_id_1] == appr_clusters_dict[step_id_2]):
            FP += 1
            
        # false negative case
        elif (manual_clusters_dict[step_id_1] == manual_clusters_dict[step_id_2]) and (appr_clusters_dict[step_id_1] != appr_clusters_dict[step_id_2]):
            FN += 1
            wrong.append((step_id_1,step_id_2))
            
        # true negative case
        elif (manual_clusters_dict[step_id_1] != manual_clusters_dict[step_id_2]) and (appr_clusters_dict[step_id_1] != appr_clusters_dict[step_id_2]):
            TN += 1

In [124]:
precision = TP / (TP + FP)
recall = TP / (TP + FN)
print("Precision = ", precision)
print("Recall = ", recall)

Precision =  1.0
Recall =  0.5432098765432098


In [125]:
f_score = (2 * precision * recall) / (precision + recall)
print("F-score = ", f_score)

F-score =  0.704
