# SelectQAOA: Regression Test Case Selection Using QAOA
Regression testing is an important part of the software development process in software engineering. It is a practice aimed at identifying any regression, which are the emergence of new defects or issues in a software application following changes, enhancements, or updates made to the source code. In other words, regression testing focuses on how changes made to the software can affect the correct behavior of existing features. Regression testing is particularly important in agile software development environments, where changes are made frequently and rapidly. This practice helps ensure that the software remains stable and reliable as it evolves over time. Ideal regression testing would re-run all the available test cases of a given software system. However, in addition to being potentially very costly, this could even be impractical in some case. In this scenario, test case selection is one of the most widely investigated test suite optimization approaches.
Test case selection focuses on selecting a subset from an initial test suite to test software changes, i.e., to test whether unmodified parts of a program continue to work correctly after changes involving other parts. Various techniques, such as Integer Programming, symbolic execution, data flow analysis, dependence graph-based methods, and flow graph-based approaches, can be employed to identify the modified portions of the software. Once test cases covering the unchanged program segments are pinpointed using a specific technique, an optimization algorithm (e.g., additional greedy, DIV-GA,
SelectQA, BootQA or SelectQAOA) can select a minimal set of these test cases based on certain testing criteria (e.g., branch coverage). The ultimate aim is to reduce the expenses associated with regression testing.

In [1]:
#this cell contains all the imports needed by the pipeline
#to run it on the browser: jupyter notebook --NotebookApp.iopub_data_rate_limit=1.0e10
import json
import time

import matplotlib.pyplot as plt
import numpy as np
import statistics
from qiskit_optimization import QuadraticProgram
from qiskit_optimization.algorithms import MinimumEigenOptimizer
from qiskit_algorithms.optimizers import COBYLA
from qiskit.primitives import BackendSampler
from qiskit_algorithms import QAOA
from qiskit_algorithms.utils import algorithm_globals
from qiskit_aer import AerSimulator
from sklearn.cluster import KMeans
from sklearn.preprocessing import StandardScaler
from collections import defaultdict
from scipy.spatial.distance import cdist

In [2]:
#this cell contains all variable definitions that will be useful throughout the entire project
sir_programs = ["flex","grep","gzip","sed"]
sir_programs_tests_number = {"flex":567,"grep":806,"gzip":214,"sed":360}
sir_programs_end_lines = {"flex":14192,"grep":13281,"gzip":6701,"sed":7118}
alpha = 0.5
experiments = 30

In [4]:
def json_keys_to_int(d):
    if isinstance(d, dict):
        return {int(k) if k.isdigit() else k: json_keys_to_int(v) for k, v in d.items()}
    elif isinstance(d, list):
        return [json_keys_to_int(i) for i in d]
    else:
        return d


with open("datasets/sir_programs/executed_lines_test_by_test.json", "r") as file:
    #dictionary that, for each sir program, associates at each LINE of that program the LIST of TESTS COVERING it
    executed_lines_test_by_test = json_keys_to_int(json.load(file)) #{program1:{line:[tci,tcj,...,tck],line2:...}
with open("datasets/sir_programs/faults_dictionary.json", "r") as file:
    #dictionary that associates at each SIR PROGRAM the LIST of PAST FAULT COVERAGE VALUES ORDERED BY TEST 
    faults_dictionary = json.load(file) #{program1:[fault_cov_tc1,fault_cov_tc2,...,fault_cov_tcn],program2:...}
with open("datasets/sir_programs/test_coverage_line_by_line.json", "r") as file:
    #dictionary that, for each sir program, associates at each TEST of that program the LIST of LINES COVERED by it
    test_coverage_line_by_line = json_keys_to_int(json.load(file)) #{program1:{tc1:[linei,linej,...,linek],tc2:...}
with open("datasets/sir_programs/test_cases_costs.json", "r") as file:
    #dictionary that, for each sir program, associates at each TEST its EXECUTION COST
    test_cases_costs = json_keys_to_int(json.load(file)) #{program1:{tc1:ex_cost1,tc2:ex_cost2,...,tcn:ex_costn},program2:...}
