Add the root path to the sys path so that we can import the modules.

In [1]:
import sys

ROOT_PATH = ".."

sys.path.append(ROOT_PATH)

### 1. Preprecessing

In this step, we firstly generate attributes used in our filtering framework, including:

- ```text```: str, content of files
- ```filename```: str, name of files
- ```lang```: str, language type of files
- ```ext```: str, file extension of files
- ```file_size_in_byte```: int, file volume of files (units are byte)
- ```program_lang```: str, programming language type of files
- ```doc_type```: str, documentation type, the super categories based on programming language types, includes "code", "data" and "text"

In [2]:
import os
import fasttext
from utils.preprocessing import get_program_lang, get_doc_type
import pandas as pd

# 新增
from tree_sitter import Language

# 编译解析器库（需确保解析器目录存在）
build_dir = os.path.join(ROOT_PATH, 'build')
os.makedirs(build_dir, exist_ok=True)
parser_library = os.path.join(build_dir, 'my-languages.so')

# 修复：指定完整路径
language_dirs = [
    os.path.join(ROOT_PATH, 'tree-sitter-c'),
    os.path.join(ROOT_PATH, 'tree-sitter-cpp'),
    os.path.join(ROOT_PATH, 'tree-sitter-java'),
    os.path.join(ROOT_PATH, 'tree-sitter-javascript'),
    os.path.join(ROOT_PATH, 'tree-sitter-go')
]
# 检查每个语言目录是否存在
for lang_dir in language_dirs:
    if not os.path.exists(lang_dir):
        print(f"警告: 目录不存在 {lang_dir}")

Language.build_library(
    parser_library,
    language_dirs
)


# generate a dataframe of attributes for each code file
def generate_attributes_df(raw_code_dir, lang_predictor):

    code_list = []
    for root, dirs, files in os.walk(raw_code_dir):
        for file in files:
            cur_code_dict = {}
            name, ext = os.path.splitext(file)
            ext = ext[1:]
            # file extension
            cur_code_dict['ext'] = ext

            file_path = os.path.join(root, file)
            with open(file_path, 'r') as f:
                text = f.read()
            # file content
            cur_code_dict['text'] = text

            # file name
            cur_code_dict['filename'] = name

            # language
            predictions = lang_predictor.predict(text.lower().replace("\n", " "))
            lang = predictions[0][0].replace('__label__', '')
            cur_code_dict['lang'] = lang

            # file size in byte
            file_size_in_byte = os.path.getsize(file_path)
            cur_code_dict['file_size_in_byte'] = file_size_in_byte

            # programming language
            cur_code_dict['program_lang'] = get_program_lang(name, ext)

            # documentation type
            cur_code_dict['doc_type'] = get_doc_type(cur_code_dict['program_lang'])

            code_list.append(cur_code_dict)

    code_df = pd.DataFrame(code_list)

    return code_df


raw_code_dir = f'{ROOT_PATH}/test_data/raw_code' # your source code directory
lang_predictor = fasttext.load_model(f'{ROOT_PATH}/artifacts/lang_predictor.bin') # fasttext language predictor

code_df = generate_attributes_df(raw_code_dir, lang_predictor)

print(f"code_df.shape: {code_df.shape}")
code_df.head()



code_df.shape: (120, 7)


