# 数据预处理工具
## 异常值和缺失值处理

本notebook用于处理dataset/B题因子库文件夹内的数据文件，支持多种异常值和缺失值处理方法。

In [1]:
import pandas as pd
import numpy as np
import os
import glob
import matplotlib.pyplot as plt
import seaborn as sns
from scipy import stats
from sklearn.preprocessing import StandardScaler
from sklearn.impute import SimpleImputer, KNNImputer
import warnings
warnings.filterwarnings('ignore')

plt.rcParams['font.sans-serif'] = ['SimHei']
plt.rcParams['axes.unicode_minus'] = False

ModuleNotFoundError: No module named 'seaborn'

## 配置参数

In [None]:
# 数据路径配置
DATA_PATH = '../dataset/B题因子库'
OUTPUT_PATH = '../dataset/B题因子库_processed'

# 异常值处理方法
OUTLIER_METHODS = {
    'iqr': 'IQR方法',
    'zscore': 'Z-score方法', 
    'isolation_forest': '孤立森林',
    'percentile': '百分位数方法'
}

# 缺失值处理方法
MISSING_METHODS = {
    'drop': '删除缺失值',
    'mean': '均值填充',
    'median': '中位数填充',
    'mode': '众数填充',
    'forward_fill': '前向填充',
    'backward_fill': '后向填充',
    'interpolate': '线性插值',
    'knn': 'KNN填充'
}

# 创建输出目录
os.makedirs(OUTPUT_PATH, exist_ok=True)

## 数据加载和探索

In [None]:
def load_data_files(data_path):
    """加载数据文件"""
    file_patterns = ['*.csv', '*.xlsx', '*.xls']
    data_files = []
    
    for pattern in file_patterns:
        files = glob.glob(os.path.join(data_path, pattern))
        data_files.extend(files)
    
    print(f"找到 {len(data_files)} 个数据文件:")
    for file in data_files:
        print(f"  - {os.path.basename(file)}")
    
    return data_files

# 加载数据文件
data_files = load_data_files(DATA_PATH)

In [None]:
def load_single_file(file_path):
    """加载单个数据文件"""
    try:
        if file_path.endswith('.csv'):
            df = pd.read_csv(file_path)
        elif file_path.endswith(('.xlsx', '.xls')):
            df = pd.read_excel(file_path)
        else:
            raise ValueError(f"不支持的文件格式: {file_path}")
        
        print(f"成功加载文件: {os.path.basename(file_path)}")
        print(f"数据形状: {df.shape}")
        print(f"列名: {list(df.columns)}")
        
        return df
    except Exception as e:
        print(f"加载文件失败 {file_path}: {e}")
        return None

# 示例：加载第一个文件
if data_files:
    sample_df = load_single_file(data_files[0])
    if sample_df is not None:
        display(sample_df.head())
        display(sample_df.info())

## 异常值检测和处理函数

In [None]:
def detect_outliers_iqr(data, factor=1.5):
    """使用IQR方法检测异常值"""
    Q1 = data.quantile(0.25)
    Q3 = data.quantile(0.75)
    IQR = Q3 - Q1
    lower_bound = Q1 - factor * IQR
    upper_bound = Q3 + factor * IQR
    return (data < lower_bound) | (data > upper_bound)

def detect_outliers_zscore(data, threshold=3):
    """使用Z-score方法检测异常值"""
    z_scores = np.abs(stats.zscore(data, nan_policy='omit'))
    return z_scores > threshold

def detect_outliers_percentile(data, lower_percentile=1, upper_percentile=99):
    """使用百分位数方法检测异常值"""
    lower_bound = data.quantile(lower_percentile/100)
    upper_bound = data.quantile(upper_percentile/100)
    return (data < lower_bound) | (data > upper_bound)

def detect_outliers_isolation_forest(data, contamination=0.1):
    """使用孤立森林检测异常值"""
    from sklearn.ensemble import IsolationForest
    
    # 处理缺失值
    data_clean = data.dropna()
    if len(data_clean) == 0:
        return pd.Series([False] * len(data), index=data.index)
    
    iso_forest = IsolationForest(contamination=contamination, random_state=42)
    outliers = iso_forest.fit_predict(data_clean.values.reshape(-1, 1))
    
    # 创建完整的异常值标记
    result = pd.Series([False] * len(data), index=data.index)
    result.loc[data_clean.index] = outliers == -1
    
    return result

## 缺失值处理函数

In [None]:
def handle_missing_values(df, method='mean', columns=None):
    """处理缺失值"""
    df_processed = df.copy()
    
    if columns is None:
        columns = df.select_dtypes(include=[np.number]).columns
    
    for col in columns:
        if col not in df.columns:
            continue
            
        if method == 'drop':
            df_processed = df_processed.dropna(subset=[col])
        elif method == 'mean':
            df_processed[col].fillna(df_processed[col].mean(), inplace=True)
        elif method == 'median':
            df_processed[col].fillna(df_processed[col].median(), inplace=True)
        elif method == 'mode':
            mode_val = df_processed[col].mode()
            if len(mode_val) > 0:
                df_processed[col].fillna(mode_val[0], inplace=True)
        elif method == 'forward_fill':
            df_processed[col].fillna(method='ffill', inplace=True)
        elif method == 'backward_fill':
            df_processed[col].fillna(method='bfill', inplace=True)
        elif method == 'interpolate':
            df_processed[col] = df_processed[col].interpolate(method='linear')
        elif method == 'knn':
            imputer = KNNImputer(n_neighbors=5)
            df_processed[col] = imputer.fit_transform(df_processed[[col]]).flatten()
    
    return df_processed

