In [2]:
import luigi
import requests
import json
import pandas as pd
import datetime
import numpy as np

In [3]:
import matplotlib.pyplot as plt
import copy
from sklearn.model_selection import train_test_split

import pandas as pd
import numpy as np
import pickle
import random

from sklearn.preprocessing import StandardScaler, MinMaxScaler
from sklearn.model_selection import train_test_split, ShuffleSplit, cross_val_score, learning_curve
from sklearn.model_selection import KFold, GridSearchCV, RandomizedSearchCV
from sklearn.metrics import classification_report, f1_score, precision_score, recall_score

from sklearn.linear_model import LogisticRegression
from sklearn.neighbors import KNeighborsClassifier
import xgboost as xgb, lightgbm as lgbm, catboost as catb

import seaborn as sns
from matplotlib import pyplot as plt
%matplotlib inline

In [4]:
def get_classification_report(y_train_true, y_train_pred, y_test_true, y_test_pred):
    print('TRAIN\n\n' + classification_report(y_train_true, y_train_pred))
    print('TEST\n\n' + classification_report(y_test_true, y_test_pred))
    print('CONFUSION MATRIX\n')
    print(pd.crosstab(y_test_true, y_test_pred))

In [5]:
def my_gb(input_train, input_valid, input_need_predict, target_col, model_type, scale, random_state, scale_pos_weight, verbose=False):
    import xgboost as xgb
    import catboost as catb
    import lightgbm as lgbm
    from sklearn.preprocessing import StandardScaler
    
    if (verbose):
        print("Включена отладка:")
        print(f"    input_train.shape={input_train.shape}")
        if (type(input_valid) != bool): print(f"    input_valid.shape={input_valid.shape}")
        if (type(input_need_predict) != bool): print(f"    input_need_predict.shape={input_need_predict.shape}")
        print(f"    target_col={target_col}")
        print(f"    model_type={model_type}")
        print(f"    scale={scale}")
        print(f"    random_state={random_state}")
        print(f"    scale_pos_weight={scale_pos_weight}")
        
    train = copy.deepcopy(input_train)
    valid = copy.deepcopy(input_valid)
    need_predict = copy.deepcopy(input_need_predict)
    
    if (scale):
        print("Масштабирую данные")
        scaler = StandardScaler()
        train_scaler = scaler.fit_transform(train)
        train = pd.DataFrame(train_scaler, columns=train.columns)
        valid_scaler = scaler.fit_transform(valid)
        valid = pd.DataFrame(valid_scaler, columns=valid.columns)
    
    print("====================================")
    print(f"Тип модели: {model_type}")
    if (type(input_need_predict) != bool):  print(f"Боевой запуск")
    
    if (model_type=="xgb"):
        model = xgb.XGBClassifier(random_state=random_state, scale_pos_weight=scale_pos_weight)
    elif (model_type=="catb"):
        model = catb.CatBoostClassifier(silent=True, random_state=random_state, scale_pos_weight=scale_pos_weight)
    elif (model_type=="lgbm"):
        model = lgbm.LGBMClassifier(random_state=random_state, n_jobs=-1, scale_pos_weight=scale_pos_weight)
    elif (model_type=="all"):
        
        model_xgb = my_gb(input_train, input_valid, False, 'target', "xgb", False, 42, 16.29, True)
        model_catb = my_gb(input_train, input_valid, False, 'target', "catb", False, 42, 16.29, True)
        model_lgbm = my_gb(input_train, input_valid, False, 'target', "lgbm", False, 42, 16.29, True)
        model_summary = [model_xgb, model_catb, model_lgbm]
#         print(f"возвращаю {model_summary[np.argmax(model_summary.T[0][0])]}")
        return model_summary[np.argmax(model_summary.T[0][0])]
    else:
        print(f"Неправильный параметр model_type: {model_type}")
        return False
    
    train_target = train.pop(target_col) #Отделяю целевое значение
    model.fit(train, train_target)
    pred_train = model.predict(train)
    if (type(valid) != bool):
        print(f"Произвожу валидацию")
        valid_target = valid.pop(target_col)
        pred_valid = model.predict(valid)
        print(get_classification_report(train_target, pred_train, valid_target, pred_valid))
        model_f1_score = f1_score(valid_target, pred_valid, average='macro')
        
    
    if (type(need_predict) != bool):
        print(f"Прогнозирую {target_col}")
        need_predict.pop(target_col)
        return model.predict(need_predict)
    
    print("\n\n")
    return [[model_f1_score], [model]]

In [6]:
class DatasetTrainPrepare(luigi.Task):
    
        
    def run(self):
        data_train = pd.read_csv('data_train.csv', index_col='id')
        data_train = data_train.drop('Unnamed: 0', axis=1)
