This file was made to calculate the time Clustered Mutation Testing takes for projects with the correct pom.xml configuration.

More in-depth discussion is in Adam Abdalla's thesis.

In [None]:
import math
import subprocess
import numpy as np
from os import path
import csv
import pandas as pd
from sklearn.cluster import AgglomerativeClustering
from sklearn.cluster import MeanShift
from sklearn.preprocessing import LabelEncoder
import time

In [None]:
# Executes pitest for given project, includes test compilation.
def compile_tests(project: str) -> str:
    pitestOutput = subprocess.run(["mvn", "test-compile"], capture_output=True, cwd="projectsTA/" + project, text=True)
    return pitestOutput

# Executes pitest for given project.
def execute_normal(project: str) -> str:
    pitestOutput = subprocess.run(["mvn", "-Drat.skip=true", "org.pitest:pitest-maven:mutationCoverage"], capture_output=True, cwd="projectsTA/" + project, text=True)
    return pitestOutput

# Executes pitest for given project, writes verbose output to file and returns a list of it.
def execute_verbose(project: str, addition: str="") -> list:
    pitestOutput = subprocess.run(["mvn", "-Dverbose=true", "-Drat.skip=true", "-Dfeatures=+cluster", "org.pitest:pitest-maven:mutationCoverage"], capture_output=True, cwd="projectsTA/" + project, text=True)
    with open("verboseOutputs/" + project + addition  + "-pitestOutput.txt", "w") as verboseFile:
        verboseFile.write(pitestOutput.stdout)
    return pitestOutput.stdout.split("\n")

# Executes pitest for given project, includes Pitest-Clustering-Plugin's characteristic extraction.
def characteristics_extraction(project: str) -> str:
    pitestOutput = subprocess.run(["mvn", "-Drat.skip=true", "-Dfeatures=+characteristics" "org.pitest:pitest-maven:mutationCoverage"], capture_output=True, cwd="projectsTA/" + project, text=True)
    return pitestOutput

# Executes pitest for given project, but uses Pitest-Clustering-Plugin's clustering feature.
def execute_cluster(project: str) -> str:
    pitestOutput = subprocess.run(["mvn", "-Drat.skip=true", "-Dfeatures=+cluster", "org.pitest:pitest-maven:mutationCoverage"], capture_output=True, cwd="projectsTA/" + project, text=True)
    return pitestOutput


# Reads the verbose output created by execute_verbose().
def read_verbose(project: str) -> list:
    with open("verboseOutputs/" + project + "-pitestOutput.txt") as verboseFile:
        pitestOutput = verboseFile.readlines()
        pitestOutput.append("filler")
        return pitestOutput


# Creates an mutant_id-cluster_id csv, so that the pitest-clustering-plugin
# can be used to execute only one mutant per cluster.
def export_clusters(labels: np.ndarray, csv_data: pd.DataFrame, export_dir: str) -> pd.DataFrame:
    df = pd.DataFrame(columns=["id", "cluster_id"])

    for i in range(len(labels)):
        df = df.append({"id": csv_data["id"][i], "cluster_id": labels[i]}, ignore_index=True)

    df.to_csv(export_dir + "/clustering/clusters.csv", sep=",", index=False)
    return df


# Gets characteristics data from project file created by pitest clustering plugin.
def getProjectDf(project: str) -> pd.DataFrame:
    csv_path = "projectsTA/" + project
    if path.exists(csv_path + "/target/pit-reports/clustering/characteristics.csv"):
        data = pd.read_csv(csv_path + "/target/pit-reports/clustering/characteristics.csv",
                                names=["id", "mutOperator", "opcode", "returnType",
                                        "localVarsCount", "isInTryCatch", "isInFinalBlock",
                                        "className", "methodName", "blockNumber", "lineNumber",
                                        "numTests"],
                                skiprows=1)
    else:
        data = pd.read_csv(csv_path + "/processor/target/pit-reports/clustering/characteristics.csv",
                                names=["id", "mutOperator", "opcode", "returnType",
                                        "localVarsCount", "isInTryCatch", "isInFinalBlock",
                                        "className", "methodName", "blockNumber", "lineNumber",
                                        "numTests"],
                                skiprows=1)

    return data


