In [26]:
# --------------------------------------------------
# Cell 0: 【⚠️【每次运行时请在此处配置】
# --------------------------------------------------
# --- 定义当前需要归档的年月 ---
ARCHIVE_YEAR = 2024
ARCHIVE_MONTH = 9
# ------------------------------------

In [None]:
# --------------------------------------------------
# Cell 1: 执行所有数据的归档
# --------------------------------------------------

In [None]:
import os
import shutil
from pathlib import Path

import duckdb
import pandas as pd

print("库导入完成。")

# --- 项目路径设置 ---
base_path = Path.cwd()
report_path = base_path / "报告数据"
temp_path = report_path / "temp"

# 源数据路径
anjian_data_path = report_path / "输入" / "安监数据"
zhuzhuyun_merge_path = temp_path / "3_猪猪云合并数据"
logistics_data_path = temp_path / "4_logistics数据"
transit_data_path = temp_path / "5_中转数据"
data_analysis_result_path = report_path / "输出" / "data_analysis_result"
data_analysis_result_path.mkdir(exist_ok=True)  # 确保目录存在

# --- 目标数据库路径 ---
db_path = report_path / "database"
db_path.mkdir(exist_ok=True)
print(f"路径设置完毕，将开始归档 {ARCHIVE_YEAR}年{ARCHIVE_MONTH}月 的数据。")


# --------------------------------------------------
# 定义归档函数
# --------------------------------------------------
def archive_data_safely(
    source_path: Path,
    table_name: str,
    db_base_path: Path,
    year: int,
    month: int,
    date_pattern_in_filename: str,
):
    """
    将数据归档至Parquet数据库

    :param source_path: 源文件夹路径。
    :param table_name: 在数据库中对应的表名。
    :param db_base_path: 数据库根路径。
    :param year: 归档年份。
    :param month: 归档月份。
    :param date_pattern_in_filename: 用于在文件名中识别月份的独特字符串模式。
    """
    print(f"--- 开始归档 {table_name} 数据至 {year}-{month:02d} ---")

    if not source_path.exists():
        print(f"⚠️ 源目录不存在: {source_path}，跳过归档。")
        return

    print(f"  - 正在使用模式 '{date_pattern_in_filename}' 筛选文件...")
    all_files = list(source_path.glob("*.xlsx"))
    files_to_process = [f for f in all_files if date_pattern_in_filename in f.name]

    if not files_to_process:
        print(f"✅ 在 {source_path} 中未找到匹配模式的文件，跳过归档。")
        return

    df_list = []
    for f in files_to_process:
        print(f"  - 正在读取: {f.name}")
        df_list.append(pd.read_excel(f, engine="openpyxl"))

    if not df_list:
        print("❌ 未能加载任何数据，归档中止。")
        return

    full_df = pd.concat(df_list, ignore_index=True)
    print(f"  - 成功合并 {len(files_to_process)} 个文件，共 {len(full_df)} 行数据。")

    # --- 【核心修正】开始：将'任务备注'添加到列表中 ---
    problematic_columns = ["单号", "快递单号", "派送时间", "任务备注"]
    for col in problematic_columns:
        if col in full_df.columns:
            print(f"  - 正在强制转换列 '{col}' 的类型为字符串...")
            full_df[col] = full_df[col].astype(str)
    # --- 【核心修正】结束 ---

    full_df["year"] = year
    full_df["month"] = month

    output_table_path = db_path / table_name
    final_partition_path = output_table_path / f"year={year}" / f"month={month}"
    temp_partition_path = output_table_path / f"year={year}" / f"month={month}_temp"

    temp_partition_path.mkdir(parents=True, exist_ok=True)

    try:
        output_file_path = temp_partition_path / "data.parquet"
        print(f"  - 正在写入数据到文件: {output_file_path}")
        full_df.to_parquet(output_file_path, engine="pyarrow", index=False)
    except Exception as e:
        print(f"❌ 写入临时目录失败: {e}")
        if temp_partition_path.exists():
            shutil.rmtree(temp_partition_path)
        return

    print("  - 临时写入成功，准备替换正式目录...")
    if final_partition_path.exists():
        print(f"  - 发现旧目录，正在删除: {final_partition_path}")
        shutil.rmtree(final_partition_path)

    os.rename(temp_partition_path, final_partition_path)
    print(f"✅ 成功归档 {table_name} 数据！")


