In [None]:
import datetime
import fileinput
import glob
import json
import os
import re
import subprocess
import sys

## Data

In [None]:
def read_mut_data(path): 
    input_file = open(path, "r")
    project_targets = json.loads(input_file.read())
    input_file.close()
    return project_targets

## Test runner

In [None]:
def install_parent_module(target):
    os.chdir(target["parent-module-path"])
    process = subprocess.Popen(['mvn', 'clean', 'install', '-DskipTests'],
                               stdout=subprocess.PIPE, 
                               stderr=subprocess.PIPE)
    output_to_log("[LOG] Invoked a clean install of the parent module with {}".format(process.args))
    stdout, stderr = process.communicate()
    output_to_log(stdout.decode("utf-8"))
    output_to_log(stderr.decode("utf-8"))
    output_to_log("---------------------------------------------------")
    os.chdir(experiment_directory)
    # if compilation fails
    if ("COMPILATION ERROR" in stdout.decode("utf-8")):
        output_to_log("[LOG] Skipping this mutant, restoring original " + target["parent-file"])
        return False
    return True

In [None]:
def run_mvn_test(test, mode):
    result = ""
    if (mode == "original"):
        output_to_log("[LOG] Running original test " + test["original-test"])
        os.chdir(test["original-test-module-path"])
        output_to_log("[LOG] In test module directory " + os.getcwd())
        # run original test
        run_test_method = "-Dtest=" + test["original-test"]
        process = subprocess.Popen(['mvn', 'test', run_test_method],
                                   stdout=subprocess.PIPE, 
                                   stderr=subprocess.PIPE)
        output_to_log("[LOG] Invoked tests with {}".format(process.args))
        stdout, stderr = process.communicate()
        result = stdout.decode("utf-8")
        output_to_log(result)
        output_to_log(stderr.decode("utf-8"))
    else:
        output_to_log("[LOG] Running parameterized test " + test["param-test-class"])
        os.chdir(test["param-test-module-path"])
        output_to_log("[LOG] In test module directory " + os.getcwd())
        # run parameterized test
        run_test_class = "-Dtest=" + test["param-test-class"]
        process = subprocess.Popen(['mvn', 'test', run_test_class],
                                   stdout=subprocess.PIPE, 
                                   stderr=subprocess.PIPE)
        output_to_log("[LOG] Invoked tests with {}".format(process.args))
        stdout, stderr = process.communicate()
        result = stdout.decode("utf-8")
        output_to_log(result)
        output_to_log(stderr.decode("utf-8"))
    
    output_to_log("---------------------------------------------------")

    # store result here
    result = re.search(r"(Tests run:.*)\n", result).group(1)

    # cd back into experiment directory
    os.chdir(experiment_directory)
    output_to_log("[LOG] In experiment directory " + os.getcwd())
    output_to_log("---------------------------------------------------")
    
    return result

## Util

In [None]:
def output_to_log(message):
    with open(mut_mutation_log_path, 'a+') as logfile:
        logfile.write(message)
        logfile.write("\n")

In [None]:
def cleanup_existing_results():
    output_to_log("[LOG] Removing pre-existing mutants from current working directory")
    os.system('rm -rf *.java/')
    output_to_log("[LOG] Cleaning up LittleDarwin results")
    os.system('rm -rf ' + littledarwin_results_path)

In [None]:
def get_mutant_files(parent):
    mutants = ('./' + parent + '/[0-9]*.java')
    mutant_files = sorted(glob.glob(mutants))
    return mutant_files

In [None]:
def copy_littledarwin_results(parent):
    output_to_log("[LOG] Copying mutants of " + parent)
    os.system('cp -r ' + littledarwin_results_path + parent + " ./")

In [None]:
def run_littledarwin(path):
    output_to_log("[LOG] Running littledarwin on " + path)
    process = subprocess.Popen(['python3', '-m', 'littledarwin', '-m', '-v', '-p', path, '--all'],
                               stdout=subprocess.PIPE, 
                               stderr=subprocess.PIPE)
    output_to_log("[LOG] Invoked LitteDarwin with {}".format(process.args))
    stdout, stderr = process.communicate()
    output_to_log(stdout.decode("utf-8"))
    output_to_log(stderr.decode("utf-8"))

In [None]:
def find_if_mutant_is_in_mut(target, mutant_file):
    search_string = "line number in original file: "
    found = ""
    with open(mutant_file, 'r') as fp:
        for l_no, line in enumerate(fp):
            # search string
            if search_string in line:
                found = line
                print("Processing mutant file", mutant_file)
                print("mutant found:", found)
                break
                
    result = int(re.search(r"(line number in original file:\s)(\d+)", found).group(2))
    return result >= target["mut-start-line"] and result <= target["mut-end-line"]

