In [None]:
import datetime
import fileinput
import glob
import json
import os
import re
import subprocess
import sys

## Data

In [None]:
def read_mut_data(path): 
    input_file = open(path, "r")
    project_targets = json.loads(input_file.read())
    input_file.close()
    return project_targets

## Test runner

In [None]:
def install_parent_module(target):
    os.chdir(target["parent-module-path"])
    process = subprocess.Popen(['mvn', 'clean', 'install', '-DskipTests'],
                               stdout=subprocess.PIPE, 
                               stderr=subprocess.PIPE)
    output_to_log("[LOG] Invoked a clean install of the parent module with {}".format(process.args))
    stdout, stderr = process.communicate()
    output_to_log(stdout.decode("utf-8"))
    output_to_log(stderr.decode("utf-8"))
    output_to_log("---------------------------------------------------")
    os.chdir(experiment_directory)

In [None]:
def run_mvn_test(target):
    os.chdir(target["test-module-path"])
    output_to_log("[LOG] In test module directory " + os.getcwd())
    # run tests
    test_class = target["test-file"].replace(".java", "")
    run_test_class = "-Dtest=" + test_class
    process = subprocess.Popen(['mvn', 'test', run_test_class],
                               stdout=subprocess.PIPE, 
                               stderr=subprocess.PIPE)
    output_to_log("[LOG] Invoked tests with {}".format(process.args))
    stdout, stderr = process.communicate()
    output_to_log(stdout.decode("utf-8"))
    output_to_log(stderr.decode("utf-8"))
    
    output_to_log("---------------------------------------------------")

    # store result here
    result = re.search(r"(Tests run:.*)\n",
                       stdout.decode("utf-8")).group(1)
        
    process = subprocess.Popen(['mvn', 'clean'],
                               stdout=subprocess.PIPE, 
                               stderr=subprocess.PIPE)
    output_to_log("[LOG] Cleaned targets with {}".format(process.args))
    stdout, stderr = process.communicate()
    output_to_log(stdout.decode("utf-8"))
    output_to_log(stderr.decode("utf-8"))
    
    output_to_log("---------------------------------------------------")

    # cd back into experiment directory
    os.chdir(experiment_directory)
    output_to_log("[LOG] In experiment directory " + os.getcwd())
    output_to_log("---------------------------------------------------")
    
    return result

In [None]:
def run_baseline_tests_with_no_oracles(target):
    output_to_log("[BASELINE] Not evaluating any oracle")
    # comment out all assertEquals and Mockito.verify statements"
    replace_original_RICK_test_with_variant(target, "baseline")
    
    # cd into test module, run tests, cd back into experiments directory
    result_baseline = run_mvn_test(target)
    
    # restore original test
    restore_original_RICK_test(target)
    output_to_log("---------------------------------------------------")
    return result_baseline

In [None]:
def run_oo(target):
    output_to_log("[OO] Evaluating only OO")
    replace_original_RICK_test_with_variant(target, "oo")

    # cd into test module, run tests, cd back into experiments directory
    result_oo = run_mvn_test(target)
    
    # restore original test
    restore_original_RICK_test(target)
    output_to_log("---------------------------------------------------")
    return result_oo

In [None]:
def run_po(target):
    output_to_log("[PO] Evaluating only PO")
    replace_original_RICK_test_with_variant(target, "po")
    
    # cd into test module, run tests, cd back into experiments directory
    result_po = run_mvn_test(target)
    
    # restore original test
    restore_original_RICK_test(target)
    output_to_log("---------------------------------------------------")
    return result_po

In [None]:
def run_co(target):
    output_to_log("[CO] Evaluating only CO")
    replace_original_RICK_test_with_variant(target, "co")
    
    # cd into test module, run tests, cd back into experiments directory
    result_co = run_mvn_test(target)
    
    # restore original test
    restore_original_RICK_test(target)
    output_to_log("---------------------------------------------------")
    return result_co

In [None]:
def run_all_oracles(target):
    output_to_log("[ALL] Evaluating (OO +) PO + CO")
    # restore original tests
    restore_original_RICK_test(target)
    
    # cd into test module, run tests, cd back into experiments directory
    result_all = run_mvn_test(target)
    
    output_to_log("---------------------------------------------------")
    return result_all

