In [33]:
import pandas as pd
import duckdb
from sklearn.pipeline import Pipeline
import joblib
import json
import fastparquet
import os
import tqdm
import numpy as np
import warnings
from IPython import get_ipython

# Suppress all warnings
warnings.filterwarnings("ignore")

from sklearn.metrics import roc_auc_score
from sklearn.model_selection import GridSearchCV

# drawing
import seaborn as sns
import matplotlib.pyplot as plt


# transform
from sklearn.model_selection import train_test_split
from sklearn.model_selection import StratifiedKFold
from sklearn.preprocessing import FunctionTransformer
from sklearn.preprocessing import OneHotEncoder


# estimators
import xgboost as xgb

# functions & classes from neighboring ipynb file


In [4]:
def sample_df(df3, total_rows =  100000, neg_percent = 50, pos_percent = 50):
    df3_pos = df3[df3['flag'] == 1].sample(int(total_rows / 100 * pos_percent))
    df3_neg = df3[df3['flag'] == 0].sample(int(total_rows / 100 * neg_percent))
    df3_pos = df3_pos.reset_index()
    df3_neg = df3_neg.reset_index()
    df3_pos = df3_pos.drop('index', axis=1)
    df3_neg = df3_neg.drop('index', axis=1)
    df3 = pd.concat([df3_pos, df3_neg])
    
    return df3

In [5]:
def mean(lst): 
    return sum(lst) / len(lst) 

In [6]:
def overdue_and_quick_closure_duck(df_time_exp):
    
#     print('overdue started')
    con = duckdb.connect(':memory:')
    
    # new column, product-wise
    df_time_exp['closed_faster_than_expected'] = df_time_exp['pre_fterm'] < df_time_exp['pre_pterm'] 
    df_time_exp['closed_faster_than_expected'] = df_time_exp['closed_faster_than_expected'].apply(lambda x: int(x))

    # new column, product-wise
    df_time_exp['overdue_severity'] = df_time_exp['is_zero_loans5'] + df_time_exp['is_zero_loans530']*2 + df_time_exp['is_zero_loans3060']*3 + df_time_exp['is_zero_loans6090']*4 + df_time_exp['is_zero_loans90']*5
    
    # two new columns, client-wise, ready for aggregation
    result = con.execute('''
    SELECT id, 
    cast(SUM(overdue_severity) as float) / max(rn) AS prone_to_overdue, 
    cast(SUM(closed_faster_than_expected) as float) / max(rn) as prone_to_close_faster
    FROM df_time_exp
    GROUP BY id
    '''
                        ).fetchall()

    result = pd.DataFrame.from_records(result, columns=['id','prone_to_overdue', 'prone_to_close_faster'])
    df_time_exp = pd.merge(result, df_time_exp, on='id', how='left')
    
#     print('overdue ended')
    return df_time_exp

In [7]:
def overdue_and_quick_closure_duck_optimised(df):
    # two new columns, product-wise
#     df = df.sort_values('id')
#     df = df.reset_index()
#     df = df.drop('index', axis=1)
    #     print('ovedue started')
    con = duckdb.connect(':memory:')

    result = con.execute('''


    SELECT  id, 
            pre_fterm, 
            pre_pterm,
               CASE 
                   WHEN pre_fterm < pre_pterm THEN 1 
                   ELSE 0 
               END AS closed_faster_than_expected,
            is_zero_loans5 + is_zero_loans530*2 + is_zero_loans3060*3 + is_zero_loans6090*4 + is_zero_loans90*5 as overdue_severity,
    FROM df

    '''
                        ).fetchall()

    result = pd.DataFrame.from_records(result, columns=['id','pre_fterm', 'pre_pterm', 'closed_faster_than_expected', 'overdue_severity'])

    # result = result.drop('id', axis=1)
    # result


    for col in result.columns:
        if col =='id':
            continue
        df[col] = result[col]
    # df

    con = duckdb.connect(':memory:')

    # two new columns, client-wise, ready for aggregation
    result = con.execute('''
    SELECT id, 
    cast(SUM(overdue_severity) as float) / max(rn) AS prone_to_overdue, 
    cast(SUM(closed_faster_than_expected) as float) / max(rn) as prone_to_close_faster
    FROM df
    GROUP BY id
    '''
                        ).fetchall()

    result = pd.DataFrame.from_records(result, columns=['id','prone_to_overdue', 'prone_to_close_faster'])
    # result

    result2 = con.execute('''
    SELECT df.id, result.prone_to_overdue, result.prone_to_close_faster
    FROM df join result on df.id = result.id
    order by df.id
    '''
                        ).fetchall()

    result2 = pd.DataFrame.from_records(result2, columns=['id','prone_to_overdue', 'prone_to_close_faster'])
    result2
    for col in result2.columns:
        if col =='id':
            continue
        df[col] = result2[col]
#     print('overdue ended')
    return df

In [8]:
def credit_status_prevalent_atm(df):
    # new column, client-wise, ready for aggregation
#     print('credit status started')
    con = duckdb.connect(':memory:')

    con.register('df', df)

    result = con.execute(f'''
    SELECT id, 
    mode(enc_loans_credit_status) as credit_status_prevalent_atm, 
    FROM df
    GROUP BY id
    '''
    ).fetchall()

    result = pd.DataFrame.from_records(result, columns=['id', 'credit_status_prevalent_atm'])
    

    result2 = con.execute('''
    SELECT df.id, result.credit_status_prevalent_atm
    FROM df join result on df.id = result.id
    order by df.id
    '''
                        ).fetchall()

    result2 = pd.DataFrame.from_records(result2, columns=['id','credit_status_prevalent_atm'])
    
    for col in result2.columns:
        if col =='id':
            continue
        df[col] = result2[col]
#     print('credit status ended')
        
    return df

In [9]:
def credit_status_general_mean(df):
    # new column, client-wise, ready for aggregation
#     print('general cs start')
    con = duckdb.connect(':memory:')

    con.register('df', df)

    result = con.execute(f'''
    SELECT id,  
    cast(sum(enc_loans_credit_status) as float) / max(rn) as credit_status_general_mean
    FROM df
    GROUP BY id
    '''
    ).fetchall()


    result = pd.DataFrame.from_records(result, columns=['id','credit_status_general_mean'])
    

    result2 = con.execute('''
    SELECT df.id, result.credit_status_general_mean
    FROM df join result on df.id = result.id
    order by df.id
    '''
                        ).fetchall()

    result2 = pd.DataFrame.from_records(result2, columns=['id','credit_status_general_mean'])
    
    for col in result2.columns:
        if col =='id':
            continue
        df[col] = result2[col]
