In [15]:
import os
import glob
import json
import pandas as pd
from pathlib import Path
from sklearn.model_selection import train_test_split

# === 配置 ===
DATA_ROOT_PATH = '../autodl-tmp/Data_数据/'
TEST_SET_SIZE = 10000
OUTPUT_FILENAME = '/root/RAG/test_data_10k.json'
RANDOM_STATE = 42

def read_csv_force(path):
    """鲁棒读取CSV，忽略坏字符"""
    try:
        with open(path, "rb") as f:
            raw = f.read()
        # 尝试多种常见编码
        for enc in ["utf-8", "utf-8-sig", "gb18030", "gbk", "ISO-8859-1"]:
            try:
                text = raw.decode(enc, errors="ignore")
                from io import StringIO
                return pd.read_csv(StringIO(text), sep=None, engine="python", on_bad_lines="skip")
            except Exception:
                continue
        return pd.DataFrame()
    except Exception:
        return pd.DataFrame()

# === 主逻辑 ===
file_paths = glob.glob(os.path.join(DATA_ROOT_PATH, '**', '*.csv'), recursive=True)
if not file_paths:
    raise FileNotFoundError(f"路径 {DATA_ROOT_PATH} 下没有找到任何 CSV 文件。")

print(f"找到 {len(file_paths)} 个CSV文件，开始加载...")

df_list = []
for p in file_paths:
    tmp = read_csv_force(p)
    if tmp.empty:
        print(f"跳过空或损坏文件: {Path(p).name}")
        continue
    cols = [c.lower() for c in tmp.columns]
    q_col = next((c for c in tmp.columns if c.lower() in ['ask', 'question', 'query', '问题', '问']), None)
    a_col = next((c for c in tmp.columns if c.lower() in ['answer', 'document', '答', '答案']), None)
    dept_col = next((c for c in tmp.columns if c.lower() in ['department', '科室']), None)

    if not q_col or not a_col:
        print(f"跳过文件 {Path(p).name}（未找到问答列）")
        continue
    if not dept_col:
        dept = Path(p).parent.name
        tmp['department'] = dept
    else:
        tmp = tmp.rename(columns={dept_col: 'department'})

    tmp = tmp.rename(columns={q_col: 'query', a_col: 'document'})
    tmp = tmp.dropna(subset=['query', 'document', 'department'])
    df_list.append(tmp[['query', 'document', 'department']])

if not df_list:
    raise RuntimeError("未成功加载任何有效问答文件。")

df = pd.concat(df_list, ignore_index=True).dropna(subset=['query', 'document'])
print(f"总计 {len(df):,} 条原始问答记录。")

# === 数据清洗 ===
df = df[df['query'].str.len() > 5]
df = df[df['document'].str.len() > 10]
class_counts = df['department'].value_counts()
classes_to_keep = class_counts[class_counts > 1].index
df = df[df['department'].isin(classes_to_keep)]
print(f"数据清洗完成，剩余 {len(df):,} 条有效样本，{len(classes_to_keep)} 个科室。")

# === 分层抽样 ===
_, test_df = train_test_split(
    df, 
    test_size=TEST_SET_SIZE, 
    stratify=df['department'], 
    random_state=RANDOM_STATE
)
print(f"已通过分层抽样划分出 {len(test_df)} 条测试样本。")

# === 保存 ===
os.makedirs(os.path.dirname(OUTPUT_FILENAME), exist_ok=True)
test_data = test_df[['query', 'document', 'department']].to_dict('records')
with open(OUTPUT_FILENAME, 'w', encoding='utf-8') as f:
    json.dump(test_data, f, ensure_ascii=False, indent=2)

print(f"测试集已成功保存到: {OUTPUT_FILENAME}")

找到 6 个CSV文件，开始加载...
总计 433,567 条原始问答记录。
数据清洗完成，剩余 401,463 条有效样本，125 个科室。
已通过分层抽样划分出 10000 条测试样本。
测试集已成功保存到: /root/RAG/test_data_10k.json


In [4]:
import os
os.getcwd()

'/root/RAG'