In [1]:
# This Python 3 environment comes with many helpful analytics libraries installed
# It is defined by the kaggle/python Docker image: https://github.com/kaggle/docker-python
# For example, here's several helpful packages to load

import numpy as np # linear algebra
import pandas as pd # data processing, CSV file I/O (e.g. pd.read_csv)

# Input data files are available in the read-only "../input/" directory
# For example, running this (by clicking run or pressing Shift+Enter) will list all files under the input directory

import os
for dirname, _, filenames in os.walk('/kaggle/input'):
    for filename in filenames:
        print(os.path.join(dirname, filename))

# You can write up to 20GB to the current directory (/kaggle/working/) that gets preserved as output when you create a version using "Save & Run All" 
# You can also write temporary files to /kaggle/temp/, but they won't be saved outside of the current session

/kaggle/input/home-credit-credit-risk-model-stability/sample_submission.csv
/kaggle/input/home-credit-credit-risk-model-stability/feature_definitions.csv
/kaggle/input/home-credit-credit-risk-model-stability/parquet_files/test/test_deposit_1.parquet
/kaggle/input/home-credit-credit-risk-model-stability/parquet_files/test/test_applprev_2.parquet
/kaggle/input/home-credit-credit-risk-model-stability/parquet_files/test/test_static_cb_0.parquet
/kaggle/input/home-credit-credit-risk-model-stability/parquet_files/test/test_static_0_0.parquet
/kaggle/input/home-credit-credit-risk-model-stability/parquet_files/test/test_credit_bureau_a_1_3.parquet
/kaggle/input/home-credit-credit-risk-model-stability/parquet_files/test/test_credit_bureau_a_1_2.parquet
/kaggle/input/home-credit-credit-risk-model-stability/parquet_files/test/test_tax_registry_b_1.parquet
/kaggle/input/home-credit-credit-risk-model-stability/parquet_files/test/test_static_0_2.parquet
/kaggle/input/home-credit-credit-risk-model-st

In [2]:
import sys
from pathlib import Path
import subprocess
import os
import gc
from glob import glob
import pickle

import numpy as np
import pandas as pd
import polars as pl
from datetime import datetime
import seaborn as sns
import matplotlib.pyplot as plt

In [3]:
from sklearn.model_selection import TimeSeriesSplit, GroupKFold, StratifiedGroupKFold
from sklearn.base import BaseEstimator, RegressorMixin
from sklearn.metrics import roc_auc_score
import lightgbm as lgb

from imblearn.over_sampling import SMOTE
from imblearn.under_sampling import EditedNearestNeighbours
from sklearn.preprocessing import OrdinalEncoder, LabelEncoder
# from woe_conversion.woe import *
from sklearn.impute import KNNImputer

from imblearn.combine import SMOTEENN
from collections import Counter

import warnings
warnings.filterwarnings("ignore")

In [4]:
import gc
gc.collect()

0

In [5]:
ROOT            = Path("/kaggle/input/home-credit-credit-risk-model-stability")
TRAIN_DIR       = ROOT / "parquet_files" / "train"
TRAIN_CSV_DIR   = ROOT / "csv_files" / "train"
TEST_DIR        = ROOT / "parquet_files" / "test"

In [6]:
import threading

def process_files(filename, dir, feature_sheet_list):
    '''
    处理单个csv
    '''
    df = pd.read_parquet(os.path.join(dir, filename))
    print('Processing: ', filename)

    info = pd.DataFrame(df.isnull().sum(), columns=['nan_num'])
    info['df_len'] = df.shape[0]
    info['file'] = filename
    info = info.rename(index={'case_id': f'case_id_{filename[:-4]}'})
    
    feature_sheet_list.append(info)

In [7]:
def feature_sheet_process(file_list, dir):
    '''
    多线程处理
    '''
    threads = []
    feature_sheet_list = []

    for file in file_list:
        thread = threading.Thread(target=process_files, args=(file, dir, feature_sheet_list))
        thread.start()
        threads.append(thread)

    for thread in threads:
        thread.join()

    combined_feature_sheet = pd.concat(feature_sheet_list, axis=0)
    return combined_feature_sheet

In [8]:
'''
data_store = {}
for i in range(0, 32, 8):
    train_feature_sheet = feature_sheet_process(train_list[i:i+8], TRAIN_DIR)
    data_store.update({f'feature_sheet_{i//8}': train_feature_sheet})
'''

"\ndata_store = {}\nfor i in range(0, 32, 8):\n    train_feature_sheet = feature_sheet_process(train_list[i:i+8], TRAIN_DIR)\n    data_store.update({f'feature_sheet_{i//8}': train_feature_sheet})\n"

In [9]:
'''
train_feature_sheet = data_store['feature_sheet_0.csv']
for i in range(1,4):
    train_feature_sheet = pd.concat([train_feature_sheet, data_store[f'feature_sheet_{i}.csv']], axis=0)
del data_store
'''

"\ntrain_feature_sheet = data_store['feature_sheet_0.csv']\nfor i in range(1,4):\n    train_feature_sheet = pd.concat([train_feature_sheet, data_store[f'feature_sheet_{i}.csv']], axis=0)\ndel data_store\n"

In [10]:
'''
train_feature_sheet = train_feature_sheet.rename(columns={'Unnamed: 0': 'feature'})
train_feature_sheet
'''

"\ntrain_feature_sheet = train_feature_sheet.rename(columns={'Unnamed: 0': 'feature'})\ntrain_feature_sheet\n"

In [11]:
'''
train_feature_sheet = train_feature_sheet.groupby('feature').agg({'nan_num': 'sum', 'df_len': 'sum', 'file': list}).reset_index()
train_feature_sheet['nan_ratio'] = train_feature_sheet['nan_num'] / train_feature_sheet['df_len']
train_feature_sheet
'''

"\ntrain_feature_sheet = train_feature_sheet.groupby('feature').agg({'nan_num': 'sum', 'df_len': 'sum', 'file': list}).reset_index()\ntrain_feature_sheet['nan_ratio'] = train_feature_sheet['nan_num'] / train_feature_sheet['df_len']\ntrain_feature_sheet\n"

In [12]:
'''
# 按照feature首字母排序
train_feature_sheet = train_feature_sheet.sort_values(by='feature')
train_feature_sheet
'''

"\n# 按照feature首字母排序\ntrain_feature_sheet = train_feature_sheet.sort_values(by='feature')\ntrain_feature_sheet\n"

In [13]:
'''
# train_feature_sheet丢弃feature为'case_id'开头的行
train_feature_sheet_1 = train_feature_sheet[~train_feature_sheet['feature'].str.startswith('case_id')]
del train_feature_sheet
train_feature_sheet_1
'''

"\n# train_feature_sheet丢弃feature为'case_id'开头的行\ntrain_feature_sheet_1 = train_feature_sheet[~train_feature_sheet['feature'].str.startswith('case_id')]\ndel train_feature_sheet\ntrain_feature_sheet_1\n"

In [14]:
'''
# feature_definitions里面的feature
feature_list = pd.read_csv(os.path.joint(ROOT, 'feature_definitions.csv'))
features1 = feature_list['Variable'].values
features0 = train_feature_sheet_1['feature'].values
features = list(set(features0) - set(features1))
features  # 在csv不在definitions里面的feature(就是train_base里面的)
'''

