In [1]:
# ==============================================================================
# 多功能数据清洗与预处理脚本 (统一输出 Parquet)
#
# 主要变更:
# 1. (根据您的要求) 将 '常规数据清洗' 模式的输出从 .csv 修改为 .parquet 文件。
# 2. 现在两种模式的输出格式统一，便于后续处理。
# ==============================================================================
import pandas as pd
from vaderSentiment.vaderSentiment import SentimentIntensityAnalyzer
import os
import time
import ipywidgets as widgets
from IPython.display import display, clear_output
from tqdm.auto import tqdm
import glob

# --- 提示: Parquet 功能需要额外的库 ---
# 如果您尚未安装，请在终端或代码单元格中运行:
# !pip install pyarrow
# 或者
# !pip install fastparquet

# --- 初始化tqdm以支持Pandas的progress_apply ---
tqdm.pandas()

# ==============================================================================
# 辅助函数 (通用)
# ==============================================================================

def scan_data_files(directory='data'):
    """扫描 'data' 文件夹，返回所有支持的表格文件列表。"""
    if not os.path.isdir(directory):
        print(f"警告：未找到 '{directory}' 文件夹，请创建并放入数据文件。")
        return []
    path_pattern = os.path.join(directory, '*.*')
    all_files = glob.glob(path_pattern)
    supported_files = [f for f in all_files if f.endswith(('.csv', '.xlsx', '.xls'))]
    return supported_files

def load_data(file_path):
    """根据文件路径加载数据。"""
    print(f"--- 正在加载文件: '{os.path.basename(file_path)}' ---")
    if file_path.endswith(('.xlsx', '.xls')):
        df = pd.read_excel(file_path)
    elif file_path.endswith('.csv'):
        try:
            df = pd.read_csv(file_path, on_bad_lines='skip')
        except UnicodeDecodeError:
            df = pd.read_csv(file_path, on_bad_lines='skip', encoding='gbk')
    else:
        print(f"错误：不支持的文件类型 for '{file_path}'。")
        return None
    print("✔ 文件加载成功!")
    return df

# ==============================================================================
# 模式一：评论文件情感分析 (专用函数)
# ==============================================================================

def process_review_file(file_path):
    """对评论文件进行情感分析和预处理。"""
    start_time = time.time()
    try:
        reviews_df = load_data(file_path)
        if reviews_df is None: return

        def find_review_column(df):
            priority_cols = ['reviews.text', 'review_text', 'content', 'comment', 'review']
            for p_col in priority_cols:
                if p_col in df.columns and df[p_col].dropna().astype(str).str.strip().any(): return p_col
            object_cols = df.select_dtypes(include=['object']).columns
            return max(object_cols, key=lambda col: df[col].dropna().astype(str).str.len().mean()) if not object_cols.empty else None

        def sentiment_to_rating(sentiment):
            if sentiment >= 0.5: return 5
            elif sentiment >= 0.05: return 4
            elif sentiment > -0.05: return 3
            elif sentiment > -0.5: return 2
            else: return 1

        review_column_name = find_review_column(reviews_df)
        if review_column_name is None: raise ValueError("未能在文件中找到有效的文本评论列。")
        
        print(f"自动检测到评论列为: '{review_column_name}'")
        reviews_df.rename(columns={review_column_name: 'review_text'}, inplace=True)
        reviews_df.dropna(subset=['review_text'], inplace=True)
        reviews_df['review_text'] = reviews_df['review_text'].astype(str)
        
        analyzer = SentimentIntensityAnalyzer()
        print("⏳ 正在进行情感分析...")
        reviews_df['sentiment'] = reviews_df['review_text'].progress_apply(lambda t: analyzer.polarity_scores(t)['compound'])
        print("正在估算星级...")
        reviews_df['rating'] = reviews_df['sentiment'].apply(sentiment_to_rating)
        
        final_df = reviews_df[['rating', 'review_text', 'sentiment']]
        output_file = 'reviews_processed.parquet'
        print(f"\n正在将结果保存到 '{output_file}'...")
        final_df.to_parquet(output_file, index=False)
        
        end_time = time.time()
        print("\n" + "="*50)
        print("✅ 评论情感分析成功完成！")
        print(f"总耗时: {end_time - start_time:.2f} 秒")
        print(f"共处理了 {len(final_df)} 条有效评论。")
        print(f"结果已保存至 '{output_file}'。")
        print("="*50 + "\n处理后数据预览:")
        display(final_df.head())
        
    except Exception as e:
        print(f"\n❌ 处理过程中发生错误: {e}")

