In [None]:
import subprocess
import os
from pathlib import Path

CP_SEP = ";" if os.name == "nt" else ":"

def compile_case(case_dir, classpath):
    java_files = list(Path(case_dir).glob("*.java"))
    
    #print(f"Compiling Java files in {case_dir} with classpath {classpath}")

    cmd = [
        "javac",
        "-cp", f"{classpath}{CP_SEP}{case_dir}",
        "-d", ".",
        *map(str, java_files)
    ]

    return subprocess.run(
        cmd,
        cwd=case_dir,
        stdout=subprocess.PIPE,
        stderr=subprocess.PIPE,
        text=True
    )

In [None]:
def run_tests(case_dir, classpath):
    #print(f"Running tests in {case_dir} with classpath {classpath}")
    
    cmd = [
        "java",
        "-cp", f"{classpath}{CP_SEP}{case_dir}",
        "org.junit.platform.console.ConsoleLauncher",
        "execute", # Explicitly tell it to execute
        "--scan-class-path",
        "--reports-dir", str(case_dir)
    ]

    return subprocess.run(
        cmd,
        cwd=case_dir,
        stdout=subprocess.PIPE,
        stderr=subprocess.PIPE,
        text=True
    )

In [None]:
import xml.etree.ElementTree as ET

def parse_report(report_dir):
    stats = dict(tests=0, failures=0, errors=0, skipped=0)

    for xml_file in report_dir.glob("*.xml"):
        root = ET.parse(xml_file).getroot()

        stats["tests"] += int(root.attrib.get("tests", 0))
        stats["failures"] += int(root.attrib.get("failures", 0))
        stats["errors"] += int(root.attrib.get("errors", 0))
        stats["skipped"] += int(root.attrib.get("skipped", 0))

    stats["passed"] = (
        stats["tests"]
        - stats["failures"]
        - stats["errors"]
        - stats["skipped"]
    )

    return stats

In [None]:
from pathlib import Path
import re

PACKAGE_REGEX = re.compile(r'^\s*package\s+[\w\.]+;\s*$', re.MULTILINE)

def remove_package_declaration(java_file: Path):
    text = java_file.read_text(encoding="utf-8")

    new_text, count = PACKAGE_REGEX.subn("", text)

    # delete all imports for import com.thealgorithms.* since we provide it locally
    content = re.sub(r'^\s*import\s+com\.thealgorithms\.[\w\.]+;\s*$', '', string=new_text, flags=re.MULTILINE)

    java_file.write_text(content.lstrip(), encoding="utf-8")


def process_directory(root: Path):
    for java_file in root.rglob("*.java"):
        remove_package_declaration(java_file)


In [None]:
import re
import os

def extract_public_methods(file_path):
    # Regex Breakdown:
    # 1. Look for 'public' methods
    # 2. Match the return type (includes generics like List<String>)
    # 3. Capture the method name (?P<method_name>...)
    # 4. Ensure it is followed by '('
    method_regex = r"public\s+(?:static\s+)?[\w<>[\]]+\s+(?P<method_name>\w+)\s*\("
    
    method_names = []
    
    try:
        with open(file_path, 'r') as f:
            content = f.read()
            
            # finditer returns all non-overlapping matches
            matches = re.finditer(method_regex, content)
            for match in matches:
                method_names.append(match.group("method_name"))
                
        return method_names
    except FileNotFoundError:
        print("File not found.")
        return []


