In [1]:
"""数据集预处理类"""

import numpy as np
import pandas as pd
from sklearn import preprocessing
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler
import os
from pathlib import Path
import logging
from rich.logging import RichHandler

In [15]:
class DataPreProcessorV1:
    def __init__(
        self,
        data_path: str,
        *,
        sensi_names: list[str] = None,
        np_seed: int = 42,
        level: int = None,
    ) -> None:
        """初始化数据预处理器"""
        # 初始化随机数种子
        np.random.seed(np_seed)
        self.seed = np_seed
        self.sensi_names = sensi_names
        # 初始化日志
        self.init_logger(level=level)
        # 初始化 df
        self.init_df(data_path)

    def init_logger(self, logger_name="default_dataset", level=None) -> None:
        """初始化日志"""
        if level is None:
            level = logging.INFO
        FORMAT = "%(message)s"
        logging.basicConfig(
            level="NOTSET", format=FORMAT, datefmt="[%X]", handlers=[RichHandler()]
        )
        self.logger = logging.getLogger(logger_name)
        self.logger.setLevel(level)

    def init_df(self, data_path: str):
        """初始化数据"""
        if not os.path.exists(data_path):
            raise FileNotFoundError(f"{data_path} 文件路径不存在")
        if not data_path.endswith("csv"):
            raise TypeError("文件类型错误，应该是 csv 文件")
        self.df = pd.read_csv(data_path, encoding="latin-1")
        self.logger.debug(f"df 包含 {self.df.shape[0]} 行数据，{self.df.shape[1]} 列")

    def init_Xy(self, label, mapper):
        """清洗数据"""
        self.X = self.df.drop(label, axis=1)
        self.y = self.df[label].map(mapper)
        self.X.replace("?", np.nan)
        self.split_Xy()
        self.label_X()

    def split_Xy(self, test_size=0.3):
        """分割数据为训练集和测试集"""
        if self.seed is None:
            raise BaseException("self.seed 未定义")
        self.X_train, self.X_test, self.y_train, self.y_test = train_test_split(
            self.X, self.y, test_size=test_size, random_state=self.seed
        )

    def label_X(self):
        """为数据集中的 object 列标签"""
        X_object_columns = self.X.select_dtypes("object").columns
        self.X_train_label = self.X_train.copy()
        self.X_test_label = self.X_test.copy()
        for feature in X_object_columns:
            le = preprocessing.LabelEncoder()
            self.X_train_label[feature] = le.fit_transform(self.X_train[feature])
            self.X_test_label[feature] = le.transform(self.X_test[feature])

    def scaler_X(self):
        """数据缩放"""
        if self.sensi_names is None:
            raise BaseException("self.sensi_names 未定义")
        # 获得原始名字
        origin_names = self.X_train[self.sensi_names].value_counts().index

        X_columns = self.X_train.columns
        # 数据缩放（需要加上 columns 和 index 参数，这样才可以保证和原来的一样）
        scaler = StandardScaler()
        self.X_train_label_scale = pd.DataFrame(
            scaler.fit_transform(self.X_train_label),
            columns=X_columns,
            index=self.X_train.index,
        )
        self.X_test_label_scale = pd.DataFrame(
            scaler.transform(self.X_test_label),
            columns=X_columns,
            index=self.X_test.index,
        )
        # 映射 原始名 -> 数字
        handle_names = self.X_train_label_scale[self.sensi_names].value_counts().index
        self.names_map_scaler = {}
        for key, value in zip(origin_names, handle_names):
            self.names_map_scaler[key] = value
    
    def group_Xy(self):
        result = []
        X, y = self.X_train_label_scale, self.y_train
        grouped = X.groupby(self.sensi_names)
        for name, data in grouped:
            label = y[data.index]
            result.append((name, data, label))
        self.grouped = result


        
if __name__ == "__main__":
    processor = DataPreProcessorV1("../input/adult.csv", sensi_names=["sex"])
    processor.init_Xy("income", {"<=50K": 0, ">50K": 1})
    processor.X_train_label
    processor.scaler_X()
    processor.group_Xy()
    len(processor.grouped)