with open("datasets/sir_programs/total_program_lines.json", "r") as file:
    #dictionary wich associates at each SIR PROGRAM its size in terms of NUMBER OF ITS LINES
    total_program_lines = json.load(file) #{program1:tot_lines_program1,program2:tot_lines_program2,program3:...}

In [None]:
def num_of_covered_lines(sir_program,test_cases):
    covered_lines = set()
    
    for test_case in test_cases:
        try:
            for covered_line in test_coverage_line_by_line[sir_program][test_case]:
                covered_lines.add(covered_line)
        except:
            continue
    
    return len(covered_lines)

clusters_dictionary = dict()

for sir_program in sir_programs:
    tot_test_cases = sir_programs_tests_number[sir_program]
    
    # from {..., test_case_i : [cov_stmts], ...} to [..., #_stmt_cov_i, ...]
    test_cases_stmt_cov = []
    for test_case in test_coverage_line_by_line[sir_program].keys():
        test_cases_stmt_cov.append(len(test_coverage_line_by_line[sir_program][test_case]))
    suite_stmt_cov = sum(test_cases_stmt_cov)
    
    # Normalize data
    data = np.column_stack((list(test_cases_costs[sir_program].values()),faults_dictionary[sir_program],test_cases_stmt_cov))
    scaler = StandardScaler()
    normalized_data = scaler.fit_transform(data)

    num_clusters = 0
    
    #ATTENTION: this number also depends on the QAOA simulator/machine you will use
    if sir_program == "flex":
        num_clusters = 55
    elif sir_program == "grep":
        num_clusters = 105
    elif sir_program == "gzip":
        num_clusters = 195
    elif sir_program == "sed":
        num_clusters = 195
    
    # Step 2: Perform K-Means Clustering
    kmeans = KMeans(n_clusters=num_clusters, random_state=43)
    start = time.time()
    clusters = kmeans.fit_predict(normalized_data)
    
    # Organize test cases by cluster
    clustered_data = defaultdict(list)
    for idx, cluster_id in enumerate(clusters):
        clustered_data[cluster_id].append(idx)
        
    # Step 4: Reassign points to ensure each cluster has at most 30 elements
    max_elements_per_cluster = 30
    for cluster_id, points in list(clustered_data.items()):
        if len(points) > max_elements_per_cluster:
            excess_points = points[max_elements_per_cluster:]
            clustered_data[cluster_id] = points[:max_elements_per_cluster]
            
            # Reassign excess points
            excess_data = normalized_data[excess_points]
            remaining_clusters = [k for k in clustered_data if len(clustered_data[k]) < max_elements_per_cluster]
            
            # Find nearest cluster with space for each excess point
            distances = cdist(excess_data, kmeans.cluster_centers_[remaining_clusters])
            nearest_clusters = np.argmin(distances, axis=1)
            
            for i, excess_idx in enumerate(excess_points):
                new_cluster = remaining_clusters[nearest_clusters[i]]
                clustered_data[new_cluster].append(excess_idx)
    
    end = time.time()
    print("SelectQAOA Decomposition Time (ms): " + str((end-start)*1000))
    
    clusters_dictionary[sir_program] = clustered_data
        
    # Step 3: Calculate the metrics for each cluster and validate
    cluster_metrics = {}
    for cluster_id in clustered_data.keys():
        tot_cluster_exec_cost = 0
        tot_cluster_past_fault_cov = 0
        tot_cluster_stmt_cov = 0
        for test_case in clustered_data[cluster_id]:
            tot_cluster_exec_cost += test_cases_costs[sir_program][test_case]
            tot_cluster_past_fault_cov += faults_dictionary[sir_program][test_case]
        tot_cluster_past_fault_cov = tot_cluster_past_fault_cov/tot_test_cases
        tot_cluster_stmt_cov = num_of_covered_lines(sir_program,clustered_data[cluster_id])/total_program_lines[sir_program]
        cluster_metrics[cluster_id] = {
            "tot_exec_cost": tot_cluster_exec_cost,
            "tot_past_fault_cov": tot_cluster_past_fault_cov,
            "tot_stmt_cov": tot_cluster_stmt_cov  # Avg stmt coverage per test case in cluster
        }
        print(f"Cluster {cluster_id + 1} metrics:")
        print(f"Test Cases: {clustered_data[cluster_id]}")
        print(f" - Num. Test Cases: {len(clustered_data[cluster_id]):.2f}")
        print(f" - Execution Cost: {tot_cluster_exec_cost:.2f}")
        print(f" - Past Fault Coverage (%): {tot_cluster_past_fault_cov}")
        print(f" - Statement Coverage (%): {tot_cluster_stmt_cov:.2f}\n")
    
    for cluster_id in clustered_data.keys():
        print("Test cases of cluster " + str(cluster_id) + ": " + str(len(clustered_data[cluster_id])))
    
    print("======================================================================================")
    
    print("Program Name: " + sir_program)
    
    for cluster_id in clustered_data.keys():
        if len(clustered_data[cluster_id]) > max_elements_per_cluster:
            print("Test cases of cluster " + str(cluster_id) + ": " + str(len(clustered_data[cluster_id])))
    
    # Plotting the clusters in 3D space
    fig = plt.figure(figsize=(10, 7))
    ax = fig.add_subplot(111, projection='3d')
    
    # Extracting data for plotting
    exec_costs = np.array(list(test_cases_costs[sir_program].values()))
    fault_covs = np.array(faults_dictionary[sir_program])
    stmt_covs = np.array(test_cases_stmt_cov)
    
    # Plot each cluster with a different color
    colors = plt.cm.get_cmap("tab10", num_clusters)  # A colormap with as many colors as clusters
    for cluster_id in clustered_data.keys():
        cluster_indices = clustered_data[cluster_id]
        
        # Plot each cluster's points
        ax.scatter(
            exec_costs[cluster_indices], 
            fault_covs[cluster_indices], 
            stmt_covs[cluster_indices], 
            color=colors(cluster_id), 
            label=f"Cluster {cluster_id + 1}"
        )
    
    # Label the axes
    ax.set_xlabel("Execution Cost")
    ax.set_ylabel("Past Fault Coverage")
    ax.set_zlabel("Statement Coverage")
    ax.legend()
    ax.set_title("Test Case Clustering Visualization")
    
    # Display the plot
    plt.show()
    