def overwrite_method_names_best_effort(target_file_path, new_names):
    with open(target_file_path, 'r') as f:
        content = f.read()

    # The regex matching the signature
    pattern = r"(public\s+(?:static\s+)?[\w<>[\]]+\s+)(\w+)(\s*\()"
    
    # Find all matches to calculate our limits
    matches = list(re.finditer(pattern, content))
    num_found = len(matches)
    num_provided = len(new_names)

    if num_found != num_provided:
        print(f"Warning: Mismatch found. File has {num_found} methods, but {num_provided} names provided.")
        print("Proceeding with best-effort matching...")

    # We only iterate up to the smaller of the two lists
    limit = min(num_found, num_provided)
    
    # We work backwards! Replacing from the end of the file to the start 
    # ensures that changing string lengths doesn't mess up our match offsets.
    content_list = list(content)
    
    for i in range(limit - 1, -1, -1):
        match = matches[i]
        new_name = new_names[i]
        
        # match.start(2) and match.end(2) point exactly to the old method name
        start, end = match.span(2)
        content_list[start:end] = list(new_name)

    updated_content = "".join(content_list)

    with open(target_file_path, 'w') as f:
        f.write(updated_content)

    #print(f"Successfully updated {limit} method(s) in {target_file_path}")


def overwrite_method_names(target_file_path, new_names):
    with open(target_file_path, 'r') as f:
        content = f.read()

    # This regex matches the same pattern as before:
    # Group 1: Modifiers and return type
    # Group 2: The old method name
    # Group 3: The opening parenthesis and beyond
    pattern = r"(public\s+(?:static\s+)?[\w<>[\]]+\s+)(\w+)(\s*\()"
    
    # Find all current public methods to check the count
    current_methods = re.findall(pattern, content)
    
    if len(current_methods) != len(new_names):
        raise ValueError(
            f"Count mismatch! File has {len(current_methods)} methods, "
            f"but you provided {len(new_names)} names."
        )

    # Create a mapping of old method names to new names
    old_method_names = [match[1] for match in current_methods]
    method_name_mapping = dict(zip(old_method_names, new_names))

    # First pass: rename method definitions
    name_index = 0

    def replacement_callback(match):
        nonlocal name_index
        result = f"{match.group(1)}{new_names[name_index]}{match.group(3)}"
        name_index += 1
        return result

    updated_content = re.sub(pattern, replacement_callback, content)

    # Second pass: rename method calls
    for old_name, new_name in method_name_mapping.items():
        # Match method calls: word boundary + method name + (
        call_pattern = rf"\b{re.escape(old_name)}\s*\("
        updated_content = re.sub(call_pattern, f"{new_name}(", updated_content)

    with open(target_file_path, 'w') as f:
        f.write(updated_content)

    #print(f"Successfully updated {len(new_names)} methods in {target_file_path}")


def rename_class_in_file(file_path):
    file_name = os.path.basename(file_path)
    new_class_name = file_name.replace(".java", "")
    
    with open(file_path, 'r', encoding="utf-8") as f:
        content = f.read()

    # remove comments to avoid false positives
    content_no_comments = re.sub(r"//.*?$|/\*.*?\*/", "", content, flags=re.DOTALL | re.MULTILINE)

    # 1. Identify the OLD class name first so we know what constructor to look for
    # We look for the current class name after the 'class' keyword
    old_name_match = re.search(r"class\s+([a-zA-Z0-9_]+)", content_no_comments)
    if not old_name_match:
        return
    old_class_name = old_name_match.group(1)

    # 2. Rename the Class Definition
    class_pattern = r"((?:(?:public|final|protected|private|abstract)\s+)*class\s+)[a-zA-Z0-9_]+"
    content_no_comments = re.sub(class_pattern, rf"\1{new_class_name}", content_no_comments, count=1)

    # 3. Rename the Constructor(s)
    # Pattern: Modifier (optional) + Old Name + (
    # We use a lookahead to ensure it's a constructor and not a method call
    constr_pattern = rf"((?:public|protected|private)\s+){old_class_name}\s*\("
    content_no_comments = re.sub(constr_pattern, rf"\1{new_class_name}(", content_no_comments)

    with open(file_path, 'w', encoding="utf-8") as f:
        f.write(content_no_comments)
    
    #print(f"Removed comments and updated class and constructor in: {file_path}")

In [None]:
from pathlib import Path
import shutil
import re

