In [4]:
import json
import kfp
import kfp.dsl as dsl
import kfp.components as comp
from kfp.components import OutputPath

# Pipeline Config

In [36]:
# replace this value with the value of the KFP host name
KFP_HOST_NAME = 'https://453df4385c9a2ddb-dot-europe-west1.pipelines.googleusercontent.com/'
# change caching policy of KFP run (useful in debug/dev mode)
disable_cache = False
# year from which to use the financial data
year_from = 1990
# number of latest days to holdout for validation
holdout_days = 30
# number of random iterations for random search hyperparameter optimization
random_iterations = 3
# Number of trees in random forest
n_estimators = [150, 250, 300, 400]
# Number of features to consider at every split
max_features = ['sqrt']
# Maximum number of levels in tree
max_depth = [5, 10, 25]
# Minimum number of samples required to split a node
min_samples_split = [2, 3, 5, 10]
# Minimum number of samples required at each leaf node
min_samples_leaf = [1, 2, 4]
# Method of selecting samples for training each tree
bootstrap = [True]


#SVM
kernel = ['poly', 'rbf', 'sigmoid']
gamma = [0.0001, 0.001, 0.005, 0.1, 1, 3, 5]
C = [0.1, 1, 100, 1000]
epsilon = [0.0001, 0.0005, 0.001, 0.005, 0.01, 0.05, 0.1, 0.5, 1, 5, 10]


#NN
hidden_layer_sizes = [(1,),(50,),(100,),(1000,)]
activation = ["identity", "logistic", "tanh", "relu"]
solver = ["lbfgs", "sgd", "adam"]
learning_rate = ['constant', 'adaptive', 'invscaling']

In [37]:
client = kfp.Client(host=KFP_HOST_NAME)

## Data

## Preprocessing

## RandomForest

In [38]:
def Random_Forest_Trainer(path_to_file_pp: str, random_forest_path: str, holdout_days: int, random_iter: int, 
                         grid_params_forest: str) -> str:
    '''Train a random forest model using a gridsearch'''
    import json
    import pandas as pd
    from datetime import datetime, timedelta
    from google.cloud import storage # save the model to GCS
    from sklearn.ensemble import RandomForestRegressor
    from sklearn.metrics import mean_absolute_error, mean_squared_error
    from sklearn.model_selection import train_test_split
    from sklearn.model_selection import RandomizedSearchCV
    from sklearn.model_selection import GridSearchCV
    from urllib.parse import urlparse
    
    #read the dataset
    gold_df = pd.read_csv(path_to_file_pp)
    
    # read the correct parts of the data
    gold_train_df = gold_df[gold_df.index < (datetime.today() - timedelta(days=holdout_days))]
    
    X, y = gold_train_df.drop('price', axis=1), gold_train_df['price']
    X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.1, random_state=0)
    
    #create the model
    clf = RandomForestRegressor()
    
    #Create random grid
    random_grid = json.loads(grid_params_forest)
    
    clf_grid = RandomizedSearchCV(estimator = clf, param_distributions = random_grid, n_iter = random_iterations, cv = 3, verbose=2, random_state=0, n_jobs = -1)
    
    clf_grid.fit(X_train, y_train)
    
    #inital predict
    predictions = clf_grid.fit(X_test)
    mae_score = mean_absolute_error(y_test, predictions)
    mse_score = mean_squared_error(y_test, predictions)
    print('MAE RandomForest: ' + str(mae_score))
    print('MSE RandomForest: ' + str(mse_score))
    
    temp_model_path = '/tmp/model_RF.pickle'
    
    # write out output
    # save the model into temp
    with open(temp_model_path, 'wb') as f:
        cPickle.dump(clf_grid.best_estimator_, f, -1)
        
    # get client and write to GCS
    # parse model write path for GS
    parse = urlparse(url=random_forest_path, allow_fragments=False)
    
    if parse.path[0] =='/':
        model_path = parse.path[1:]
    client = storage.Client()
    bucket = client.get_bucket(parse.netloc)
    model = bucket.blob(model_path)
    model.upload_from_filename(temp_model_path)
    
    return random_forest_path    

In [39]:
# create a KFP component
Random_Forest_Trainer_op = comp.create_component_from_func(
    Random_Forest_Trainer, output_component_file='Random_Forest_Trainer.yaml', packages_to_install=['scikit-learn', 'fastparquet', 'fsspec', 'gcsfs', 'google-cloud-storage'])

## SVM

