In [1]:
import pandas as pd
import numpy as np
from pathlib import Path
import warnings
warnings.filterwarnings('ignore')

class FilePreprocessor:
    def __init__(self, file_path):
        """
        初始化文件预处理器
        
        Args:
            file_path (str): 文件路径
        """
        self.file_path = Path(file_path)
        self.df = None
        self.column_types = {}  # 存储列类型信息
        self.processed_df = None
        
    def load_file(self):
        """加载文件"""
        if self.file_path.suffix == '.csv':
            self.df = pd.read_csv(self.file_path)
        elif self.file_path.suffix in ['.xlsx', '.xls']:
            self.df = pd.read_excel(self.file_path)
        else:
            raise ValueError("不支持的文件格式。请使用CSV或Excel文件")
        
        print(f"加载文件成功，原始数据形状: {self.df.shape}")
        return self.df
    
    def remove_empty_columns(self):
        """删除空的列并向左合并其他列"""
        if self.df is None:
            self.load_file()
            
        # 记录原始列名
        original_columns = self.df.columns.tolist()
        
        # 找出所有完全为空的列
        empty_cols = self.df.columns[self.df.isna().all()].tolist()
        
        if empty_cols:
            print(f"发现 {len(empty_cols)} 个空列: {empty_cols}")
            # 删除空列
            self.df = self.df.drop(columns=empty_cols)
            print("空列已删除")
        else:
            print("未发现空列")
        
        print(f"删除空列后数据形状: {self.df.shape}")
        return self.df
    
    def detect_column_type(self, column):
        """
        检测列的类型
        
        Args:
            column (pd.Series): 数据列
            
        Returns:
            str: 列类型 - 'binary', 'ordinal', 'continuous'
        """
        # 获取非空值
        non_null_vals = column.dropna()
        
        if len(non_null_vals) == 0:
            # 如果全是空值，根据列名猜测类型
            col_name = column.name.lower()
            if 'id' in col_name or '编号' in col_name:
                return 'id'
            else:
                return 'continuous'  # 默认作为连续变量处理
        
        # 计算唯一值数量（非空）
        unique_vals = non_null_vals.nunique()
        
        # 如果是ID列（第一列），特殊处理
        if column.name == self.df.columns[0] or 'id' in str(column.name).lower():
            return 'id'
        
        # 判断是否为二分类变量
        if unique_vals <= 2:
            # 检查值是否为数值型或可转换为数值型
            try:
                # 尝试转换为数值型
                numeric_vals = pd.to_numeric(non_null_vals, errors='coerce')
                if numeric_vals.notna().all():
                    return 'binary'
            except:
                # 如果转换失败，检查是否是字符串类型的二分类
                if unique_vals <= 2:
                    return 'binary'
        
        # 判断是否为定序变量
        # 定序变量通常是有序的分类变量，如等级、评分等
        try:
            # 尝试转换为数值型
            numeric_vals = pd.to_numeric(non_null_vals, errors='coerce')
            if numeric_vals.notna().all():
                # 如果值是整数且唯一值数量较少（通常 <= 10），可能是定序变量
                if (numeric_vals.apply(float.is_integer).all() and 
                    unique_vals <= 10 and 
                    unique_vals > 2):
                    return 'ordinal'
        except:
            pass
        
        # 默认作为连续变量
        return 'continuous'
    
    def process_binary_column(self, column):
        """处理二分类变量"""
        col_name = column.name
        
        # 首先转换为数值型
        try:
            numeric_col = pd.to_numeric(column, errors='coerce')
        except:
            # 如果是字符串，编码为数字
            unique_vals = column.dropna().unique()
            mapping = {val: i+1 for i, val in enumerate(unique_vals)}
            numeric_col = column.map(mapping)
        
        # 缺失值填充为0
        processed_col = numeric_col.fillna(0)
        
        # 确保只有0和1（如果原始有1,2，则映射为0,1）
        unique_nonzero = processed_col[processed_col != 0].unique()
        if len(unique_nonzero) == 1:
            # 如果只有一个非零值，将其映射为1
            nonzero_val = unique_nonzero[0]
            processed_col = processed_col.replace({nonzero_val: 1})
        elif len(unique_nonzero) == 2:
            # 如果有两个非零值，映射为0,1
            vals = sorted(unique_nonzero)
            processed_col = processed_col.replace({vals[0]: 0, vals[1]: 1})
        
        return processed_col
    
    def process_ordinal_column(self, column):
        """处理定序变量"""
        # 计算众数
        mode_val = column.mode()
        if not mode_val.empty:
            fill_value = mode_val.iloc[0]
        else:
            fill_value = column.median()  # 如果没有众数，使用中位数
        
        # 填充缺失值
        processed_col = column.fillna(fill_value)
        
        # 确保是数值型
        try:
            processed_col = pd.to_numeric(processed_col, errors='coerce')
        except:
            pass
            
        return processed_col
    
    def process_continuous_column(self, column):
        """处理连续变量"""
        # 计算平均值
        mean_val = column.mean()
        
        # 填充缺失值
        processed_col = column.fillna(mean_val)
        
        return processed_col
    
    def preprocess_file(self, save_output=True, output_path=None):
        """
        主处理函数
        
        Args:
            save_output (bool): 是否保存输出文件
            output_path (str): 输出文件路径
            
        Returns:
            pd.DataFrame: 处理后的数据框
        """
        print("开始文件预处理...")
        
        # 1. 加载文件
        self.load_file()
        
        # 2. 删除空列
        self.remove_empty_columns()
        
        # 3. 创建处理后的数据框副本
        self.processed_df = self.df.copy()
        
        print("\n检测列类型并处理缺失值...")
        print("-" * 50)
        
        # 4. 遍历每一列（除第一列ID列外）
        for i, col_name in enumerate(self.processed_df.columns):
            column = self.processed_df[col_name]
            
            # 检测列类型
            col_type = self.detect_column_type(column)
            self.column_types[col_name] = col_type
            
            # 根据类型处理
            if col_type == 'id':
                # ID列不处理
                print(f"列 {i+1}: {col_name} - ID列 (跳过处理)")
                continue
            elif col_type == 'binary':
                print(f"列 {i+1}: {col_name} - 二分类变量")
                self.processed_df[col_name] = self.process_binary_column(column)
            elif col_type == 'ordinal':
                print(f"列 {i+1}: {col_name} - 定序变量")
                self.processed_df[col_name] = self.process_ordinal_column(column)
            elif col_type == 'continuous':
                print(f"列 {i+1}: {col_name} - 连续变量")
                self.processed_df[col_name] = self.process_continuous_column(column)
        
        print("-" * 50)
        print(f"处理完成！最终数据形状: {self.processed_df.shape}")
        
        # 5. 保存输出文件
        if save_output:
            if output_path is None:
                # 生成默认输出文件名
                suffix = self.file_path.suffix
                output_path = str(self.file_path).replace(suffix, f"_processed{suffix}")
            
            if self.file_path.suffix == '.csv':
                self.processed_df.to_csv(output_path, index=False)
            else:
                self.processed_df.to_excel(output_path, index=False)
            
            print(f"处理后的文件已保存至: {output_path}")
        
        return self.processed_df
    
    def get_summary(self):
        """获取处理摘要"""
        if self.processed_df is None:
            return "请先运行 preprocess_file() 方法"
        
        summary = {
            '原始文件': str(self.file_path),
            '原始数据形状': f"{self.df.shape[0]} 行 × {self.df.shape[1]} 列",
            '处理后形状': f"{self.processed_df.shape[0]} 行 × {self.processed_df.shape[1]} 列",
            '列类型统计': {
                'ID列': sum(1 for t in self.column_types.values() if t == 'id'),
                '二分类变量': sum(1 for t in self.column_types.values() if t == 'binary'),
                '定序变量': sum(1 for t in self.column_types.values() if t == 'ordinal'),
                '连续变量': sum(1 for t in self.column_types.values() if t == 'continuous'),
            },
            '列详细信息': self.column_types
        }
        
        return summary