## Util

In [None]:
def output_to_log(message):
    with open(mut_mutation_log_path, 'a+') as logfile:
        logfile.write(message)
        logfile.write("\n")

In [None]:
def cleanup_existing_results():
    output_to_log("[LOG] Removing pre-existing mutants from current working directory")
    os.system('rm -rf *.java/')
    output_to_log("[LOG] Cleaning up LittleDarwin results")
    os.system('rm -rf ' + littledarwin_results_path)

In [None]:
def back_up_original_RICK_test(target):
    output_to_log("[LOG] Backing up original RICK test as ./" + target["parent-file"] + "/original-test.java")
    backup_rick_test = "cp " + target["test-path"] + target["test-file"] + " ./" + target["parent-file"] + "/original-test.java"
    output_to_log("[LOG] Invoking " + backup_rick_test)
    os.system(backup_rick_test)

In [None]:
def restore_original_RICK_test(target):
    output_to_log("[LOG] Restoring original RICK test from ./" + target["parent-file"] + "/original-test.java")
    restore_rick_test = "cp" + " ./" + target["parent-file"] + "/original-test.java" + " " + target["test-path"] + target["test-file"]
    output_to_log("[LOG] Invoking " + restore_rick_test)
    os.system(restore_rick_test)

In [None]:
def replace_original_RICK_test_with_variant(target, variant):
    output_to_log("[LOG] Replacing original RICK test with variant " + variant)
    variant_test_file = "/test-" + variant + ".java"
    original_test_file = target["test-path"] + target["test-file"]
    replace_rick_test = "cp " + output_directory + target["mut"] + variant_test_file + " " + original_test_file
    output_to_log("[LOG] Invoking " + replace_rick_test)
    os.system(replace_rick_test)

In [None]:
def make_copies_of_original_RICK_test(target):
    output_to_log("[LOG] Generating variants of original RICK test")
    original_test_file = target["test-path"] + target["test-file"]
    
    test_files = [
        "/test-baseline.java",
        "/test-po.java",
        "/test-co.java",
        "/test-all.java"
    ]
    
    for i in range(len(test_files)):
        os.system("cp " + original_test_file + " " + output_directory + target["mut"] + test_files[i])
        
    if target["has-oo"]:
        os.system("cp " + original_test_file + " " + output_directory + target["mut"] + "/test-oo.java")
        create_oo_test(output_directory + target["mut"] + "/test-oo.java")
        
    create_baseline_test(output_directory + target["mut"] + test_files[0])
    create_po_test(output_directory + target["mut"] + test_files[1])
    create_co_test(output_directory + target["mut"] + test_files[2])
    create_all_oracles_test(output_directory + target["mut"] + test_files[3])

In [None]:
def create_baseline_test(test_file_name):
    # baseline - comment out all assertEquals and Mockito.verify statements"
    output_to_log("[LOG] Modifying " + test_file_name)
    for line in fileinput.input(test_file_name, inplace = 1): 
        if oo_string in line or po_string in line or co_string in line:
            print(re.sub(r"^(.*)", "//\\1", line), end = '')
        else:
            print(line, end = '')
    output_to_log("[LOG] Created baseline test at " + test_file_name)

In [None]:
def create_oo_test(test_file_name):
    # OO: comment out all Mockito.verify statements
    output_to_log("[LOG] Modifying " + test_file_name)
    for line in fileinput.input(test_file_name, inplace = 1): 
        if po_string in line or co_string in line:
            print(re.sub(r"^(.*)", "// \\1", line), end = '')
        else:
            print(line, end = '')   
    output_to_log("[LOG] Created OO test at " + test_file_name)

In [None]:
def create_po_test(test_file_name):
    # PO: comment out all assertEquals, Mockito.times() statements
    output_to_log("[LOG] Modifying " + test_file_name)
    for line in fileinput.input(test_file_name, inplace = 1): 
        if oo_string in line or co_string in line:
            print(re.sub(r"^(.*)", "// \\1", line), end = '')
        else:
            print(line, end = '')
    output_to_log("[LOG] Created PO test at " + test_file_name)

