In [1]:
# run luigiid
import numpy as np
import pandas as pd
from datetime import datetime, date

import luigi
import pickle

from sklearn.base import BaseEstimator, TransformerMixin

import os
from pathlib import Path
from column_selector import ColumnSelector
# utils функции для датасета
from utils import reduce_mem_usage, merge_dfs, features_types, prepare_df_train, prepare_df_test

In [2]:
class DataPipeline(luigi.Task):
    train_csv = luigi.Parameter()
    test_csv = luigi.Parameter()
    features_csv = luigi.Parameter()
    model_file = luigi.Parameter()
    

    def output(self):
        return luigi.LocalTarget('result.csv')
 
    def run(self):
        df_train = pd.read_csv(self.train_csv)
        df_train.drop('Unnamed: 0', axis=1, inplace=True)

        df_test = pd.read_csv(self.test_csv, encoding='utf8', sep=',')
        df_test.drop('Unnamed: 0', axis=1, inplace=True)
        
        data = pd.read_csv(self.features_csv, sep='\t', index_col=[0])
        print("Датасеты импортированы")
        
        df_train = reduce_mem_usage(df_train)
        df_test = reduce_mem_usage(df_test)
        
        data = data.loc[(data['id'].isin(df_train['id'].values)) | ((data['id'].isin(df_test['id'].values)))]
        data = reduce_mem_usage(data)
        print("Датасеты оптимизированы")
        
        df_train['user_vas'] = df_train['id'].astype(str) + '_' + df_train['vas_id'].astype(str)

        # Из трейна их убираем дублирующие строки по одной и той же услуге с разным таргетом
        df_train = df_train.drop_duplicates('user_vas', keep="last")
        
        # Функция, которая мерджит тренировочный и тестовый датасеты
        train_res = merge_dfs(df_train, data)
        test_res = merge_dfs(df_test, data)
        
        print("Датасет признаков присоединен к тесту и трейну")
        
        train_res, med_1, med_3, med_5, med_207 = prepare_df_train(train_res)
        test_res = prepare_df_test(test_res, med_1, med_3, med_5, med_207)
        print("Датасеты подготовлены к модели")
        
        feat_ok, feat_const, feat_categorical, feat_float = features_types(train_res)
        print("Определены типы признаков")
        
        with open(self.model_file, 'rb') as model_file:
            model = pickle.load(model_file)
        print("Модель импортирована")
        
        preds = model.predict_proba(test_res)[:,1]
        result = pd.concat([test_res['id'], test_res['vas_id'], test_res['buy_time'], pd.Series(preds)], axis=1)
        result = result.rename(columns={0: 'target'})
        print("Предсказания для теста сделаны")
        
        # Сохранение в csv
        result.to_csv('answers_test.csv', encoding="utf-8-sig", sep=';', index=False)

In [3]:
if __name__ == '__main__':  
    train_csv = os.path.join(Path(os.getcwd()), "data", "data_train.csv")
    test_csv = 'data_test.csv'
    features_csv = os.path.join(Path(os.getcwd()), "data", "features.csv")
    model_file = os.path.join(Path(os.getcwd()), "models", "xgb_estimator.pickle")

    luigi.build([DataPipeline(train_csv, test_csv, features_csv, model_file)])

DEBUG: Checking if DataPipeline(train_csv=/Users/mac/Documents/course_megafon/course_proj/data/data_train.csv, test_csv=data_test.csv, features_csv=/Users/mac/Documents/course_megafon/course_proj/data/features.csv, model_file=/Users/mac/Documents/course_megafon/course_proj/models/xgb_estimator.pickle) is complete
INFO: Informed scheduler that task   DataPipeline__Users_mac_Docum__Users_mac_Docum_data_test_csv_6ef31b432f   has status   PENDING
INFO: Done scheduling tasks
INFO: Running Worker with 1 processes
DEBUG: Asking scheduler for work...
DEBUG: Pending tasks: 1
INFO: [pid 4391] Worker Worker(salt=824245955, workers=1, host=MacBook-Pro-di-Mac.local, username=mac, pid=4391) running   DataPipeline(train_csv=/Users/mac/Documents/course_megafon/course_proj/data/data_train.csv, test_csv=data_test.csv, features_csv=/Users/mac/Documents/course_megafon/course_proj/data/features.csv, model_file=/Users/mac/Documents/course_megafon/course_proj/models/xgb_estimator.pickle)
  mask |= (ar1 == a)

Датасеты импортированы
Датасеты оптимизированы
Датасет признаков присоединен к тесту и трейну
Добавлены новые фичи
Удалены столбцы с низкой значимостью
Добавлены новые фичи
Удалены столбцы с низкой значимостью
Датасеты подготовлены к модели
Определены типы признаков
Модель импортирована
Предсказания для теста сделаны


INFO: [pid 4391] Worker Worker(salt=824245955, workers=1, host=MacBook-Pro-di-Mac.local, username=mac, pid=4391) done      DataPipeline(train_csv=/Users/mac/Documents/course_megafon/course_proj/data/data_train.csv, test_csv=data_test.csv, features_csv=/Users/mac/Documents/course_megafon/course_proj/data/features.csv, model_file=/Users/mac/Documents/course_megafon/course_proj/models/xgb_estimator.pickle)
DEBUG: 1 running tasks, waiting for next task to finish
INFO: Informed scheduler that task   DataPipeline__Users_mac_Docum__Users_mac_Docum_data_test_csv_6ef31b432f   has status   DONE
DEBUG: Asking scheduler for work...
DEBUG: Done
DEBUG: There are no more tasks to run at this time
INFO: Worker Worker(salt=824245955, workers=1, host=MacBook-Pro-di-Mac.local, username=mac, pid=4391) was stopped. Shutting down Keep-Alive thread
INFO: 
===== Luigi Execution Summary =====

Scheduled 1 tasks of which:
* 1 ran successfully:
    - 1 DataPipeline(...)

This progress looks :) because there were