def main():
    """主函数示例"""
    import argparse
    
    parser = argparse.ArgumentParser(description='文件预处理程序')
    parser.add_argument('file_path', help='输入文件路径')
    parser.add_argument('--output', '-o', help='输出文件路径（可选）')
    parser.add_argument('--no-save', action='store_true', help='不保存输出文件')
    
    args = parser.parse_args()
    
    try:
        # 创建预处理器实例
        preprocessor = FilePreprocessor(args.file_path)
        
        # 执行预处理
        processed_df = preprocessor.preprocess_file(
            save_output=not args.no_save,
            output_path=args.output
        )
        
        # 显示摘要
        print("\n" + "="*50)
        print("处理摘要")
        print("="*50)
        summary = preprocessor.get_summary()
        
        if isinstance(summary, dict):
            for key, value in summary.items():
                if key == '列详细信息':
                    print(f"\n{key}:")
                    for col, col_type in value.items():
                        print(f"  {col}: {col_type}")
                elif key == '列类型统计':
                    print(f"\n{key}:")
                    for type_name, count in value.items():
                        print(f"  {type_name}: {count}")
                else:
                    print(f"{key}: {value}")
        
        print("\n前5行数据预览:")
        print(processed_df.head())
        
    except Exception as e:
        print(f"处理过程中发生错误: {e}")
        import traceback
        traceback.print_exc()