"\n# feature_definitions里面的feature\nfeature_list = pd.read_csv(os.path.joint(ROOT, 'feature_definitions.csv'))\nfeatures1 = feature_list['Variable'].values\nfeatures0 = train_feature_sheet_1['feature'].values\nfeatures = list(set(features0) - set(features1))\nfeatures  # 在csv不在definitions里面的feature(就是train_base里面的)\n"

In [15]:
'''
# 保留feature_list中Variable列的值在train_feature_sheet_1中的行
train_feature_sheet_2 = train_feature_sheet_1[train_feature_sheet_1['feature'].isin(features1)]
feature_info = pl.DataFrame(train_feature_sheet_2.reset_index())
del train_feature_sheet_2, train_feature_sheet_1, feature0, features, feature1, feature_list

with open("feature_info.pkl", 'wb') as f:
    pickle.dump(feature_info, f)

dir = 'home-credit-credit-risk-model-stability'
feature_info = pl.read_csv("feature_info.csv")
feature_info = feature_info.filter(pl.col("nan_ratio") <= 0.7)
feat_defs = pl.read_csv(os.path.join(dir0, 'feature_definitions.csv'))
feat_defs = feat_defs.filter(pl.col("Variable").is_in(feature_info["feature"]))
del dir, feature_info, feat_defs
'''

'\n# 保留feature_list中Variable列的值在train_feature_sheet_1中的行\ntrain_feature_sheet_2 = train_feature_sheet_1[train_feature_sheet_1[\'feature\'].isin(features1)]\nfeature_info = pl.DataFrame(train_feature_sheet_2.reset_index())\ndel train_feature_sheet_2, train_feature_sheet_1, feature0, features, feature1, feature_list\n\nwith open("feature_info.pkl", \'wb\') as f:\n    pickle.dump(feature_info, f)\n\ndir = \'home-credit-credit-risk-model-stability\'\nfeature_info = pl.read_csv("feature_info.csv")\nfeature_info = feature_info.filter(pl.col("nan_ratio") <= 0.7)\nfeat_defs = pl.read_csv(os.path.join(dir0, \'feature_definitions.csv\'))\nfeat_defs = feat_defs.filter(pl.col("Variable").is_in(feature_info["feature"]))\ndel dir, feature_info, feat_defs\n'

In [16]:
DATA_T = {"Num" : ["Number", "number", "Amount", "amount", "limit", "Value", "value", "sum", "Sum"], 
            "Factor": ["Flag", "flag", "Index", "index", "Indices", "indices", "Type", "type",
                      "Indicates", "indicates", "Status", "status", "Order", "Gender", "indicating", "reason", 
                      "Reason", "Classification", "classification", "Name", "District", "Zipcode", "address", "language", 
                      "Category", "category", "Role", "role", "Year", "year"],
            "Ratio": ["Rate", "rate", "Percentage", "percentage"], 
            "Time":  ["Date", "date"]}

DATA_L = {"Num" : ["Number", "number", "Amount", "amount", "DPD", "Days" "Average", "average", "limit", "Value", "value", 
                      "Sum", "sum"], 
            "Factor": ["Flag", "flag", "Index", "index", "Indices", "indices", "Type", "type",
                      "Indicates", "indicates", "Status", "status", "Order", "Gender", "indicating", "reason", 
                      "Reason", "Classification", "classification", "Name", "District", "Zipcode", "address", "language", 
                      "Category", "category", "Role", "role", "Year", "year"],
            "Ratio": ["Rate", "rate", "Percentage", "percentage"],
            "Time":  ["Date", "date"]}

In [36]:
class Pipeline:
    
   
    @staticmethod
    def set_table_dtypes(df):
        # 读取特征定义文件
        feat_defs = pl.read_csv(os.path.join(ROOT, 'feature_definitions.csv'))
        # 处理特征描述列的数据类型
        feat_defs = feat_defs.with_columns(pl.col("Description").str.split(by=" ")
                                     .alias("Description"))
        for col in df.columns:
            if col in ["case_id", "WEEK_NUM", "num_group1", "num_group2", "target"]:
                df = df.with_columns(pl.col(col).cast(pl.Int64).alias(col))
            elif col in ["date_decision"]:
                df = df.with_columns(pl.col(col).cast(pl.Date).alias(col))
            elif col[-1] in ("D",):
                df = df.with_columns(pl.col(col).cast(pl.Date).alias(col))
            elif col[-1] in ("P", "A"):
                df = df.with_columns(pl.col(col).cast(pl.Float64).alias(col))
            elif col[-1] in ("M"):
                df = df.with_columns(pl.col(col).cast(pl.String).alias(col))
            elif col[-1] in ("T", "L"):
                feat_df = feat_defs.filter(pl.col("Variable") == col).select("Description")
                if not feat_df.is_empty():
                    words = feat_df[0]["Description"].to_list()
                    for word in words:
                        if word in DATA_T["Num"] or word in DATA_L["Num"]:
                            df = df.with_columns(pl.col(col).cast(pl.Float64).alias(col))
                        elif word in DATA_T["Ratio"] or word in DATA_L["Ratio"]:
                            df = df.with_columns(pl.col(col).cast(pl.Float64).alias(col))
                        elif word in DATA_T["Factor"] or word in DATA_L["Factor"]:
                            df = df.with_columns(pl.col(col).cast(pl.String).alias(col))
                        elif word in DATA_T["Time"] or word in DATA_L["Time"]:
                            df = df.with_columns(pl.col(col).cast(pl.Date).alias(col))
        
        return df

    @staticmethod
    def handle_dates(df, dir_path, base_file_name):
        
        date_decision = pl.read_parquet(os.path.join(dir_path, base_file_name))    \
                               .select(pl.col("case_id"), pl.col("date_decision").cast(pl.Date))
        if "date_decision" in df.columns and "target" in df.columns:
            df.drop("MONTH")
        for col in df.columns:
            
            if df[col].dtype == pl.Date and col != "date_decision":
                
                # Calculate duration difference in days

                df = df.join(date_decision, on='case_id', how='left')
                diff_days_col = (pl.col(col) - pl.col("date_decision")).cast(pl.Duration)
                df = df.with_columns(diff_days_col.dt.total_days().alias(f"{col}_diff_daysP"))   
                df = df.drop("date_decision")
                
                df = df.with_columns(
                                month_col = pl.col(col).dt.month().cast(pl.UInt8).alias(f"month_{col}"),
                                weekday_col = pl.col(col).dt.weekday().cast(pl.UInt8).alias(f"weekday_{col}"),
                            )
                
                
        return df
    
          
    @staticmethod
    def filter_cols(df):
        
        for col in df.columns:
            if col in ["WEEK_NUM", "num_group1", "num_group2"]:
                df = df.drop(col)
        
        for col in df.columns:
            if col not in ["target", "case_id", "WEEK_NUM", "num_group1", "num_group2"]:
                isnull = df[col].is_null().mean()
                if isnull > 0.7:
                    df = df.drop(col)
        
        for col in df.columns:
            if (col not in ["target", "case_id", "WEEK_NUM", "num_group1", "num_group2"]) \
                & (df[col].dtype == pl.String):
                freq = df[col].n_unique()
                if (freq == 1) | (freq > 200):
                    df = df.drop(col)
        
        return df
        