print(clusters_dictionary)

In [6]:
def make_linear_terms(sir_program, cluster_test_cases, alpha):
    max_cost = max(test_cases_costs[sir_program].values())
    
    estimated_costs = []

    #linear coefficients, that are the diagonal of the matrix encoding the QUBO
    for test_case in cluster_test_cases:
        estimated_costs.append((alpha * (test_cases_costs[sir_program][test_case]/max_cost)) - (1 - alpha) * faults_dictionary[sir_program][test_case])
    
    return np.array(estimated_costs)

def make_quadratic_terms(sir_program, variables, cluster_test_cases, linear_terms, penalty):
    quadratic_terms = {}
    
    #k is a stmt
    for k in executed_lines_test_by_test[sir_program].keys():
        #k_test_cases is the list of test cases covering k
        k_test_cases = executed_lines_test_by_test[sir_program][k]
        for i in k_test_cases:
            if i not in cluster_test_cases or i not in variables:
                continue
            for j in k_test_cases:
                if j not in cluster_test_cases or j not in variables:
                    continue
                if i < j:
                    linear_terms[variables.index(i)] -= penalty
                    try:
                        quadratic_terms[variables.index(i),variables.index(j)] += 2 * penalty
                    except:
                        quadratic_terms[variables.index(i),variables.index(j)] = 2 * penalty
    
    return quadratic_terms

In [7]:
def create_QUBO_problem(linear_terms, quadratic_terms):
    """This function is the one that has to encode the QUBO problem that QAOA will have to solve. The QUBO problem specifies the optimization to solve and a quadratic binary unconstrained problem"""
    qubo = QuadraticProgram()
    
    for i in range(0,len(linear_terms)):
        qubo.binary_var('x%s' % (i))

    qubo.minimize(linear=linear_terms,quadratic=quadratic_terms)

    return qubo