if __name__ == "__main__":
    # 直接运行示例
    print("文件预处理程序")
    print("="*50)
    
    # 示例用法1: 直接指定文件
    # preprocessor = FilePreprocessor("your_file.csv")
    # processed_df = preprocessor.preprocess_file()
    
    # 示例用法2: 使用交互式输入
    file_path =r"C:\Users\DXW\Desktop\半月板手术_predictor.xlsx"
    
    if file_path:
        preprocessor = FilePreprocessor(file_path)
        processed_df = preprocessor.preprocess_file()
        
        # 显示摘要
        print("\n" + "="*50)
        print("处理摘要")
        print("="*50)
        summary = preprocessor.get_summary()
        
        if isinstance(summary, dict):
            for key, value in summary.items():
                if key == '列详细信息':
                    print(f"\n{key}:")
                    for col, col_type in value.items():
                        print(f"  {col}: {col_type}")
                elif key == '列类型统计':
                    print(f"\n{key}:")
                    for type_name, count in value.items():
                        print(f"  {type_name}: {count}")
                else:
                    print(f"{key}: {value}")
        
        print("\n前5行数据预览:")
        print(processed_df.head())
    else:
        print("使用命令行运行: python script.py your_file.csv")

文件预处理程序
开始文件预处理...
加载文件成功，原始数据形状: (300, 356)
发现 59 个空列: ['Unnamed: 0', 'Unnamed: 3', 'Unnamed: 4', 'Unnamed: 5', 'Unnamed: 6', 'Unnamed: 7', 'Unnamed: 8', 'HPNRDK', 'HPNLDK', 'BPDK', 'Unnamed: 176', 'Unnamed: 198', 'Unnamed: 200', 'Unnamed: 202', 'Unnamed: 203', 'Unnamed: 204', 'Unnamed: 217', 'Unnamed: 218', 'Unnamed: 219', 'Unnamed: 220', 'Unnamed: 221', 'Unnamed: 222', 'Unnamed: 223', 'Unnamed: 224', 'Unnamed: 226', 'Unnamed: 228', 'Unnamed: 229', 'Unnamed: 240', 'Unnamed: 241', 'Unnamed: 245', 'Unnamed: 249', 'Unnamed: 255', 'Unnamed: 262', 'Unnamed: 265', 'Unnamed: 274', 'Unnamed: 278', 'Unnamed: 279', 'Unnamed: 280', 'Unnamed: 281', 'Unnamed: 282', 'Unnamed: 283', 'Unnamed: 284', 'Unnamed: 285', 'Unnamed: 286', 'Unnamed: 287', 'Unnamed: 288', 'Unnamed: 289', 'Unnamed: 290', 'Unnamed: 291', 'Unnamed: 292', 'Unnamed: 293', 'Unnamed: 301', 'Unnamed: 302', 'Unnamed: 310', 'Unnamed: 311', 'Unnamed: 328', 'Unnamed: 330', 'Unnamed: 332', 'Unnamed: 334']
空列已删除
删除空列后数据形状: (300, 297)

检测列类