# Kubeflow - Pipeline steps development

In [1]:
!pip install xgboost



In [52]:
'''
Wrapper around the preprocessing function.
This code perform:

- Data loading from local files (debug) or GCP bigquery
- Save data as DMatrix
'''
import argparse
import logging
import numpy as np
import xgboost as xgb
from sklearn.model_selection import train_test_split
from google.cloud import storage

# logger configuration
logging.basicConfig(format='%(asctime)s | %(levelname)s: %(message)s', level=logging.NOTSET)

def _get_blob_bucket(bucket_name, path, destination):
    """Downloads a blob from the bucket."""
    storage_client = storage.Client()
    bucket = storage_client.bucket(bucket_name)

    blob = bucket.blob(path)
    blob.download_to_filename(destination)
    
    logging.info('Exoplanets Pipeline: Blob {} downloaded to {}'.format(path, destination))
    return destination

def _upload_blob_bucket(bucket_name, source_file_name, destination):
    """Uploads a file to the bucket."""

    storage_client = storage.Client()
    bucket = storage_client.bucket(bucket_name)
    
    blob = bucket.blob(destination)
    blob.upload_from_filename(source_file_name)
    
    logging.info('Exoplanets Pipeline: Blob {} uploaded to {}'.format(source_file_name, 
                                                                                   destination))
    
def _local_loader(path, label_column_index, skiprows=1, delimiter=','):
    '''
    Light-weight txt files data loader that uses numpy as backend
    '''
    dataset = {}
    raw = np.loadtxt(path, skiprows=1, delimiter=',')
    x = raw[:, label_column_index+1:]
    y = raw[:, label_column_index, np.newaxis] - 1. # -1 to report label in the range 0-1
    return x, y


def preprocess_data(bucket, local=True):
    LABEL_COLUMN_INDEX = 0
    
    logging.info('Exoplanets Pipeline - Preprocess: Paths creation with {0} bucket'.format(bucket))
    # define input bucket-path of the data
    train_dataset_path = _get_blob_bucket(bucket, "data/exoTrain.csv", "exoTrain.csv")
    test_dataset_path = _get_blob_bucket(bucket, "data/exoTest.csv", "exoTest.csv")
                              
    logging.info('Exoplanets Pipeline - Preprocess: Data import')
    x, y = _local_loader(path=train_dataset_path, label_column_index=LABEL_COLUMN_INDEX)
    # create an additonal evaluation split
    x_train, x_eval, y_train, y_eval = train_test_split(x, y,
                                                        stratify=y,
                                                        test_size=0.25)
    x_test, y_test = _local_loader(path=test_dataset_path, label_column_index=LABEL_COLUMN_INDEX)
    # save results as DMatrix for better performances on xgboost
    dtrain = xgb.DMatrix(x_train, label=y_train)
    deval = xgb.DMatrix(x_eval, label=y_eval)
    dtest = xgb.DMatrix(x_test, label=y_test)
    
    logging.info('Exoplanets Pipeline - Preprocess: Store data set as DMatrix')
    xgb.DMatrix.save_binary(dtrain, "train_dmatrix.data")
    xgb.DMatrix.save_binary(deval, "eval_dmatrix.data")
    xgb.DMatrix.save_binary(dtest, "test_dmatrix.data")
    
    # upload data on the storage
    _upload_blob_bucket(bucket, "train_dmatrix.data", "preprocess/train_dmatrix.data")
    _upload_blob_bucket(bucket, "eval_dmatrix.data", "preprocess/eval_dmatrix.data")
    _upload_blob_bucket(bucket, "test_dmatrix.data", "preprocess/test_dmatrix.data")
    
    logging.info('Exoplanets Pipeline - Preprocess: Completed')

In [53]:
bucket = "exoplanets_kubeflow"

In [54]:
logging.info('Exoplanets Pipeline - Preprocess: Running...')
preprocess_data(bucket)