# --------------------------------------------------
# 执行所有数据的归档
# --------------------------------------------------
print(f"\n===== 开始执行 {ARCHIVE_YEAR} 年 {ARCHIVE_MONTH} 月的数据归档任务 =====")

# 为不同文件名规则定义日期匹配模式
anjian_pattern = f"{ARCHIVE_YEAR}年{ARCHIVE_MONTH}月"
common_pattern = f"_{ARCHIVE_YEAR}{ARCHIVE_MONTH:02d}"

# 1. 归档 "安监数据"
archive_data_safely(
    anjian_data_path,
    "anjian_data",
    db_path,
    ARCHIVE_YEAR,
    ARCHIVE_MONTH,
    date_pattern_in_filename=anjian_pattern,
)

# 2. 归档 "3_猪猪云合并数据"
archive_data_safely(
    zhuzhuyun_merge_path,
    "zhuzhuyun_merged_data",
    db_path,
    ARCHIVE_YEAR,
    ARCHIVE_MONTH,
    date_pattern_in_filename=common_pattern,
)

# 3. 归档 "4_logistics数据"
archive_data_safely(
    logistics_data_path,
    "logistics_data",
    db_path,
    ARCHIVE_YEAR,
    ARCHIVE_MONTH,
    date_pattern_in_filename=common_pattern,
)

# 4. 归档 "5_中转数据"
archive_data_safely(
    transit_data_path,
    "transit_data",
    db_path,
    ARCHIVE_YEAR,
    ARCHIVE_MONTH,
    date_pattern_in_filename=common_pattern,
)

# 5. 归档 "data_analysis_result"
archive_data_safely(
    data_analysis_result_path,
    "data_analysis_result",
    db_path,
    ARCHIVE_YEAR,
    ARCHIVE_MONTH,
    date_pattern_in_filename=common_pattern,
)

print("\n===== 所有归档任务执行完毕 =====")

库导入完成。
路径设置完毕，将开始归档 2024年9月 的数据。

===== 开始执行 2024 年 9 月的数据归档任务 =====
--- 开始归档 anjian_data 数据至 2024-09 ---
  - 正在使用模式 '2024年9月' 筛选文件...
  - 正在读取: 2024年9月极兔抽样.xlsx
  - 正在读取: 2024年9月德邦抽样.xlsx
  - 正在读取: 2024年9月京东抽样.xlsx
  - 正在读取: 2024年9月韵达抽样.xlsx
  - 正在读取: 2024年9月EMS抽样.xlsx
  - 正在读取: 2024年9月顺丰抽样.xlsx
  - 正在读取: 2024年9月中通抽样.xlsx
  - 正在读取: 2024年9月邮政抽样.xlsx
  - 正在读取: 2024年9月圆通抽样.xlsx
  - 正在读取: 2024年9月申通抽样.xlsx
  - 成功合并 10 个文件，共 707869 行数据。
  - 正在强制转换列 '单号' 的类型为字符串...
  - 正在强制转换列 '派送时间' 的类型为字符串...
  - 正在写入数据到文件: /Users/lava/Documents/国家邮政局发展研究中心实习/python_data_analysis/报告数据/database/anjian_data/year=2024/month=9_temp/data.parquet
  - 临时写入成功，准备替换正式目录...
✅ 成功归档 anjian_data 数据！
--- 开始归档 zhuzhuyun_merged_data 数据至 2024-09 ---
  - 正在使用模式 '_202409' 筛选文件...
  - 正在读取: 圆通_202409.xlsx
  - 正在读取: 顺丰_202409.xlsx
  - 正在读取: 申通_202409.xlsx
  - 正在读取: 德邦_202409.xlsx
  - 正在读取: 韵达_202409.xlsx
  - 正在读取: 邮政_202409.xlsx
  - 正在读取: 京东_202409.xlsx
  - 正在读取: 极兔_202409.xlsx
  - 正在读取: 中通_202409.xlsx
  - 正在读取: EMS_202409.xlsx

In [None]:
# --------------------------------------------------
# Cell 2: 验证归档数据
# --------------------------------------------------
print("\n===== 开始执行【增强版】数据验证 =====")
con = duckdb.connect()

# ---- 验证参数 ----
# 确保这里的年月和您归档的年月一致
VERIFY_YEAR = ARCHIVE_YEAR
VERIFY_MONTH = ARCHIVE_MONTH

