In [3]:
import pandas as pd
import glob
import os
import json
import re

# 指定包含 parquet 文件的目录
#directory = r"F:\Desktop\yjs\a毕业设计论文进行\a_dataset\apr_original_data\raw_dataset\coconut_c2005_preprocessed\data"
directory = r"F:\Desktop\yjs\a毕业设计论文进行\a_dataset\apr_original_data\raw_dataset\coconut_python2010_preprocessed\data"

# 获取所有 parquet 文件路径
parquet_files = glob.glob(os.path.join(directory, "*.parquet"))

# 读取并合并所有 parquet 文件
df_list = [pd.read_parquet(file, engine="pyarrow") for file in parquet_files]
df = pd.concat(df_list, ignore_index=True)  # 合并成一个 DataFrame

# 打印前 5 行
print(df.head())


                            rem  \
0                    print "di"   
1  print self.x, self.y, self.z   
2                                 
3      M = amax(new.rowind) + 1   
4   new.data = new.data * other   

                                                 add  \
0                                                      
1                                                      
2  expr = numexpr("2.0*a+3.0*c",[('a',float),('c'...   
3                      M = int(amax(new.rowind)) + 1   
4                                  new.data *= other   

                                             context  
0  def __init__(self, x, y, z, kind='linear', cop...  
1  def __init__(self, x, y, z, kind='linear', cop...  
2  def check_broadcasting(self): a = arange(100)....  
3  def Construct(s, ij=None, M=None ,N=None, nzma...  
4  def __mul__(self, other):  # implement matrix ...  


In [None]:
# # 仅处理前 100 行
# df_subset = df.head(100).copy()
df_subset = df.copy()

# 处理逻辑
def process_row(rem, add, context):
    """ 根据 rem 和 add 规则处理 context """
    if pd.isna(rem) or pd.isna(add) or pd.isna(context):
        return None  # 跳过

    # 统计 rem 出现的次数
    rem_occurrences = len(re.findall(re.escape(rem), context))
    if rem_occurrences != 1 :
        return None  # rem 多次匹配，跳过

    # 3.1 rem:xxx, add: xxx，直接替换
    if rem != "" and add != "":
        return context.replace(rem, add)

    # 3.2 rem:xxx, add: 空，替换为空
    if rem != "" and add == "":
        return context.replace(rem, "")

    # 3.3 rem:{} , add:{xxx}，替换 { // FIXME: not implemented}
    if rem == "{}" and add.startswith("{") and add.endswith("}"):
        return context.replace("{ // FIXME: not implemented }", add)

    # 3.4 rem:}, add: xxx}，替换
    if rem == "}" and add.endswith("}"):
        return context.replace(rem, add)

    # 3.5 rem:{, add:{xxx，替换
    if rem == "{" and add.startswith("{"):
        return context.replace(rem, add)

    # 3.6 rem:{xxx}, add:任意，替换 { // FIXME: not implemented xxx}
    if rem.startswith("{") and rem.endswith("}"):
        return context.replace(f"{{ // FIXME: not implemented {rem.strip('{}')} }}", add)

    # 3.7 其他情况，跳过
    return None

# 处理数据并去重
output_data = []
seen_entries = set()  # 用于去重

for _, row in df_subset.iterrows():
    buggy_code = row["context"]
    fix_code = process_row(row["rem"], row["add"], row["context"])

    if fix_code is not None and fix_code.strip() != "":  # 仅保留符合规则的数据
        entry = (row["rem"], row["add"], row["context"])  # 作为唯一标识
        if entry not in seen_entries:
            seen_entries.add(entry)
            output_data.append({"buggy_code": buggy_code, "fix_code": fix_code})

# 导出 JSONL 文件
output_path = "processed_data.jsonl"
with open(output_path, "w", encoding="utf-8") as f:
    for entry in output_data:
        f.write(json.dumps(entry, ensure_ascii=False) + "\n")

# 统计 JSONL 文件的行数
line_count = len(output_data)
print(f"最终 JSONL 文件行数: {line_count}")
print(f"数据处理完成，已保存为 {output_path}")


最终 JSONL 文件行数: 119610
数据处理完成，已保存为 processed_data.jsonl


In [20]:
import black
import autopep8
import textwrap

def format_python_code(code: str, max_width=80) -> str:
    """
    格式化 Python 代码，自动换行并使用 `black` 和 `autopep8` 标准化。
    
    参数：
    - code (str): 需要格式化的 Python 代码
    - max_width (int): 代码最大行宽，超过此宽度的行会自动换行

    返回：
    - 格式化后的 Python 代码 (str)
    """
    try:
        # 1. 先用 textwrap 进行简单换行
        wrapped_code = "\n".join(textwrap.wrap(code, width=max_width))

        # 2. 使用 black 进行标准化
        formatted_code = black.format_str(wrapped_code, mode=black.FileMode())
        return formatted_code
    except black.InvalidInput:
        print("[Warning] `black` 解析失败，尝试使用 `autopep8` 进行格式化...")
        try:
            # 3. 如果 black 失败，尝试用 autopep8
            return autopep8.fix_code(code)
        except Exception as e:
            print(f"[Error] `autopep8` 也失败了: {e}")
            return code  # 返回原始代码，避免丢失数据

# 示例：需要格式化的 Python 代码（超长单行）
sample_code = "def __mul__(self, other):  # implement matrix multiplication and matrix-vector multiplication if isspmatrix(other): return self.matmat(other) elif isscalar(other): new = self.copy() new.data = new.data * other new._dtypechar = new.data.dtypechar new.ftype = _transtabl[new._dtypechar] return new else: return self.matvec(other)"

# 运行格式化
formatted_result = format_python_code(sample_code)
print(formatted_result)


# implement matrix multiplication and matrix-vector multiplication if isspmatrix(other): return self.matmat(other) elif isscalar(other): new = self.copy() new.data = new.data * other new._dtypechar = new.data.dtypechar new.ftype = _transtabl[new._dtypechar] return new else: return self.matvec(other)
def __mul__(self, other):