In [18]:
class Aggregator:
    #Please add or subtract features yourself, be aware that too many features will take up too much space.
    def num_expr(df):
        cols = [col for col in df.columns if df[col].dtype == pl.datatypes.Float64]
        expr_max = [pl.max(col).alias(f"max_{col}") for col in cols]
        expr_min = [pl.min(col).alias(f"min_{col}") for col in cols]
        expr_last = [pl.last(col).alias(f"last_{col}") for col in cols]
        #expr_first = [pl.first(col).alias(f"first_{col}") for col in cols]
        expr_mean = [pl.mean(col).alias(f"mean_{col}") for col in cols]
        return expr_max +expr_last+expr_mean +expr_min 
    
    def date_expr(df):
        cols = [col for col in df.columns if df[col].dtype == pl.Date]
        #expr_max = [pl.max(col).alias(f"max_{col}") for col in cols]
        #expr_min = [pl.min(col).alias(f"min_{col}") for col in cols]
        expr_last = [pl.last(col).alias(f"last_{col}") for col in cols]
        expr_first = [pl.first(col).alias(f"first_{col}") for col in cols]
        #expr_mean = [pl.mean(col).alias(f"mean_{col}") for col in cols]
        return  expr_first +expr_last
    
    def str_expr(df):
        cols = [col for col in df.columns if df[col].dtype == pl.String]
        # expr_max = [pl.max(col).alias(f"max_{col}") for col in cols]
        #expr_min = [pl.min(col).alias(f"min_{col}") for col in cols]
        expr_last = [pl.last(col).alias(f"last_{col}") for col in cols]
        #expr_first = [pl.first(col).alias(f"first_{col}") for col in cols]
        expr_count = [pl.count(col).alias(f"count_{col}") for col in cols]
        return  expr_last +expr_count #+expr_max
    
    '''
    def other_expr(df):
        cols = [col for col in df.columns if col[-1] in ("T", "L")]
        expr_max = [pl.max(col).alias(f"max_{col}") for col in cols]
        #expr_min = [pl.min(col).alias(f"min_{col}") for col in cols]
        expr_last = [pl.last(col).alias(f"last_{col}") for col in cols]
        #expr_first = [pl.first(col).alias(f"first_{col}") for col in cols]
        return  expr_max +expr_last
    '''
    
    def count_expr(df):
        cols = [col for col in df.columns if "num_group" in col]
        expr_max = [pl.max(col).alias(f"max_{col}") for col in cols] 
        #expr_min = [pl.min(col).alias(f"min_{col}") for col in cols]
        expr_last = [pl.last(col).alias(f"last_{col}") for col in cols]
        #expr_first = [pl.first(col).alias(f"first_{col}") for col in cols]
        return  expr_max +expr_last
    
    def get_exprs(df):
        exprs = Aggregator.num_expr(df) + \
                Aggregator.date_expr(df) + \
                Aggregator.str_expr(df) + \
                Aggregator.count_expr(df)

        return exprs

In [67]:
def read_file(path, depth=None):
    df = pl.read_parquet(path)
    df = df.pipe(Pipeline.set_table_dtypes)
    if depth in [1,2]:
        df = df.group_by("case_id").agg(Aggregator.get_exprs(df)) 
        
    return df

def read_files(regex_path, depth=None):
    chunks = []
    
    for path in glob(str(regex_path)):
        df = pl.read_parquet(path)
        df = df.pipe(Pipeline.set_table_dtypes)
        if depth in [1, 2]:
            df = df.group_by("case_id").agg(Aggregator.get_exprs(df))
        #df = df.fill_null(value="Temp_null_value")
        chunks.append(df)
    
    df = pl.concat(chunks, how="vertical_relaxed")
    #for col in df.columns:
    #    df = df.with_columns(pl.col(col).cast(pl.String))
    #    df = df.with_columns(pl.col(col).replace(df.filter(pl.col(col) == "Temp_null_value"),None))
    #df = df.pipe(Pipeline.set_table_dtypes)
    df = df.unique(subset=["case_id"])
    
    return df

def to_pandas(df_data, cat_cols=None):
    df_data = df_data.to_pandas()
    if cat_cols is None:
        cat_cols = list(df_data.select_dtypes("object").columns)
    df_data[cat_cols] = df_data[cat_cols].astype("category")
    return df_data, cat_cols

def reduce_mem_usage(df):
    """ iterate through all the columns of a dataframe and modify the data type
        to reduce memory usage.        
    """
    start_mem = df.memory_usage().sum() / 1024**2
    print('Memory usage of dataframe is {:.2f} MB'.format(start_mem))
    
    for col in df.columns:
        col_type = df[col].dtype
        if str(col_type)=="category":
            continue
        
        if col_type != object:
            c_min = df[col].min()
            c_max = df[col].max()
            if str(col_type)[:3] == 'int':
                if c_min > np.iinfo(np.int8).min and c_max < np.iinfo(np.int8).max:
                    df[col] = df[col].astype(np.int8)
                elif c_min > np.iinfo(np.int16).min and c_max < np.iinfo(np.int16).max:
                    df[col] = df[col].astype(np.int16)
                elif c_min > np.iinfo(np.int32).min and c_max < np.iinfo(np.int32).max:
                    df[col] = df[col].astype(np.int32)
                elif c_min > np.iinfo(np.int64).min and c_max < np.iinfo(np.int64).max:
                    df[col] = df[col].astype(np.int64)  
            else:
                if c_min > np.finfo(np.float16).min and c_max < np.finfo(np.float16).max:
                    df[col] = df[col].astype(np.float16)
                elif c_min > np.finfo(np.float32).min and c_max < np.finfo(np.float32).max:
                    df[col] = df[col].astype(np.float32)
                else:
                    df[col] = df[col].astype(np.float64)
        else:
            continue
    end_mem = df.memory_usage().sum() / 1024**2
    print('Memory usage after optimization is: {:.2f} MB'.format(end_mem))
    print('Decreased by {:.1f}%'.format(100 * (start_mem - end_mem) / start_mem))
    
    return df

In [20]:
os.path.exists("/kaggle/input/home-credit-credit-risk-model-stability")


True

In [21]:
ROOT            = Path("/kaggle/input/home-credit-credit-risk-model-stability")
TRAIN_DIR       = ROOT / "parquet_files" / "train"
TRAIN_CSV_DIR   = ROOT / "csv_files" / "train"
TEST_DIR        = ROOT / "parquet_files" / "test"
print(TRAIN_CSV_DIR)


/kaggle/input/home-credit-credit-risk-model-stability/csv_files/train


In [76]:
data_store = {
    "df_base": [read_file(TRAIN_DIR / "train_base.parquet")],
    "depth_0": [
        read_file(TRAIN_DIR / "train_static_cb_0.parquet"),
        read_files(TRAIN_DIR / "train_static_0_*.parquet"),
    ],
    "depth_1": [
        read_files(TRAIN_DIR / "train_applprev_1_*.parquet", 1),
        read_file(TRAIN_DIR / "train_tax_registry_a_1.parquet", 1),
        read_file(TRAIN_DIR / "train_tax_registry_b_1.parquet", 1),
        read_file(TRAIN_DIR / "train_tax_registry_c_1.parquet", 1),
        read_files(TRAIN_DIR / "train_credit_bureau_a_1_*.parquet", 1),
        read_file(TRAIN_DIR / "train_credit_bureau_b_1.parquet", 1),
        read_file(TRAIN_DIR / "train_other_1.parquet", 1),
        read_file(TRAIN_DIR / "train_person_1.parquet", 1),
        read_file(TRAIN_DIR / "train_deposit_1.parquet", 1),
        read_file(TRAIN_DIR / "train_debitcard_1.parquet", 1),
    ],
    "depth_2": [
        read_file(TRAIN_DIR / "train_credit_bureau_b_2.parquet", 2),
        read_files(TRAIN_DIR / "train_credit_bureau_a_2_*.parquet", 2),
        read_file(TRAIN_DIR / "train_applprev_2.parquet", 2),
        read_file(TRAIN_DIR / "train_person_2.parquet", 2)
    ]
}