ORIGINAL_DIR = Path("../input/testplayground/original")
TESTS_DIR = Path("../input/testplayground/tests")
DATASET_DIR = Path("../input/testplayground/dataset")
SUPPORT_DIR = Path("../input/testplayground/support")

# Matches: ClassName_KF_Version_Prompt.java
IMPL_PATTERN = re.compile(r"^(?P<base>[^_]+)_(?P<KeyFactor>[^_]+)_(?P<Version>[^_]+)_(?P<Prompt>[^_]+)\.java$")

case_class_function_name_mapping = []

def build_dataset():
    for impl_file in ORIGINAL_DIR.rglob("*.java"):
        match = IMPL_PATTERN.match(impl_file.name)
        if not match:
            print(f"Skipping {impl_file.name} (does not match pattern)")
            continue

        base = match.group("base")
        key_factor = match.group("KeyFactor")
        version = match.group("Version")
        prompt = match.group("Prompt")

        test_file = TESTS_DIR / f"{base}Test.java"
        if not test_file.exists():
            print(f"⚠ No test found for {base}, skipping")
            continue

        if version == "v2" or version == "v3" or version == "v4" or version == "v5":
            # todo remove this exception when everything runs
            #print(f"⚠ For speed, we are currently skipping later iterations")
            #continue
            pass

        case_dir = DATASET_DIR / "new" / base / key_factor / version / prompt
        case_dir.mkdir(parents=True, exist_ok=True)

        # Copy + rename implementation
        shutil.copyfile(
            impl_file,
            case_dir / f"{base}.java"
        )

        # copy over supporting files
        def copy_support_files(case_dir):
            for support_file in SUPPORT_DIR.glob("*.java"):
                shutil.copyfile(
                    support_file,
                    case_dir / support_file.name
                )

        copy_support_files(case_dir)

        # Copy + rename class name in implementation to match file name
        rename_class_in_file(case_dir / f"{base}.java")

        # Extract public static method names and store mapping
        if key_factor == "KF0" and version == "v0" and prompt == "nop":
            method_names = extract_public_methods(case_dir / f"{base}.java")
            case_class_function_name_mapping.append({
                "class_name": base,
                "method_names": method_names
            })
        else:
            if not any(mapping["class_name"] == base for mapping in case_class_function_name_mapping):
                print(f"missing mapping for {base}/{key_factor}/{version}/{prompt}")
            else:
                #print(f"Found mapping for {base}/{key_factor}/{version}/{prompt}")
                
                # If we have a mapping, overwrite method names in this implementation
                # We assume the order of methods in the file is consistent across versions, so we can reuse the same method names.
                # get the method names for this class
                mapping = next((m for m in case_class_function_name_mapping if m["class_name"] == base), None)

                try:
                    overwrite_method_names(target_file_path=case_dir / f"{base}.java", new_names=mapping["method_names"])
                except ValueError as e:
                    print(f"Error: {e}: Trying best-effort alternative")
                    try:
                        overwrite_method_names_best_effort(target_file_path=case_dir / f"{base}.java", new_names=mapping["method_names"])
                    except ValueError as e:
                        print(f"Error: {e}: Best-effort alternative also failed")


        # Copy test file
        shutil.copyfile(
            test_file,
            case_dir / test_file.name
        )

        #print(f"Created case: {base}/{key_factor}/{version}/{prompt}")

In [None]:
from pathlib import Path

DIRECTORY = Path("../input/testplayground/")
DATASET = Path.joinpath(DIRECTORY, "dataset")

build_dataset()

In [None]:
process_directory(DATASET)

In [None]:
import json

import tqdm

LIB_DIR = Path("lib")
classpath = CP_SEP.join(str(Path.joinpath(DIRECTORY, jar)) for jar in LIB_DIR.glob("*.jar"))

#print(f"Using classpath: {classpath}")

# grab a specific list of cases to run
directory_to_run = DATASET.joinpath("failing", "new")

# get all folder names in directory_to_run
folder_names = [
    folder.name
    for folder in directory_to_run.iterdir()
    if folder.is_dir()
]