In [8]:
penalties_dictionary = {"flex":None,"grep":None,"gzip":None,"sed":None}

#to obtain a QUBO problem from a quadratic problem with constraints, we have to insert those constraints into the Hamiltonian to solve (which is the one encoded by the QUBO problem). When we insert constraint into the Hamiltonian, we have to specify also penalties
for sir_program in sir_programs:
    max_penalty = 0
    max_cost = max(test_cases_costs[sir_program].values())
    for i in range(sir_programs_tests_number[sir_program]):
        cost = (alpha * (test_cases_costs[sir_program][i]/max_cost)) - ((1 - alpha) * faults_dictionary[sir_program][i])
        if cost > max_penalty:
            max_penalty = cost
    penalties_dictionary[sir_program] = max_penalty + 1

In [9]:
qubos_dictionary = {"flex":[],"grep":[],"gzip":[],"sed":[]}
#make a dictionary that saves, for each program, the correspondent QUBO
for sir_program in sir_programs:
    print("SIR Program:\n")
    for cluster_id in clusters_dictionary[sir_program]:
        print("Cluster ID: " + str(cluster_id))
        variables = []
        for idx in range(0, len(clusters_dictionary[sir_program][cluster_id])):
            variables.append(idx)
        linear_terms = make_linear_terms(sir_program, clusters_dictionary[sir_program][cluster_id], alpha)
        quadratic_terms = make_quadratic_terms(sir_program, variables, clusters_dictionary[sir_program][cluster_id], linear_terms, penalties_dictionary[sir_program])
        qubo = create_QUBO_problem(linear_terms, quadratic_terms)
        qubos_dictionary[sir_program].append(qubo)
        print(qubo)
        print("/--/--/--/--/--/--/--/--/--/--/--/--/--/--/--/--/--/--/--/--/--/--/--/--/--/--/--/--/")
    print("======================================================================================")

SIR Program:

Cluster ID: 7
minimize 70.38563106303513*x0*x2 - 35.1928149508474*x0 + 4.6791867798879533e-07*x1 + 2.311405517775977e-07*x2 + 3.157041682815968e-07*x3 + 4.735562524223953e-07*x4 + 3.833550614847962e-07*x5 + 3.833550614847962e-07*x6 (7 variables, 0 constraints, '')
/--/--/--/--/--/--/--/--/--/--/--/--/--/--/--/--/--/--/--/--/--/--/--/--/--/--/--/--/
Cluster ID: 50
minimize -0.4986518924632677*x0 - 0.49865199393960746*x1 - 0.49865195447658645*x2 - 0.49863967583947005*x3 - 0.49863963637644904*x4 - 0.49863970402734226*x5 - 0.4986396645643212*x6 - 0.498639895704873*x7 (8 variables, 0 constraints, '')
/--/--/--/--/--/--/--/--/--/--/--/--/--/--/--/--/--/--/--/--/--/--/--/--/--/--/--/--/
Cluster ID: 24
minimize 2821.458296612482*x3*x10 - 0.4991933871251894*x0 - 0.49925980902716605*x1 - 0.4991294457559635*x10 - 0.49858233043233147*x11 - 0.49927110108875655*x12 - 0.49912940629294245*x13 - 0.49866111553504106*x14 - 0.4991725844755294*x15 - 0.49917269158944366*x16 - 0.498666459955604

In [10]:
def covered_lines(sir_program,test_cases_list):
    covered_lines = set()
    
    for test_case in test_cases_list:
        try:
            for covered_line in test_coverage_line_by_line[sir_program][test_case]:
                covered_lines.add(covered_line)
        except:
            continue
    
    return len(covered_lines)