## 数据处理主函数

In [None]:
def process_dataframe(df, outlier_method='iqr', missing_method='mean', 
                     outlier_action='remove', columns=None):
    """
    处理数据框的异常值和缺失值
    
    参数:
    - df: 输入数据框
    - outlier_method: 异常值检测方法 ('iqr', 'zscore', 'isolation_forest', 'percentile')
    - missing_method: 缺失值处理方法 ('drop', 'mean', 'median', 'mode', 'forward_fill', 'backward_fill', 'interpolate', 'knn')
    - outlier_action: 异常值处理动作 ('remove', 'replace_nan', 'cap')
    - columns: 要处理的列名列表，None表示处理所有数值列
    """
    
    df_processed = df.copy()
    
    if columns is None:
        columns = df.select_dtypes(include=[np.number]).columns
    
    print(f"处理列: {list(columns)}")
    print(f"异常值检测方法: {OUTLIER_METHODS.get(outlier_method, outlier_method)}")
    print(f"缺失值处理方法: {MISSING_METHODS.get(missing_method, missing_method)}")
    
    # 异常值处理
    outlier_stats = {}
    
    for col in columns:
        if col not in df.columns:
            continue
            
        print(f"\n处理列: {col}")
        
        # 检测异常值
        if outlier_method == 'iqr':
            outliers = detect_outliers_iqr(df_processed[col])
        elif outlier_method == 'zscore':
            outliers = detect_outliers_zscore(df_processed[col])
        elif outlier_method == 'isolation_forest':
            outliers = detect_outliers_isolation_forest(df_processed[col])
        elif outlier_method == 'percentile':
            outliers = detect_outliers_percentile(df_processed[col])
        else:
            outliers = pd.Series([False] * len(df_processed), index=df_processed.index)
        
        outlier_count = outliers.sum()
        outlier_stats[col] = outlier_count
        print(f"  检测到 {outlier_count} 个异常值")
        
        # 处理异常值
        if outlier_action == 'remove':
            df_processed = df_processed[~outliers]
        elif outlier_action == 'replace_nan':
            df_processed.loc[outliers, col] = np.nan
        elif outlier_action == 'cap':
            Q1 = df_processed[col].quantile(0.25)
            Q3 = df_processed[col].quantile(0.75)
            IQR = Q3 - Q1
            lower_bound = Q1 - 1.5 * IQR
            upper_bound = Q3 + 1.5 * IQR
            df_processed[col] = df_processed[col].clip(lower_bound, upper_bound)
    
    # 缺失值处理
    missing_before = df_processed.isnull().sum()
    df_processed = handle_missing_values(df_processed, missing_method, columns)
    missing_after = df_processed.isnull().sum()
    
    print("\n=== 处理结果 ===")
    print(f"原始数据形状: {df.shape}")
    print(f"处理后数据形状: {df_processed.shape}")
    print(f"异常值统计: {outlier_stats}")
    print(f"缺失值处理前: {missing_before.sum()}")
    print(f"缺失值处理后: {missing_after.sum()}")
    
    return df_processed, outlier_stats

## 数据可视化函数

In [None]:
def visualize_data_quality(df, title="数据质量分析"):
    """可视化数据质量"""
    numeric_cols = df.select_dtypes(include=[np.number]).columns
    
    if len(numeric_cols) == 0:
        print("没有数值列可以可视化")
        return
    
    fig, axes = plt.subplots(2, 2, figsize=(15, 10))
    fig.suptitle(title, fontsize=16)
    
    # 缺失值热图
    sns.heatmap(df[numeric_cols].isnull(), cbar=True, ax=axes[0,0])
    axes[0,0].set_title('缺失值分布')
    
    # 缺失值统计
    missing_counts = df[numeric_cols].isnull().sum()
    missing_counts = missing_counts[missing_counts > 0]
    if len(missing_counts) > 0:
        missing_counts.plot(kind='bar', ax=axes[0,1])
        axes[0,1].set_title('各列缺失值数量')
        axes[0,1].tick_params(axis='x', rotation=45)
    else:
        axes[0,1].text(0.5, 0.5, '无缺失值', ha='center', va='center', transform=axes[0,1].transAxes)
        axes[0,1].set_title('各列缺失值数量')
    
    # 数据分布（选择前几列）
    cols_to_plot = numeric_cols[:min(4, len(numeric_cols))]
    for i, col in enumerate(cols_to_plot):
        if i < 2:
            df[col].hist(bins=30, ax=axes[1,i], alpha=0.7)
            axes[1,i].set_title(f'{col} 分布')
    
    plt.tight_layout()
    plt.show()