Unnamed: 0,ext,text,filename,lang,file_size_in_byte,program_lang,doc_type
0,h,/*********************************************...,blkpg,en,1189,c,code
1,h,/*********************************************...,compat,en,738,c,code
2,h,// Copyright (c) 2012 The LevelDB Authors. All...,filter_block,en,2316,c,code
3,h,/*********************************************...,nlm,en,1611,c,code
4,h,// Copyright (c) 2014 The Bitcoin developers\n...,sha256,en,681,c,code


### 2. Qualtiy Signal Computing

In this step, we compute the quality signal of each file.

In [3]:
from pipeline.compute_quality_signals import ComputeCodeQualitySignal
from tqdm import tqdm
import json


def compute_qs(row, ccqs: ComputeCodeQualitySignal):
    
    final_result = ccqs.evaluate(
                    text=row['text'],
                    filename=row['filename'],
                    lang=row['lang'],
                    ext=row['ext'],
                    file_size_in_byte=row['file_size_in_byte'],
                    program_lang=row['program_lang'],
                    doc_type=row['doc_type'],
                )

    return final_result

ccqs = ComputeCodeQualitySignal()
tqdm.pandas()
qsc_results = code_df.progress_apply(compute_qs, axis=1 ,args=(ccqs,))
# qsc_scores = [json.loads(line)["quality_signal"] for line in qsc_results.tolist()]

# 安全提取 quality_signal 的方法
qsc_scores = []
for line in qsc_results.tolist():
    try:
        data = json.loads(line)
        # 优先从 quality_signal 提取，若不存在则设为 None
        score = data.get("quality_signal", None)
        
        # 处理异常情况（从 err_msg 中提取信息）
        if score is None:
            print(f"警告: 结果中缺少 quality_signal，原始数据: {data}")
            score = None  # 或根据需求设置默认值
    except json.JSONDecodeError:
        print(f"警告: 无法解析 JSON: {line}")
        score = None
    qsc_scores.append(score)

qsc_score_df = pd.concat([code_df, pd.DataFrame({"quality_signal": qsc_scores})], axis=1)
qsc_score_df.head(5)

100%|██████████| 120/120 [00:02<00:00, 43.87it/s]


Unnamed: 0,ext,text,filename,lang,file_size_in_byte,program_lang,doc_type,quality_signal
0,h,/*********************************************...,blkpg,en,1189,c,code,"{""qsc_syntax_c"": 0}"
1,h,/*********************************************...,compat,en,738,c,code,"{""qsc_syntax_c"": 1}"
2,h,// Copyright (c) 2012 The LevelDB Authors. All...,filter_block,en,2316,c,code,"{""qsc_syntax_c"": 0}"
3,h,/*********************************************...,nlm,en,1611,c,code,"{""qsc_syntax_c"": 1}"
4,h,// Copyright (c) 2014 The Bitcoin developers\n...,sha256,en,681,c,code,"{""qsc_syntax_c"": 0}"


### 3. Filtering

In this step, we do the filtering process based on the quality signal of each file.

Result for one file:
- ```effective```: Whether this file is retained or not.
- ```hit_map```: The hit status of each filtering rule.
- ```err_msg```: [optional] Recording the error message if exists.

In [4]:
from pipeline.compute_filtering import CodeFilter

def filter_code(row, code_filter: CodeFilter):

    final_result = code_filter.evaluate(
                    doc_type=row['doc_type'],
                    lang=row['lang'],
                    program_lang=row['program_lang'],
                    quality_signal=row['quality_signal']
                )

    return final_result

code_filter = CodeFilter()
tqdm.pandas()
filtered_results = qsc_score_df.progress_apply(filter_code, axis=1 ,args=(code_filter,)).tolist()
filtered_results = [json.loads(line) for line in filtered_results]

100%|██████████| 120/120 [00:00<00:00, 23996.02it/s]


In [5]:
# get the final clean code files and save to the targeted directory

clean_code_df = pd.concat([qsc_score_df, pd.DataFrame(filtered_results)], axis=1)
clean_code_df = clean_code_df[clean_code_df['effective'] == '1']

# print(f'clean_code_df.shape: {clean_code_df.shape}')

# 解析quality_signal中的语法检查结果
def parse_syntax_result(row):
    try:
        # 解析quality_signal
        quality_signal = json.loads(row['quality_signal'])
        
        # 根据编程语言获取对应的语法检查结果
        if row['program_lang'] == 'c':
            return quality_signal.get('qsc_syntax_c', 0)
        elif row['program_lang'] == 'cpp':
            return quality_signal.get('qsc_syntax_cpp', 0)
        elif row['program_lang'] == 'go':
            return quality_signal.get('qsc_syntax_go', 0)
        elif row['program_lang'] == 'java':
            return quality_signal.get('qsc_syntax_java', 0)
        elif row['program_lang'] == 'javascript':
            return quality_signal.get('qsc_syntax_javascript', 0)
        else:
            # 其他语言默认保留（因为没有语法检查结果）
            return 1
    except Exception as e:
        print(f"解析语法检查结果出错: {e}")
        return 0  # 出错时默认不保留

# 添加语法检查结果列
clean_code_df['syntax_correct'] = clean_code_df.apply(parse_syntax_result, axis=1)

# 根据语法检查结果进一步过滤
clean_code_df = clean_code_df[clean_code_df['syntax_correct'] == 1]

print(f'clean_code_df.shape: {clean_code_df.shape}')

target_dir = f"{ROOT_PATH}/test_data/clean_code"

for index, row in clean_code_df.iterrows():
    if row["effective"]:
        cur_dir = f"{target_dir}/{row['program_lang']}"
        os.makedirs(cur_dir, exist_ok=True)
        file_path = os.path.join(cur_dir, row['filename'])
        with open(file_path, 'w') as f:
            f.write(row['text'])

print(f'Successfully saved {clean_code_df.shape[0]} clean code files to path: {target_dir}')



# 调试信息：查看effective的值的类型和分布
print("Effective值的类型和分布:")
print(clean_code_df['effective'].value_counts())
print(clean_code_df['effective'].dtype)

# 保存质量信号结果到CSV文件
qsc_score_df.to_csv(f"{ROOT_PATH}/test_data/quality_signal_results.csv", index=False)

# 或者保存详细的JSON结果
with open(f"{ROOT_PATH}/test_data/quality_signal_details.json", 'w') as f:
    f.write(json.dumps([json.loads(line) for line in qsc_results.tolist()], indent=2))

clean_code_df.shape: (111, 11)
Successfully saved 111 clean code files to path: ../test_data/clean_code
Effective值的类型和分布:
effective
1    111
Name: count, dtype: int64
object
