In [6]:
import pandas as pd
import numpy as np
from datetime import datetime
import os

def compare_csv_by_phone(csv1_path, csv2_path, output_excel_path=None, phone_col="26.请留下您的手机号"):
    """
    对比两个CSV文件中指定手机号列的相同部分，输出包含相同手机号的完整数据到Excel
    
    Args:
        csv1_path (str): 第一个CSV文件的完整路径
        csv2_path (str): 第二个CSV文件的完整路径
        output_excel_path (str, optional): 输出Excel文件的路径，默认自动生成
        phone_col (str, optional): 手机号列的表头名称，默认"请留下您的手机号"
    
    Returns:
        None: 直接生成Excel文件，同时在控制台打印分析结果
    """
    # --------------------------
    # 1. 验证输入文件是否存在
    # --------------------------
    for file_path in [csv1_path, csv2_path]:
        if not os.path.exists(file_path):
            raise FileNotFoundError(f"文件不存在：{file_path}，请检查路径是否正确")
        if not file_path.endswith(".csv"):
            raise ValueError(f"文件格式错误：{file_path}，请确保输入为CSV文件")

    # --------------------------
    # 2. 读取CSV文件（处理编码和空值）
    # --------------------------
    print("="*60)
    print("开始读取CSV文件...")
    print(f"文件1：{os.path.basename(csv1_path)}")
    print(f"文件2：{os.path.basename(csv2_path)}")
    print("="*60)

    # 尝试多种编码读取（解决中文乱码问题）
    encodings = ["utf-8", "gbk", "gb2312", "utf-8-sig"]
    df1 = None
    df2 = None

    for encoding in encodings:
        try:
            if df1 is None:
                df1 = pd.read_csv(csv1_path, encoding=encoding, dtype={phone_col: str})  # 手机号按字符串读取（保留前导0）
            if df2 is None:
                df2 = pd.read_csv(csv2_path, encoding=encoding, dtype={phone_col: str})
            if df1 is not None and df2 is not None:
                print(f"✅ 成功使用 {encoding} 编码读取两个CSV文件")
                break
        except Exception as e:
            if df1 is None:
                print(f"❌ 用 {encoding} 编码读取文件1失败：{str(e)[:50]}")
            if df2 is None:
                print(f"❌ 用 {encoding} 编码读取文件2失败：{str(e)[:50]}")

    # 检查手机号列是否存在
    for df, file_name in [(df1, "文件1"), (df2, "文件2")]:
        if phone_col not in df.columns:
            raise KeyError(f"{file_name} 中缺少指定的手机号列：{phone_col}，请检查表头是否正确")

    # --------------------------
    # 3. 数据预处理（清洗手机号）
    # --------------------------
    print("\n" + "="*60)
    print("开始数据预处理...")

    def clean_phone(phone_series):
        """清洗手机号：去除空格、横杠等特殊字符，保留纯数字"""
        # 转换为字符串，处理NaN值
        cleaned = phone_series.fillna("").astype(str)
        # 去除非数字字符（空格、-、_等）
        cleaned = cleaned.str.replace(r"[^0-9]", "", regex=True)
        # 过滤空字符串和无效手机号（假设手机号长度为11位，可根据需求调整）
        cleaned = cleaned[(cleaned != "") & (cleaned.str.len().between(10, 13))]  # 兼容10-13位手机号/座机号
        return cleaned

    # 清洗两个文件的手机号列，同时保留原始数据关联
    df1["cleaned_phone"] = clean_phone(df1[phone_col])
    df2["cleaned_phone"] = clean_phone(df2[phone_col])

    # 统计原始数据量和有效手机号量
    print(f"文件1原始数据量：{len(df1)} 行")
    print(f"文件1有效手机号量（10-13位纯数字）：{df1['cleaned_phone'].notna().sum()} 个")
    print(f"文件2原始数据量：{len(df2)} 行")
    print(f"文件2有效手机号量（10-13位纯数字）：{df2['cleaned_phone'].notna().sum()} 个")

    # --------------------------
    # 4. 查找相同手机号并提取数据
    # --------------------------
    print("\n" + "="*60)
    print("开始查找相同手机号...")

    # 获取两个文件中都存在的有效手机号（交集）
    common_phones = set(df1["cleaned_phone"]) & set(df2["cleaned_phone"])
    common_phones = [p for p in common_phones if p != ""]  # 排除空字符串

    if not common_phones:
        print("❌ 未找到相同的手机号")
        # 生成空结果Excel（包含两个文件的表头，方便参考）
        df1_empty = df1.drop("cleaned_phone", axis=1).head(0)  # 仅保留表头
        df2_empty = df2.drop("cleaned_phone", axis=1).head(0)
    else:
        print(f"✅ 找到 {len(common_phones)} 个相同的手机号")
        # 提取包含相同手机号的完整行（保留原始数据，删除清洗辅助列）
        df1_common = df1[df1["cleaned_phone"].isin(common_phones)].drop("cleaned_phone", axis=1).reset_index(drop=True)
        df2_common = df2[df2["cleaned_phone"].isin(common_phones)].drop("cleaned_phone", axis=1).reset_index(drop=True)

    # --------------------------
    # 5. 生成输出Excel文件
    # --------------------------
    print("\n" + "="*60)
    print("开始生成Excel结果文件...")

    # 自动生成输出路径（若未指定）
    if output_excel_path is None:
        current_time = datetime.now().strftime("%Y%m%d_%H%M%S")
        output_excel_path = f"CSV手机号匹配结果_{current_time}.xlsx"
        print(f"⚠️  未指定输出路径，自动生成：{output_excel_path}")

    # 使用ExcelWriter写入多个工作表
    with pd.ExcelWriter(output_excel_path, engine="openpyxl") as writer:
        if common_phones:
            # 写入两个文件的相同手机号数据
            df1_common.to_excel(writer, sheet_name=f"文件1_相同手机号数据", index=False)
            df2_common.to_excel(writer, sheet_name=f"文件2_相同手机号数据", index=False)
            # 写入相同手机号列表（单独工作表，方便查看）
            common_phones_df = pd.DataFrame({"相同手机号列表": sorted(common_phones)})
            common_phones_df.to_excel(writer, sheet_name="相同手机号清单", index=False)
            print(f"✅ 已写入3个工作表：")
            print(f"   - 文件1_相同手机号数据（{len(df1_common)} 行）")
            print(f"   - 文件2_相同手机号数据（{len(df2_common)} 行）")
            print(f"   - 相同手机号清单（{len(common_phones)} 个手机号）")
        else:
            # 无相同手机号时，写入空表头
            df1_empty.to_excel(writer, sheet_name="文件1_无相同数据", index=False)
            df2_empty.to_excel(writer, sheet_name="文件2_无相同数据", index=False)
            print(f"⚠️  无相同手机号，已写入空表头工作表（供参考原始结构）")

    # --------------------------
    # 6. 打印最终分析报告
    # --------------------------
    print("\n" + "="*60)
    print("【最终分析报告】")
    print("="*60)
    print(f"输入文件1：{csv1_path}")
    print(f"输入文件2：{csv2_path}")
    print(f"输出文件：{os.path.abspath(output_excel_path)}")
    print("-"*60)
    print(f"文件1有效手机号：{df1['cleaned_phone'].notna().sum()} 个")
    print(f"文件2有效手机号：{df2['cleaned_phone'].notna().sum()} 个")
    print(f"相同手机号数量：{len(common_phones)} 个")
    if common_phones:
        print(f"文件1匹配数据行数：{len(df1_common)} 行")
        print(f"文件2匹配数据行数：{len(df2_common)} 行")
    print("="*60)
    print("✅ 任务完成！")


