### Multi-objective optimization using NSGA-II for the prioritization of manual test cases in natural language

We performed two main experiments:
* **Without feature usage**: our objectives functions are *number of covered features* (maximize) and *test execution time* (minimize)
* **With feature usage**: our objectives functions are *number of covered highly-used game features* (maximize) and *test execution time* (minimize)

In [1]:
import re
import os
import sys  
import math
import time
import pandas as pd
import numpy as np
from pathlib import Path
from statistics import mean, median
from scipy import stats
from scipy.stats import mannwhitneyu
from cliffs_delta import cliffs_delta
from sklearn.metrics import ndcg_score
from sklearn.preprocessing import MinMaxScaler
from pymoo.optimize import minimize
from pymoo.problems.functional import FunctionalProblem
from pymoo.algorithms.moo.nsga2 import NSGA2
from pymoo.operators.sampling.rnd import PermutationRandomSampling
from pymoo.operators.crossover.ox import OrderCrossover
from pymoo.operators.mutation.inversion import InversionMutation
from pymoo.core.problem import Problem
from pymoo.core.population import Population
from pymoo.core.callback import Callback
from pymoo.core.termination import Termination
from matplotlib import cm
import matplotlib.pyplot as plt
import plotly.graph_objects as go
import plotly.express as px
import seaborn as sns

In [2]:
# import modules (py scripts) with needed functions and classes
sys.path.insert(0, '/utils')
sys.path.insert(0, '/optimization')

from utils import utils
from optimization import optimization

#### Load data: test cases with test labels (covered game features) and execution time

In [None]:
test_cases_complete = utils.load_data()

In [None]:
# Extract information from test cases
[feature_coverage_test_suite, execution_time_test_suite, mapping_test_key_test_suite,
 feature_test_id_relation, total_execution_time_test_suite, number_test_cases] = utils.get_test_data_info(test_cases_complete)

In [None]:
features_full_suite = []

for key,value in feature_coverage_test_suite.items():
    for elem in value:
        if elem not in features_full_suite:
            features_full_suite.append(elem)

num_features_full_suite = len(features_full_suite)
print("There are {num} features being covered in test cases".format(num=num_features_full_suite))

In [None]:
# Average number of features tested in each test case
num_feat_tested = []

for testcase in feature_coverage_test_suite:
    num_feat_tested.append(len(feature_coverage_test_suite[testcase]))
    
print("There are {count} features in each test case, on average.".format(count=mean(num_feat_tested)))
print("There is a median {count} features in each test case.".format(count=median(num_feat_tested)))

#### Get feature usage and basic usage stats

In [None]:
# Read feature usage data (this data was collected from Snowflake at Prodigy and saved with the pickle format)
num_uses_features = pd.read_pickle('FEATURE_USAGE_DATA_PATH')
print("Number of features: {num}".format(num=len(num_uses_features)))
num_uses_features.head()

In [None]:
# Build mapping (from Snowflake data to testing data)
mapping_dict = utils.build_feature_mapping()

# Check features that are not in the mapping (e.g., corrupted data from game data events). Use this to check if there is corrupted usage data that needs to be fixed
utils.remove_features_mapping(mapping_dict, num_uses_features)

In [None]:
# Sort by usage and get some basic stats
num_used_features = num_uses_features.sort_values(by='Number of uses', ascending=False)
display(num_used_features.head())

most_used_features_test_suite = num_uses_features['Feature'].to_list()
print("Number of features: {num}".format(num=len(most_used_features_test_suite)))

# Generate boxplot
px.box(num_uses_features['Number of uses'].to_list())

In [None]:
# Get median usage and set median_num_features
feat_usage_list = num_uses_features['Number of uses'].to_list()

median_usage = median(feat_usage_list)
counter_feat = 1
cum_usage = 0
for current_usage in feat_usage_list:
    cum_usage += current_usage
    if current_usage <= median_usage:
        break
    counter_feat += 1

print("Cumulative usage up to median usage: {cum_usage}".format(cum_usage=cum_usage))

cum_median_usage_perc = cum_usage/(sum(feat_usage_list))
print("Percentage of cumulative usage up to median usage: {cum_median_usage_perc}".format(cum_median_usage_perc=cum_median_usage_perc))
print("Corresponding number of features up to median usage: {median_num_features}".format(median_num_features=counter_feat))