#     print('general cs end')
    return df

In [10]:
def credit_line_length(df):
    # new column, client-wise, ready for aggregation
#     print('cs length start')
    con = duckdb.connect(':memory:')

    con.register('df', df)

    result = con.execute(f'''
    SELECT id,  
    count(rn) as credit_line_length
    FROM df
    GROUP BY id
    '''
    ).fetchall()


    result = pd.DataFrame.from_records(result, columns=['id','credit_line_length'])
    

    result2 = con.execute('''
    SELECT df.id, result.credit_line_length
    FROM df join result on df.id = result.id
    order by df.id
    '''
                        ).fetchall()

    result2 = pd.DataFrame.from_records(result2, columns=['id','credit_line_length'])
    
    for col in result2.columns:
        if col =='id':
            continue
        df[col] = result2[col]
#     print('cs length end')
    return df

In [11]:
def ohe_auto(df):
    # кодирование нужных колонок
    
    cols_to_encode = []
    
    for col in df.columns:
        if col == 'id':
            continue
        if col == 'flag':
            continue
        value = len(df[col].unique())

        if value == 2:
            continue
        if 3 <= value and value < 10:
            cols_to_encode.append(col)
        if value > 10:
            continue
    
    
    ohe = OneHotEncoder(sparse_output=False)
    df_ohe = pd.DataFrame.from_records(ohe.fit_transform(df[cols_to_encode]), columns=ohe.get_feature_names_out())
    df = pd.concat([df, df_ohe], axis=1)
    
    return df



def ohe_handpicked(df):
    # кодирование нужных колонок
#     print('ohe starting')
    cols_to_encode = [
        'pre_loans_outstanding',
        'pre_loans_max_overdue_sum',
#         'pre_loans_credit_cost_rate',
#         'pre_util',
#         'pre_over2limit',
#         'pre_maxover2limit',
        'enc_loans_account_holder_type',
        'enc_loans_account_cur',
        'enc_loans_credit_status',   
        'enc_loans_credit_type',
    ]
    
    
    ohe = OneHotEncoder(sparse_output=False)
    df_ohe = pd.DataFrame.from_records(ohe.fit_transform(df[cols_to_encode]), columns=ohe.get_feature_names_out())
    df = pd.concat([df, df_ohe], axis=1)
#     print('ohe ended')
    return df



In [12]:
def dropping_cols(df):
#     print('dropping start')
    cols_to_drop = ['enc_paym_0', 'enc_paym_1', 'enc_paym_2', 'enc_paym_3', 'enc_paym_4', 'enc_paym_5', 'enc_paym_6', 'enc_paym_7',
    'enc_paym_8', 'enc_paym_9', 'enc_paym_10', 'enc_paym_11', 'enc_paym_12', 'enc_paym_13', 'enc_paym_14', 'enc_paym_15',
    'enc_paym_16', 'enc_paym_17', 'enc_paym_18', 'enc_paym_19', 'enc_paym_20', 'enc_paym_21', 'enc_paym_22',
    'enc_paym_23', 'enc_paym_24']
    df = df.drop(cols_to_drop, axis=1)
#     print('dropping end')
    return df

In [13]:
def answers_func(df):
    try:
        answers = pd.read_csv('train_target.csv')
    except FileNotFoundError:
        answers = pd.read_csv('train_data/train_target.csv')
    return pd.merge(df, answers, on='id', how='left')
    

In [14]:
def answers_func_optimised(df):
#     print('answers start')

    try:
        answers = pd.read_csv('train_target.csv')
    except FileNotFoundError:
        answers = pd.read_csv('train_data/train_target.csv')
    con = duckdb.connect(':memory:')

    result = con.execute('''
    SELECT df.id, answers.flag
    FROM df join answers on df.id = answers.id 
    order by df.id
    '''
                        ).fetchall()

    result = pd.DataFrame.from_records(result, columns=['id','flag'])
    
    for col in result.columns:
        if col =='id':
            continue
        df[col] = result[col]
        
#     print('answers end')
    return df

def answers_func_optimised_for_pipe(df_og):
#     print('answers start')
    df = df_og.copy()
    try:
        answers = pd.read_csv('train_target.csv')
    except FileNotFoundError:
        answers = pd.read_csv('train_data/train_target.csv')
    con = duckdb.connect(':memory:')

    result = con.execute('''
    SELECT df.id, answers.flag
    FROM df join answers on df.id = answers.id 
    order by df.id
    '''
                        ).fetchall()

    result = pd.DataFrame.from_records(result, columns=['id','flag'])
    
    for col in result.columns:
        if col =='id':
            continue
        df[col] = result[col]
#     print('answers end')
    return df

In [15]:
def read_parquet_dataset_from_local(path_to_dataset: str = 'train_data/', start_from: int = 0,
                                     num_parts_to_read: int = 2, columns=None, verbose=True,
                                   transformers: list = [],
                                   ):
    """
    Предоставленный инструмент по сбору итогового датасета.
    Доработки:
    1) теперь на вход так же принимает лист функций, в дальнейшем они оборачиваются в function_transformer 
    и служат этапами в пайплайне по обработке данных
    
    2) была заменена библиотека для чтения паркет-файлов - pandas и pyarrow моментально 
    убивают ядро - вместо этого используется fastparquet - свою функцию выполняет.
    
    читает num_parts_to_read партиций, преобразовывает их к pd.DataFrame и возвращает
    :param path_to_dataset: путь до директории с партициями
    :param start_from: номер партиции, с которой нужно начать чтение
    :param num_parts_to_read: количество партиций, которые требуется прочитать
    :param columns: список колонок, которые нужно прочитать из партиции
    :return: pd.DataFrame
    """
    
    res = []
    dataset_paths = sorted([os.path.join(path_to_dataset, filename) for filename in os.listdir(path_to_dataset)
                              if filename.startswith('train')])
    print(dataset_paths)

    start_from = max(0, start_from)
    chunks = dataset_paths[start_from: start_from + num_parts_to_read]
    if verbose:
        print('Reading chunks:\n')
        for chunk in chunks:
            print(chunk)
            
            
    transformers_trf = []  
    for i in range(len(transformers)):  
            transformers_trf.append((transformers[i].__name__+'_trf', FunctionTransformer(transformers[i])))  # look for error here!!!!      
    pipeline = Pipeline(transformers_trf) 
    
    for chunk_path in tqdm.tqdm_notebook(chunks, desc="Reading dataset with pandas"): 