# --- 验证查询 1: 核对 anjian_data 的总行数 ---
try:
    print("\n[1/3] 正在验证 'anjian_data'...")
    table_path = str(db_path / "anjian_data")
    count = con.execute(f"""
        SELECT COUNT(*) 
        FROM read_parquet('{table_path}/*/*/*.parquet') 
        WHERE year={VERIFY_YEAR} AND month={VERIFY_MONTH}
    """).fetchone()[0]

    print(
        f"✅ 'anjian_data' 表 {VERIFY_YEAR}-{VERIFY_MONTH:02d} 的总行数为: {count} 行。"
    )
    print("  - (请与 Cell 3 输出的 '成功合并' 行数对比，应一致)")

except Exception as e:
    print(f"❌ 'anjian_data' 验证失败: {e}")

# --- 验证查询 2: 抽样检查 zhuzhuyun_merged_data 的数据和类型 ---
try:
    print("\n[2/3] 正在验证 'zhuzhuyun_merged_data'...")
    table_path = str(db_path / "zhuzhuyun_merged_data")
    sample_df = con.execute(f"""
        SELECT *
        FROM read_parquet('{table_path}/*/*/*.parquet') 
        WHERE year={VERIFY_YEAR} AND month={VERIFY_MONTH}
        LIMIT 5
    """).df()

    print(f"✅ 'zhuzhuyun_merged_data' 表数据抽样成功:")
    print(sample_df)
    print("\n  - 抽样数据类型信息:")
    sample_df.info()
    print("  - (请检查 '快递单号' 列的 Dtype 是否为 object 或 string)")

except Exception as e:
    print(f"❌ 'zhuzhuyun_merged_data' 验证失败: {e}")

# --- 验证查询 3: 按公司分组统计 logistics_data 行数 ---
try:
    print("\n[3/3] 正在验证 'logistics_data'...")
    table_path = str(db_path / "logistics_data")
    company_col_name = "企业"

    # 首先检查'企业'列是否存在
    all_columns = (
        con.execute(
            f"DESCRIBE SELECT * FROM read_parquet('{table_path}/*/*/*.parquet')"
        )
        .df()["column_name"]
        .tolist()
    )

    if company_col_name in all_columns:
        grouped_df = con.execute(f"""
            SELECT "{company_col_name}", COUNT(*) as row_count
            FROM read_parquet('{table_path}/*/*/*.parquet') 
            WHERE year={VERIFY_YEAR} AND month={VERIFY_MONTH}
            GROUP BY "{company_col_name}"
            ORDER BY row_count DESC
        """).df()

        print(f"✅ 'logistics_data' 表按 '{company_col_name}' 列分组统计成功:")
        print(grouped_df)
    else:
        print(
            f"⚠️ 在 'logistics_data' 表中未找到列 '{company_col_name}'，跳过分组验证。"
        )

except Exception as e:
    print(f"❌ 'logistics_data' 验证失败: {e}")

print("\n===== 数据验证执行完毕 =====")


===== 开始执行【增强版】数据验证 =====

[1/3] 正在验证 'anjian_data'...
✅ 'anjian_data' 表 2025-06 的总行数为: 781973 行。
  - (请与 Cell 3 输出的 '成功合并' 行数对比，应一致)

[2/3] 正在验证 'zhuzhuyun_merged_data'...
✅ 'zhuzhuyun_merged_data' 表数据抽样成功:
              快递单号 快递公司                 揽收时间                 最新时间     时效  \
0  777314022551584   申通  2025-06-09 13:03:38  2025-06-14 16:56:54  124小时   
1  776103484916314   申通  2025-06-09 17:41:19  2025-06-14 23:51:25  127小时   
2  773360013887328   申通  2025-06-09 20:51:03  2025-06-14 20:50:44  120小时   
3  773360177148225   申通  2025-06-09 18:16:44  2025-06-11 21:42:39   52小时   
4  777313398347326   申通  2025-06-09 16:25:36  2025-06-11 08:43:25   41小时   

    发出至今   最新至今  条数                                           最后1条物流信息 物流状态  \
0  445小时  321小时  12  【驿站】包裹已签收！如有问题请联系：代收点13321544018，您的快递已经妥投，投诉电话...  已签收   
1  440小时  314小时  12  您的快件已由【门口】签收，如有疑问请联系派件员：13927019001，有事先呼叫我，勿找平...  已签收   
2  437小时  317小时  12  您的快件已由【西陇大队驿站代签收】签收，如有疑问请联系派件员：17825286652，有事先...  已签收   
3  439小时  388小时  1