In [None]:
from multiprocessing import process
from nt import error
import pandas as pd
import logging
from pathlib import Path

logging.basicConfig(
    level=logging.INFO,
    format="%(asctime)s [%(levelname)s] %(message)s",
    handlers=[logging.FileHandler("etl.log", encoding='utf-8'), logging.StreamHandler()]
)

data_locate = Path('../raw/olist_orders_dataset.csv')
save_cleaned = Path('../processed')
save_cleaned.mkdir(parents=True,exist_ok=True)

def data_quality_report(df,stage_name):
    """
    输出数据质量报告（重复率、缺失率等）
    :param df: DataFrame
    :param stage_name: 阶段名称，如 '原始数据' 或 '清洗后的数据'
    :return: dict (用于后续保存或对比)
    """
    total_rows = len(df)
    total_cols = df.shape[1]
    missing_count = df.isnull().sum().sum()
    duplicated_count = df.duplicated().sum()

    # 计算比例
    missing_rate = (missing_count / (total_rows * total_cols) * 100) if total_rows > 0 else 0
    duplicated_rate = (duplicated_count / total_rows * 100) if total_rows > 0 else 0

    # 打印日志
    logging.info(f'[DATA QUALITY] {stage_name} - 行数: {total_rows} | 列数: {total_cols} | 缺失率: {missing_rate:.2f}% | 重复率: {duplicated_rate:.2f}%')

    # 返回结构化结果，方便后面汇总
    return {
        'stage': stage_name,
        'rows': total_rows,
        'cols': total_cols,
        'missing_count': missing_count,
        'missing_rate': round(missing_rate, 2),
        'duplicated_count': duplicated_count,
        'duplicated_rate': round(duplicated_rate, 2)
    }


def extract_data(path):
    """
        从指定路径加载CSV文件
    :param path: CSV文件路径
    :return: 加载后的DataFrame
    """
    logging.info(f'[EXTRACT] 正在加载数据: {path}')
    try:
        df = pd.read_csv(path)
    except FileNotFoundError:
        logging.error(f'文件未找到: {path}')
        return pd.DataFrame()
    row,col = df.shape
    logging.info(f'加载完成，共{row}行，{col}列')
    return df

def transform_data(order_table):
    """
    对订单表进行数据转换,包括缺失值填充，重复值处理，异常数据清理
    :param order_table: 订单表
    :return: 转换后的订单表
    """
    # 1.判断数据是否为空
    if order_table.empty:
        logging.warning('输入数据为空，跳过 transform 阶段')
        return order_table

    # 2.异常数据清理 (识别2016年数据均无效 2018年9-10月数据异常)
    order_table = errors_data(order_table)

    # 3.缺失值处理
    logging.info('[TRANSFORM] 查看缺失值......')
    null_count =  order_table.isnull().sum().sum()
    logging.info(f'缺失值总数为{null_count}')
    if null_count > 0:
        # 保存缺失列数据分别情况
        loss_data = order_table.isnull().sum()
        loss_data = loss_data[loss_data>0]
        logging.info(f'缺失列情况: \n{loss_data}')

        for index in loss_data.index:
            if index.endswith(('date','at','_timestamp')):
                logging.info(f'{index} 为时间列,缺失数:{loss_data[index]}进行时间填充')
                order_table[index] = fill_date(order_table[index], index)
            else:
                logging.info(f'{index} 为普通列，缺失数:{loss_data[index]}，填充为“未知”')
                order_table[index] = order_table[index].fillna('未知')
        logging.info('缺失值填充完成')
    else:
        logging.info('无缺失值')

    # 4.重复值清理
    duplicated_count = order_table.duplicated().sum()
    if duplicated_count !=0:
        logging.info(f'检测到{duplicated_count}条重复值，执行删除...')
        order_table = drop_duplicated(order_table)
        logging.info('重复值删除完成')
    else:
        logging.info('无重复值')
    

    # 5.特征工程
    order_table = feature_engineering(order_table)

    return order_table

def load_data(order_table):
    """
        将DataFrame保存至指定路径的CSV文件
    :param df: 输入DataFrame
    :param output_path: 输出文件路径
    :return: None
    """
    path = save_cleaned / 'order_table_cleaned.csv'
    order_table.to_csv(path,index=False)
    logging.info(f'[LOAD] 数据已保存至{path}')

def fill_date(time_col_data,data_col):
    """
    对时间列缺失值进行填充,填充固定值1970-01-01 00:00:00  仅当占位使用代替NULL无实际意义
    :param order_table: 订单表
    :param data_col: 时间列
    :return: 填充后的订单表
    """   
    try:
        filled = time_col_data.fillna('1970-01-01 00:00:00')
        logging.info(f'对{data_col}列填充完成')
        return filled
    except Exception as e:
        logging.info(f'对{data_col}列填充失败,失败原因{e}')
        