#     for chunk_path in tqdm.notebook.tqdm(chunks, desc="Reading dataset with pandas"):
        print('chunk_path', chunk_path)
        chunk = fastparquet.ParquetFile(chunk_path).to_pandas()
        
        chunk = pipeline.fit_transform(chunk)  # look for error here!!!!      
        
        res.append(chunk)
        
        
    res = pd.concat(res).reset_index(drop=True)
    
    keys = res.isnull().sum().keys()
    nan_values = res.isnull().sum()
    cols_w_nans = []
    for key in keys:
        if nan_values[key] > 0:
            cols_w_nans.append(key)

    for col in cols_w_nans:
        res.loc[res[col].isna() == True, col] = 0
    
    
    
    return res

In [16]:
def read_parquet_dataset_from_local_og(path_to_dataset: str, start_from: int = 0,
                                     num_parts_to_read: int = 2, columns=None, verbose=True,
                                   ) -> pd.DataFrame:
    """
    Предоставленный инструмент по сбору итогового датасета.
    Доработки:
    1) была заменена библиотека для чтения паркет-файлов - pandas и pyarrow моментально 
    убивают ядро - вместо этого используется fastparquet - свою функцию выполняет.
    
    читает num_parts_to_read партиций, преобразовывает их к pd.DataFrame и возвращает
    :param path_to_dataset: путь до директории с партициями
    :param start_from: номер партиции, с которой нужно начать чтение
    :param num_parts_to_read: количество партиций, которые требуется прочитать
    :param columns: список колонок, которые нужно прочитать из партиции
    :return: pd.DataFrame
    """

    res = []
    dataset_paths = sorted([os.path.join(path_to_dataset, filename) for filename in os.listdir(path_to_dataset)
                              if filename.startswith('train')])
    print(dataset_paths)

    start_from = max(0, start_from)
    chunks = dataset_paths[start_from: start_from + num_parts_to_read]
    if verbose:
        print('Reading chunks:\n')
        for chunk in chunks:
            print(chunk)
            
    
    for chunk_path in tqdm.tqdm_notebook(chunks, desc="Reading dataset with pandas"): 
#     for chunk_path in tqdm.notebook.tqdm(chunks, desc="Reading dataset with pandas"):
        print('chunk_path', chunk_path)
        chunk = fastparquet.ParquetFile(chunk_path).to_pandas()
        
        
        res.append(chunk)
        
        
    res = pd.concat(res).reset_index(drop=True)
    
    return res

In [17]:
def str_cols_for_sql_2(df) -> str:    
    tst = str([f't1.{col}' for col in  df.columns]).replace("'",'').replace('\n','').replace(' ','').replace('Index([','').replace(']','').replace(',dtype=object)','').replace('[','')
    return tst

In [18]:
def max_rn(df):
    con = duckdb.connect(':memory:')

    con.register('df', df)

    result = con.execute(f'''
    SELECT {str_cols_for_sql_2(df)}
    FROM df t1
    JOIN (
        SELECT id, MAX(rn) AS max_rn
        FROM df
        GROUP BY id
    ) t2 ON t1.id = t2.id AND t1.rn = t2.max_rn;
    '''
    ).fetchall()


    result = pd.DataFrame.from_records(result, columns=[col for col in df.columns])
    
    result = result.sort_values(by='id')
    result = result.reset_index(drop=True)

    return result


In [19]:
def zero_imputer(res):
#     print('zero_imputer start')
    keys = res.isnull().sum().keys()
    nan_values = res.isnull().sum()
    cols_w_nans = []
    for key in keys:
        if nan_values[key] > 0:
            cols_w_nans.append(key)

    for col in cols_w_nans:
        res.loc[res[col].isna() == True, col] = 0        
#     print('zero_imputer end')   
    return res

In [20]:

def custom_agregating(df):
    
#     print('custom agg start')
    data  = df.id.unique()
    tst_df = pd.DataFrame.from_records([])
    tst_df['id'] = data
    
    old_cols = [
    'pre_since_opened',
    'pre_since_confirmed',
    'pre_loans_outstanding',
    'pre_loans_total_overdue',
    'pre_loans_max_overdue_sum',
    'pre_loans_credit_cost_rate',
    'pre_loans5',
    'pre_loans530',
    'pre_loans3060',
    'pre_loans6090',
    'pre_loans90',
    'pre_util',
    'pre_over2limit',
    'pre_maxover2limit',
    'is_zero_util',
    'is_zero_over2limit',
    'is_zero_maxover2limit',
    'enc_loans_account_holder_type',
    'enc_loans_credit_status',
    'enc_loans_credit_type',
    'enc_loans_account_cur',
    'pclose_flag',
    'fclose_flag',
    ]
    
    for old_col in old_cols:
        
        
        
        temp = df[old_col].value_counts().keys() #[df.id == 2500000]
        new_cols = []
        for entry in temp:
            new_cols.append(old_col+'_'+str(entry).replace('.0',''))
        
    
        for col in new_cols:
            number = col[-2:].replace('_','')



            con = duckdb.connect(':memory:')

            con.register('df', df)
            result = con.execute(f'''


            select id, count({old_col}) as {col}
            FROM df
            where {old_col} = {number}
            group by id


            '''
            ).fetchall()


            result = pd.DataFrame.from_records(result, columns=['id', f'{old_col}_{number}'])
            tst_df = pd.merge(tst_df, result, on='id', how='left')
            
    
    con = duckdb.connect(':memory:')

    con.register('df', df)

    result = con.execute(f'''
    SELECT {str_cols_for_sql_2(df)}
    FROM df t1
    JOIN (
        SELECT id, MAX(rn) AS max_rn
        FROM df
        GROUP BY id
    ) t2 ON t1.id = t2.id AND t1.rn = t2.max_rn;
    '''
    ).fetchall()


    result = pd.DataFrame.from_records(result, columns=[col for col in df.columns])
    
    result = result.sort_values(by='id')
    result = result.reset_index(drop=True)
    
    df_result = pd.merge(tst_df, result, on='id', how='left')
    
#     print('custom agg end')
    return df_result

In [21]:
def custom_agregating_optimised(df):
#     print('custom agg start')
    data  = df.id.unique()
    tst_df = pd.DataFrame.from_records([])
    tst_df['id'] = data
    
    old_cols = [
    'pre_since_opened',
    'pre_since_confirmed',
    'pre_loans_outstanding',
    'pre_loans_total_overdue',
    'pre_loans_max_overdue_sum',
    'pre_loans_credit_cost_rate',
    'pre_loans5',
    'pre_loans530',
    'pre_loans3060',
    'pre_loans6090',
    'pre_loans90',
    'pre_util',
    'pre_over2limit',
    'pre_maxover2limit',
    'is_zero_util',
    'is_zero_over2limit',
    'is_zero_maxover2limit',
    'enc_loans_account_holder_type',
    'enc_loans_credit_status',
    'enc_loans_credit_type',
    'enc_loans_account_cur',
    'pclose_flag',
    'fclose_flag',
    ]
    
    for old_col in old_cols:
        
        
        
        temp = df[old_col].value_counts().keys() #[df.id == 2500000]
        new_cols = []
        for entry in temp:
            new_cols.append(old_col+'_'+str(entry).replace('.0',''))
        
        for col in new_cols:
#             print(col)
            number = col[-2:].replace('_','')



            con = duckdb.connect(':memory:')

            result = con.execute(f'''


            select id, count({old_col}) as {col}
            FROM df
            where {old_col} = {number}
            group by id


            '''
            ).fetchall()


            result = pd.DataFrame.from_records(result, columns=['id', f'{old_col}_{number}_agg'])
            
            result2 = con.execute(f'''
            SELECT tst_df.id, result.{old_col}_{number}_agg
            FROM tst_df left join result on tst_df.id = result.id
            order by tst_df.id
            '''
                                ).fetchall()
            result2 = pd.DataFrame.from_records(result2, columns=['id',f'{old_col}_{number}_agg'])

            for col in result2.columns:
                if col =='id':
                    continue
                tst_df[col] = result2[col]
    
    tst_df = zero_imputer(tst_df)
        
    con = duckdb.connect(':memory:')

    result = con.execute(f'''
    SELECT {str_cols_for_sql_2(df)}
    FROM df t1
    JOIN (
        SELECT id, MAX(rn) AS max_rn
        FROM df
        GROUP BY id
    ) t2 ON t1.id = t2.id AND t1.rn = t2.max_rn;
    '''
    ).fetchall()


    result = pd.DataFrame.from_records(result, columns=[col for col in df.columns])
    
    result = result.sort_values(by='id')
    result = result.reset_index(drop=True)
    
#     df_result = pd.merge(tst_df, result, on='id', how='left')
    for col in result.columns:
        if col =='id':
            continue
        tst_df[col] = result[col]
        
#     print('custom agg end')   
    return tst_df

In [22]:
def custom_agregating_optimised_for_pipe(df):
#     print('custom agg start')
    data  = df.id.unique()
    tst_df = pd.DataFrame.from_records([])
    tst_df['id'] = data
    
    old_cols = [
    'pre_since_opened',
    'pre_since_confirmed',
    'pre_loans_outstanding',
    'pre_loans_total_overdue',
    'pre_loans_max_overdue_sum',
    'pre_loans_credit_cost_rate',
    'pre_loans5',
    'pre_loans530',
    'pre_loans3060',
    'pre_loans6090',
    'pre_loans90',
    'pre_util',
    'pre_over2limit',
    'pre_maxover2limit',
    'is_zero_util',
    'is_zero_over2limit',
    'is_zero_maxover2limit',
    'enc_loans_account_holder_type',
    'enc_loans_credit_status',
    'enc_loans_credit_type',
    'enc_loans_account_cur',
    'pclose_flag',
    'fclose_flag',
    ]
    
    for old_col in old_cols:
        
        
        
        temp = df[old_col].value_counts().keys() #[df.id == 2500000]
        new_cols = []
        for entry in temp:
            new_cols.append(old_col+'_'+str(entry).replace('.0',''))
        
        for col in new_cols:
#             print(col)
            number = col[-2:].replace('_','')



            con = duckdb.connect(':memory:')

            result = con.execute(f'''


            select id, count({old_col}) as {col}
            FROM df
            where {old_col} = {number}
            group by id


            '''
            ).fetchall()


            result = pd.DataFrame.from_records(result, columns=['id', f'{old_col}_{number}_agg'])
            
            result2 = con.execute(f'''
            SELECT tst_df.id, result.{old_col}_{number}_agg
            FROM tst_df left join result on tst_df.id = result.id
            order by tst_df.id
            '''
                                ).fetchall()
            result2 = pd.DataFrame.from_records(result2, columns=['id',f'{old_col}_{number}_agg'])

            for col in result2.columns:
                if col =='id':
                    continue
                tst_df[col] = result2[col]
    
    tst_df = zero_imputer(tst_df)
        
    con = duckdb.connect(':memory:')

    result = con.execute(f'''
    SELECT {str_cols_for_sql_2(df)}
    FROM df t1
    JOIN (
        SELECT id, MAX(rn) AS max_rn
        FROM df
        GROUP BY id
    ) t2 ON t1.id = t2.id AND t1.rn = t2.max_rn;
    '''
    ).fetchall()


    result = pd.DataFrame.from_records(result, columns=[col for col in df.columns])
    
    result = result.sort_values(by='id')
    result = result.reset_index(drop=True)
    
#     df_result = pd.merge(tst_df, result, on='id', how='left')
    for col in result.columns:
        if col =='id':
            continue
        tst_df[col] = result[col]
        
#     print('custom agg end')  
    
    return tst_df.drop('flag', axis=1), tst_df['flag']


In [23]:
def outstanding_loans_summ(df):
#     print('outstanding_loans_summ start')
    # сумма неоплаченных долгов по займам
    
    df = df.sort_values('id')
    df = df.reset_index()
    df = df.drop('index', axis=1)
    
    
    con = duckdb.connect(':memory:')

    con.register('df', df)

    result = con.execute(f'''
    SELECT id,  
    sum(pre_loans_outstanding) as outstanding_loans_summ
    FROM df
    GROUP BY id
    '''
    ).fetchall()

    result = pd.DataFrame.from_records(result, columns=['id','outstanding_loans_summ'])

    result2 = con.execute('''
    SELECT df.id, result.outstanding_loans_summ
    FROM df join result on df.id = result.id
    order by df.id
    '''
                        ).fetchall()

    result2 = pd.DataFrame.from_records(result2, columns=['id','outstanding_loans_summ'])
    
    for col in result2.columns:
        if col =='id':
            continue
        df[col] = result2[col]
    
#     print('outstanding_loans_summ end')     
        
    return df


def outstanding_loans_summ_sq(df):
    
    # сумма неоплаченных долгов по займам
    con = duckdb.connect(':memory:')

    con.register('df', df)

    result = con.execute(f'''
    SELECT id,  
    power(sum(pre_loans_outstanding), 2) as outstanding_loans_summ_sq
    FROM df
    GROUP BY id
    '''
    ).fetchall()


    result = pd.DataFrame.from_records(result, columns=['id','outstanding_loans_summ_sq'])
    
    result2 = con.execute('''
    SELECT df.id, result.outstanding_loans_summ_sq
    FROM df join result on df.id = result.id
    order by df.id
    '''
                        ).fetchall()

    result2 = pd.DataFrame.from_records(result2, columns=['id','outstanding_loans_summ_sq'])
    
    for col in result2.columns:
        if col =='id':
            continue
        df[col] = result2[col]