In [None]:
def create_co_test(test_file_name):
    # CO: comment out all assertEquals, Mockito.atLeastOnce() statements
    output_to_log("[LOG] Modifying " + test_file_name)
    for line in fileinput.input(test_file_name, inplace = 1): 
        if oo_string in line or po_string in line:
            print(re.sub(r"^(.*)", "// \\1", line), end = '')
        else:
            print(line, end = '')
    output_to_log("[LOG] Created CO test at " + test_file_name)

In [None]:
def create_all_oracles_test(test_file_name):
    # All oracles: Nothing to change, this test is equal to the original
    output_to_log("[LOG] Created test with all oracles at " + test_file_name)

In [None]:
def find_if_mutant_is_in_mut(target, mutant_file):
    search_string = "line number in original file: "
    found = ""
    with open(mutant_file, 'r') as fp:
        for l_no, line in enumerate(fp):
            # search string
            if search_string in line:
                found = line
                print("Processing mutant file", mutant_file)
                print("mutant found:", found)
                break
                
    result = int(re.search(r"(line number in original file:\s)(\d+)", found).group(2))
    return result >= target["mut-start-line"] and result <= target["mut-end-line"]

In [None]:
def get_mutant_files(parent):
    mutants = ('./' + parent + '/[0-9]*.java')
    mutant_files = sorted(glob.glob(mutants))
    return mutant_files

In [None]:
def copy_littledarwin_results(parent):
    output_to_log("[LOG] Copying mutants of " + parent)
    os.system('cp -r ' + littledarwin_results_path + parent + " ./")

In [None]:
def run_littledarwin(path):
    output_to_log("[LOG] Running littledarwin on " + path)
    process = subprocess.Popen(['python3', '-m', 'littledarwin', '-m', '-v', '-p', path, '--all'],
                               stdout=subprocess.PIPE, 
                               stderr=subprocess.PIPE)
    output_to_log("[LOG] Invoked LitteDarwin with {}".format(process.args))
    stdout, stderr = process.communicate()
    output_to_log(stdout.decode("utf-8"))
    output_to_log(stderr.decode("utf-8"))

In [None]:
def copy_mutant_files_to_output_directory(target):
    mutants_in_experiment_directory = experiment_directory + target["parent-file"]
    path_for_mutants = output_directory + target["mut"] + "/"
    output_to_log("[LOG] Copying littledarwin mutants to " + path_for_mutants)
    process = subprocess.Popen(['cp', '-r', mutants_in_experiment_directory, path_for_mutants],
                               stdout=subprocess.PIPE, 
                               stderr=subprocess.PIPE)
    output_to_log("[LOG] Invoked copying with {}".format(process.args))
    stdout, stderr = process.communicate()
    output_to_log(stdout.decode("utf-8"))
    output_to_log(stderr.decode("utf-8"))

In [None]:
def run_experiment_with_mutant_file(target, mutant_file, original_file):
    output_to_log("[LOG] Replacing original" + target["parent-file"] + " with " + mutant_file)
    replace_original_with_mutant = "cp " + mutant_file + " " + target["parent-path"] + target["parent-file"]
    output_to_log("[LOG] Invoking " + replace_original_with_mutant)
    os.system(replace_original_with_mutant)
    
    # cd into parent module, make a clean install, cd back into experiments directory
    install_parent_module(target)
        
    target_report = {
        "mut": target["mut"],
        "CUT": target["parent-fqn"],
        "mutant-file": mutant_file,
        "mutant-within-mut": True
    }
    
    # test execution
    target_report["result_baseline"] = run_baseline_tests_with_no_oracles(target)
    if target["has-oo"]:
        target_report["result-oo"] = run_oo(target)
    else:
        target_report["result-oo"] = ""
    target_report["result-po"] = run_po(target)
    target_report["result-co"] = run_co(target)
    target_report["result-all"] = run_all_oracles(target)
    
    mutation_report_file = output_directory + target["mut"] + "/mutation-report.json"
    
    with open(mutation_report_file, 'a+') as outfile:
        json.dump(target_report, outfile, indent=4)
        outfile.write(",\n")
        
    output_to_log("[LOG] Resetting " + target["parent-file"] + " to original")
    replace_mutant_with_original = "cp " + original_file + " " + target["parent-path"] + target["parent-file"]
    output_to_log("[LOG] Invoking " + replace_mutant_with_original)
    os.system(replace_mutant_with_original)