# For each seed, simulates execution of CMT by creating the cluster.csv file where only 1 random
# mutant per cluster is written to it and then using the "execute_cluster()" function.
def execute_mutants(project: str, labels: np.ndarray, data: pd.DataFrame, seeds: list) -> list:
    timings = []

    for seed in seeds:
        start_time = time.time()
        df = pd.DataFrame(columns=["id", "cluster_id"])

        for i in range(0, len(labels)):
            df = df.append({"id": data["id"][i], "cluster_id": labels[i]}, ignore_index=True)

        clusters = df["cluster_id"].unique()
        mutants = []

        for cluster_id in clusters:
            tmp = df[df["cluster_id"] == cluster_id]
            mutantID = tmp.sample(random_state=seed).iloc[0]["id"]
            mutants.append(mutantID)

        filler = range(len(mutants))
        df = pd.DataFrame({"id": mutants, "filler": filler})
        csv_path = "projectsTA/" + project

        if path.exists(csv_path + "/target/"):
            df.to_csv(csv_path + "/target/pit-reports/clustering/cluster.csv", mode="w")
        else:
            df.to_csv(csv_path + "/processor/target/pit-reports/clustering/cluster.csv", mode="w")

        execute_cluster(project)
        timings.append(time.time() - start_time)
        print(project + ": " + str(timings))

    return timings


# Creates clusters and gives them to execute_mutants() to continue to
def clusteredTestingSimulation(project: str, seeds: list, reduction: float, timingFile: str, useMeanshift: int=0):
    start_time = time.time()
    csv_path = "projectsTA/" + project
    data = getProjectDf(project)

    # define ordinal encoding
    encoder = LabelEncoder()
    data = data[["id", "mutOperator", "opcode", "returnType",
                    "localVarsCount", "isInTryCatch", "isInFinalBlock", "className", "methodName",
                    "blockNumber", "lineNumber"]]

    # Transform each column. Transform id last since we need to invert that.
    for col in ["mutOperator", "returnType", "className", "methodName", "id"]:
        data[col] = encoder.fit_transform(data[col])

    if useMeanshift:
        clustering = MeanShift(bandwidth=reduction, cluster_all=True)
    else:
        clustering = AgglomerativeClustering(distance_threshold=None,
                        n_clusters=int(math.ceil(len(data) * reduction)),
                        linkage="ward",
                        compute_distances=False)
    clusters = clustering.fit(data)

    # unlabel id so we can recognize the mutants
    data["id"] = encoder.inverse_transform(data["id"])
    simulationTime = time.time() - start_time
    resultTime = execute_mutants(project, clusters.labels_, data, seeds)
    timingFile.write("," + str(simulationTime + np.mean(resultTime)) + "," + str(np.std(resultTime)))


Parameter cell down below. Note that these are not the only parameters you might want to change.

Projects in the list should be in the projectsTA folder.

In [None]:
seeds = [
    66304, 16389, 14706, 91254, 49890, 86054, 55284, 77324, 36147, 13506, 73920, 80157, 43981, 75358, 33399, 56134,
    13388, 81617, 90957, 52113, 20428, 26482, 56340, 31018, 32067, 13067, 8339, 49008, 125894, 68282, ]
projects = [ "commons-csv", "commons-cli",  "commons-text", "commons-codec", "scribejava/scribejava-core", "google-auto-factory", "google-auto-common"]
reductions = [0.5, 0.25, 0.1]
useMeanshift = 0
bandwidths = [25, 12]

In [None]:
outputFilename = "timings/HierarchicalTimings.txt"
if useMeanshift:
    outputFilename = "timings/MSTimings.txt"
    reductions = bandwidths

f = open(outputFilename, "w")
f.write("project,full")
full_time = 0

for reduction in reductions:
    f.write("," + str(reduction) + " time avg," + str(reduction) + " time std")
f.write("\n")

# For every project, compile tests -> measure full time -> extract features -> measure CMT time
# Ensure all measurements are written in the output file.
for project in projects:
    compile_tests(project)
    print("Starting project: " + project)
    print("Executing the full mutation testing...")
    start_time = time.time()
    execute_normal(project)
    full_time = time.time() - start_time
    print("Full mutation testing took: " + str(full_time) + " seconds.")
    f.write(project + "," + str(full_time))

    if not path.exists("projectsTA/" + project + "/processor/target/pit-reports/clustering/characteristics.csv") and not path.exists("projectsTA/" + project + "/target/pit-reports/clustering/characteristics.csv"):
        print("Extracting features for project: " + project)
        start_time = time.time()
        characteristics_extraction(project)
        print("Extracting features took: " + str(time.time() - start_time) + " seconds.")

    for reduction in reductions:
        print("Starting reduction: " + str(reduction))
        clusteredTestingSimulation(project, seeds, reduction, f)
        f.close()
        f = open(outputFilename, "a")

    f.write("\n")

f.close()