def compare_before_after(df_before, df_after, column):
    """比较处理前后的数据"""
    fig, axes = plt.subplots(1, 2, figsize=(12, 4))
    
    # 处理前
    df_before[column].hist(bins=30, ax=axes[0], alpha=0.7, color='red')
    axes[0].set_title(f'{column} - 处理前')
    axes[0].axvline(df_before[column].mean(), color='black', linestyle='--', label='均值')
    axes[0].legend()
    
    # 处理后
    df_after[column].hist(bins=30, ax=axes[1], alpha=0.7, color='green')
    axes[1].set_title(f'{column} - 处理后')
    axes[1].axvline(df_after[column].mean(), color='black', linestyle='--', label='均值')
    axes[1].legend()
    
    plt.tight_layout()
    plt.show()

## 批量处理函数

In [None]:
def batch_process_files(data_files, outlier_method='iqr', missing_method='mean', 
                       outlier_action='remove', save_results=True):
    """
    批量处理多个数据文件
    """
    results = {}
    
    for file_path in data_files:
        print(f"\n{'='*50}")
        print(f"处理文件: {os.path.basename(file_path)}")
        print(f"{'='*50}")
        
        # 加载数据
        df = load_single_file(file_path)
        if df is None:
            continue
        
        # 显示原始数据质量
        print("\n原始数据质量:")
        visualize_data_quality(df, f"原始数据 - {os.path.basename(file_path)}")
        
        # 处理数据
        df_processed, outlier_stats = process_dataframe(
            df, outlier_method, missing_method, outlier_action
        )
        
        # 显示处理后数据质量
        print("\n处理后数据质量:")
        visualize_data_quality(df_processed, f"处理后数据 - {os.path.basename(file_path)}")
        
        # 保存结果
        if save_results:
            output_file = os.path.join(OUTPUT_PATH, f"processed_{os.path.basename(file_path)}")
            if output_file.endswith(('.xlsx', '.xls')):
                output_file = output_file.rsplit('.', 1)[0] + '.csv'
            
            df_processed.to_csv(output_file, index=False)
            print(f"\n结果已保存到: {output_file}")
        
        results[os.path.basename(file_path)] = {
            'original_shape': df.shape,
            'processed_shape': df_processed.shape,
            'outlier_stats': outlier_stats,
            'missing_before': df.isnull().sum().sum(),
            'missing_after': df_processed.isnull().sum().sum()
        }
    
    return results

## 使用示例

In [None]:
# 配置处理参数
OUTLIER_METHOD = 'iqr'  # 可选: 'iqr', 'zscore', 'isolation_forest', 'percentile'
MISSING_METHOD = 'mean'  # 可选: 'drop', 'mean', 'median', 'mode', 'forward_fill', 'backward_fill', 'interpolate', 'knn'
OUTLIER_ACTION = 'replace_nan'  # 可选: 'remove', 'replace_nan', 'cap'

print(f"异常值检测方法: {OUTLIER_METHODS.get(OUTLIER_METHOD)}")
print(f"缺失值处理方法: {MISSING_METHODS.get(MISSING_METHOD)}")
print(f"异常值处理动作: {OUTLIER_ACTION}")

In [None]:
# 批量处理所有文件
if data_files:
    results = batch_process_files(
        data_files, 
        outlier_method=OUTLIER_METHOD,
        missing_method=MISSING_METHOD,
        outlier_action=OUTLIER_ACTION,
        save_results=True
    )
    
    # 显示处理结果摘要
    print("\n" + "="*60)
    print("处理结果摘要")
    print("="*60)
    
    for filename, stats in results.items():
        print(f"\n文件: {filename}")
        print(f"  原始形状: {stats['original_shape']}")
        print(f"  处理后形状: {stats['processed_shape']}")
        print(f"  异常值统计: {stats['outlier_stats']}")
        print(f"  缺失值: {stats['missing_before']} -> {stats['missing_after']}")
else:
    print("未找到数据文件，请检查路径设置")

## 单文件处理示例

In [None]:
# 如果需要单独处理某个文件，可以使用以下代码
# 替换为您要处理的具体文件路径
SINGLE_FILE_PATH = None  # 例如: '../dataset/B题因子库/your_file.csv'

if SINGLE_FILE_PATH and os.path.exists(SINGLE_FILE_PATH):
    print(f"处理单个文件: {SINGLE_FILE_PATH}")
    
    # 加载数据
    df = load_single_file(SINGLE_FILE_PATH)
    
    if df is not None:
        # 显示原始数据信息
        print("\n原始数据信息:")
        display(df.describe())
        
        # 处理数据
        df_processed, outlier_stats = process_dataframe(
            df, 
            outlier_method='iqr',
            missing_method='mean',
            outlier_action='replace_nan'
        )
        
        # 显示处理后数据信息
        print("\n处理后数据信息:")
        display(df_processed.describe())
        
        # 比较处理前后（选择一个数值列）
        numeric_cols = df.select_dtypes(include=[np.number]).columns
        if len(numeric_cols) > 0:
            compare_before_after(df, df_processed, numeric_cols[0])
else:
    print("请设置SINGLE_FILE_PATH变量来处理单个文件")