with open('temporary_data_store.pkl', 'wb') as f:
    pickle.dump(data_store, f)

In [23]:
'''
WOE-Encoder:
@Writer: BertrandBrelier
@Copy right: MIT Liscence
@Link: https://github.com/BertrandBrelier/woe/blob/main/woe.py
'''

# DataFrame here is based on pandas rather than polars
class WoeConversion():
    def __init__(self, binarytarget, features, nbins=10):
        self.target = binarytarget
        self.features = features
        self.continuousvariables = []
        self.categoricalvariables = []
        self.categoricalmodel = None
        self.continuousmodel = None
        self.nbins = nbins
    def fit(self,df):
        #find continuous variables:
        traindf = df.copy(deep=True)
        try:
            traindf[self.target] = traindf[self.target].astype('int')
        except:
            print("ERROR : target variable must be a binary integer column with no missing values")
            return 
        for feat in self.features:
            if traindf[feat].dtypes == 'O':
                traindf[feat] = traindf[feat].astype(str)
                self.categoricalvariables.append(feat)
            else:
                try:
                    traindf[feat] = traindf[feat].astype(float)
                    self.continuousvariables.append(feat)
                except:
                    self.categoricalvariables.append(feat)
        self.categoricalmodel = ConvertCategoricalFeatures(binarytarget=self.target,CategoricalFeatures=self.categoricalvariables)
        self.categoricalmodel.fit(traindf)
        self.continuousmodel = ConvertContinuousFeatures(binarytarget=self.target,ContinuousFeatures=self.continuousvariables, NBins = self.nbins)
        self.continuousmodel.fit(traindf)
    def transform(self,testdf):
        tmpdf = testdf.copy(deep=True)
        tmpdf = self.categoricalmodel.transform(tmpdf)
        tmpdf = self.continuousmodel.transform(tmpdf)
        return tmpdf
        
class ConvertCategoricalFeatures():
    #Class to convert categorical features to WOE for binary classification problem (target = 0 or 1)
    def __init__(self,binarytarget,CategoricalFeatures):
        self.target = binarytarget
        self.Model = {}
        self.Features = CategoricalFeatures
    def fit(self,traindf):
        NPositive=traindf[traindf[self.target]==1].shape[0]
        NNegative=traindf[traindf[self.target]==0].shape[0]
        for feature in self.Features:
            tmptraindf = traindf[[feature,self.target]].copy(deep=True)
            tmptraindf[self.target] = tmptraindf[self.target].astype(int)
            tmptraindf[feature] = tmptraindf[feature].astype(str)
            results = tmptraindf[[feature,self.target]].fillna("None").groupby([feature]).agg(['sum','count'])
            results = results.reset_index()
            results.columns=[feature,"Positive","Count"]
            results["Negative"]=results["Count"]-results["Positive"]
            results["CountPositive"] = results["Positive"]
            #Replace 0 with 1 to avoid infinite log                                                                                                          
            results.loc[results.Negative == 0, 'Negative'] = 1
            results.loc[results.Positive == 0, 'Positive'] = 1
            #Distribution Positive (Good)                                                                                                                    
            results["DG"]=results["Positive"]*1./NPositive
            #Distribution Negative (Bad)                                                                                                                     
            results["DB"]=results["Negative"]*1./NNegative
            #WOE                                                                                                                                             
            results["WOE"]=np.log(results["DG"]/results["DB"])
            results.loc[results.Count <= 10, 'WOE'] = 0
            results.loc[results.CountPositive <= 1, 'WOE'] = results["WOE"].min()
            results.loc[results.Count <= 10, 'WOE'] = 0
            results = results[[feature,'WOE']]
            self.Model[feature] = dict(zip(results[feature], results.WOE))
    def train(self,traindf):
        self.fit(traindf)
    def transform(self,testdf):
        #In case new values are found, needs to impute 0 for WOE
        for feature in self.Features:
            testdf[feature] = testdf[feature].astype(str)
            testdf = testdf.fillna({feature: "None"})
            ListofValues = list(set(testdf[feature].values))
            for omega in ListofValues:
                if omega not in self.Model[feature]:
                    self.Model[feature][omega]=0.
        return testdf.replace(self.Model)

    
class ConvertContinuousFeatures():
    #Class to convert continuous features to WOE for binary classification problem (target = 0 or 1)
    def __init__(self,binarytarget,ContinuousFeatures,NBins):
        self.target = binarytarget
        self.Model = {}
        self.Features = ContinuousFeatures
        self.NBins = NBins
        self.BinModel = {}
    def train(self,traindf):
        self.fit(traindf)
    def fit(self,traindf):
        NPositive=traindf[traindf[self.target]==1].shape[0]
        NNegative=traindf[traindf[self.target]==0].shape[0]
        for feature in self.Features:
            tmpdf = traindf[[feature,self.target]].copy(deep=True)
            List = sorted(list(filter(lambda x:not np.isnan(x) ,tmpdf[feature].values)))
            Len = len(List)
            BinsLim = [-np.inf]
            for omega in range(1,self.NBins):
                Value = List[int(omega * Len / self.NBins)]
                if Value not in BinsLim:
                    BinsLim.append( Value )
            BinsLim.append(np.inf)
            self.BinModel[feature] = BinsLim
            tmpdf["bin"] = pd.cut(tmpdf[feature], self.BinModel[feature], labels=range(1,len(self.BinModel[feature])))
            tmpdf["bin"] = tmpdf["bin"].cat.add_categories([-1])
            tmpdf["bin"] = tmpdf["bin"].fillna(-1) 
            results = tmpdf[["bin",self.target]].groupby(["bin"]).agg(['sum','count'])
            results = results.reset_index()
            results.columns=["bin","Positive","Count"]
            results["Negative"]=results["Count"]-results["Positive"]
            results["CountPositive"] = results["Positive"]
            #Replace 0 with 1 to avoid infinite log                                                                                                          
            results.loc[results.Negative == 0, 'Negative'] = 1
            results.loc[results.Positive == 0, 'Positive'] = 1
            #Distribution Positive (Good)                                                                                                                    
            results["DG"]=results["Positive"]*1./NPositive
            #Distribution Negative (Bad)                                                                                                                     
            results["DB"]=results["Negative"]*1./NNegative
            #WOE                                                                                                                                             
            results["WOE"]=np.log(results["DG"]/results["DB"])
            results.loc[results.Count <= 10, 'WOE'] = 0
            results.loc[results.CountPositive <= 1, 'WOE'] = results["WOE"].min()
            results.loc[results.Count <= 10, 'WOE'] = 0
            results = results[["bin",'WOE']]
            self.Model[feature] = dict(zip(results["bin"], results.WOE))
    def transform(self,testdf):
        tmpdf = testdf.copy(deep=True)
        for feature in self.Features:
            tmpdf[feature] = pd.cut(tmpdf[feature], self.BinModel[feature], labels=range(1,len(self.BinModel[feature])))
            tmpdf[feature] = tmpdf[feature].cat.add_categories([-1])
            tmpdf[feature] = tmpdf[feature].fillna(-1) 
        return tmpdf.replace(self.Model)
        #return tmpdf.dtypes