#     print('outstanding_loans_summ_sq end')     
    return df

In [24]:
def loans_overdue_summ(df):
#     print('loans_overdue_summ start')
    # сумма неоплаченных долгов по займам
    con = duckdb.connect(':memory:')

    con.register('df', df)

    result = con.execute(f'''
    SELECT id,  
    sum(pre_loans_total_overdue) as loans_overdue_summ
    FROM df
    GROUP BY id
    '''
    ).fetchall()


    result = pd.DataFrame.from_records(result, columns=['id','loans_overdue_summ'])
    
    result2 = con.execute('''
    SELECT df.id, result.loans_overdue_summ
    FROM df join result on df.id = result.id
    order by df.id
    '''
                        ).fetchall()

    result2 = pd.DataFrame.from_records(result2, columns=['id','loans_overdue_summ'])
    
    for col in result2.columns:
        if col =='id':
            continue
        df[col] = result2[col]
#     print('loans_overdue_summ end')    
    return df


def loans_overdue_summ_sq(df):
#     print('loans_overdue_summ start')
    # сумма неоплаченных долгов по займам в квадрате
    con = duckdb.connect(':memory:')

    con.register('df', df)

    result = con.execute(f'''
    SELECT id,  
    power(sum(pre_loans_total_overdue), 2) as loans_overdue_summ_sq
    FROM df
    GROUP BY id
    '''
    ).fetchall()


    result = pd.DataFrame.from_records(result, columns=['id','loans_overdue_summ_sq'])

    result2 = con.execute('''
    SELECT df.id, result.loans_overdue_summ_sq
    FROM df join result on df.id = result.id
    order by df.id
    '''
                        ).fetchall()

    result2 = pd.DataFrame.from_records(result2, columns=['id','loans_overdue_summ_sq'])
    
    for col in result2.columns:
        if col =='id':
            continue
        df[col] = result2[col]
#     print('loans_overdue_summ_sq end')
    return df


In [25]:
def max_overdue(df):
#     print('max_overdue start')
    # максимальная просрочка за всю кредитную историю 
    con = duckdb.connect(':memory:')

    con.register('df', df)

    result = con.execute(f'''
    SELECT id,  
    max(pre_loans_max_overdue_sum) as max_overdue
    FROM df
    GROUP BY id
    '''
    ).fetchall()


    result = pd.DataFrame.from_records(result, columns=['id','max_overdue'])
    
    result2 = con.execute('''
    SELECT df.id, result.max_overdue
    FROM df join result on df.id = result.id
    order by df.id
    '''
                        ).fetchall()

    result2 = pd.DataFrame.from_records(result2, columns=['id','max_overdue'])
    
    for col in result2.columns:
        if col =='id':
            continue
        df[col] = result2[col]
#     print('max_overdue end')
    return df


In [26]:
def credit_limit(df):
#     print('credit_limit start')
    # максимальная кредитный лимит за всю кредитную историю
    con = duckdb.connect(':memory:')

    con.register('df', df)

    result = con.execute(f'''
    SELECT id,  
    max(pre_loans_credit_limit) as credit_limit
    FROM df
    GROUP BY id
    '''
    ).fetchall()


    result = pd.DataFrame.from_records(result, columns=['id','credit_limit'])
    
    result2 = con.execute('''
    SELECT df.id, result.credit_limit
    FROM df join result on df.id = result.id
    order by df.id
    '''
                        ).fetchall()

    result2 = pd.DataFrame.from_records(result2, columns=['id','credit_limit'])
    
    for col in result2.columns:
        if col =='id':
            continue
        df[col] = result2[col]
#     print('credit_limit end')
    return df


In [27]:
def credit_costs(df):
#     print('credit_costs start')
    # сумма дохода от пользования кредитными продуктами
    con = duckdb.connect(':memory:')

    con.register('df', df)

    result = con.execute(f'''
    SELECT id,  
    sum(pre_loans_credit_cost_rate) as credit_costs
    FROM df
    GROUP BY id
    '''
    ).fetchall()


    result = pd.DataFrame.from_records(result, columns=['id','credit_costs'])
    
    result2 = con.execute('''
    SELECT df.id, result.credit_costs
    FROM df join result on df.id = result.id
    order by df.id
    '''
                        ).fetchall()

    result2 = pd.DataFrame.from_records(result2, columns=['id','credit_costs'])
    
    for col in result2.columns:
        if col =='id':
            continue
        df[col] = result2[col]
#     print('credit_costs end')
    return df

def credit_costs_sq(df):
#     print('credit_costs_sq start')
    # сумма дохода от пользования кредитными продуктами в квадрате
    con = duckdb.connect(':memory:')

    con.register('df', df)

    result = con.execute(f'''
    SELECT id,  
    power(sum(pre_loans_credit_cost_rate), 2) as credit_costs_sq
    FROM df
    GROUP BY id
    '''
    ).fetchall()


    result = pd.DataFrame.from_records(result, columns=['id','credit_costs_sq'])
    
    result2 = con.execute('''
    SELECT df.id, result.credit_costs_sq
    FROM df join result on df.id = result.id
    order by df.id
    '''
                        ).fetchall()

    result2 = pd.DataFrame.from_records(result2, columns=['id','credit_costs_sq'])
    
    for col in result2.columns:
        if col =='id':
            continue
        df[col] = result2[col]
#     print('credit_costs_sq end')
    return df


default_transformers = [answers_func_optimised_for_pipe,
             outstanding_loans_summ,
             outstanding_loans_summ_sq,
             loans_overdue_summ,
             loans_overdue_summ_sq,
             credit_limit,
             credit_costs,
             credit_costs_sq,
             max_overdue,
             credit_status_general_mean,
             credit_status_prevalent_atm,
             overdue_and_quick_closure_duck_optimised,
             credit_line_length,
             ohe_handpicked,
             dropping_cols,
             custom_agregating_optimised,
            ]

def transform(df,
                 transformers: list = default_transformers,
                ) -> pd.DataFrame:


  
            
                
    transformers_trf = []  
    for i in range(len(transformers)):  
            transformers_trf.append((transformers[i].__name__+'_trf', FunctionTransformer(transformers[i])))  # look for error here!!!!      
    pipeline = Pipeline(transformers_trf) 
    

    res = pipeline.fit_transform(df)  # look for error here!!!!      

        
        
    return res

