In [1]:
import os
import json
import pandas as pd
import altair as alt
import difflib
import re

In [2]:
def instruction_finetune_dataset(base_path):
    """
    Reconfigure the dataset to combine information from each subfolder
    into a single text file for each main folder.

    :param base_path: Path to the dataset's root directory.
    :param output_path: Path to save the reconfigured files.
    """
    processed_data = []
    # Iterate through each folder in the base path
    for main_folder in os.listdir(base_path):
        main_folder_path = os.path.join(base_path, main_folder)
        print(main_folder_path)
        print("\n\n")
        if not os.path.isdir(main_folder_path):
            continue

        # output_file = os.path.join(output_path, f"{main_folder}_bug.txt")
        
        # with open(output_file, 'w', encoding='utf-8') as outfile:
        # Iterate through subfolders
        for subfolder in sorted(os.listdir(main_folder_path)):
            
            subfolder_path = os.path.join(main_folder_path, subfolder)
            print(subfolder_path)
            print("\n\n")
            if not os.path.isdir(subfolder_path):
                continue

            # Define file paths
            method_before_path = os.path.join(subfolder_path, 'method_before.txt')
            method_after_path = os.path.join(subfolder_path, 'method_after.txt')
            bug_json_path = os.path.join(subfolder_path, 'bug.json')

            # Read content from files if they exist
            method_before = read_file(method_before_path)
            method_after = read_file(method_after_path)
            bug_info = read_json(bug_json_path)
            bug_info = {k:v for k, v in bug_info.items() if k in ['bug_type', 'qualifier']}
            # Write to the output file
            # input_text = f"""
            #             ---
            #             Method Level Code containing bug:\n
            #             {method_before}
            #             \n\n
            #             Error Information:\n
            #             {str(bug_info)}
            #             ---"""
            # output_text = method_after
            
            processed_data.append({
                "BuggyCode": method_before,
                "BugInfo": bug_info,
                "FixedCode": method_after
            })

    return processed_data

def read_file(file_path):
    """Read the content of a file if it exists."""
    if os.path.exists(file_path):
        with open(file_path, 'r', encoding='utf-8') as file:
            return file.read().strip()
    return "File not found."

def read_json(file_path):
    """Read and parse a JSON file if it exists."""
    if os.path.exists(file_path):
        with open(file_path, 'r', encoding='utf-8') as file:
            return json.load(file)
    return {"error": "File not found."}

base_path = "InferredBugs/inferredbugs/java" 
output_path = "Processed_InferredBugs_InstructionFineTune"
processed_data = instruction_finetune_dataset(base_path)


InferredBugs/inferredbugs/java/shadowsocks-java



InferredBugs/inferredbugs/java/shadowsocks-java/1



InferredBugs/inferredbugs/java/shadowsocks-java/2



InferredBugs/inferredbugs/java/shadowsocks-java/3



InferredBugs/inferredbugs/java/find-sec-bugs



InferredBugs/inferredbugs/java/find-sec-bugs/1



InferredBugs/inferredbugs/java/find-sec-bugs/2



InferredBugs/inferredbugs/java/find-sec-bugs/3



InferredBugs/inferredbugs/java/find-sec-bugs/4



InferredBugs/inferredbugs/java/find-sec-bugs/5



InferredBugs/inferredbugs/java/find-sec-bugs/6



InferredBugs/inferredbugs/java/find-sec-bugs/7



InferredBugs/inferredbugs/java/find-sec-bugs/8



InferredBugs/inferredbugs/java/jansi



InferredBugs/inferredbugs/java/jansi/1



InferredBugs/inferredbugs/java/ShiroJwt



InferredBugs/inferredbugs/java/ShiroJwt/1



InferredBugs/inferredbugs/java/ShiroJwt/2



InferredBugs/inferredbugs/java/ShiroJwt/3



InferredBugs/inferredbugs/java/ShiroJwt/4



InferredBugs/inferredbugs/java/bigque