In [24]:
class FeatureEngineering(object):

    def __init__(self, path_dir, based_file_name):
        self.based_file_name = based_file_name
        self.dir = path_dir
    
    def balance_dataset(self, X_features_str="case_id", y_features_str="target", 
                                under_ratio=0.5, over_ratio=0.5, batch_size=1000, 
                                strategy = 'ENN', output=False):
        '''
        过采样与欠采样解决标签不均匀的问题
        '''
        df = pd.read_parquet(os.path.join(self.dir, self.based_file_name))
        X = np.array(df[X_features_str]).reshape(-1, 1)
        y = df[y_features_str]
        del df
        
        
        # 计算目标类别分布
        class_distribution = Counter(y)
        majority_class = max(class_distribution, key=class_distribution.get)
        minority_class = min(class_distribution, key=class_distribution.get)
        
        
        # 分批次处理数据
        X_resampled_batches = []
        y_resampled_batches = []
        for i in range(0, len(X), batch_size):
            X_batch = X[i:i+batch_size]
            y_batch = y[i:i+batch_size]
            
            # 计算欠采样和过采样的数量
            under_sample_size_batch = min(int(class_distribution[majority_class] * under_ratio)
                                          , len(X_batch))
            over_sample_size_batch = min(int(class_distribution[minority_class] * over_ratio)
                                          , len(X_batch))
            

            if strategy == 'SMOTEENN':
                smoteenn = SMOTEENN(sampling_strategy={majority_class: under_sample_size_batch, 
                                                   minority_class: over_sample_size_batch})
            elif strategy == 'ENN':
                smoteenn = EditedNearestNeighbours(sampling_strategy='auto', n_neighbors = 125)
            X_resampled_batch, y_resampled_batch = smoteenn.fit_resample(X_batch, y_batch)
            
            X_resampled_batches.append(X_resampled_batch)
            y_resampled_batches.append(y_resampled_batch)
            del X_resampled_batch, y_resampled_batch, X_batch, y_batch
        
        # 合并批次
        self.X_resampled = np.concatenate(X_resampled_batches)
        self.y_resampled = np.concatenate(y_resampled_batches)
        del X_resampled_batches, y_resampled_batches
        
        
        # 导出csv
        if output:
            df_resample = pd.DataFrame({X_features_str: self.X_resampled.flatten(), y_features_str: self.y_resampled})
            df_resample.to_csv("train_base_resample.csv", index=False)
        else:
            return self.X_resampled, self.y_resampled
    
    
    def encoder(self, df):
        '''
        特征编码器
        '''
        cols_woe = []
        for col in df.columns:
            if col in ["case_id"]:
                pass
            elif col in ["WEEK_NUM", "num_group1", "num_group2", "max_num_group1", \
                         "last_num_group1", "max_num_group2", "last_num_group2"]:
                df.drop(pl.col(col))
            elif col in ["date_decision"]:
                pass
            elif df[col].dtype in [pl.Date]:
                         
                df = df.with_columns(
                                month_col = pl.col(col).dt.month().alias(f"month_{col}"),
                                weekday_col = pl.col(col).dt.weekday().alias(f"weekday_{col}"),
                                month_col_str = pl.col(f"month_{col}").dt.month().cast(pl.Utf8).alias(f"weekday_{col}"), 
                                weekday_col_str  = pl.col(f"weekday_{col}").dt.weekday().cast(pl.Utf8).alias(f"weekday_{col}")
                            )
                df.drop(col)
                cols_woe.append(f"month_{col}", f"weekday_{col}")
                         
            elif df[col].dtype in [pl.datatypes.Int64, pl.datatypes.Int32, pl.datatypes.Float64, pl.datatypes.String]:
                #Encoding
                cols_woe.append(col)
                         
        woemodel = WoeConversion(binarytarget='target', features=cols_woe)
        woemodel.fit(df)
        df = woemodel.transform(df)
        
        with open("encoder.pkl", "wb") as f:
            pickle.dump(woemodel, f) #需要时直接调用模型
                         
        return df  
        
        
        
    def df_data_compaction(self, df, base_case_ids):
                
        if not isinstance(base_case_ids, list):
            base_case_ids = list(base_case_ids.reshape((-1,)))
        df = df.filter(
                pl.col("case_id").is_in(base_case_ids))
        df = df.pipe(Pipeline.handle_dates, self.dir, self.based_file_name) #传参可能有bug
        # df = df.pipe(Pipeline.filter_cols)
        return df 
    
    
    def df_concatenates(self, data_store_dict):
        
        # 同一特征是否有重复合并之嫌
        feature_processed = []
        X_train = data_store_dict["df_base"][0]
        for key, values in data_store_dict.items():
            print("Files is being concatenated: ", key)
            for value in values:
                # print("Type of value is ", type(value))
                feature_processing = []
                for column_name in value.columns:       
                    # justification
                    if column_name in feature_processed and not column_name in \
                    ["case_id", "WEEK_NUM", "num_group1", "num_group2", "max_num_group1", "last_num_group1", \
                     "max_num_group2", "last_num_group2", "target"]:
                        
                        X_train = X_train.join(value.select(pl.col("case_id"), pl.col(column_name)), 
                                                 on="case_id", how="outer_coalesce")

                        X_train = X_train.with_columns(
                                    pl.when(pl.col(column_name).is_not_null()).then(pl.col(column_name)) \
                                      .otherwise(pl.col(f"{column_name}_right")).alias(column_name) 
                            ).drop(f"{column_name}_right")
                    elif column_name in \
                    ["WEEK_NUM", "num_group1", "num_group2", "max_num_group1", "last_num_group1", \
                     "max_num_group2", "last_num_group2"]:
                        pass
                    elif column_name == "case_id":
                        feature_processing.append(column_name)
                    elif column_name == "target":
                        pass
                    else: 
                        feature_processing.append(column_name)
                        feature_processed.append(column_name)
                 
                #按照case_id左连接
                X_train = X_train.join(value.select(feature_processing), on='case_id', how="outer_coalesce")
                del feature_processing
        del feature_processed
        gc.collect()
        
        return X_train                          
    
          
        
    def main(self):
    
    
        df_base = read_file(os.path.join(self.dir, self.based_file_name))
        case_ids, _ = self.balance_dataset()
        case_ids = list(case_ids.reshape((-1,)))
        del _
        
        
        with open('temporary_data_store.pkl', 'rb') as f:
            data_dict = {}
            while True:
                try:
                    data_loaded = pickle.load(f)
                    for key, value in data_loaded.items():
                        print("Processing files: ", key)
                        if key == 'df_base':
                            
                            
                            #重采样减少数据量
                            value = value[0]
                            df_base = df_base.filter(df_base['case_id'].is_in(case_ids)) 
                            df_base = df_base.with_columns(
                                month_decision = pl.col("date_decision").dt.month().alias("month_decision"),
                                weekday_decision = pl.col("date_decision").dt.weekday().alias("weekday_decision"),
                            )
                            data_dict.update({key: [df_base]})
                            target = df_base["target"]
                            del df_base
                            
                        elif key == 'depth_0' or 'depth_1' or 'depth_2':
                            #并行化处理函数

                            from concurrent.futures import ThreadPoolExecutor
                            # 创建一个临时字典来存储处理后的 DataFrame
                            temp_value = {}

                            with ThreadPoolExecutor(max_workers=3) as executor:
                                # 对 depth_* 中的每个 DataFrame 进行并行处理
                                futures = {i: executor.submit(self.df_data_compaction, df, case_ids) for i, df in enumerate(value)}

                                # 等待所有任务完成
                                for i, future in futures.items():
                                    # 获取任务的结果
                                    filtered_df = future.result()
                                    # 将处理后的 DataFrame 存储到临时字典中
                                    temp_value[i] = filtered_df
                                
                            data_dict.update({key: list(temp_value.values())})
                        else:
                            data_dict.update({key: value})
                        print("Files processed: ", key)
                        
                        
                except EOFError:
                    # 到达文件末尾
                    break
        
        #合并
        train_data = self.df_concatenates(data_dict)
        #del data_dict
        #初步特征选择
        train_data = train_data.pipe(Pipeline.filter_cols)
        train_data = pd.DataFrame(train_data, columns = train_data.columns)
        #编码
        train_data = self.encoder(train_data)

        # 将数据重新保存到 pkl 文件中
        with open('temporary_data_store.pkl', 'wb') as f:
            pickle.dump(data_dict, f)
      