In [28]:
def column_checker(df):

    try:
        with open('all_columns_list.txt', 'r') as file:
            all_columns = [line.strip() for line in file.readlines()]

    except FileNotFoundError:

        all_columns = [
            'closed_faster_than_expected',     'credit_costs',     'credit_costs_sq',     'credit_limit',     'credit_line_length',     'credit_status_general_mean',     'credit_status_prevalent_atm',     'enc_loans_account_cur',     'enc_loans_account_cur_0',     'enc_loans_account_cur_0_agg',     'enc_loans_account_cur_1',     'enc_loans_account_cur_1_agg',     'enc_loans_account_cur_2',     'enc_loans_account_cur_2_agg',     'enc_loans_account_cur_3',     'enc_loans_account_cur_3_agg',     'enc_loans_account_holder_type',     'enc_loans_account_holder_type_0',     'enc_loans_account_holder_type_0_agg',     'enc_loans_account_holder_type_1',     'enc_loans_account_holder_type_1_agg',     'enc_loans_account_holder_type_2',     'enc_loans_account_holder_type_2_agg',
         'enc_loans_account_holder_type_3',     'enc_loans_account_holder_type_3_agg',     'enc_loans_account_holder_type_4',     'enc_loans_account_holder_type_4_agg',     'enc_loans_account_holder_type_5',     'enc_loans_account_holder_type_5_agg',     'enc_loans_account_holder_type_6',     'enc_loans_account_holder_type_6_agg',     'enc_loans_credit_status',     'enc_loans_credit_status_0',     'enc_loans_credit_status_0_agg',     'enc_loans_credit_status_1',     'enc_loans_credit_status_1_agg',     'enc_loans_credit_status_2',     'enc_loans_credit_status_2_agg',     'enc_loans_credit_status_3',     'enc_loans_credit_status_3_agg',     'enc_loans_credit_status_4',     'enc_loans_credit_status_4_agg',     'enc_loans_credit_status_5',     'enc_loans_credit_status_5_agg',     'enc_loans_credit_status_6',     'enc_loans_credit_status_6_agg',     'enc_loans_credit_type',     'enc_loans_credit_type_0',
         'enc_loans_credit_type_0_agg',    'enc_loans_credit_type_1',     'enc_loans_credit_type_1_agg',     'enc_loans_credit_type_2',     'enc_loans_credit_type_2_agg',     'enc_loans_credit_type_3',     'enc_loans_credit_type_3_agg',     'enc_loans_credit_type_4',     'enc_loans_credit_type_4_agg',     'enc_loans_credit_type_5',     'enc_loans_credit_type_5_agg',     'enc_loans_credit_type_6',     'enc_loans_credit_type_6_agg',     'enc_loans_credit_type_7',     'enc_loans_credit_type_7_agg',     'fclose_flag',     'fclose_flag_0_agg',     'fclose_flag_1_agg',     'flag',     'id',     'is_zero_loans3060',     'is_zero_loans5',     'is_zero_loans530',     'is_zero_loans6090',     'is_zero_loans90',     'is_zero_maxover2limit',
         'is_zero_maxover2limit_0_agg',     'is_zero_maxover2limit_1_agg',     'is_zero_over2limit',     'is_zero_over2limit_0_agg',     'is_zero_over2limit_1_agg',     'is_zero_util',     'is_zero_util_0_agg',     'is_zero_util_1_agg',     'loans_overdue_summ',     'loans_overdue_summ_sq',     'max_overdue',     'outstanding_loans_summ',     'outstanding_loans_summ_sq',     'overdue_severity',     'pclose_flag',     'pclose_flag_0_agg',     'pclose_flag_1_agg',     'pre_fterm',     'pre_loans3060',     'pre_loans3060_0_agg',     'pre_loans3060_1_agg',     'pre_loans3060_2_agg',     'pre_loans3060_3_agg',     'pre_loans3060_4_agg',     'pre_loans3060_5_agg',     'pre_loans3060_6_agg',     'pre_loans3060_7_agg',
         'pre_loans3060_8_agg',     'pre_loans3060_9_agg',     'pre_loans5',     'pre_loans530',     'pre_loans530_0_agg',     'pre_loans530_10_agg',     'pre_loans530_11_agg',     'pre_loans530_12_agg',     'pre_loans530_13_agg',     'pre_loans530_14_agg',     'pre_loans530_15_agg',     'pre_loans530_16_agg',     'pre_loans530_17_agg',     'pre_loans530_18_agg',     'pre_loans530_19_agg',     'pre_loans530_1_agg',     'pre_loans530_2_agg',     'pre_loans530_3_agg',     'pre_loans530_4_agg',     'pre_loans530_5_agg',     'pre_loans530_6_agg',     'pre_loans530_7_agg',     'pre_loans530_8_agg',     'pre_loans530_9_agg',     'pre_loans5_0_agg',     'pre_loans5_10_agg',     'pre_loans5_11_agg',     'pre_loans5_13_agg',     'pre_loans5_16_agg',     'pre_loans5_1_agg',
         'pre_loans5_2_agg',     'pre_loans5_3_agg',     'pre_loans5_5_agg',     'pre_loans5_6_agg',     'pre_loans5_7_agg',     'pre_loans5_8_agg',     'pre_loans5_9_agg',     'pre_loans6090',     'pre_loans6090_0_agg',     'pre_loans6090_1_agg',     'pre_loans6090_2_agg',     'pre_loans6090_3_agg',     'pre_loans6090_4_agg',     'pre_loans90',     'pre_loans90_10_agg',     'pre_loans90_13_agg',     'pre_loans90_14_agg',     'pre_loans90_19_agg',     'pre_loans90_2_agg',     'pre_loans90_3_agg',     'pre_loans90_8_agg',     'pre_loans_credit_cost_rate',     'pre_loans_credit_cost_rate_0_agg',     'pre_loans_credit_cost_rate_10_agg',     'pre_loans_credit_cost_rate_11_agg',     'pre_loans_credit_cost_rate_12_agg',     'pre_loans_credit_cost_rate_13_agg',     'pre_loans_credit_cost_rate_1_agg',     'pre_loans_credit_cost_rate_2_agg',     'pre_loans_credit_cost_rate_3_agg',     'pre_loans_credit_cost_rate_4_agg',     'pre_loans_credit_cost_rate_5_agg',     'pre_loans_credit_cost_rate_6_agg',     'pre_loans_credit_cost_rate_7_agg',
         'pre_loans_credit_cost_rate_8_agg',     'pre_loans_credit_cost_rate_9_agg',     'pre_loans_credit_limit',     'pre_loans_max_overdue_sum',     'pre_loans_max_overdue_sum_0',     'pre_loans_max_overdue_sum_0_agg',     'pre_loans_max_overdue_sum_1',     'pre_loans_max_overdue_sum_1_agg',     'pre_loans_max_overdue_sum_2',     'pre_loans_max_overdue_sum_2_agg',     'pre_loans_max_overdue_sum_3',     'pre_loans_max_overdue_sum_3_agg',     'pre_loans_next_pay_summ',     'pre_loans_outstanding',     'pre_loans_outstanding_1',     'pre_loans_outstanding_1_agg',     'pre_loans_outstanding_2',     'pre_loans_outstanding_2_agg',     'pre_loans_outstanding_3',     'pre_loans_outstanding_3_agg',     'pre_loans_outstanding_4',     'pre_loans_outstanding_4_agg',     'pre_loans_outstanding_5',     'pre_loans_outstanding_5_agg',     'pre_loans_total_overdue',
         'pre_loans_total_overdue_0_agg',     'pre_loans_total_overdue_1_agg',     'pre_maxover2limit',     'pre_maxover2limit_0_agg',     'pre_maxover2limit_10_agg',     'pre_maxover2limit_11_agg',     'pre_maxover2limit_12_agg',     'pre_maxover2limit_13_agg',     'pre_maxover2limit_14_agg',     'pre_maxover2limit_15_agg',     'pre_maxover2limit_16_agg',     'pre_maxover2limit_17_agg',     'pre_maxover2limit_18_agg',     'pre_maxover2limit_19_agg',     'pre_maxover2limit_1_agg',     'pre_maxover2limit_2_agg',     'pre_maxover2limit_3_agg',     'pre_maxover2limit_4_agg',     'pre_maxover2limit_5_agg',     'pre_maxover2limit_6_agg',     'pre_maxover2limit_7_agg',     'pre_maxover2limit_8_agg',     'pre_maxover2limit_9_agg',     'pre_over2limit',     'pre_over2limit_0_agg',     'pre_over2limit_10_agg',
         'pre_over2limit_11_agg',     'pre_over2limit_12_agg',     'pre_over2limit_13_agg',     'pre_over2limit_14_agg',     'pre_over2limit_15_agg',     'pre_over2limit_16_agg',     'pre_over2limit_17_agg',     'pre_over2limit_18_agg',     'pre_over2limit_19_agg',     'pre_over2limit_1_agg',     'pre_over2limit_2_agg',     'pre_over2limit_3_agg',     'pre_over2limit_4_agg',     'pre_over2limit_5_agg',     'pre_over2limit_6_agg',     'pre_over2limit_7_agg',     'pre_over2limit_8_agg',     'pre_over2limit_9_agg',     'pre_pterm',     'pre_since_confirmed',     'pre_since_confirmed_0_agg',     'pre_since_confirmed_10_agg',     'pre_since_confirmed_11_agg',     'pre_since_confirmed_12_agg',     'pre_since_confirmed_13_agg',     'pre_since_confirmed_14_agg',     'pre_since_confirmed_15_agg',     'pre_since_confirmed_16_agg',
         'pre_since_confirmed_17_agg',     'pre_since_confirmed_1_agg',     'pre_since_confirmed_2_agg',     'pre_since_confirmed_3_agg',     'pre_since_confirmed_4_agg',     'pre_since_confirmed_5_agg',     'pre_since_confirmed_6_agg',     'pre_since_confirmed_7_agg',     'pre_since_confirmed_8_agg',     'pre_since_confirmed_9_agg',     'pre_since_opened',     'pre_since_opened_0_agg',     'pre_since_opened_10_agg',     'pre_since_opened_11_agg',     'pre_since_opened_12_agg',     'pre_since_opened_13_agg',     'pre_since_opened_14_agg',
         'pre_since_opened_15_agg',     'pre_since_opened_16_agg',     'pre_since_opened_17_agg',     'pre_since_opened_18_agg',     'pre_since_opened_19_agg',     'pre_since_opened_1_agg',     'pre_since_opened_2_agg',     'pre_since_opened_3_agg',     'pre_since_opened_4_agg',     'pre_since_opened_5_agg',     'pre_since_opened_6_agg',     'pre_since_opened_7_agg',     'pre_since_opened_8_agg',     'pre_since_opened_9_agg',     'pre_till_fclose',     'pre_till_pclose',     'pre_util',     'pre_util_0_agg',     'pre_util_10_agg',     'pre_util_11_agg',     'pre_util_12_agg',     'pre_util_13_agg',
         'pre_util_14_agg',     'pre_util_15_agg',     'pre_util_16_agg',     'pre_util_17_agg',     'pre_util_18_agg',     'pre_util_19_agg',     'pre_util_1_agg',     'pre_util_2_agg',     'pre_util_3_agg',     'pre_util_4_agg',     'pre_util_5_agg',     'pre_util_6_agg',
         'pre_util_7_agg',     'pre_util_8_agg',     'pre_util_9_agg',     'prone_to_close_faster',     'prone_to_overdue',
         'rn']
        with open('all_columns_list.txt', 'w') as file:
            for item in all_columns:
                file.write(item + '\n')
                
        with open('all_columns_list.txt', 'r') as file:
            all_columns = [line.strip() for line in file.readlines()]
    
    missing_columns = []
    
    for col in all_columns:
        if col not in df.columns:
            missing_columns.append(col)
    
    for col in missing_columns:
        df[col] = 0
            
    return df.reindex(sorted(df.columns), axis=1)