In [12]:
# Convert feature names from game data in Snowflake to standard names in the testing side
most_used_features_test_suite_testing_std = [mapping_dict[x] for x in most_used_features_test_suite] 

In [None]:
# Some game data events lead to the same testing feature (i.e., there are duplicates in most_used_features_test_suite_testing_std). Below, we remove them keeping the first
most_used_features_test_suite_testing_std = list(dict.fromkeys(most_used_features_test_suite_testing_std))
print("Now there are {num} features in the set of most used features".format(num=len(most_used_features_test_suite_testing_std)))

In [17]:
# List with ranking position of the most used features
most_used_features_rank = {}
counter = 1
for feat in most_used_features_test_suite_testing_std:
    most_used_features_rank[feat] = counter
    counter += 1

In [19]:
# Get num. of times each feature is covered (i.e., number of test cases where each feature appears)
feature_counter = {}
for key,value in feature_coverage_test_suite.items():
    for feat in value:
        if feat not in feature_counter:
            feature_counter[feat] = 1
        else:
            feature_counter[feat] += 1

#### Define objective/fitness functions

In [4]:
# Compute cumulative feature coverage for a specific test execution order and feature per-feature coverage threshold
def get_cum_feature_coverage_test_suite(order, feature_counter, thresh):
    cum_feature_count = 0
    cum_feature_count_array = []
    features_covered = []
    feature_counter_order = dict.fromkeys(feature_counter.keys(),0)

    for test_case_id in order:
        features = feature_coverage_test_suite[test_case_id]

        # Update dict with counter
        for feat in features:
            feature_counter_order[feat] = feature_counter_order[feat] + 1
            
            # Check if feature meets threshold to be considered covered
            if feature_counter_order[feat] >= math.floor(feature_counter[feat]*thresh):
                if feat not in features_covered:
                    features_covered.append(feat)
                    cum_feature_count = cum_feature_count + 1

        cum_feature_count_array.append(cum_feature_count)

    # Define arrays for test case (X axis) versus cumulative feature count (Y axis)
    x = np.array([i for i in range(len(order))])
    y = cum_feature_count_array
    
    # Compute area using trapezoidal rule
    area = np.trapz(y=y, x=x)
    
    # Compute optimal area
    optimal_area = np.trapz(y=np.array([num_features_full_suite]*len(x)), x=x)

    # Normalize area
    norm_area = area/optimal_area

    return norm_area

In [3]:
# Compute cumulative execution time for a specific test execution order
def get_cum_execution_time_test_suite(order):
    cum_execution_time = 0
    cum_execution_time_array = []

    for test_case_id in order:
        exec_time = execution_time_test_suite[test_case_id]
        cum_execution_time = cum_execution_time + exec_time
        cum_execution_time_array.append(cum_execution_time)
        
    x = np.array([i for i in range(len(order))])
    y = cum_execution_time_array
    
    # Compute area using trapezoidal rule
    area = np.trapz(y=y, x=x)

    # Compute optimal area
    optimal_area = np.trapz(y=np.array([total_execution_time_test_suite]*len(x)), x=x)
    
    # Normalize area
    norm_area = area/optimal_area

    return norm_area

In [2]:
# Compute ranking similarity score for a specific test execution order using 'feature usage' as relevance for features
def get_ranking_score_full_suite(order, true_relev, scaler_ndcg):
    global most_used_features_rank
    
    list_tested_features = []
    
    for test_case_id in order:
        features = feature_coverage_test_suite_reduced[test_case_id] 
        for feat in features:
                
            if feat not in list_tested_features:
                list_tested_features.append(feat)
        
    tested_feat_relev = {}
    for feat in list_tested_features:
        # Use usage ranking to find relevance for the currently tested feature
        tested_feat_relev[feat] = most_used_features_rank[feat]

    score_relev = list(tested_feat_relev.values())
    
    # Compute NDCG
    score = ndcg_score(np.asarray([true_relev]),np.asarray([score_relev]))
    scaled_score = scaler_ndcg.transform(np.asarray(score).reshape(-1, 1))[0][0]
    return scaled_score

#### Modify NSGA-II to use T-test and mutual dominance rate as stopping criteria