## Mutation analysis

In [None]:
def run_mutation_analysis(target):
    global mut_mutation_log_path
    mut_mutation_log_path = output_directory + target["mut"] + "/mutation-analysis.log"
    os.makedirs(os.path.dirname(mut_mutation_log_path), exist_ok=True)
    
    output_to_log("[LOG] Initiating analysis at " + str(datetime.datetime.now()))
    output_to_log("---------------------------------------------------")
    
    # housekeeping
    cleanup_existing_results()
    output_to_log("---------------------------------------------------")

    output_to_log("[LOG] Working with " + target["mut"])
    output_to_log("[LOG] Generating mutants for its parent " + target["parent-file"])
    output_to_log("---------------------------------------------------")
    
    # run littledarwin
    run_littledarwin(target["parent-path"])    
    output_to_log("---------------------------------------------------")
    
    # copy littledarwin results to current directory
    copy_littledarwin_results(target["parent-file"])
    output_to_log("---------------------------------------------------")

    # get list of mutant files
    mutant_files = get_mutant_files(target["parent-file"])
    output_to_log("[LOG] Generated " + str(len(mutant_files)) + " mutants")
    output_to_log("---------------------------------------------------")

    # get original file
    original_file = glob.glob(r'./' + target["parent-file"] + '/original.java')[0]
    
    # back-up original RICK test
    back_up_original_RICK_test(target)
    output_to_log("---------------------------------------------------")
    
    # create variants of original RICK test
    make_copies_of_original_RICK_test(target)
    output_to_log("---------------------------------------------------")
    
    for f in range(len(mutant_files)):
        if find_if_mutant_is_in_mut(target, mutant_files[f]):
            run_experiment_with_mutant_file(target, mutant_files[f], original_file)
            output_to_log("---------------------------------------------------")
    
    # copy all mutants to output folder
    copy_mutant_files_to_output_directory(target)
    
    output_to_log("---------------------------------------------------")
    output_to_log("[LOG] Finishing analysis at " + str(datetime.datetime.now()))
    
    output_to_log("====== END OF THE LINE ======")

In [None]:
def main():
    global experiment_directory
    global output_directory
    global littledarwin_results_path
    global oo_string
    global po_string
    global co_string

    experiment_directory = "/home/dee/dev/rick-experiments/mutation-analysis/"
    output_directory = experiment_directory + "output/"
    littledarwin_results_path = "/home/dee/anaconda3/lib/python3.8/site-packages/littledarwin/LittleDarwinResults/"

    oo_string = "assertEquals"
    po_string = "Mockito.atLeastOnce"
    co_string = "Mockito.times"
    
    os.chdir(experiment_directory)
    targets_pdfbox = read_mut_data("/home/dee/dev/rick-experiments/mutation-analysis/pdfbox/muts-pdfbox.json")
    targets_graphhopper = read_mut_data("/home/dee/dev/rick-experiments/mutation-analysis/graphhopper/muts-graphhopper.json")
    targets_gephi = read_mut_data("/home/dee/dev/rick-experiments/mutation-analysis/gephi/muts-gephi.json")    

    print("Found", len(targets_pdfbox), "MUTs for PDFBox")
    print("Found", len(targets_graphhopper), "MUTs for GraphHopper" )
    print("Found", len(targets_gephi), "MUTs for Gephi" )
    
    
    print("Working with PDFBox MUTs")
    for target in targets_pdfbox:
        print("Currently processing", target["mut"])
        print("Details:", target)
        run_mutation_analysis(target)
    
    print("Working with GraphHopper MUTs")
    for target in targets_graphhopper:
        print(target["mut"])
        print("Details:", target)
        run_mutation_analysis(target)
    
    print("Working with Gephi MUTs")
    for target in targets_gephi:
        print(target["mut"])
        print("Details:", target)
        run_mutation_analysis(target)


In [None]:
main()