# ==============================================================================
# 模式二：常规数据清洗 (新增函数)
# ==============================================================================

def process_general_file(file_path):
    """对任何表格文件执行通用的数据清洗操作。"""
    start_time = time.time()
    try:
        df = load_data(file_path)
        if df is None: return

        print("\n--- 开始常规数据清洗 ---")
        
        # 1. 打印原始信息
        print("\n1. 原始数据信息:")
        print(f"原始形状: {df.shape}")
        display(df.info())
        
        # 2. 处理重复值
        initial_rows = len(df)
        df.drop_duplicates(inplace=True)
        print(f"\n2. 重复值处理: 已移除 {initial_rows - len(df)} 个重复行。")
        
        # 3. 报告缺失值
        missing_values = df.isnull().sum()
        missing_values = missing_values[missing_values > 0]
        print("\n3. 缺失值报告:")
        if not missing_values.empty:
            print("发现以下列存在缺失值:")
            display(missing_values)
        else:
            print("数据完整，没有发现缺失值。")
        
        # 4. 保存清洗后的文件 (***关键修改***)
        # 构建新文件名，例如: amazon_products.xls -> amazon_products_cleaned.parquet
        base_name = os.path.basename(file_path)
        name_part, _ = os.path.splitext(base_name)
        output_file = os.path.join('data', f"{name_part}_cleaned.parquet") # 输出为 parquet
        
        print(f"\n正在将清洗结果保存到 '{output_file}'...")
        df.to_parquet(output_file, index=False) # 使用 to_parquet 保存
        
        end_time = time.time()
        print("\n" + "="*50)
        print("✅ 常规数据清洗成功完成！")
        print(f"总耗时: {end_time - start_time:.2f} 秒")
        print(f"清洗后形状: {df.shape}")
        print(f"结果已保存至 '{output_file}'。")
        print("="*50 + "\n清洗后数据预览:")
        display(df.head())
        
    except Exception as e:
        print(f"\n❌ 处理过程中发生错误: {e}")

# ==============================================================================
# 交互逻辑主函数 (路由)
# ==============================================================================

def main_process_router(cleaning_mode, file_path):
    """根据用户选择的模式，调用相应的处理函数。"""
    clear_output(wait=True)
    
    if not file_path or file_path == 'None':
        print("请从下拉菜单中选择一个文件。")
        return
        
    print(f"当前模式: 【{cleaning_mode}】 | 当前文件: 【{os.path.basename(file_path)}】")
    print("-" * 60)

    if cleaning_mode == '评论情感分析':
        process_review_file(file_path)
    elif cleaning_mode == '常规数据清洗':
        process_general_file(file_path)
    else:
        print("错误：未知的清洗模式。")

# ==============================================================================
# 创建并显示交互式控件
# ==============================================================================

# 1. 创建文件选项列表
file_options = ['None'] + scan_data_files()

# 2. 创建控件
#   - 清洗模式选择 (Radio Buttons)
mode_selector = widgets.RadioButtons(
    options=['评论情感分析', '常规数据清洗'],
    description='选择清洗模式:',
    style={'description_width': 'initial'}
)

#   - 文件选择 (Dropdown)
file_dropdown = widgets.Dropdown(
    options=file_options,
    description='选择数据文件:',
    style={'description_width': 'initial'},
    layout={'width': 'max-content'}
)

# 3. 将控件组合并与主路由函数绑定
ui_controls = widgets.VBox([mode_selector, file_dropdown])
output_area = widgets.interactive_output(main_process_router, {
    'cleaning_mode': mode_selector,
    'file_path': file_dropdown
})

# 4. 显示UI和输出区域
print("请选择清洗模式，并从下拉菜单中选择要处理的文件。")
display(ui_controls, output_area)

请选择清洗模式，并从下拉菜单中选择要处理的文件。


VBox(children=(RadioButtons(description='选择清洗模式:', options=('评论情感分析', '常规数据清洗'), style=DescriptionStyle(descri…

Output()