In [33]:
import json
import os
import logging
import pickle
import warnings

import gokart
from gokart.config_params import inherits_config_params
import luigi
import numpy as np
import pandas as pd
import psycopg2
from sklearn.impute import SimpleImputer
from sklearn.linear_model import LogisticRegression
from sklearn.metrics import roc_curve, auc
from sklearn.model_selection import train_test_split
from sqlalchemy import create_engine

from utils import ml_utils, db_utils

warnings.filterwarnings("ignore")

In [3]:
# DB接続
conn, _ = db_utils.connect('dev')

In [47]:
class LoadFromDB(gokart.TaskOnKart):
    mode = luigi.parameter.Parameter()
    dtype = {
        'index': np.int8, 
        'pregnancies': np.int8,
        'glucose': np.int8, 
        'blood_pressure': np.int8,
        'skin_thickness': np.float16, 
        'insulin': np.float16,
        'bmi': np.float16, 
        'diabetes_pedigree_function': np.float16,
        'age': np.int8, 
        'outcome': np.int8,
        'is_trained': bool
    }
    
    def output(self):
        return {'src_data': self.make_target(f'{self.mode}/data/src_data.pkl'), 
                'src_index': self.make_target(f'{self.mode}/data/src_index.txt')}
    
    def run(self):
        src_df = pd.read_sql(sql="SELECT * FROM diabetes_diagnosis_results;", con=conn).astype(self.dtype)
        src_index_list = ','.join([str(index) for index in src_df['index'].values.tolist()])
        self.dump(src_df, 'src_data')
        self.dump(src_index_list, 'src_index')

        
class SplitTrainTestData(gokart.TaskOnKart):
    mode = luigi.parameter.Parameter()
    
    def requires(self):
        return LoadFromDB(mode=self.mode)
    
    def output(self):
        return {'train_data': self.make_target(f'{self.mode}/data/train_data.pkl'), 
                'test_data': self.make_target(f'{self.mode}/data/test_data.pkl')}
    
    def run(self):
        src_df = self.load('src_data')
        train_df, test_df = train_test_split(
            src_df, test_size=0.2, random_state=0
        )
        self.dump(train_df, 'train_data')
        self.dump(test_df, 'test_data')

        
class PreprocessTrainData(gokart.TaskOnKart):
    mode = luigi.parameter.Parameter()
    required_columns = [
        'pregnancies', 'glucose', 'blood_pressure', 'skin_thickness',
        'insulin', 'bmi', 'diabetes_pedigree_function', 'age', 'outcome'
    ]
    dtype = {
        'index': np.int8, 
        'pregnancies': np.int8,
        'glucose': np.int8, 
        'blood_pressure': np.int8,
        'skin_thickness': np.float16, 
        'insulin': np.float16,
        'bmi': np.float16, 
        'diabetes_pedigree_function': np.float16,
        'age': np.int8, 
        'outcome': np.int8,
        'is_trained': bool
    }
    
    def requires(self):
        return {
            'evaluate': SplitTrainTestData(mode=self.mode),
            'deploy': LoadFromDB(mode=self.mode)
        }[self.mode]
    
    def output(self):
        return {'processed_train_data': self.make_target(f'{self.mode}/data/processed_train_data.pkl'),
                'imputer': self.make_target(f'{self.mode}/model/imputer.pkl')}
    
    def run(self):
        train_key = {'evaluate': 'train_data', 'deploy': 'src_data'}[self.mode]
        train_df = self.load(train_key)[self.required_columns]
        train_X = train_df.copy().drop("outcome", axis=1, inplace=False)
        train_y = train_df.copy()['outcome']
        imputer = SimpleImputer(strategy="median")
        train_X = pd.DataFrame(imputer.fit_transform(train_X), columns=train_X.columns)
        train_df = train_X.copy()
        train_df['outcome'] = train_y.values
        self.dump(train_df, 'processed_train_data')
        self.dump(imputer, 'imputer')
    