folder_names = ["JugglerSequence"]

case_test_results = []

# Pre-scan case directories so we know total progress
case_dirs = [
    case
    for case in DATASET.joinpath("new").rglob("*")
    if case.is_dir() and any(case.glob("*.java"))
]

for case in tqdm.tqdm(case_dirs, total=len(case_dirs), desc="Compiling and running test cases"):
    current_case = case.relative_to(DATASET.joinpath("new")).parts[0]
    #print(f"folder of interest {current_case}")
    #print(f"acceptable folders {folder_names}")
    if current_case not in folder_names:
        print(f"Skipping {current_case} since it's not in the specified list of folders to run")
        continue

    compile_res = compile_case(case, classpath)

    # get relative path from DATASET root
    path = case.relative_to(DATASET.joinpath("new"))
    
    if compile_res.returncode != 0:
        case_test_results.append({
            "case": str(path),
            "status": "compile_error",
            "error": compile_res.stderr
        })

        # move failing compiles into the "failing" directory for easier access
        failing_dir = DATASET.joinpath("failing", path.parent)
        failing_dir.mkdir(parents=True, exist_ok=True)
        shutil.move(str(case), str(failing_dir / case.name))

        continue

    test_result = run_tests(case, classpath)
    #print(test_result)

    stats = parse_report(case)
    stats["case"] = str(path)
    stats["status"] = "ok"
    case_test_results.append(stats)

    if False:
        if stats["failures"] > 0 or stats["errors"] > 0:
            # move failing cases into the "failing" directory for easier access
            failing_dir = DATASET.joinpath("failing", path.parent)
            failing_dir.mkdir(parents=True, exist_ok=True)
            shutil.move(case, failing_dir / case.name)
        else:
            # move passing cases into the "passing" directory for easier access
            passing_dir = DATASET.joinpath("passing", path.parent)
            passing_dir.mkdir(parents=True, exist_ok=True)
            shutil.move(case, passing_dir / case.name)


In [None]:
# summarize the number of passed and failed tests
passed_tests = sum(1 for result in case_test_results if result["status"] == "ok" and result.get("passed", 0) > 0)
failed_tests = sum(1 for result in case_test_results if result["status"] == "ok" and result.get("failed", 0) > 0)
compile_errors = sum(1 for result in case_test_results if result["status"] == "compile_error")

print(f"Total cases with passed tests: {passed_tests}")
print(f"Total cases with failed tests: {failed_tests}")
print(f"Total cases with compile errors: {compile_errors}")

# print cases where there is a compile error
for result in case_test_results:
    if result["status"] == "compile_error":
        print(f"Compile error in case {result['case']}:")
        print(result["error"])

In [None]:
# go into each case and parse the report
dir = DATASET.joinpath("new")
repos = []
for case in dir.rglob("*"):
    if case.is_dir() and any(case.glob("*.java")):
        print(f"Processing case: {case.relative_to(DATASET)}")
        
        # read xml report
        report_stats = parse_report(case)

        print(report_stats)

        print(f"Total cases with passed tests: {report_stats['passed']}")
        print(f"Total cases with failed tests: {report_stats['failures'] + report_stats['errors']}")

        if report_stats["failures"] > 0 or report_stats["errors"] > 0:
            # move failing cases into the "failing" directory for easier access
            failing_dir = DATASET.joinpath("failing", case.relative_to(DATASET))
            failing_dir.mkdir(parents=True, exist_ok=True)

            print(f"Moving failing case {case.relative_to(DATASET)} from {case} to {failing_dir / case.name}")

            shutil.move(case, failing_dir / case.name)
        else:
            # move passing cases into the "passing" directory for easier access
            passing_dir = DATASET.joinpath("passing", case.relative_to(DATASET))
            passing_dir.mkdir(parents=True, exist_ok=True)
            
            print(f"Moving passing case {case.relative_to(DATASET)} from {case} to {passing_dir / case.name}")

            shutil.move(case, passing_dir / case.name)