In [28]:
class FeatureEngineering(object):

    def __init__(self, path_dir, based_file_name):
        self.based_file_name = based_file_name
        self.dir = path_dir
    
    def balance_dataset(self, X_features_str="case_id", y_features_str="target", 
                                under_ratio=0.5, over_ratio=0.5, batch_size=1000, 
                                strategy = 'ENN', output=False):
        '''
        过采样与欠采样解决标签不均匀的问题
        '''
        df = pd.read_parquet(os.path.join(self.dir, self.based_file_name))
        X = np.array(df[X_features_str]).reshape(-1, 1)
        y = df[y_features_str]
        del df
        
        
        # 计算目标类别分布
        class_distribution = Counter(y)
        majority_class = max(class_distribution, key=class_distribution.get)
        minority_class = min(class_distribution, key=class_distribution.get)
        
        
        # 分批次处理数据
        X_resampled_batches = []
        y_resampled_batches = []
        for i in range(0, len(X), batch_size):
            X_batch = X[i:i+batch_size]
            y_batch = y[i:i+batch_size]
            
            # 计算欠采样和过采样的数量
            under_sample_size_batch = min(int(class_distribution[majority_class] * under_ratio)
                                          , len(X_batch))
            over_sample_size_batch = min(int(class_distribution[minority_class] * over_ratio)
                                          , len(X_batch))
            

            if strategy == 'SMOTEENN':
                smoteenn = SMOTEENN(sampling_strategy={majority_class: under_sample_size_batch, 
                                                   minority_class: over_sample_size_batch})
            elif strategy == 'ENN':
                smoteenn = EditedNearestNeighbours(sampling_strategy='auto', n_neighbors = 125)
            X_resampled_batch, y_resampled_batch = smoteenn.fit_resample(X_batch, y_batch)
            
            X_resampled_batches.append(X_resampled_batch)
            y_resampled_batches.append(y_resampled_batch)
            del X_resampled_batch, y_resampled_batch, X_batch, y_batch
        
        # 合并批次
        self.X_resampled = np.concatenate(X_resampled_batches)
        self.y_resampled = np.concatenate(y_resampled_batches)
        del X_resampled_batches, y_resampled_batches
        
        
        # 导出csv
        if output:
            df_resample = pd.DataFrame({X_features_str: self.X_resampled.flatten(), y_features_str: self.y_resampled})
            df_resample.to_csv("train_base_resample.csv", index=False)
        else:
            return self.X_resampled, self.y_resampled
    
    
    def encoder(self, df):
        cols_woe = []
        polars_int = [pl.UInt8, pl.UInt16, pl.UInt32, pl.UInt64, pl.Int64, pl.Int32, pl.Int8, pl.Int16]
        polars_str = [pl.String, str, object, pl.Boolean]
        polars_flt = [pl.Float32, pl.Float64]
        
        for col in df.columns:
            if col == "case_id":
                continue  # 跳过特定列 "case_id"
            elif col == "target":
                df = df.with_columns(pl.col(col).cast(pl.Int32))  # 将 "target" 列转换为 Int32 类型
            elif col in ["WEEK_NUM", "num_group1", "num_group2", "MONTH", "date_decision"]:
                df = df.drop(col)  # 删除特定列
            elif df[col].dtype in polars_int or df[col].dtype in polars_flt:
                # 填充数值类型列的空值为平均值
                cols_woe.append(col)
                df = df.with_columns(pl.col(col).fill_null(pl.col(col).mean()).alias(col))
            elif df[col].dtype in polars_str:
                # 填充字符串类型列的空值为 "NA"
                cols_woe.append(col)
                df = df.with_columns(pl.col(col).fill_null(value="NA").alias(col))

        woemodel = WoeConversion(binarytarget='target', features=cols_woe)
        tuples_list = zip(df.columns, df.dtypes)
        dtype_dict = {k: v for k, v in tuples_list}
        
        # 将 pl.DataType 映射为 python datatype
        dtype_dict = {k: (int if v in polars_int else v) for k, v in dtype_dict.items()}
        dtype_dict = {k: (str if v in polars_str else v) for k, v in dtype_dict.items()}
        dtype_dict = {k: (float if v in polars_flt else v) for k, v in dtype_dict.items()}
        
        #这里要先处理缺失值
        df_pd = pd.DataFrame(df, columns=df.columns).astype(dtype_dict)
        woemodel.fit(df_pd)
        df_pd = woemodel.transform(df_pd)
        with open("encoder.pkl", "wb") as f:
            pickle.dump(woemodel, f) #需要时直接调用模型
                         
        return df_pd  
        
        
        
    def df_data_compaction(self, df, base_case_ids):
                
        if not isinstance(base_case_ids, list):
            base_case_ids = list(base_case_ids.reshape((-1,)))
        df = df.filter(
                pl.col("case_id").is_in(base_case_ids))
        df = df.pipe(Pipeline.handle_dates, self.dir, self.based_file_name) #传参可能有bug
        # df = df.pipe(Pipeline.filter_cols)
        return df 
    
    
    def df_concatenates(self, data_store_dict):
        
        # 同一特征是否有重复合并之嫌
        feature_processed = []
        X_train = data_store_dict["df_base"][0]
        for key, values in data_store_dict.items():
            for value in values:
                feature_processing = []
                for column_name in value.columns:       
                    # justification
                    if column_name in feature_processed and not column_name in data_store_dict["df_base"][0].columns:
                        '''
                                                                               ["case_id", "target", "WEEK_NUM", \
                                                                                "num_group1", "num_group2", "max_num_group1", \
                                                                                "last_num_group1", "max_num_group2", "last_num_group2", \
                                                                                "date_decision", "weekday_decision", "month_decision", \
                                                                                "MONTH"]:
                       '''
                        
                        new_column = X_train.join(value.select(pl.col("case_id"), pl.col(column_name)), 
                                                 on="case_id", how="outer_coalesce")

                        new_column = new_column.with_columns(
                                    pl.when(pl.col(column_name).is_not_null()).then(pl.col(column_name)) \
                                      .otherwise(pl.col(f"{column_name}_right")).alias(column_name) 
                            ).drop(f"{column_name}_right")
                    elif column_name == "case_id":
                        feature_processing.append(column_name)
                    elif column_name == "target":
                        pass
                    elif column_name in data_store_dict["df_base"][0].columns:
                        '''
                                        ["WEEK_NUM", "num_group1", "num_group2",\
                                         "max_num_group1", "last_num_group1", "max_num_group2", "last_num_group2", \
                                         "date_decision", "weekday_decision", "month_decision", "MONTH"]:
                        '''
                        pass
                    elif value[column_name].dtype == pl.Date:
                        pass
                    else: 
                        feature_processing.append(column_name)
                        feature_processed.append(column_name)
                 
                #按照case_id左连接
                X_train = X_train.join(value.select(feature_processing), on='case_id', how="outer_coalesce")
                del feature_processing
        del feature_processed
        gc.collect()
        
        return X_train                          
    
          
        
    def main(self):
    
    
        df_base = read_file(os.path.join(self.dir, self.based_file_name))
        case_ids, _ = self.balance_dataset()
        case_ids = list(case_ids.reshape((-1,)))
        del _
        
        
        with open('temporary_data_store.pkl', 'rb') as f:
            data_dict = {}
            while True:
                try:
                    data_loaded = pickle.load(f)
                    for key, value in data_loaded.items():
                        print("Processing files: ", key)
                        if key == 'df_base':
                            
                            
                            #重采样减少数据量
                            value = value[0]
                            df_base = df_base.filter(df_base['case_id'].is_in(case_ids)) 
                            df_base = df_base.with_columns(
                                month_decision = pl.col("date_decision").dt.month().alias("month_decision"),
                                weekday_decision = pl.col("date_decision").dt.weekday().alias("weekday_decision"),
                            )
                            data_dict.update({key: [df_base]})
                            target = df_base["target"]
                            del df_base
                            
                        elif key == 'depth_0' or 'depth_1' or 'depth_2':
                            #并行化处理函数

                            from concurrent.futures import ThreadPoolExecutor
                            # 创建一个临时字典来存储处理后的 DataFrame
                            temp_value = {}

                            with ThreadPoolExecutor(max_workers=3) as executor:
                                # 对 depth_* 中的每个 DataFrame 进行并行处理
                                futures = {i: executor.submit(self.df_data_compaction, df, case_ids) for i, df in enumerate(value)}

                                # 等待所有任务完成
                                for i, future in futures.items():
                                    # 获取任务的结果
                                    filtered_df = future.result()
                                    # 将处理后的 DataFrame 存储到临时字典中
                                    temp_value[i] = filtered_df
                                    del filtered_df

                            data_dict.update({key: list(temp_value.values())})
                        else:
                            data_dict.update({key: value})
                        print("Files processed: ", key)
                        
                        
                except EOFError:
                    # 到达文件末尾
                    break
        
        #合并
        train_data = self.df_concatenates(data_dict)
        del data_dict
        #初步特征选择
        train_data = train_data.pipe(Pipeline.filter_cols)
        #编码
        train_data = self.encoder(train_data)

        # 将数据重新保存到 pkl 文件中
        with open('train_data_store.pkl', 'wb') as f:
            pickle.dump(train_data, f) 