#         data_train.to_csv('train_processed.csv', index=True)
        with self.output().open('w') as f:
            data_train.to_csv(f, index=True)
    
    def output(self):
        return luigi.LocalTarget(path='output_train.csv')

        
class DatasetTestPrepare(luigi.Task):
    
    def run(self):
        data_test = pd.read_csv('data_test.csv', index_col='id')
        data_test = data_test.drop('Unnamed: 0', axis=1)
#         data_train.to_csv('test_processed.csv', index=True)
        with self.output().open('w') as f:
            data_test.to_csv(f, index=True)
        
    def output(self):
        return luigi.LocalTarget(path='output_test.csv')
    
class DatasetFeaturesFilter(luigi.Task):

    def requires(self):
        return DatasetTrainPrepare()

    def run(self):
#         data_features = pd.read_csv('features.zip', compression='zip', sep='\t', index_col='id')
        data_features = pd.read_csv('features.csv', sep='\t', index_col='id')
        data_features = data_features.drop('Unnamed: 0', axis=1)
        data_train=pd.read_csv('output_train.csv', index_col='id')
        data_features = data_features.loc[data_train.index]
        
        with self.output().open('w') as f:
            data_features.to_csv(f, index=True)
    
    def output(self):
        return luigi.LocalTarget(path='output_features_filtered.csv')
    
class DatasetTrainTestConcat(luigi.Task):
    
    def requires(self):
        return DatasetFeaturesFilter()
#         DatasetTrainPrepare()
        
    def run(self):
        data_train = pd.read_csv('output_train.csv', index_col='id') #подгружаю трейн
        data_features = pd.read_csv('output_features_filtered.csv', index_col='id') #подгружаю параметры
        #дополняю трейн параметрами
        data_train = pd.merge(data_train, data_features, how="left", left_index=True, right_index=True)
        #отбрасываю записи где дата предложения < даты параметров
        data_train = data_train.drop(data_train.loc[data_train.buy_time_x < data_train.buy_time_y].index)
        #отбрасываю дубли
        data_train = data_train.drop(data_train.loc[data_train.duplicated()].index)
        data_train['buy_time'] = data_train['buy_time_x'] 
        data_train = data_train.drop(['buy_time_x', 'buy_time_y'], axis=1)
#         temp_train['datetime'] = pd.to_datetime(temp_train['buy_time'].values, unit='s').date
#         temp_train = temp_train.sort_values(by=['datetime'], ascending=True)

#     from sklearn.model_selection import train_test_split
#         RANDOM_STATE = 42

        temp_train, temp_test = train_test_split(data_train, test_size=0.25, random_state=42)
#         temp_test = temp_train.loc[test_idx].to_csv("output_ready_to_test.csv")
        temp_test.to_csv("output_ready_to_test.csv")
#         temp_tain = temp_train.loc[train_idx]

        with self.output().open('w') as f:
            temp_train.to_csv(f, index=True)
            
    def output(self):
        return luigi.LocalTarget(path='output_ready_to_train.csv')
    
class ModelTrainCATBClassifier(luigi.Task):
    def requires(self):
        return DatasetTrainTestConcat()
    
    def run(self):
        data_train = pd.read_csv('output_ready_to_train.csv', index_col='id')
        
        print(data_train.head())
        
        data_train.head()
        from sklearn.model_selection import train_test_split
        RANDOM_STATE = 42

        train_idx, test_idx = train_test_split(data_train.index, test_size=0.25, random_state=RANDOM_STATE)
        
        model = my_gb(data_train.loc[train_idx], data_train.loc[test_idx], False, 'target', "catb", False, RANDOM_STATE, 16.29, True)
        
        with open('model_catb.pkl', 'wb') as fid:
            pickle.dump(model[1], fid)  
    
    def output(self):
        return luigi.LocalTarget(path='model_catb.pkl')
    
class ModelTrainXGBClassifier(luigi.Task):
    def requires(self):
        return DatasetTrainTestConcat()
    
    def run(self):
        data_train = pd.read_csv('output_ready_to_train.csv', index_col='id')
        
        print(data_train.head())
        
        data_train.head()
        from sklearn.model_selection import train_test_split
        RANDOM_STATE = 42

        train_idx, test_idx = train_test_split(data_train.index, test_size=0.25, random_state=RANDOM_STATE)
        
        model = my_gb(data_train.loc[train_idx], data_train.loc[test_idx], False, 'target', "xgb", False, RANDOM_STATE, 16.29, True)
        
        with open('model_xgb.pkl', 'wb') as fid:
            pickle.dump(model[1], fid)  
    
    def output(self):
        return luigi.LocalTarget(path='model_xgb.pkl')
    

    