In [None]:
# build a dataframe with all snippets, variants, and versions

all_code_path = Path("../input/testplayground/all_code")

all_codes = []

for case in all_code_path.rglob("*"):
    if case.is_dir() and any(case.glob("*.java")):
        snippet_name = case.name
        for impl_file in case.glob("*.java"):
            match = IMPL_PATTERN.match(impl_file.name)
            if not match:
                print(f"Skipping {impl_file.name} (does not match pattern)")
                continue

            base = match.group("base")
            key_factor = match.group("KeyFactor")
            version = match.group("Version")
            prompt = match.group("Prompt")

            code = impl_file.read_text(encoding="utf-8")

            all_codes.append({
                "snippet": snippet_name,
                #"class_name": base,
                "key_factor": key_factor,
                "version": version,
                "prompt": prompt
            })

import pandas as pd
df_test_results = pd.DataFrame(all_codes)

# print dimensions
print(f"All code dataframe dimensions: {df_test_results.shape}")
print(df_test_results.head(5))


In [None]:
currently_passing = DATASET.joinpath("passing", "new")
currently_failing = DATASET.joinpath("failing", "new")

def evaluate_test_results(folder_to_search):
    snippets = 0
    tests_passed = 0
    tests_failed = 0

    for case in folder_to_search.rglob("*"):
        if case.is_dir() and any(case.glob("*.java")):
            #print(f"Processing case: {case.relative_to(DATASET)}")
            snippets += 1

            # extract KF and version from path
            path_parts = case.relative_to(DATASET).parts
        
            snippet = path_parts[-5]
            key_factor = path_parts[-4]
            version = path_parts[-3]
            prompt = path_parts[-2]
            #print(f"Key factor: {key_factor}, version: {version}, prompt: {prompt}")

            # read xml report
            report_stats = parse_report(case)

            #print(report_stats)

            if report_stats["failures"] > 0 or report_stats["errors"] > 0:
                tests_failed += report_stats["failures"]
                tests_failed += report_stats["errors"]
            else:
                tests_passed += report_stats["passed"]

            # add results to dataframe where snippet, key_factor, version, and prompt match
            mask = (df_test_results["snippet"] == snippet) & (df_test_results["key_factor"] == key_factor) & (df_test_results["version"] == version) & (df_test_results["prompt"] == prompt)
            df_test_results.loc[mask, "tests_passed"] = report_stats["passed"]
            df_test_results.loc[mask, "tests_failed"] = report_stats["failures"] 
            df_test_results.loc[mask, "tests_errors"] = report_stats["errors"]
        

    print(f"Total snippets: {snippets}")
    print(f"Total tests passed: {tests_passed}")
    print(f"Total tests failed: {tests_failed}")

evaluate_test_results(currently_passing)
evaluate_test_results(folder_to_search=currently_failing)

In [None]:

df_test_results.to_csv(DIRECTORY / "all_code_test_results.csv", index=False)

In [None]:
def rename_class_in_file_to_filename(file_path):
    file_name = os.path.basename(file_path)
    new_class_name = file_name.replace(".java", "")
    
    with open(file_path, 'r', encoding="utf-8") as f:
        content = f.read()

    # 1. Identify the OLD class name first so we know what constructor to look for
    # We look for the current class name after the 'class' keyword
    old_name_match = re.search(r"class\s+([a-zA-Z0-9_]+)", content)
    if not old_name_match:
        return
    old_class_name = old_name_match.group(1)

    # 2. Rename the Class Definition
    class_pattern = r"((?:(?:public|final|protected|private|abstract)\s+)*class\s+)[a-zA-Z0-9_]+"
    content = re.sub(class_pattern, rf"\1{new_class_name}", content, count=1)

    # 3. Rename the Constructor(s)
    # Pattern: Modifier (optional) + Old Name + (
    # We use a lookahead to ensure it's a constructor and not a method call
    constr_pattern = rf"((?:public|protected|private)\s+){old_class_name}\s*\("
    content = re.sub(constr_pattern, rf"\1{new_class_name}(", content)

    with open(file_path, 'w', encoding="utf-8") as f:
        f.write(content)