In [40]:
def SVM_Trainer(path_to_file_pp: str, SVM_path: str, holdout_days: int, random_iter: int, 
                         grid_params_SVM: str) -> str:
    '''Train a random forest model using a gridsearch'''
    import json
    import pandas as pd
    from datetime import datetime, timedelta
    from google.cloud import storage # save the model to GCS
    from sklearn.svm import SVR
    from sklearn.metrics import mean_absolute_error, mean_squared_error
    from sklearn.model_selection import train_test_split
    from sklearn.model_selection import RandomizedSearchCV
    from sklearn.model_selection import GridSearchCV
    from urllib.parse import urlparse
    
    #read the dataset
    gold_df = pd.read_csv(path_to_file_pp)
    
    # read the correct parts of the data
    gold_train_df = gold_df[gold_df.index < (datetime.today() - timedelta(days=holdout_days))]
    
    X, y = gold_train_df.drop('price', axis=1), gold_train_df['price']
    X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.1, random_state=0)
    
    #create the model
    clf = SVR()
    
    #Create random grid
    random_grid = json.loads(grid_params_SVM)
    
    clf_grid = RandomizedSearchCV(estimator = clf, param_distributions = random_grid, n_iter = random_iterations, cv = 3, verbose=2, random_state=0, n_jobs = -1)
    
    clf_grid.fit(X_train, y_train)
    
    #inital predict
    predictions = clf_grid.fit(X_test)
    mae_score = mean_absolute_error(y_test, predictions)
    mse_score = mean_squared_error(y_test, predictions)
    print('MAE RandomForest: ' + str(mae_score))
    print('MSE RandomForest: ' + str(mse_score))
    
    temp_model_path = '/tmp/model_SVM.pickle'
    
    # write out output
    # save the model into temp
    with open(temp_model_path, 'wb') as f:
        cPickle.dump(clf_grid.best_estimator_, f, -1)
        
    # get client and write to GCS
    # parse model write path for GS
    parse = urlparse(url=SVM_path, allow_fragments=False)
    
    if parse.path[0] =='/':
        model_path = parse.path[1:]
    client = storage.Client()
    bucket = client.get_bucket(parse.netloc)
    model = bucket.blob(model_path)
    model.upload_from_filename(temp_model_path)
    
    return SVM_path   

In [41]:
# create a KFP component
SVM_Trainer_op = comp.create_component_from_func(
    SVM_Trainer, output_component_file='SVM_Trainer.yaml', packages_to_install=['scikit-learn', 'fastparquet', 'fsspec', 'gcsfs', 'google-cloud-storage'])

## MR

In [42]:
def MR_Trainer(path_to_file_pp: str, MR_path: str, holdout_days: int) -> str:
    '''Train a random forest model using a gridsearch'''
    import json
    import pandas as pd
    from datetime import datetime, timedelta
    from google.cloud import storage # save the model to GCS
    from sklearn.linear_model import LinearRegression
    from sklearn.metrics import mean_absolute_error, mean_squared_error
    from sklearn.model_selection import train_test_split
    from sklearn.model_selection import RandomizedSearchCV
    from sklearn.model_selection import GridSearchCV
    from urllib.parse import urlparse
    
    #read the dataset
    gold_df = pd.read_csv(path_to_file_pp)
    
    # read the correct parts of the data
    gold_train_df = gold_df[gold_df.index < (datetime.today() - timedelta(days=holdout_days))]
    
    X, y = gold_train_df.drop('price', axis=1), gold_train_df['price']
    X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.1, random_state=0)
    
    #create the model
    clf = LinearRegression()

    
    clf.fit(X_train, y_train)
    
    #inital predict
    predictions = clf.fit(X_test)
    mae_score = mean_absolute_error(y_test, predictions)
    mse_score = mean_squared_error(y_test, predictions)
    print('MAE RandomForest: ' + str(mae_score))
    print('MSE RandomForest: ' + str(mse_score))
    
    temp_model_path = '/tmp/model_MR.pickle'
    
    # write out output
    # save the model into temp
    with open(temp_model_path, 'wb') as f:
        cPickle.dump(clf_grid.best_estimator_, f, -1)
        
    # get client and write to GCS
    # parse model write path for GS
    parse = urlparse(url=MR_path, allow_fragments=False)
    
    if parse.path[0] =='/':
        model_path = parse.path[1:]
    client = storage.Client()
    bucket = client.get_bucket(parse.netloc)
    model = bucket.blob(model_path)
    model.upload_from_filename(temp_model_path)
    
    return MR_path

In [43]:
# create a KFP component
MR_Trainer_op = comp.create_component_from_func(
    MR_Trainer, output_component_file='MR_Trainer.yaml', packages_to_install=['scikit-learn', 'fastparquet', 'fsspec', 'gcsfs', 'google-cloud-storage'])

## NN