## Analyze Bug Distribution

In [3]:
bug_type_dict = {}
for json in processed_data:
    bug_type_dict[json['BugInfo']['bug_type']] = bug_type_dict.get(json['BugInfo']['bug_type'], 0) + 1
for key in bug_type_dict:
    bug_type_dict[key] = round(bug_type_dict[key]/len(processed_data) * 100 ,2)

In [4]:
bug_type_dict

{'THREAD_SAFETY_VIOLATION': 22.74,
 'RESOURCE_LEAK': 34.95,
 'NULL_DEREFERENCE': 40.24,
 'CHECKERS_PRINTF_ARGS': 0.96,
 'CHECKERS_IMMUTABLE_CAST': 0.2,
 'INTERFACE_NOT_THREAD_SAFE': 0.8,
 'UNSAFE_GUARDED_BY_ACCESS': 0.11}

In [36]:
data = {'THREAD_SAFETY_VIOLATION': 22.74,
        'RESOURCE_LEAK': 34.95,
        'NULL_DEREFERENCE': 40.24,
        'CHECKERS_PRINTF_ARGS': 0.96,
        'CHECKERS_IMMUTABLE_CAST': 0.2,
        'INTERFACE_NOT_THREAD_SAFE': 0.8,
        'UNSAFE_GUARDED_BY_ACCESS': 0.11}

df = pd.DataFrame(list(data.items()), columns=['Bug Type', 'Percentage Count'])

df = df.sort_values(by='Percentage Count', ascending=False)

chart = alt.Chart(df).mark_bar().encode(
    x=alt.X('Percentage Count', axis=alt.Axis(title='Percentage Count')),
    y=alt.Y('Bug Type', axis=alt.Axis(title='Bug Type',), sort='-x'),
    tooltip=['Bug Type', 'Percentage Count'] 
).properties(
    title='Distribution of Bug Types in InferredBugs JAVA Subset',
    width = 600,
    height = 400
).interactive()

# Add text labels to the chart
text = chart.mark_text(
    align='left',
    baseline='middle',
    dx=5
).encode(
    text=alt.Text('Percentage Count', format='.2f')  # Format the text to 2 decimal places
)

final_chart = chart + text

final_chart = final_chart.configure_axis(
    grid=True
).configure_mark(
    color='lightblue'
).configure_view(
    strokeWidth=0
).configure(
    background='white'
)

final_chart.save('bug_type_distribution_bar_chart.json')


the convert_dtype parameter is deprecated and will be removed in a future version.  Do ``ser.astype(object).apply()`` instead if you want ``convert_dtype=False``.



In [37]:
final_chart


the convert_dtype parameter is deprecated and will be removed in a future version.  Do ``ser.astype(object).apply()`` instead if you want ``convert_dtype=False``.



## Extract Bug And Prepare Data For SFT And ORPO

In [7]:


def tokenize_code(code: str):
    return re.findall(r'\w+|[^\s\w]', code)

def map_tokens_to_lines(code):
    lines = code.splitlines()
    token_to_line = {}
    token_idx = 0
    for line_num, line in enumerate(lines):
        tokens = tokenize_code(line)
        for token in tokens:
            token_to_line[token_idx] = (line_num, line.strip())
            token_idx += 1
    return token_to_line

def extract_localized_bug_from_tokens(buggy_code: str, fixed_code: str):
    tokens_buggy = tokenize_code(buggy_code)
    tokens_fixed = tokenize_code(fixed_code)
    
    matcher = difflib.SequenceMatcher(None, tokens_buggy, tokens_fixed)
    token_map = map_tokens_to_lines(buggy_code)

    changed_lines = set()
    for tag, i1, i2, j1, j2 in matcher.get_opcodes():
        if tag in ("replace", "delete"):
            for token_idx in range(i1, i2):
                line_num, line_text = token_map.get(token_idx, (None, None))
                if line_num is not None:
                    changed_lines.add((line_num, line_text))

    sorted_lines = sorted(list(changed_lines), key=lambda x: x[0])
    localized_lines = [line for _, line in sorted_lines]
    return '\n'.join(localized_lines)