if __name__ == "__main__":
    # --------------------------
    # 请在此处修改文件路径！！！
    # --------------------------
    CSV1_PATH = r'C:/Users/Administrator/Downloads/骑手筛选/1-1.csv'  # 第一个CSV文件路径
    CSV2_PATH = r'C:/Users/Administrator/Downloads/骑手筛选/1-2.csv'  # 第二个CSV文件路径
    OUTPUT_EXCEL_PATH = r"C:/Users/Administrator/Downloads/骑手筛选/手机号相同筛选结果.xlsx"  # 输出Excel路径（可选）

    try:
        # 执行对比任务
        compare_csv_by_phone(
            csv1_path=CSV1_PATH,
            csv2_path=CSV2_PATH,
            output_excel_path=OUTPUT_EXCEL_PATH,
            phone_col="26.请留下您的手机号"  # 手机号列名（若表头不同，需修改此处）
        )
    except Exception as e:
        print(f"\n❌ 程序执行出错：{str(e)}")
        # 打印详细错误栈（便于排查问题）
        import traceback
        traceback.print_exc()

开始读取CSV文件...
文件1：1-1.csv
文件2：1-2.csv
✅ 成功使用 utf-8 编码读取两个CSV文件

开始数据预处理...
文件1原始数据量：129 行
文件1有效手机号量（10-13位纯数字）：129 个
文件2原始数据量：263 行
文件2有效手机号量（10-13位纯数字）：259 个

开始查找相同手机号...
❌ 未找到相同的手机号

开始生成Excel结果文件...
⚠️  无相同手机号，已写入空表头工作表（供参考原始结构）

【最终分析报告】
输入文件1：C:/Users/Administrator/Downloads/骑手筛选/1-1.csv
输入文件2：C:/Users/Administrator/Downloads/骑手筛选/1-2.csv
输出文件：C:\Users\Administrator\Downloads\骑手筛选\手机号相同筛选结果.xlsx
------------------------------------------------------------
文件1有效手机号：129 个
文件2有效手机号：259 个
相同手机号数量：0 个
✅ 任务完成！