# base class

In [39]:
class Grad_2:
    'This class is made for streamlining the process of data preparation and making predictions with my model of choice'

    
    def __init__(self, path_to_dataset = 'train_data/', default_transformers = list):
        """
        Initializes Grad_2 

        :param path_to_dataset: path to fragmented dataset
        :param default_transformers: list of predetermined transformers for data
        """
        self.path_to_dataset = path_to_dataset
        self.default_transformers = default_transformers
        
        
    def prepare_dataset(self, path_to_dataset: str, start_from: int = 0,
                                         num_parts_to_read: int = 2, columns=None, verbose=True,
                                       transformers: list = [],
                                       ) -> pd.DataFrame:
        """
        Предоставленный инструмент по сбору итогового датасета.
        
        Доработки:
        1) теперь на вход так же принимает лист функций, в дальнейшем они оборачиваются в function_transformer 
        и служат этапами в пайплайне по обработке данных

        2) была заменена библиотека для чтения паркет-файлов - pandas и pyarrow у меня моментально 
        убивают ядро - вместо этого используется fastparquet - свою функцию выполняет.
        
        3) в следствии различного наполнения кусков сета данными и особенностей агрегации "на выходе" добавлен код, 
        который заполняет пустоты нулями, чтобы модель не ругалась

        читает num_parts_to_read партиций, преобразовывает их к pd.DataFrame и возвращает
        :param path_to_dataset: путь до директории с партициями
        :param start_from: номер партиции, с которой нужно начать чтение
        :param num_parts_to_read: количество партиций, которые требуется прочитать
        :param columns: список колонок, которые нужно прочитать из партиции
---new! :param transformers: лист функций-трансформеров данных                        
        :return: pd.DataFrame
        """

        res = []
        dataset_paths = sorted([os.path.join(path_to_dataset, filename) for filename in os.listdir(path_to_dataset)
                                  if filename.startswith('train')])
        print(dataset_paths)

        start_from = max(0, start_from)
        chunks = dataset_paths[start_from: start_from + num_parts_to_read]
        if verbose:
            print('Reading chunks:\n')
            for chunk in chunks:
                print(chunk)


        transformers_trf = []  
        for i in range(len(transformers)):  
                transformers_trf.append((transformers[i].__name__+'_trf', FunctionTransformer(transformers[i])))  
        pipeline = Pipeline(transformers_trf) 

        for chunk_path in tqdm.tqdm_notebook(chunks, desc="Reading dataset"): 
            print('chunk_path ', chunk_path)
            chunk = fastparquet.ParquetFile(chunk_path).to_pandas()

            chunk = pipeline.fit_transform(chunk)  

            res.append(chunk)


        res = pd.concat(res).reset_index(drop=True)

        keys = res.isnull().sum().keys()
        nan_values = res.isnull().sum()
        cols_w_nans = []
        for key in keys:
            if nan_values[key] > 0:
                cols_w_nans.append(key)

        for col in cols_w_nans:
            res.loc[res[col].isna() == True, col] = 0



        return res             
        
        
    def write_dataset_down(self, df: pd.DataFrame, path_to_dataset: str,):
        """"
        Writes dataset down in the same folder as its fragmented and non-aggregated part were stored in i.e. '/train_data'
        
        :param df: dataset to be written down to .pq format
        :param path_to_dataset: path to fragmented dataset
        :return: None
        
        ex.:
        class_exemplar.write_dataset_down(dataframe, 'train_data')
        """
        tst = sorted([os.path.join(path_to_dataset, filename) for filename in os.listdir(path_to_dataset)
                                          if filename.startswith('df_compiled_v')])
        versions = []
        for entry in tst:
            versions.append(eval(entry.split('v')[1][:entry.split('v')[1].find('_')]))
        next_version = max(versions)+1
        next_version

        path = f'{path_to_dataset}/df_compiled_v{next_version}_{df.shape[1]}_cols_full.pq'
        df.to_parquet(path)
        pass

    def fit_model(self, df: pd.DataFrame, ):
        """"
        Fits XGBClassifier with prepared data, gives back fiitted model, X_test and y_test
        
        :param df: dataset for model to be fit with
        :return: xgboost.sklearn.XGBClassifier, pd.DataFrame, pd.DataFrame
        
        ex.:
        model, X_test, y_test = class_exemplar.fit_model(dataframe)
        """

        shares = True
        share = (60, 40)

        if shares == True:
            shares = f'{share[0]}-{share[1]} shares'
        else:
            shares = f'no shares'

        df2, df_transformed_test = train_test_split(df, stratify=df['flag'], test_size=0.2, )

        df2 = sample_df(df2,round(df2.flag.value_counts()[1] / share[1] * 100-1, 0) , share[0], share[1])

        X, y = df2.drop('flag', axis=1), df2.flag
        X_test, y_test = df_transformed_test.drop('flag', axis=1), df_transformed_test.flag

        model = xgb.XGBClassifier(colsample_bytree=0.9, gamma=0.1, learning_rate=0.1, max_depth=5, n_estimators=400, subsample=0.8)

        model.fit(X, y) 
        
        self.fitted_model = model
        
        return model, X_test, y_test

    
    def write_down_model(self, model, model_filename: str):
        """"
        writes down fitted model for later use
        
        :param model: takes fitted model
        :param model_filename: str file name for written down model, without .pkl, just the name
        :return: None
        
        ex.:
        class_exemplar.write_down_model(fitted_model, 'model_1')
        """
        if type(model_filename) != str:
            return print('wrong filename type!')
            
        joblib.dump(model, f'models/{model_filename}.pkl')
        pass


    def load_model(self, path_to_model: str):
        """"
        Loads model from designated local folder
        
        :param path_to_model: designated local folder
        :return: xgboost.sklearn.XGBClassifier - supposedly fitted model
        
        ex.:
        loaded_model = class_exemplar.load_model('models/model_1.pkl')
        """
        model = joblib.load(path_to_model)
        return model

    
    def predict(self, fitted_model, X_test, y_test, write_it_down = True ):
        """"
        Makes prediction based on a provided test-chunk of a dataset
        
        :param fitted_model: fitted model
        :param X_test: prediction data
        :param y_test: target data
        :write_it_down True/False: write down predictions to 'predicts/' folder. or dont
        :return: array
        
        ex.:
        prediced_probabilities = class_exemplar.predict(fitted_model, X_test, y_test, True):
        """
        y_pred_proba = fitted_model.predict_proba(X_test)
        roc_auc = roc_auc_score(y_test, y_pred_proba[:, 1])

        if write_it_down:
            with open('predicts/predictions.txt', 'w') as file:
                for item in y_pred_proba[:, 1]:
                    file.write(str(item) + '\n')
            
        
        return y_pred_proba[:, 1]
    
    
    
    
    def fit(self, 
            path_to_dataset: str, 
            start_from: int = 0,
            num_parts_to_read: int = 2,
            columns=None, 
            verbose=True,                                       
            transformers: list = [],
           ) -> pd.DataFrame:
        """"
        Combines:
        Grad_2.prepare_dataset method
        Grad_2.fit_model method
        
        basically preprocesses dataset and fits model in one go
        
        :param path_to_dataset: путь до директории с партициями
        :param start_from: номер партиции, с которой нужно начать чтение
        :param num_parts_to_read: количество партиций, которые требуется прочитать
        :param columns: список колонок, которые нужно прочитать из партиции
---new! :param transformers: лист функций-трансформеров данных                        
        :return:    xgboost.sklearn.XGBClassifier - fitted model,
                    pd.DataFrame - X_test prediction data, 
                    pd.DataFrame - y_test target array
        """
        res = self.prepare_dataset(path_to_dataset,
                                   start_from,
                                   num_parts_to_read, 
                                   columns, 
                                   verbose,
                                   transformers,
                                  )
        
        model, X_test, y_test = self.fit_model(res)
        
        return model, X_test, y_test
        

    