In [23]:
# Using callback and custom termination criteria
class MyCallback(Callback):
    
    def __init__(self, mdr_limit) -> None:
        super().__init__()
        self.data["terminate"] = False
        self.data["index"] = 0
        self.data["last_gen_solution"] = []
        self.data["current_gen_solution"] = []
        self.data["last_gen_objectives"] = []
        self.data["current_gen_objectives"] = []
        self.data["last_gen_t_test"] = False
        self.data["current_gen_t_test"] = False
        self.data["count_last_mdr"] = 0
        self.data["count_last_t_test"] = 0
        self.data["MDR_LIMIT"] = mdr_limit
        
    def get_mutual_dom_rate(self, gen_1_obj, gen_2_obj):
        counter = 0
        found = False
        for obj_values_gen_1 in gen_1_obj:
            for obj_values_gen_2 in gen_2_obj:
                if not found:
                    if all(obj_values_gen_2[index] >= obj_values_gen_1[index] for index in range(len(obj_values_gen_2))):
                        if any(obj_values_gen_2[index] > obj_values_gen_1[index] for index in range(len(obj_values_gen_2))):
                            counter += 1
                            found = True
            found = False
            
        return counter

    def notify(self, algorithm):

        if self.data["index"] == 0:
            self.data["last_gen_solution"] = algorithm.opt
            self.data["index"] = 1
            obj_results = algorithm.opt.get("F").T
            self.data["last_gen_objectives"] = obj_results

        else:
            self.data["current_gen_solution"] = algorithm.opt
            
            # Transpose objective results and iterate through results for all objectives
            obj_results = algorithm.opt.get("F").T
            self.data["current_gen_objectives"] = obj_results

            # Compute t-test for each objective
            t_test_res_list = []
            for index in range(len(self.data["current_gen_objectives"])):
                single_obj_values_current = self.data["current_gen_objectives"][index]
                single_obj_values_last = self.data["last_gen_objectives"][index]
                t_test_res = stats.ttest_ind(single_obj_values_current, single_obj_values_last)[1] # get only p-value
                t_test_res_list.append(t_test_res)
            
            # Get mutual dominance rates
            dom_last_current = self.get_mutual_dom_rate(self.data["last_gen_objectives"].T, self.data["current_gen_objectives"].T)
            dom_current_last = self.get_mutual_dom_rate(self.data["current_gen_objectives"].T, self.data["last_gen_objectives"].T)
            mdr = (dom_last_current/len(self.data["last_gen_objectives"].T)) - (dom_current_last/len(self.data["current_gen_objectives"].T))
            
            # Check is mdr is within range and increase counter; otherwise, reset counter
            if abs(mdr) < self.data["MDR_LIMIT"]:
                self.data["count_last_mdr"] += 1
            else:
                self.data["count_last_mdr"] = 0
                
            # Check is t-test for all objectives is above 0.05 and increase counter; otherwise, reset counter
            if all(i > 0.05 for i in t_test_res_list):
                self.data["count_last_t_test"] += 1
            else:
                self.data["count_last_t_test"] = 0
                
            # Check if mdr and t-test meet criteria for 5 generations
            if (self.data["count_last_mdr"] >= 5) and (self.data["count_last_t_test"] >= 5):
                self.data["terminate"] = True

            # Update last generation
            self.data["last_gen_solution"] = self.data["current_gen_solution"]
            self.data["last_gen_objectives"] = self.data["current_gen_objectives"]
            self.data["last_gen_t_test"] = self.data["current_gen_t_test"]
            self.data["last_gen_mdr"] = mdr


class MyTermination(Termination):

    def __init__(self) -> None:
        super().__init__()
        
    def terminate(self):
        self.force_termination = True
   
    def update(self, algorithm):
        """
        Provide the termination criterion a current status of the algorithm to update the perc.
        Parameters
        ----------
        algorithm : object
            The algorithm object which is used to determine whether a run has terminated.
        """
        
        if my_callback.data["terminate"]:
            self.terminate()
        
        if self.force_termination:
            progress = 1.0
        else:
            progress = self._update(algorithm)
            assert progress >= 0.0

        self.perc = progress
        return self.perc
    
    def _update(self, algorithm):
        return 0.1

---