2021-02-22 16:42:30,866 | INFO: Exoplanets Pipeline - Preprocess: Running...
2021-02-22 16:42:30,868 | INFO: Exoplanets Pipeline - Preprocess: Paths creation with exoplanets_kubeflow bucket
2021-02-22 16:42:30,869 | DEBUG: Checking None for explicit credentials as part of auth process...
2021-02-22 16:42:30,870 | DEBUG: Checking Cloud SDK credentials as part of auth process...
2021-02-22 16:42:30,872 | DEBUG: Cloud SDK credentials not found on disk; not using them
2021-02-22 16:42:30,873 | DEBUG: Checking for App Engine runtime as part of auth process...
2021-02-22 16:42:30,874 | DEBUG: No App Engine library was found so cannot authentication via App Engine Identity Credentials.
2021-02-22 16:42:30,876 | DEBUG: Making request: GET http://169.254.169.254
2021-02-22 16:42:30,879 | DEBUG: Making request: GET http://metadata.google.internal/computeMetadata/v1/project/project-id
2021-02-22 16:42:30,883 | DEBUG: Checking None for explicit credentials as part of auth process...
2021-02-22 16:

In [67]:
'''
Wrapper around the xgboost train function.
This code perform:

- Load DMatrix from GCP bucket
- Run XGBoost model Training
- Save model inside the Bucket
'''
import argparse
import logging
import joblib
import xgboost as xgb
from google.cloud import storage

# logger configuration
logging.basicConfig(format='%(asctime)s | %(levelname)s: %(message)s', level=logging.NOTSET)

def _get_blob_bucket(bucket_name, path, destination):
    """Downloads a blob from the bucket."""
    storage_client = storage.Client()
    bucket = storage_client.bucket(bucket_name)

    blob = bucket.blob(path)
    blob.download_to_filename(destination)
    
    logging.info('Exoplanets Pipeline: Blob {} downloaded to {}'.format(path, 
                                                                        destination))
    return destination

def _upload_blob_bucket(bucket_name, source_file_name, destination):
    """Uploads a file to the bucket."""

    storage_client = storage.Client()
    bucket = storage_client.bucket(bucket_name)
    
    blob = bucket.blob(destination)
    blob.upload_from_filename(source_file_name)
    
    logging.info('Exoplanets Pipeline: Blob {} uploaded to {}'.format(source_file_name,
                                                                      destination))
def _xgb_parameters(scale_positive):
    
    parameters = {
    'eta': 0.05,
    'objective': 'binary:logistic',
    'max_depth': 7, # depth of the trees in the boosting process
    'min_child_weight': 1, 
    'colsample_bytree': 0.8,
    'scale_pos_weight' : scale_positive
    }
    
    return parameters

def trainer(bucket):
    
    logging.info('Exoplanets Pipeline - Trainer: Load DMatrices from {0} bucket'.format(bucket))
    # define input bucket-path of the data
    train_dmatrix_path = _get_blob_bucket(bucket, "preprocess/train_dmatrix.data", "train_dmatrix.data")
    eval_dmatrix_path = _get_blob_bucket(bucket, "preprocess/eval_dmatrix.data", "eval_dmatrix.data")
    
    dtrain = xgb.DMatrix(train_dmatrix_path)
    deval = xgb.DMatrix(eval_dmatrix_path)
    
    label, count = np.unique(dtrain.get_label(), return_counts=True)
    scale_positive = count[0] / count[1]
    watchlist = [(dtrain, 'train'), (deval, 'eval')]
    
    ESTIMATORS = 300
    model = xgb.train(_xgb_parameters(scale_positive), dtrain, ESTIMATORS, watchlist, early_stopping_rounds=10)
    
    logging.info('Exoplanets Pipeline - Trainer: Storing model on the bucket')
    joblib.dump(model, 'xgb_model.pkl') 
    _upload_blob_bucket(bucket, "xgb_model.pkl", "model/xgb_model.pkl")
    
    logging.info('Exoplanets Pipeline - Trainer: Job Completed')

In [68]:
bucket = "exoplanets_kubeflow"

