In [1]:
import os
import pandas as pd

def process_csv_file(file_path):
    """
    先尝试用标准 csv 解析；失败再退到暴力正则清洗
    """
    # print(f"\n处理文件: {file_path}")

    # 0. 先按“正常 csv”读一次
    try:
        df = pd.read_csv(file_path, encoding='utf-8-sig')
        # 如果列名里出现大量空列，也算失败
        if df.shape[1] > 50 or df.columns.str.contains('Unnamed').sum() > df.shape[1] // 2:
            raise pd.errors.ParserError("大量空列，视为脏文件")
        # print("✅ 标准 csv 读取成功，跳过暴力清洗")
    except (pd.errors.ParserError, UnicodeDecodeError, KeyError):
        print("⚠️  标准 csv 解析失败，进入暴力正则清洗...")
        df = _dirty_csv_handler(file_path)   # 你的老逻辑封装一下

    # 1. 统一补零、加序号、写文件
    new_path = file_path.replace('.csv', '_filled.csv')
    df = _post_process(df, new_path)          # 把路径显式传进去
    return df


# ------------ 下面只是把原来的“暴力清洗”抽出来，方便复用 ------------
def _dirty_csv_handler(file_path):
    """你原来的正则清洗逻辑，原封不动搬进来"""
    with open(file_path, 'r', encoding='utf-8-sig') as f:
        lines = f.readlines()

    processed_lines, max_columns = [], 0
    import re
    for line in lines:
        line = line.strip()
        if not line:
            continue
        # 只有脏文件才暴力替换
        normalized_line = re.sub(r'\s+', ',', line)
        parts = [p.strip() for p in normalized_line.split(',') if p.strip()]
        max_columns = max(max_columns, len(parts))
        processed_lines.append(parts)

    # 补列
    uniform_data = []
    for parts in processed_lines:
        while len(parts) < max_columns:
            parts.append('0')
        uniform_data.append(parts)

    df = pd.DataFrame(uniform_data)
    return df

def _post_process(df: pd.DataFrame, new_path: str) -> pd.DataFrame:
    """
    1. 空值 / NaN / "提取失败" → 置 0
    2. 在最前面插入一行“列序号”，**保留原有表头**
    3. 写入 CSV：第 1 行序号，第 2 行表头，第 3 行起数据
    """
    # 1. 置零处理（跳过即将写入的序号行，只处理原数据）
    for i in range(len(df)):
        for j in range(len(df.columns)):
            val = df.iat[i, j]
            val_str = str(val).strip()
            if pd.isna(val) or val_str == '' or val_str.lower() == 'nan' or val_str == '提取失败':
                df.iat[i, j] = 0

    # 2. 构造序号行
    index_row = [str(i + 1) for i in range(df.shape[1])]

    # 3. 写文件：先序号行 → 再带表头的数据
    with open(new_path, 'w', encoding='utf-8-sig') as f:
        # 第 1 行：序号
        f.write(','.join(index_row) + '\n')
        # 第 2 行起：DataFrame（带表头）
        df.to_csv(f, index=False, header=True, encoding='utf-8-sig')

    # print(f"✅ 已保存: {new_path}  （第1行为序号，第2行为原表头）")
    return df

def main():
    """主函数：批量处理CSV文件"""
    # 获取当前目录中的所有.csv文件
    csv_files = [file for file in os.listdir('.') 
                 if file.endswith('.csv') 
                 and not file.endswith('_filled.csv')]
    
    if not csv_files:
        print("当前目录中没有找到CSV文件。")
        return
    
    print(f"找到 {len(csv_files)} 个CSV文件待处理")
    
    # 处理每个文件
    for csv_file in csv_files:
        process_csv_file(csv_file)
    
    print(f"\n所有文件处理完成！")

if __name__ == "__main__":
    main()



找到 18 个CSV文件待处理

所有文件处理完成！


In [None]:
import os
import shutil

# 源文件夹路径（包含要移动的.csv文件）
source_folder = r"/path/to/file"  # 替换为你的源文件夹路径
# 目标文件夹路径（文件将被移动到这个位置）
destination_folder = r"/path/to/file"  # 替换为目标文件夹路径

# 确保目标文件夹存在，如果不存在则创建
if not os.path.exists(destination_folder):
    os.makedirs(destination_folder)

# 遍历源文件夹中的所有文件
for filename in os.listdir(source_folder):
    # 检查文件扩展名是否为.csv
    if filename.endswith("filled.csv"):
        # 构造完整的文件路径
        source_file_path = os.path.join(source_folder, filename)
        destination_file_path = os.path.join(destination_folder, filename)
        
        # 移动文件
        shutil.copy(source_file_path, destination_file_path)
        print(f"移动文件：{filename} 到 {destination_folder}")


# 遍历源文件夹中的所有文件
for filename in os.listdir(source_folder):
    # 检查文件扩展名是否为.csv
    if filename.endswith(".csv"):
        os.remove(filename)


print("所有.csv文件已移动完成。")