def drop_duplicated(order_table):
    """
    对DataFrame删除重复值
    :param order_table: 输入DataFrame
    :return: 处理后的DataFrame
    """
    try:
        drop_duplicated_data = order_table.drop_duplicates()
        return drop_duplicated_data
    except Exception as e:
        logging.info(f'删除重复值失败，失败原因{e}')

def errors_data(order_table):
    order_table['order_purchase_timestamp'] = pd.to_datetime(order_table['order_purchase_timestamp'],errors='coerce')
    # 提取年份和月份用于分组
    order_table['year'] = order_table['order_purchase_timestamp'].dt.year
    order_table['month'] = order_table['order_purchase_timestamp'].dt.month

    # 定义要删除的年月组合
    periods_to_remove = [
        (2016, 9),   # 2016年9月
        (2016, 10),  # 2016年10月
        (2016, 12),  # 2016年12月
        (2018, 9),   # 2018年9月
        (2018, 10)   # 2018年10月
    ]
    # 查看要删除的数据量
    for year, month in periods_to_remove:
        count = len(order_table[(order_table['year'] == year) & (order_table['month'] == month)])
        logging.info(f"{year}年{month}月的记录数: {count}")
    # 创建删除条件
    delete_condition = False
    for year, month in periods_to_remove:
        delete_condition |= ((order_table['year'] == year) & (order_table['month'] == month))
    # 删除指定时间段的数据
    order_table = order_table[~delete_condition].copy()
    return order_table

def feature_engineering(order_table):
    # 5.1 year(年) month(月) 根据order_purchase_timestamp 拆分得到
        # 其中5.1在errors_data()中做过，此处省略
    # 5.2 days(履约日期) 由order_delivered_customer_date - order_purchase_timestamp 得到
    order_table['order_delivered_customer_date']= pd.to_datetime(order_table['order_delivered_customer_date'],errors='coerce')
    order_table['days'] = (order_table['order_delivered_customer_date'] - order_table['order_purchase_timestamp']).dt.days
    order_table.loc[order_table['days']<0,'days'] = -1
    return order_table


def etl_pipeline(input_path):
    order_table_raw = extract_data(input_path)

    quality_before = data_quality_report(order_table_raw, '原始数据')

    order_table_processed = transform_data(order_table_raw)
    quality_after = data_quality_report(order_table_processed, '清洗后数据')

    load_data(order_table_processed)

    # 生成质量报告汇总表
    report_df = pd.DataFrame([quality_before, quality_after])
    report_path = save_cleaned / 'data_quality_report.csv'
    report_df.to_csv(report_path, index=False, encoding='utf-8-sig')
    logging.info(f' 数据质量报告已保存至: {report_path}')


if __name__ == '__main__':
    etl_pipeline(data_locate)
    

2025-10-15 16:06:41,458 [INFO] [EXTRACT] 正在加载数据: ..\raw\olist_orders_dataset.csv
2025-10-15 16:06:41,715 [INFO] 加载完成，共99441行，8列
2025-10-15 16:06:41,818 [INFO] [DATA QUALITY] 原始数据 - 行数: 99441 | 列数: 8 | 缺失率: 0.62% | 重复率: 0.00%
2025-10-15 16:06:41,848 [INFO] 2016年9月的记录数: 4
2025-10-15 16:06:41,853 [INFO] 2016年10月的记录数: 324
2025-10-15 16:06:41,855 [INFO] 2016年12月的记录数: 1
2025-10-15 16:06:41,855 [INFO] 2018年9月的记录数: 16
2025-10-15 16:06:41,857 [INFO] 2018年10月的记录数: 4
2025-10-15 16:06:41,884 [INFO] [TRANSFORM] 查看缺失值......
2025-10-15 16:06:41,897 [INFO] 缺失值总数为4739
2025-10-15 16:06:41,913 [INFO] 缺失列情况: 
order_approved_at                 135
order_delivered_carrier_date     1716
order_delivered_customer_date    2888
dtype: int64
2025-10-15 16:06:41,914 [INFO] order_approved_at 为时间列,缺失数:135进行时间填充
2025-10-15 16:06:41,916 [INFO] 对order_approved_at列填充完成
2025-10-15 16:06:41,919 [INFO] order_delivered_carrier_date 为时间列,缺失数:1716进行时间填充
2025-10-15 16:06:41,923 [INFO] 对order_delivered_carrier_date列填充完成
2025-10