class PreprocessTestData(gokart.TaskOnKart):
    mode = luigi.parameter.Parameter()
    required_columns = [
        'pregnancies', 'glucose', 'blood_pressure', 'skin_thickness',
        'insulin', 'bmi', 'diabetes_pedigree_function', 'age', 'outcome'
    ]
    
    def requires(self):
        return {
            'split': SplitTrainTestData(mode=self.mode), 
            'p_train': PreprocessTrainData(mode=self.mode)
        }
    
    def output(self):
        return self.make_target(f'{self.mode}/data/processed_test_data.pkl')
    
    def run(self):
        test_df = self.load('split')['test_data'][self.required_columns]
        test_X = test_df.copy().drop("outcome", axis=1, inplace=False)
        test_y = test_df.copy()['outcome']
        imputer = self.load('p_train')['imputer']
        test_X = pd.DataFrame(imputer.transform(test_X), columns=test_X.columns)
        test_df = test_X.copy()
        test_df['outcome'] = test_y.values
        self.dump(test_df)


class TrainModel(gokart.TaskOnKart):
    mode = luigi.parameter.Parameter()
    C = luigi.parameter.FloatParameter(default=1.0)

    def requires(self):
        return PreprocessTrainData(mode=self.mode)
    
    def output(self):
        return self.make_target(f'{self.mode}/model/model.pkl')
    
    def run(self):
        train_df = self.load('processed_train_data')
        train_X, train_y = train_df.iloc[:, :-1], train_df.iloc[:, -1]
        model = LogisticRegression(C=self.C).fit(train_X, train_y)
        self.dump(model)
        
        
class EvaluateModel(gokart.TaskOnKart):
    mode = luigi.parameter.Parameter()
    C = luigi.parameter.FloatParameter(default=1.0)
    
    def requires(self):
        return {
            'p_test': PreprocessTestData(mode=self.mode), 
            'train_model': TrainModel(mode=self.mode, C=self.C)
        }
    
    def output(self):
        return self.make_target(f'{self.mode}/result/result.txt')
    
    def run(self):
        test_df = self.load('p_test')
        model = self.load('train_model')
        test_X, test_y = test_df.iloc[:, :-1], test_df.iloc[:, -1]
        pred_y = model.predict_proba(test_X)[:, 1]
        fpr, tpr, _ = roc_curve(test_y, pred_y)
        result = f'C: {model.C}, AUC: {auc(fpr, tpr)}'
        self.dump(result)

In [52]:
# evaluate
C=10
gokart.build(EvaluateModel(mode='evaluate', C=C), return_value=False, log_level=logging.DEBUG)

DEBUG: Checking if EvaluateModel(mode=evaluate, C=10) is complete
DEBUG: Checking if PreprocessTestData(mode=evaluate) is complete
DEBUG: Checking if TrainModel(mode=evaluate, C=10) is complete
INFO: Informed scheduler that task   EvaluateModel_10_evaluate_adea6c2b9d   has status   PENDING
DEBUG: Checking if PreprocessTrainData(mode=evaluate) is complete
INFO: Informed scheduler that task   TrainModel_10_evaluate_adea6c2b9d   has status   PENDING
INFO: Informed scheduler that task   PreprocessTrainData_evaluate_92a9778673   has status   DONE
INFO: Informed scheduler that task   PreprocessTestData_evaluate_92a9778673   has status   DONE
INFO: Done scheduling tasks
INFO: Running Worker with 1 processes
DEBUG: Asking scheduler for work...
DEBUG: Pending tasks: 2
INFO: [pid 325] Worker Worker(salt=319762226, workers=1, host=88071f242bda, username=root, pid=325) running   TrainModel(mode=evaluate, C=10)
INFO: [pid 325] Worker Worker(salt=319762226, workers=1, host=88071f242bda, username=roo

In [54]:
# deploy
C=5
gokart.build(TrainModel(mode='deploy', C=C), return_value=False, log_level=logging.DEBUG)

DEBUG: Checking if TrainModel(mode=deploy, C=5) is complete
INFO: Informed scheduler that task   TrainModel_5_deploy_ebb6c431b5   has status   DONE
INFO: Done scheduling tasks
INFO: Running Worker with 1 processes
DEBUG: Asking scheduler for work...
DEBUG: Done
DEBUG: There are no more tasks to run at this time
INFO: Worker Worker(salt=989349803, workers=1, host=88071f242bda, username=root, pid=325) was stopped. Shutting down Keep-Alive thread
INFO: 
===== Luigi Execution Summary =====

Scheduled 1 tasks of which:
* 1 complete ones were encountered:
    - 1 TrainModel(...)

Did not run any tasks
This progress looks :) because there were no failed tasks or missing dependencies

===== Luigi Execution Summary =====