In [31]:
class TestFeatureEngineering(object):

    
    def __init__(self, path_dir, based_file_name):
        self.based_file_name = based_file_name
        self.dir = path_dir
        
        
    def df_data_compaction(self, df, base_case_ids):
                
        if not isinstance(base_case_ids, list):
            base_case_ids = list(base_case_ids.reshape((-1,)))
        df = df.filter(
                pl.col("case_id").is_in(base_case_ids))
        df = df.pipe(Pipeline.handle_dates, self.dir, self.based_file_name) #传参可能有bug
        # df = df.pipe(Pipeline.filter_cols)
        return df 
    
    
    def df_concatenates(self, data_store_dict):
        
        # 同一特征是否有重复合并之嫌
        feature_processed = []
        X_train = data_store_dict["df_base"][0]
        for key, values in data_store_dict.items():
            for value in values:
                feature_processing = []
                for column_name in value.columns:       
                    # justification
                    if column_name in feature_processed and not column_name in data_store_dict["df_base"][0].columns:
                        '''
                                                                               ["case_id", "target", "WEEK_NUM", \
                                                                                "num_group1", "num_group2", "max_num_group1", \
                                                                                "last_num_group1", "max_num_group2", "last_num_group2", \
                                                                                "date_decision", "weekday_decision", "month_decision", \
                                                                                "MONTH"]:
                       '''
                        
                        new_column = X_train.join(value.select(pl.col("case_id"), pl.col(column_name)), 
                                                 on="case_id", how="outer_coalesce")

                        new_column = new_column.with_columns(
                                    pl.when(pl.col(column_name).is_not_null()).then(pl.col(column_name)) \
                                      .otherwise(pl.col(f"{column_name}_right")).alias(column_name) 
                            ).drop(f"{column_name}_right")
                    elif column_name == "case_id":
                        feature_processing.append(column_name)
                    elif column_name == "target":
                        pass
                    elif column_name in data_store_dict["df_base"][0].columns:
                        '''
                                        ["WEEK_NUM", "num_group1", "num_group2",\
                                         "max_num_group1", "last_num_group1", "max_num_group2", "last_num_group2", \
                                         "date_decision", "weekday_decision", "month_decision", "MONTH"]:
                        '''
                        pass
                    elif value[column_name].dtype == pl.Date:
                        pass
                    else: 
                        feature_processing.append(column_name)
                        feature_processed.append(column_name)
                 
                #按照case_id左连接
                X_train = X_train.join(value.select(feature_processing), on='case_id', how="outer_coalesce")
                del feature_processing
        del feature_processed
        gc.collect()
        
        return X_train
    
    
    def encoding_transform(self, df, encoder_fitted):
        cols_woe = []
        polars_int = [pl.UInt8, pl.UInt16, pl.UInt32, pl.UInt64, pl.Int64, pl.Int32, pl.Int8, pl.Int16]
        polars_str = [pl.String, str, object, pl.Boolean]
        polars_flt = [pl.Float32, pl.Float64]
        
        for col in df.columns:
            if col == "case_id":
                continue  # 跳过特定列 "case_id"
            elif col == "target":
                df = df.with_columns(pl.col(col).cast(pl.Int32))  # 将 "target" 列转换为 Int32 类型
            elif col in ["WEEK_NUM", "num_group1", "num_group2", "MONTH", "date_decision"]:
                df = df.drop(col)  # 删除特定列
            elif df[col].dtype in polars_int or df[col].dtype in polars_flt:
                # 填充数值类型列的空值为平均值
                cols_woe.append(col)
                df = df.with_columns(pl.col(col).fill_null(pl.col(col).mean()).alias(col))
            elif df[col].dtype in polars_str:
                # 填充字符串类型列的空值为 "NA"
                cols_woe.append(col)
                df = df.with_columns(pl.col(col).fill_null(value="NA").alias(col))

        tuples_list = zip(df.columns, df.dtypes)
        dtype_dict = {k: v for k, v in tuples_list}
        
        # 将 pl.DataType 映射为 python datatype
        dtype_dict = {k: (int if v in polars_int else v) for k, v in dtype_dict.items()}
        dtype_dict = {k: (str if v in polars_str else v) for k, v in dtype_dict.items()}
        dtype_dict = {k: (float if v in polars_flt else v) for k, v in dtype_dict.items()}
        
        #这里要先处理缺失值
        df_pd = pd.DataFrame(df, columns=df.columns).astype(dtype_dict)
        
        df_pd = encoder_fitted.transform(df_pd)
        return df_pd
    

    def main(self, encoder_fitted):
    
    
        df_base = read_file(os.path.join(self.dir, self.based_file_name))
        case_ids = df_base.columns
        
        with open('temporary_data_store.pkl', 'rb') as f:
            data_dict = {}
            while True:
                try:
                    data_loaded = pickle.load(f)
                    for key, value in data_loaded.items():
                        print("Processing files: ", key)
                        if key == 'df_base':
                            
                            value = value[0]
                            df_base = df_base.filter(df_base['case_id'].is_in(case_ids)) 
                            df_base = df_base.with_columns(
                                month_decision = pl.col("date_decision").dt.month().alias("month_decision"),
                                weekday_decision = pl.col("date_decision").dt.weekday().alias("weekday_decision"),
                            )
                            data_dict.update({key: [df_base]})
                            del df_base
                            
                        elif key == 'depth_0' or 'depth_1' or 'depth_2':
                            #并行化处理函数

                            from concurrent.futures import ThreadPoolExecutor
                            # 创建一个临时字典来存储处理后的 DataFrame
                            temp_value = {}

                            with ThreadPoolExecutor(max_workers=3) as executor:
                                # 对 depth_* 中的每个 DataFrame 进行并行处理
                                futures = {i: executor.submit(self.df_data_compaction, df, case_ids) for i, df in enumerate(value)}

                                # 等待所有任务完成
                                for i, future in futures.items():
                                    # 获取任务的结果
                                    filtered_df = future.result()
                                    # 将处理后的 DataFrame 存储到临时字典中
                                    temp_value[i] = filtered_df
                                    del filtered_df

                            data_dict.update({key: list(temp_value.values())})
                        else:
                            data_dict.update({key: value})
                        print("Files processed: ", key)
                        
                        
                except EOFError:
                    # 到达文件末尾
                    break
        
        #合并
        test_data = self.df_concatenates(data_dict)
        del data_dict
        #初步特征选择
        with open('train_data_store.pkl', 'rb') as f:
            train_data = pickle.load(train_data, f) 
        test_data = test_data.filter(train_data.columns)
        del train_data
        #编码
        test_data = self.encoding_transform(test_data, encoder_fitted)

        # 将数据重新保存到 pkl 文件中
        with open('test_data_store.pkl', 'wb') as f:
            pickle.dump(test_data, f) 