def generate_alpaca_prompt_orpo(buggy_code, fixed_code, patch):
    prompt = """Below is an instruction that describes a task, paired with an input that provides further context. Write a response that appropriately completes the request, do not provide explanations just the answer

    ### Instruction:
    You are an expert JAVA coder. Given an input buggy method code written in JAVA, information about the bug, and the part of the code containing the probably bug (localized code containing bug), your task is to
    analyse the inputs and fix the buggy method and return the fixed method in JAVA. Only return the fixed code, no further explanation required.
    
    ### Input:
    #### JAVA Method containing bug:
    {}
    
    #### Bug Information:
    {}
    
    #### Localized Part of the Code Containing Bug:
    {}
    
    ### Response:"""

    prompt = prompt.format(buggy_code, fixed_code, patch)
    return prompt


def get_orpo_data(json_list):
    for entry in json_list:
        buggy_code = entry.get("BuggyCode", "")
        fixed_code = entry.get("FixedCode", "")
        bug_info = entry.get("BugInfo", "")
        patch = extract_localized_bug_from_tokens(buggy_code, fixed_code)
        # patch_fix = get_structured_patch(buggy_code, fixed_code)
        

        prompt = generate_alpaca_prompt_orpo(buggy_code, bug_info, patch)
        entry['prompt'] = prompt
        entry['chosen'] = fixed_code
        entry['rejected'] = buggy_code
        entry["LocalizedBug"] = patch
        entry['LocalizedPatchLength'] = len(patch)
        entry['CodeLength'] = len(buggy_code)
        # entry["PatchFix"] = patch_fix
    return json_list



In [8]:
orpo_dataset_inferredbugs = get_orpo_data(processed_data)

## Data Filtering

In [17]:
import re
non_english_pattern = re.compile(r'[^\x00-\x7F]+')

def is_english_java_code(example):
    buggy_code = example.get("BuggyCode", "")
    return not non_english_pattern.search(buggy_code)
filtered_data = [ex for ex in orpo_dataset_inferredbugs if is_english_java_code(ex)]
len(filtered_data)

7969

In [18]:
filtered_localized_data = []
for entry in filtered_data:
    if (entry['LocalizedPatchLength'] < 300 and entry['LocalizedPatchLength'] > 0) and (entry['CodeLength'] < 1200):
        filtered_localized_data.append(entry)

In [19]:
len(filtered_localized_data)

3422

## Save The Final Data

In [22]:
def dump_file_for_localization(data_list, output_file):
    keys_to_keep = ["BuggyCode", "BugInfo", "LocalizedBug", "LocalizedPatchLength", "FixedCode"]
    with open(output_file, "w", encoding="utf-8") as f:
        for item in data_list:
            filtered_item = {key: item[key] for key in keys_to_keep if key in item}
            f.write(json.dumps(filtered_item) + "\n")

In [35]:
import random
import json

data = filtered_localized_data

random.seed(42)
training_data = random.sample(data, 3000)

remaining_data = [item for item in data if item not in training_data]

test_data = random.sample(remaining_data, 300)

dump_file_for_localization(training_data, "InferredBugs_SFT_Train_with_bug_localization_it3.jsonl")
dump_file_for_localization(test_data, "InferredBugs_SFT_Test_with_bug_localization_it3.jsonl")


with open("InferredBugs_ORPO_Train_with_bug_localization.jsonl", "w") as train_file:
    for item in training_data:
        json.dump(item, train_file)
        train_file.write("\n")

with open("InferredBugs_ORPO_Test_with_bug_localization.jsonl", "w") as test_file:
    for item in test_data:
        json.dump(item, test_file)
        test_file.write("\n")

print("Training and test data successfully saved.")


Training and test data successfully saved.