class ModelTrainLGBMClassifier(luigi.Task):
    def requires(self):
        return DatasetTrainTestConcat()
    
    def run(self):
        data_train = pd.read_csv('output_ready_to_train.csv', index_col='id')
        
        print(data_train.head())
        
        data_train.head()
        from sklearn.model_selection import train_test_split
        RANDOM_STATE = 42

        train_idx, test_idx = train_test_split(data_train.index, test_size=0.25, random_state=RANDOM_STATE)
        
        model = my_gb(data_train.loc[train_idx], data_train.loc[test_idx], False, 'target', "lgbm", False, RANDOM_STATE, 16.29, True)
        
        with open('model_lgbm.pkl', 'wb') as fid:
            pickle.dump(model[1], fid)  
    
    def output(self):
        return luigi.LocalTarget(path='model_lgbm.pkl')
    
class CompareModels(luigi.Task):
    
    def requires(self):
        return ModelTrainLGBMClassifier(), ModelTrainXGBClassifier(), ModelTrainCATBClassifier()
    
    def run(self):
        data_test = pd.read_csv('output_ready_to_test.csv', index_col='id')
        model = {}
        predictions = {}
        y_test = data_test['target']
        X_test = data_test.drop('target', axis=1)
        with open('model_xgb.pkl', 'rb') as fid:
            model[0] = pickle.load(fid)
        with open('model_catb.pkl', 'rb') as fid:
            model[1] = pickle.load(fid)
        with open('model_lgbm.pkl', 'rb') as fid:
            model[2] = pickle.load(fid)

        predictions[0] = model[0][0].predict(X_test)
        predictions[1] = model[1][0].predict(X_test)
        predictions[2] = model[2][0].predict(X_test)

        f1_scores = []
        f1_scores.append(f1_score(predictions[0], y_test, average='macro'))
        f1_scores.append(f1_score(predictions[1], y_test, average='macro'))
        f1_scores.append(f1_score(predictions[2], y_test, average='macro'))

        with open('taki_best_model.pkl', 'wb') as fid:
                    pickle.dump(model[np.argmax(f1_scores)][0], fid)
        
    def output(self):
        return luigi.LocalTarget(path='taki_best_model.pkl')

    
class ModelFinalPrediction(luigi.Task):
    
    def requires(self):
        return CompareModels()
        
    def run(self):
        data_test = pd.read_csv('data_test.csv', index_col='id') #подгружаю test
        data_test = data_test.drop('Unnamed: 0', axis=1)
#         data_features_temp = pd.read_csv('features.zip', compression='zip', sep='\t', index_col='id')
        data_features_temp = pd.read_csv('features.csv', sep='\t', index_col='id')


        data_features = data_features_temp.loc[data_test.index]
        data_features = data_features.drop('Unnamed: 0', axis=1)
        data_features.sort_values(by=['buy_time'], inplace=True)
        data_features = data_features.groupby(data_features.index).last()
        X_test = pd.merge(data_test, data_features, how="left", left_index=True, right_index=True)
        X_test['buy_time'] = X_test['buy_time_x']
        X_test = X_test.drop(['buy_time_x', 'buy_time_y'], axis=1)  
        
        with open('taki_best_model.pkl', 'rb') as fid:
            best_model = pickle.load(fid)
            
        answers = best_model.predict(X_test)
        data_test['target'] = answers
#         data_test['id'] = data_test.index
        
        with self.output().open('w') as f:
            data_test.to_csv(f, index=True)
            
    def output(self):
        return luigi.LocalTarget(path='answers_test.csv')

In [12]:
if __name__ == '__main__':
#     luigi.build([DatasetTrainPrepare()])
#     luigi.build([DatasetTestPrepare()])
#     luigi.build([ModelTrainCATBClassifier()])
#     luigi.build([ModelTrainXGBClassifier()])
    luigi.build([ModelFinalPrediction()])
    

DEBUG: Checking if ModelFinalPrediction() is complete
DEBUG: Checking if CompareModels() is complete
INFO: Informed scheduler that task   ModelFinalPrediction__99914b932b   has status   PENDING
INFO: Informed scheduler that task   CompareModels__99914b932b   has status   DONE
INFO: Done scheduling tasks
INFO: Running Worker with 1 processes
DEBUG: Asking scheduler for work...
DEBUG: Pending tasks: 1
INFO: [pid 3088] Worker Worker(salt=695487131, workers=1, host=DESKTOP-TTT8HHQ, username=admin, pid=3088) running   ModelFinalPrediction()
INFO: [pid 3088] Worker Worker(salt=695487131, workers=1, host=DESKTOP-TTT8HHQ, username=admin, pid=3088) done      ModelFinalPrediction()
DEBUG: 1 running tasks, waiting for next task to finish
INFO: Informed scheduler that task   ModelFinalPrediction__99914b932b   has status   DONE
DEBUG: Asking scheduler for work...
DEBUG: Done
DEBUG: There are no more tasks to run at this time
INFO: Worker Worker(salt=695487131, workers=1, host=DESKTOP-TTT8HHQ, usern