# help to prepare a manual inspection of test case by collecting all versions
dir_test_project = Path("../TestProject/")
input_string = "ADFGVXCipher"

key_factor = ["KF0", "KF1", "KF2"]
versions = ["v0", "v1", "v2", "v3", "v4", "v5"]
prompts = ["nop", "pKF0", "pKF1", "pKF2"]

test_file_done = False

for kf in key_factor:
    for version in versions:
        for prompt in prompts:
            case_dir = DATASET.joinpath("failing", "new", input_string, kf, version, prompt)
            if case_dir.exists():
                print(f"Case: {input_string}_{kf}_{version}_{prompt}")
                
                # get the test file
                test_file = case_dir / prompt / f"{input_string}Test.java"
                if test_file.exists():
                    if not test_file_done:
                        target_test_file = dir_test_project / f"test/{input_string}Test.java"
                        shutil.copyfile(test_file, target_test_file)
                        print(f"Copied {test_file} to {target_test_file}")
                        test_file_done = True
                else:
                    print(f"Test file not found for {input_string}/{kf}/{version}/{prompt}/{prompt}")

                # get the Java file with the input_string name
                java_file = case_dir / prompt / f"{input_string}.java"
                #print(f"Looking for Java file at {java_file}")
                if java_file.exists():
                    # rename the file to match input_string
                    target_file = dir_test_project / f"src/{input_string}_{kf}_{version}_{prompt}.java"
                    shutil.copyfile(java_file, target_file)
                    rename_class_in_file_to_filename(target_file)
                    print(f"Copied and adjusted {java_file} to {target_file}")
                else:
                    print(f"File not found for {input_string}/{kf}/{version}/{prompt}/{prompt}")




In [None]:
# read .csv file again for further analysis
df_results = pd.read_csv(DIRECTORY / "all_code_test_results.csv")
df_results.head(5)

# number of unique snippets that had tests run (i.e. tests_passed + tests_failed > 0)
df_results["total_tests"] = df_results["tests_passed"] + df_results["tests_failed"] + df_results["tests_errors"]
num_snippets_with_tests = df_results[df_results["total_tests"] > 0]["snippet"].nunique()
print(f"Number of unique snippets with tests run: {num_snippets_with_tests}")

# if the values in the column "man_tests_passed" are empty, fill them with the values from "tests_passed"
if "man_tests_passed" in df_results.columns and df_results["man_tests_passed"].isnull().all():
    df_results["man_tests_passed"] = df_results["tests_passed"]
    df_results["man_tests_failed"] = df_results["tests_failed"]
    df_results["man_tests_errors"] = df_results["tests_errors"]

# run some analysis
# first: show total number of tests passed and failed
total_passed = df_results["tests_passed"].sum()
total_failed = df_results["tests_failed"].sum() + df_results["tests_errors"].sum()
print(f"Total tests passed: {total_passed}")
print(f"Total tests failed: {total_failed}")

# first: compare tests_failed for all prompt versions (nop, pKF0, pKF1, pKF2)
# print summary statistics in percentage
for version in versions:
    for kf in key_factor:
        subset = df_results[(df_results["version"] == version) & (df_results["key_factor"] == kf)]
        total_cases = len(subset)
        failed_cases = subset["tests_failed"].sum() + subset["tests_errors"].sum()
        passed_cases = subset["tests_passed"].sum()
        
        print(f"Version: {version}, Key Factor: {kf}")
        print(f"Total cases: {total_cases}")
        print(f"Passed cases: {passed_cases} ({passed_cases / (passed_cases + failed_cases) * 100:.2f}%)")
        print(f"Failed cases: {failed_cases} ({failed_cases / (passed_cases + failed_cases) * 100:.2f}%)")
        print("-" * 30)