def build_pareto_front(sir_program,selected_tests):
    pareto_front = []
    max_fault_coverage = 0
    max_stmt_coverage = 0
    
    for index in range(1,len(selected_tests)+1):
        #exract the first index selected tests
        candidate_solution = selected_tests[:index]
        candidate_solution_fault_coverage = 0
        candidate_solution_stmt_coverage = 0
        for selected_test in candidate_solution:
            candidate_solution_fault_coverage += faults_dictionary[sir_program][selected_test]
            candidate_solution_stmt_coverage += covered_lines(sir_program,candidate_solution)
        #if the actual pareto front dominates the candidate solution, get to the next candidate
        if max_fault_coverage >= candidate_solution_fault_coverage and max_stmt_coverage >= candidate_solution_stmt_coverage:
            continue
        #eventually update the pareto front information
        if candidate_solution_stmt_coverage > max_stmt_coverage:
            max_stmt_coverage = candidate_solution_stmt_coverage
        if candidate_solution_fault_coverage > max_fault_coverage:
            max_fault_coverage = candidate_solution_fault_coverage
        #add the candidate solution to the pareto front
        pareto_front.append(candidate_solution)
    
    return pareto_front

In [None]:
#I want to run the sampler 30 times to get different results for each sir program
for sir_program in sir_programs:
    reps = [8]
    for rep in reps:
        sim_ideal = AerSimulator()
        algorithm_globals.random_seed = 10598
        qaoa_mes = QAOA(sampler=BackendSampler(backend=sim_ideal), optimizer=COBYLA(100), reps=rep)
        qaoa = MinimumEigenOptimizer(qaoa_mes)  # using QAOA
        #the fronts will be saved into files
        print("SIR Program: " + sir_program)
        file_path = "results/selectqaoa/ideal/" + sir_program + "-data-rep-" + str(rep) + ".json"
        json_data = {}
        response = None
        qpu_run_times = []
        pareto_fronts_building_times = []
        for i in range(10):
            final_selected_tests = []
            cluster_dict_index = 0
            for qubo in qubos_dictionary[sir_program]:
                print("QUBO Problem: " + str(qubo) + "\n Cluster Number: " + str(cluster_dict_index))
                print("Cluster's Test Cases: " +str(list(clusters_dictionary[sir_program].values())[cluster_dict_index]))
                #for each iteration get the result
                s = time.time()
                qaoa_result = qaoa.solve(qubo)
                print("RESULTS: " + str(qaoa_result))
                e = time.time()
                qpu_run_times.append((e - s) * 1000)
                #let's extract the selected tests
                variable_values = qaoa_result.x
                indexes_selected_tests = [index for index, value in enumerate(variable_values) if value == 1]
                print("Indexes of selected tests to convert. " + str(indexes_selected_tests))
                selected_tests = []
                for index in indexes_selected_tests:
                    selected_tests.append(list(clusters_dictionary[sir_program].values())[cluster_dict_index][index])
                print("Selected tests: " + str(selected_tests))
                print("Experiment Number: " + str(i))
                cluster_dict_index += 1
                for selected_test in selected_tests:
                    if selected_test not in final_selected_tests:
                        final_selected_tests.append(selected_test)
            i+=1
            #now we have to build the pareto front
            print("Final Selected Test Cases: " + str(final_selected_tests))
            print("Length of the final list of selected test cases: " + str(len(final_selected_tests)))
            start = time.time()
            pareto_front = build_pareto_front(sir_program, final_selected_tests)
            end = time.time()
            json_data["pareto_front_" + str(i)] = pareto_front
            pareto_front_building_time = (end - start) * 1000
            pareto_fronts_building_times.append(pareto_front_building_time)

    #compute the average time needed for the construction of a pareto frontier and run time
    mean_qpu_run_time = statistics.mean(qpu_run_times)
    mean_pareto_fronts_building_time = statistics.mean(pareto_fronts_building_times)
    json_data["mean_qpu_run_time(ms)"] = mean_qpu_run_time
    json_data["stdev_qpu_run_time(ms)"] = statistics.stdev(qpu_run_times)
    json_data["all_qpu_run_times(ms)"] = qpu_run_times
    json_data["mean_pareto_fronts_building_time(ms)"] = mean_pareto_fronts_building_time

    with open(file_path, "w") as file:
        json.dump(json_data, file)