In [44]:
def NN_Trainer(path_to_file_pp: str, NN_path: str, holdout_days: int, random_iter: int, 
                         grid_params_NN: str) -> str:
    '''Train a random forest model using a gridsearch'''
    import json
    import pandas as pd
    from datetime import datetime, timedelta
    from google.cloud import storage # save the model to GCS
    from sklearn.neural_network import MLPRegressor
    from sklearn.metrics import mean_absolute_error, mean_squared_error
    from sklearn.model_selection import train_test_split
    from sklearn.model_selection import RandomizedSearchCV
    from sklearn.model_selection import GridSearchCV
    from urllib.parse import urlparse
    
    #read the dataset
    gold_df = pd.read_csv(path_to_file_pp)
    
    # read the correct parts of the data
    gold_train_df = gold_df[gold_df.index < (datetime.today() - timedelta(days=holdout_days))]
    
    X, y = gold_train_df.drop('price', axis=1), gold_train_df['price']
    X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.1, random_state=0)
    
    #create the model
    clf = MLPRegressor()
    
    #Create random grid
    random_grid = json.loads(grid_params_NN)
    
    clf_grid = RandomizedSearchCV(estimator = clf, param_distributions = random_grid, n_iter = random_iterations, cv = 3, verbose=2, random_state=0, n_jobs = -1)
    
    clf_grid.fit(X_train, y_train)
    
    #inital predict
    predictions = clf_grid.fit(X_test)
    mae_score = mean_absolute_error(y_test, predictions)
    mse_score = mean_squared_error(y_test, predictions)
    print('MAE RandomForest: ' + str(mae_score))
    print('MSE RandomForest: ' + str(mse_score))
    
    temp_model_path = '/tmp/model_NN.pickle'
    
    # write out output
    # save the model into temp
    with open(temp_model_path, 'wb') as f:
        cPickle.dump(clf_grid.best_estimator_, f, -1)
        
    # get client and write to GCS
    # parse model write path for GS
    parse = urlparse(url=NN_path, allow_fragments=False)
    
    if parse.path[0] =='/':
        model_path = parse.path[1:]
    client = storage.Client()
    bucket = client.get_bucket(parse.netloc)
    model = bucket.blob(model_path)
    model.upload_from_filename(temp_model_path)
    
    return MR_path

In [45]:
# create a KFP component
NN_Trainer_op = comp.create_component_from_func(
    NN_Trainer, output_component_file='NN_Trainer.yaml', packages_to_install=['scikit-learn', 'fastparquet', 'fsspec', 'gcsfs', 'google-cloud-storage'])

In [48]:
def eval_models(path_to_file_pp: str, random_forest_path, SVM_path, MR_path, NN_path: str, holdout_days: int) -> None:
    '''Evaluate different models on holdout dataset to see which model performs the best'''
    import json
    import pandas as pd
    from io import BytesIO
    from datetime import datetime, timedelta
    import _pickle as cPickle # save ML model
    from google.cloud import storage # save the model to GCS
    from sklearn.ensemble import RandomForestRegressor
    from sklearn.metrics import mean_absolute_error, mean_squared_error
    from sklearn.model_selection import train_test_split
    from sklearn.model_selection import RandomizedSearchCV
    from sklearn.model_selection import GridSearchCV
    from urllib.parse import urlparse
    from collections import namedtuple
    
    # read dataframe
    gold_df = pd.read_csv(path_to_file_pp)
    
    
    # this will be our training set
    gold_holdout_df = gold_df[gold_df.index > (datetime.today() - timedelta(days=holdout_days))]
    
    # get x and y
    x_val, y_val = gold_holdout_df.drop('price', axis=1), gold_holdout_df['price']
    
    def get_mae(model_path):
        '''this function evaluates a model on our holdout dataset given just the model path'''
        parse = urlparse(url=model_path, allow_fragments=False)

        if parse.path[0] =='/':
            model_path = parse.path[1:]

        client = storage.Client()
        bucket = client.get_bucket(parse.netloc)
        blob = bucket.get_blob(model_path)
        if blob is None:
            raise AttributeError('No files to download') 
        model_bytestream = BytesIO(blob.download_as_string())
        model = cPickle.load(model_bytestream)
        predictions = model.predict(x_val)
        return mean_absolute_error(y_val, predictions)
    
    Models = namedtuple('Model', 'type score path')
    m_list = list()
    
    RF_mae = get_mae(random_forest_path)
    m_list.append(Models('RF', RF_mae, random_forest_path))
    
    SVM_mae = get_mae(SVM_path)
    m_list.append(Models('SVM', SVM_mae, SVM_path))
    
    MR_mae = get_mae(MR_path)
    m_list.append(Models('MR', MR_mae, MR_path))
    
    NN_mae = get_mae(NN_path)
    m_list.append(Models('NN', NN_mae, NN_path))
    
    max_score = max([model.score for model in m_list])
    max_score_index = [model.score for model in m_list].index(max_score)
    print('Best Model: ', m_list[max_score_index])
    path = m_list[max_score_index].path
    
    # Add saved predictions
    
    return path