In [None]:
def copy_mutant_files_to_output_directory(target):
    mutants_in_experiment_directory = experiment_directory + target["parent-file"]
    path_for_mutants = output_directory + target["mut"] + "/"
    output_to_log("[LOG] Copying littledarwin mutants to " + path_for_mutants)
    process = subprocess.Popen(['cp', '-r', mutants_in_experiment_directory, path_for_mutants],
                               stdout=subprocess.PIPE, 
                               stderr=subprocess.PIPE)
    output_to_log("[LOG] Invoked copying with {}".format(process.args))
    stdout, stderr = process.communicate()
    output_to_log(stdout.decode("utf-8"))
    output_to_log(stderr.decode("utf-8"))

## Mutation analysis

In [None]:
def run_experiment_with_mutant_file(target, mutant_file, original_file):
    output_to_log("[LOG] Replacing original " + target["parent-file"] + " with " + mutant_file)
    replace_original_with_mutant = "cp " + mutant_file + " " + target["parent-path"] + target["parent-file"]
    output_to_log("[LOG] Invoking " + replace_original_with_mutant)
    os.system(replace_original_with_mutant)
    
    # cd into parent module, make a clean install, cd back into experiments directory
    mutant_compilable = install_parent_module(target)
    
    if mutant_compilable:
        target_report = {
            "mut": target["mut"],
            "CUT": target["parent-fqn"],
            "mutant-file": mutant_file,
            "mutant-within-mut": True,
            "tests": []
        }

        for t in range(len(target["tests"])):
            test = {}
            test["test"] = target["tests"][t]["original-test"]
            # execute original test
            test["result_original"] = run_mvn_test(target["tests"][t], "original")
            # execute parameterized test
            test["result-parameterized"] = run_mvn_test(target["tests"][t], "parameterized")
            target_report["tests"].append(test)

        mutation_report_file = output_directory + target["mut"] + "/mutation-report.json"

        with open(mutation_report_file, 'a+') as outfile:
            json.dump(target_report, outfile, indent=4)
            outfile.write(",\n")
        
    output_to_log("[LOG] Resetting " + target["parent-file"] + " to original")
    replace_mutant_with_original = "cp " + original_file + " " + target["parent-path"] + target["parent-file"]
    output_to_log("[LOG] Invoking " + replace_mutant_with_original)
    os.system(replace_mutant_with_original)

In [None]:
def run_mutation_analysis(target):
    global mut_mutation_log_path
    mut_mutation_log_path = output_directory + target["mut"] + "/mutation-analysis.log"
    os.makedirs(os.path.dirname(mut_mutation_log_path), exist_ok=True)
    
    output_to_log("[LOG] Initiating analysis at " + str(datetime.datetime.now()))
    output_to_log("---------------------------------------------------")
    
    # housekeeping
    cleanup_existing_results()
    output_to_log("---------------------------------------------------")

    output_to_log("[LOG] Working with " + target["mut"])
    output_to_log("[LOG] Generating mutants for its parent " + target["parent-file"])
    output_to_log("---------------------------------------------------")
    
    # run littledarwin
    run_littledarwin(target["parent-path"])    
    output_to_log("---------------------------------------------------")
    
    # copy littledarwin results to current directory
    copy_littledarwin_results(target["parent-file"])
    output_to_log("---------------------------------------------------")

    # get list of mutant files
    mutant_files = get_mutant_files(target["parent-file"])
    output_to_log("[LOG] Generated " + str(len(mutant_files)) + " mutants")
    output_to_log("---------------------------------------------------")

    # get original file
    original_file = glob.glob(r'./' + target["parent-file"] + '/original.java')[0]
    
    for f in range(len(mutant_files)):
        if find_if_mutant_is_in_mut(target, mutant_files[f]):
            run_experiment_with_mutant_file(target, mutant_files[f], original_file)
            output_to_log("---------------------------------------------------")
    
    # copy all mutants to output folder
    copy_mutant_files_to_output_directory(target)
    
    output_to_log("---------------------------------------------------")
    output_to_log("[LOG] Finishing analysis at " + str(datetime.datetime.now()))
    
    output_to_log("====== END OF THE LINE ======")

In [None]:
# for each mut:
## invoke LitteDarwin to generate mutants
## for each mutant:
### replace original parent class with mutated
### run original test > save log
### run parameterized test > save log
### restore original parent class

def main():
    global experiment_directory
    global output_directory
    global littledarwin_results_path
    global oo_string
    global po_string
    global co_string

    experiment_directory = "/home/dee/dev/fakir/proze-experiments/pdfbox/mutation-analysis/"
    output_directory = experiment_directory + "output/"
    littledarwin_results_path = "/home/dee/anaconda3/lib/python3.8/site-packages/littledarwin/LittleDarwinResults/"
    
    os.chdir(experiment_directory)
    targets_pdfbox = read_mut_data("/home/dee/dev/fakir/proze-experiments/pdfbox/mutation-analysis/muts-pdfbox.json")

    print("Found", len(targets_pdfbox), "MUTs for PDFBox")    
    
    print("Working with PDFBox MUTs")
    for target in targets_pdfbox:
        print("Currently processing", target["mut"])
        print("Details:", target)
        run_mutation_analysis(target)

In [None]:
main()