In [69]:
logging.info('Exoplanets Pipeline - Trainer: Running...')
trainer(bucket)

2021-02-22 16:46:52,290 | INFO: Exoplanets Pipeline - Train: Running...
2021-02-22 16:46:52,292 | INFO: Exoplanets Pipeline - Trainer: Load DMatrices from exoplanets_kubeflow bucket
2021-02-22 16:46:52,293 | DEBUG: Checking None for explicit credentials as part of auth process...
2021-02-22 16:46:52,295 | DEBUG: Checking Cloud SDK credentials as part of auth process...
2021-02-22 16:46:52,296 | DEBUG: Cloud SDK credentials not found on disk; not using them
2021-02-22 16:46:52,297 | DEBUG: Checking for App Engine runtime as part of auth process...
2021-02-22 16:46:52,298 | DEBUG: No App Engine library was found so cannot authentication via App Engine Identity Credentials.
2021-02-22 16:46:52,298 | DEBUG: Making request: GET http://169.254.169.254
2021-02-22 16:46:52,302 | DEBUG: Making request: GET http://metadata.google.internal/computeMetadata/v1/project/project-id
2021-02-22 16:46:52,306 | DEBUG: Checking None for explicit credentials as part of auth process...
2021-02-22 16:46:52,30

[0]	train-logloss:0.64799	eval-logloss:0.64869
[1]	train-logloss:0.60588	eval-logloss:0.60710
[2]	train-logloss:0.56759	eval-logloss:0.56965
[3]	train-logloss:0.53262	eval-logloss:0.53510
[4]	train-logloss:0.50117	eval-logloss:0.50456
[5]	train-logloss:0.47181	eval-logloss:0.47592
[6]	train-logloss:0.44448	eval-logloss:0.44922
[7]	train-logloss:0.41927	eval-logloss:0.42450
[8]	train-logloss:0.39592	eval-logloss:0.40177
[9]	train-logloss:0.37429	eval-logloss:0.38037
[10]	train-logloss:0.35414	eval-logloss:0.36023
[11]	train-logloss:0.33542	eval-logloss:0.34145
[12]	train-logloss:0.31688	eval-logloss:0.32368
[13]	train-logloss:0.30054	eval-logloss:0.30776
[14]	train-logloss:0.28526	eval-logloss:0.29293
[15]	train-logloss:0.26999	eval-logloss:0.27843
[16]	train-logloss:0.25665	eval-logloss:0.26549
[17]	train-logloss:0.24413	eval-logloss:0.25301
[18]	train-logloss:0.23224	eval-logloss:0.24110
[19]	train-logloss:0.22105	eval-logloss:0.22995
[20]	train-logloss:0.21060	eval-logloss:0.21934
[2

2021-02-22 16:48:16,359 | INFO: Exoplanets Pipeline - Train: Storing model on the bucket
2021-02-22 16:48:16,393 | DEBUG: Checking None for explicit credentials as part of auth process...
2021-02-22 16:48:16,394 | DEBUG: Checking Cloud SDK credentials as part of auth process...
2021-02-22 16:48:16,395 | DEBUG: Cloud SDK credentials not found on disk; not using them
2021-02-22 16:48:16,396 | DEBUG: Checking for App Engine runtime as part of auth process...
2021-02-22 16:48:16,397 | DEBUG: No App Engine library was found so cannot authentication via App Engine Identity Credentials.
2021-02-22 16:48:16,399 | DEBUG: Making request: GET http://169.254.169.254
2021-02-22 16:48:16,402 | DEBUG: Making request: GET http://metadata.google.internal/computeMetadata/v1/project/project-id
2021-02-22 16:48:16,406 | DEBUG: Checking None for explicit credentials as part of auth process...
2021-02-22 16:48:16,407 | DEBUG: Checking Cloud SDK credentials as part of auth process...
2021-02-22 16:48:16,408 

In [70]:
'''
Wrapper around to perform prediction over a test dataset.
This code perform:

- Load model from GCP bucket
- Run prediction over test set
- Save confusion matrix inside the bucket
'''
import argparse
import logging
import numpy as np
import joblib
import xgboost as xgb
from sklearn.metrics import precision_score
from sklearn.metrics import recall_score
from sklearn.metrics import confusion_matrix
from google.cloud import storage

# logger configuration
logging.basicConfig(format='%(asctime)s | %(levelname)s: %(message)s', level=logging.NOTSET)

def _get_blob_bucket(bucket_name, path, destination):
    """Downloads a blob from the bucket."""
    storage_client = storage.Client()
    bucket = storage_client.bucket(bucket_name)

    blob = bucket.blob(path)
    blob.download_to_filename(destination)
    
    logging.info('Exoplanets Pipeline: Blob {} downloaded to {}'.format(path, destination))
    return destination

def _upload_blob_bucket(bucket_name, source_file_name, destination):
    """Uploads a file to the bucket."""

    storage_client = storage.Client()
    bucket = storage_client.bucket(bucket_name)
    
    blob = bucket.blob(destination)
    blob.upload_from_filename(source_file_name)
    
    logging.info('Exoplanets Pipeline: Blob {} uploaded to {}'.format(source_file_name,
                                                                      destination))
    
def prediction(bucket):
    
    logging.info('Exoplanets Pipeline - Prediction: Load model from {0} bucket'.format(bucket))
    # define input bucket-path of the model
    model_path = _get_blob_bucket(bucket, "model/xgb_model.pkl", "xgb_model.pkl")
    test_dmatrix_path = _get_blob_bucket(bucket, "preprocess/test_dmatrix.data", "test_dmatrix.data")
    
    model = joblib.load(model_path)
    dtest = xgb.DMatrix(test_dmatrix_path)
    
    logging.info('Exoplanets Pipeline - Prediction: Perform Prediction')
    y_pred = model.predict(dtest)
    y_pred_binary = np.around(y_pred)
    
    logging.info('Exoplanets Pipeline - Prediction: Calculare metrics')
    cm = confusion_matrix(dtest.get_label(), y_pred_binary)
    precision = precision_score(dtest.get_label(), y_pred_binary)
    recall = recall_score(dtest.get_label(), y_pred_binary)
    
    logging.info('Exoplanets Pipeline - Prediction: Confusion matrix: \n {0}'.format(cm))
    logging.info('Exoplanets Pipeline - Prediction: Precision {0}'.format(precision))
    logging.info('Exoplanets Pipeline - Prediction: Recall {0}'.format(recall))
    
    logging.info('Exoplanets Pipeline - Prediction: Complete')


In [71]:
bucket = "exoplanets_kubeflow"

In [72]:
logging.info('Exoplanets Pipeline - Prediction: Running...')
prediction(bucket)

2021-02-22 16:48:19,962 | INFO: Exoplanets Pipeline - Prediction: Running...
2021-02-22 16:48:19,963 | INFO: Exoplanets Pipeline - Prediction: Load model from exoplanets_kubeflow bucket
2021-02-22 16:48:19,965 | DEBUG: Checking None for explicit credentials as part of auth process...
2021-02-22 16:48:19,966 | DEBUG: Checking Cloud SDK credentials as part of auth process...
2021-02-22 16:48:19,966 | DEBUG: Cloud SDK credentials not found on disk; not using them
2021-02-22 16:48:19,968 | DEBUG: Checking for App Engine runtime as part of auth process...
2021-02-22 16:48:19,969 | DEBUG: No App Engine library was found so cannot authentication via App Engine Identity Credentials.
2021-02-22 16:48:19,970 | DEBUG: Making request: GET http://169.254.169.254
2021-02-22 16:48:19,974 | DEBUG: Making request: GET http://metadata.google.internal/computeMetadata/v1/project/project-id
2021-02-22 16:48:19,978 | DEBUG: Checking None for explicit credentials as part of auth process...
2021-02-22 16:48:1