In [51]:
# create a KFP component
eval_models_op = comp.create_component_from_func(
    eval_models, output_component_file='eval_models.yaml', packages_to_install=['scikit-learn', 'fastparquet', 'fsspec', 'gcsfs', 'google-cloud-storage'])

## KubeFlow

In [52]:
@dsl.pipeline(
  name='Gold Prediciton',
  description='Predicting closing value of Gold value with multiple models'
)
def sp500_pipeline(path_to_file, path_to_file_pp, random_forest_path, SVM_path, MR_path, NN_path, 
                   holdout_days, random_iterations, grid_params_forest, grid_params_SVM, grid_params_MR, grid_params_NN, disable_cache):



    download_raw_data_task = download_raw_data_op(raw_data_path)
    feature_processing_task = feature_processing_op(download_raw_data_task.output, feature_data_path, year_from)
   
    Random_Forest_Trainer_task = Random_Forest_Trainer_op(feature_processing_task.output, random_forest_path, holdout_days, random_iterations, grid_params_forest)
    SVM_Trainer_task = SVM_Trainer_op(feature_processing_task.output, SVM_path, holdout_days, random_iterations, grid_params_SVM)
    MR_Trainer_task = SVM_Trainer_op(feature_processing_task.output, MR_path, holdout_days, random_iterations, grid_params_MR)
    NN_Trainer_task = NN_Trainer_op(feature_processing_task.output, NN_path, holdout_days, random_iterations, grid_params_NN)
    eval_models_task = eval_models_op(feature_processing_task.output, Random_Forest_Trainer_task.output, SVM_Trainer_task.output,
                                      MR_Trainer_task.output, NN_Trainer_task.output, holdout_days)

    if disable_cache:
        download_raw_data_task.execution_options.caching_strategy.max_cache_staleness = "P0D"
        feature_processing_task.execution_options.caching_strategy.max_cache_staleness = "P0D"
        Random_Forest_Trainer_task.execution_options.caching_strategy.max_cache_staleness = "P0D"
        SVM_Trainer_task.execution_options.caching_strategy.max_cache_staleness = "P0D"
        MR_Trainer_task.execution_options.caching_strategy.max_cache_staleness = "P0D"
        NN_Trainer_task.execution_options.caching_strategy.max_cache_staleness = "P0D"
        eval_models_task.execution_options.caching_strategy.max_cache_staleness = "P0D"

# Create the json string of  grid search random forest
grid_params_forest = json.dumps({'n_estimators': n_estimators,
                           'max_features': max_features,
                           'max_depth': max_depth,
                           'min_samples_split': min_samples_split,
                           'min_samples_leaf': min_samples_leaf,
                           'bootstrap': bootstrap})
    
    
# Create the json string of  grid search SVM
grid_params_SVM = json.dumps({'kernel': kernel,
                           'C': C,
                           'gamma': gamma,
                           'epsilon': epsilon})

    
# Create the json string of  grid search NN
grid_params_NN = json.dumps({'hidden_layer_sizes': hidden_layer_sizes,
                           'activation': activation,
                           'solver': solver,
                           'learning_rate': learning_rate})

    
    
    
# Specify argument values for pipeline run.
arguments = {'raw_data_path': 'gs://delabs2020/raw/sp500.parquet',
            'feature_data_path': 'gs://delabs2020/feature_store/sp500_features.parquet',
            'random_forest_path': 'gs://delabs2020/model_store/forest/forest_clf.pickle',
            'SVM_path': 'gs://delabs2020/model_store/tuned/SVM_clf.pickle',
            'MR_path': 'gs://delabs2020/model_store/tuned/MR_clf.pickle',
            'NN_path': 'gs://delabs2020/model_store/tuned/NN_rf.pickle',
            'holdout_days': holdout_days,
            'random_iterations': random_iterations,
            'grid_params_forest': grid_params_forest,
            'grid_params_SVM': grid_params_SVM,
            'grid_params_NN': grid_params_NN,
            'disable_cache': disable_cache,
            }
    
# Create a pipeline run, using the client you initialized in a prior step.
client.create_run_from_pipeline_func(sp500_pipeline, arguments=arguments)

NameError: name 'download_raw_data_op' is not defined