In [30]:
#del data_store 
gc.collect()
FeatureEngineering(path_dir= TRAIN_DIR, based_file_name="train_base.parquet").main()

Processing files:  df_base
Files processed:  df_base
Processing files:  depth_0
Files processed:  depth_0
Processing files:  depth_1
Files processed:  depth_1
Processing files:  depth_2
Files processed:  depth_2


In [75]:
data_store = {
    "df_base": [read_file(TEST_DIR / "test_base.parquet")],
    "depth_0": [
        read_file(TEST_DIR / "test_static_cb_0.parquet"),
        read_files(TEST_DIR / "test_static_0_*.parquet"),
    ],
    "depth_1": [
        Test.read_files(TEST_DIR / "test_applprev_1_*.parquet", 1),
        read_file(TEST_DIR / "test_tax_registry_a_1.parquet", 1),
        read_file(TEST_DIR / "test_tax_registry_b_1.parquet", 1),
        read_file(TEST_DIR / "test_tax_registry_c_1.parquet", 1),
        read_files(TEST_DIR / "test_credit_bureau_a_1_*.parquet", 1),
        read_file(TEST_DIR / "test_credit_bureau_b_1.parquet", 1),
        read_file(TEST_DIR / "test_other_1.parquet", 1),
        read_file(TEST_DIR / "test_person_1.parquet", 1),
        read_file(TEST_DIR / "test_deposit_1.parquet", 1),
        read_file(TEST_DIR / "test_debitcard_1.parquet", 1),
    ],
    "depth_2": [
        read_file(TEST_DIR / "test_credit_bureau_b_2.parquet", 2),
        read_files(TEST_DIR / "test_credit_bureau_a_2_*.parquet", 2),
        read_file(TEST_DIR / "test_applprev_2.parquet", 2),
        read_file(TEST_DIR / "test_person_2.parquet", 2)
    ]
}

with open('temporary_data_store.pkl', 'wb') as f:
    pickle.dump(data_store, f)

ComputeError: schema lengths differ

In [None]:
with open("encoder.pkl", "rb") as f:
    woe_encoder = pickle.load(f) 

In [None]:
del data_store 
gc.collect()
TestFeatureEngineering(path_dir= TEST_DIR, based_file_name="test_base.parquet").main(woe_encoder)

In [74]:
class Test:  

    def read_files(regex_path, depth=None):
        chunks = []
    
        for path in glob(str(regex_path)):
            df = pl.read_parquet(path)
            df = df.pipe(Pipeline.set_table_dtypes)
            if depth in [1, 2]:
                df = df.group_by("case_id").agg(Aggregator.get_exprs(df))
                for col in df.columns:
                    df = df.with_columns(pl.col(col).fill_null(value="0").alias(col))
            chunks.append(df)
    
        df = pl.concat(chunks, how="vertical_relaxed")
        for col in df.columns:
            df = df.with_columns(pl.col(col).cast(pl.String))
            df = df.with_columns(pl.col(col).replace(df.filter(pl.col(col) == "Temp_null_value"),None).alias(col))
        df = df.pipe(Pipeline.set_table_dtypes)
        df = df.unique(subset=["case_id"])
    
        return df
            
            
    def encoder(df):
        cols_woe = []
        polars_int = [pl.UInt8, pl.UInt16, pl.UInt32, pl.UInt64, pl.Int64, pl.Int32, pl.Int8, pl.Int16]
        polars_str = [pl.String, str, object, pl.Boolean]
        polars_flt = [pl.Float32, pl.Float64]
        
        for col in df.columns:
            if col == "case_id":
                continue  # 跳过特定列 "case_id"
            elif col == "target":
                df = df.with_columns(pl.col(col).cast(pl.Int32))  # 将 "target" 列转换为 Int32 类型
            elif col in ["WEEK_NUM", "num_group1", "num_group2", "MONTH", "date_decision"]:
                df = df.drop(col)  # 删除特定列
            elif df[col].dtype in polars_int or df[col].dtype in polars_flt:
                # 填充数值类型列的空值为平均值
                cols_woe.append(col)
                df = df.with_columns(pl.col(col).fill_null(pl.col(col).mean()).alias(col))
            elif df[col].dtype in polars_str:
                # 填充字符串类型列的空值为 "NA"
                cols_woe.append(col)
                df = df.with_columns(pl.col(col).fill_null(value="NA").alias(col))

        woemodel = WoeConversion(binarytarget='target', features=cols_woe)
        tuples_list = zip(df.columns, df.dtypes)
        dtype_dict = {k: v for k, v in tuples_list}
        
        # 将 pl.DataType 映射为 python datatype
        dtype_dict = {k: (int if v in polars_int else v) for k, v in dtype_dict.items()}
        dtype_dict = {k: (str if v in polars_str else v) for k, v in dtype_dict.items()}
        dtype_dict = {k: (float if v in polars_flt else v) for k, v in dtype_dict.items()}
        
        #这里要先处理缺失值
        df_pd = pd.DataFrame(df, columns=df.columns).astype(dtype_dict)
        
        woemodel.fit(df_pd)
        df_pd = woemodel.transform(df_pd)
        return df_pd