In [20]:
import re
import os

# Extract the code segment between two "--Call--" lines in a log file
def extract_call_segment(log_file_path, output_file_path):
    with open(log_file_path, 'r', encoding='utf-8') as f:
        lines = f.readlines()
    lines = [line.rstrip('\n') for line in lines]

    # Find the first and second occurrence of "--Call--"
    call_indices = [i for i, line in enumerate(lines) if line.strip() == "--Call--"]
    if len(call_indices) == 0:
        print("Error: No '--Call--' found in the log file.")
        return
    start_idx = call_indices[0]
    if len(call_indices) > 1:
        end_idx = call_indices[1]
    else:
        end_idx = len(lines)  # till end if second call not found

    # Extract the segment between first and second call
    segment = lines[start_idx:end_idx]
    # Now we need to find lines that end with "##line:(N)" and then append the next (Pdb) dictionary
    # Pattern to look for:
    # Code line example: "-> return [dictionary.get(key) for key in key_list]    ##line:(2)"
    # Followed by: "(Pdb) {'key_list': [...], 'dictionary': {...}}"
    # We remove "(Pdb)" and append this dict to the previous line.
    output_lines = []
    i = 0
    while i < len(segment):
        line = segment[i]
        if "##line:(" in line:
            code_line = line
            # Look ahead to find next line with (Pdb) { ... }
            if i + 1 < len(segment) and segment[i+1].startswith("(Pdb) "):
                pdb_line = segment[i+1]
                pdb_line_stripped = pdb_line.replace("(Pdb) ", "", 1)
                # Append this to code_line
                code_line += " " + pdb_line_stripped
                i += 2  
            else:
                i += 1

            output_lines.append(code_line)
        else:
            i += 1

    # Write the output lines to the file
    with open(output_file_path, 'w', encoding='utf-8') as f:
        for line in output_lines:
            f.write(line + '\n')

def extract_call_code_only(log_file_path, output_file_path):
    with open(log_file_path, 'r', encoding='utf-8') as f:
        lines = f.readlines()
    lines = [line.rstrip('\n') for line in lines]

    # Find the first and second occurrence of "--Call--"
    call_indices = [i for i, line in enumerate(lines) if line.strip() == "--Call--"]
    if len(call_indices) == 0:
        print("Error: No '--Call--' found in the log file.")
        return
    start_idx = call_indices[0]
    if len(call_indices) > 1:
        end_idx = call_indices[1]
    else:
        end_idx = len(lines)  # till end if second call not found

    # Extract the segment between first and second call
    segment = lines[start_idx:end_idx]
    output_lines = []
    i = 0
    while i < len(segment):
        line = segment[i]
        if "##line:(" in line:
            code_line = line
            output_lines.append(code_line)
        i += 1

    # Write the output lines to the file
    with open(output_file_path, 'w', encoding='utf-8') as f:
        for line in output_lines:
            f.write(line + '\n')

extract_call_code_only("chunk_1_2/669532/logger_PDBscript_669532.txt", "test_extract_content.txt")

In [None]:
from utile import *

def extract_all_call_segments(dataset_path, pdb_dir, output_dir):
    jsons = read_jsonl_file(dataset_path)
    for problem in jsons:
        id = problem['id']
        # Create a directory for the problem
        os.makedirs(f"{output_dir}/{id}", exist_ok=True)
        extract_call_segment(f"{pdb_dir}/{id}/logger_PDBscript_{id}.txt", f"{output_dir}/{id}/code_values_{id}.txt")
        extract_call_code_only(f"{pdb_dir}/{id}/logger_PDBscript_{id}.txt", f"{output_dir}/{id}/code_only_{id}.txt")

for i in range(1,11):    
    extract_all_call_segments(f"star_coder/chunk_4/chunk_4_{i}.jsonl", f"chunk_4_{i}", "code_sum_chunk4")

In [30]:
# Load the code lines from a txt as a string(skip the empty files and files with lines >15)
def load_code_lines(file_path):
    with open(file_path, 'r', encoding='utf-8') as f:
        lines = f.readlines()
    lines = [line.rstrip('\n') for line in lines]
    if len(lines) > 15 or not lines:
        return None
    # Join the lines into a single string with newlines
    return '\n'.join(lines)

print(load_code_lines("code_sum_chunk1/63/code_values_63.txt"))

-> def repeat_string(s):    ##line:(1) {'s': 'abc', 'repeat': <class 'itertools.repeat'>}
-> return ' '.join(repeat(s, 3))    ##line:(2) {'s': 'abc', 'repeat': <class 'itertools.repeat'>}
-> return ' '.join(repeat(s, 3))    ##line:(2) {'s': 'abc', 'repeat': <class 'itertools.repeat'>, '__return__': 'abc abc abc'}


In [39]:
# Merge the code_sum_chunks (get rid of the empty and long files)
def merge_code_sum_chunk(chunk_dir, output_file):
# traverse the chunk_dir
    merged_dict = {}
    for subdir in os.listdir(chunk_dir):
        subdir_path = os.path.join(chunk_dir, subdir)
        # the name of the subdir is the problem id
        id = subdir
        code_dict = {}
        # if there is no code_only or code_values file, skip
        if not os.path.exists(f"{subdir_path}/code_only_{id}.txt") or not os.path.exists(f"{subdir_path}/code_values_{id}.txt"):
            continue
        code_dict["code_only"] = load_code_lines(f"{subdir_path}/code_only_{id}.txt")
        code_dict["code_values"] = load_code_lines(f"{subdir_path}/code_values_{id}.txt")
        if code_dict["code_only"] and code_dict["code_values"]:
            merged_dict[id] = code_dict
    
    # Print the number of problems in the merged dictionary
    print(f"Number of problems in the merged dictionary: {len(merged_dict)}")

    with open(output_file, 'w', encoding='utf-8') as outfile:
        json.dump(merged_dict, outfile, ensure_ascii=False, indent=4)

merge_code_sum_chunk("code_sum_chunk4", "code_sum_chunk4.json")

Number of problems in the merged dictionary: 7255
