In [15]:
import json
import kfp
import kfp.dsl as dsl
import kfp.components as comp
from kfp.components import OutputPath

#### Pipeline Configurations

In [16]:
# replace this value with the value of the KFP host name
KFP_HOST_NAME = 'https://XYZ.pipelines.googleusercontent.com/'
# change caching policy of KFP run (useful in debug/dev mode)
disable_cache = False
# year from which to use the financial data
year_from = 1990
# number of latest days to holdout for validation
holdout_days = 30
# number of random iterations for random search hyperparameter optimization
random_iterations = 3
# Number of trees in random forest
n_estimators = [150, 250, 300, 400]
# Number of features to consider at every split
max_features = ['sqrt']
# Maximum number of levels in tree
max_depth = [5, 10, 25]
# Minimum number of samples required to split a node
min_samples_split = [2, 3, 5, 10]
# Minimum number of samples required at each leaf node
min_samples_leaf = [1, 2, 4]
# Method of selecting samples for training each tree
bootstrap = [True]

In [17]:
client = kfp.Client(host=KFP_HOST_NAME)

#### Component - Download Raw Data

In [18]:
def download_raw_data(raw_data_path: str) -> str:
    '''download data from Yahoo Finance'''
    import yfinance as yf
    sp500_df = yf.download('^GSPC', progress=False)
    print('Downloaded data...')
    print(sp500_df.head())
    print('trying to write to GS')
    sp500_df.to_parquet(raw_data_path, compression='GZIP')
    print('Done!')
    return raw_data_path

In [19]:
# create a KFP component
download_raw_data_op = comp.create_component_from_func(
    download_raw_data, output_component_file='download_raw_data.yaml', packages_to_install=['yfinance', 'fastparquet', 'fsspec', 'gcsfs'])

#### Component - Feature Processing

In [20]:
def feature_processing(raw_data_path: str, feature_data_path: str, year_from: int) -> str:
    '''calculate features for our machine learning model'''
    import pandas as pd
    from datetime import datetime

    # read dataframe
    sp500_df = pd.read_parquet(raw_data_path)
    
    # create empty df to store feature
    sp500_feautres_df = pd.DataFrame()
    
    average_days_window_closing_price = [5, 30, 120, 365]
    # average price for window of different days
    for window in average_days_window_closing_price:
        sp500_feautres_df['Close__rolling_mean__'+str(window)+'_days'] = sp500_df['Close'].rolling(window).mean().shift(periods=1)
        sp500_feautres_df['Close__rolling_std__'+str(window)+'_days'] = sp500_df['Close'].rolling(window).std().shift(periods=1)
        sp500_feautres_df['Close__rolling_max__'+str(window)+'_days'] = sp500_df['Close'].rolling(window).max().shift(periods=1)
        sp500_feautres_df['Close__rolling_min__'+str(window)+'_days'] = sp500_df['Close'].rolling(window).min().shift(periods=1)
        sp500_feautres_df['Close__rolling_range__'+str(window)+'_days'] = sp500_feautres_df['Close__rolling_max__'+str(window)+'_days'] - sp500_feautres_df['Close__rolling_min__'+str(window)+'_days']
    
    average_days_window_volume = [5, 10, 15]
    # average price for window of different days
    for window in average_days_window_volume:
        sp500_feautres_df['Volume__rolling_max__'+str(window)+'_days'] = sp500_df['Close'].rolling(window).max().shift(periods=1)
        sp500_feautres_df['Volume__rolling_sum__'+str(window)+'_days'] = sp500_df['Close'].rolling(window).sum().shift(periods=1)
        
    # get day of the week
    sp500_df['day_of_week'] = sp500_df.index.dayofweek
    # get quarter
    sp500_df['quarter'] = sp500_df.index.quarter
    
    sp500_feautres_df = pd.concat([sp500_feautres_df, pd.get_dummies(sp500_df['day_of_week'], prefix='day_of_week')], 1)
    sp500_feautres_df = pd.concat([sp500_feautres_df, pd.get_dummies(sp500_df['day_of_week'], prefix='quarter')], 1)
    
    # let's not confuse our model from data from way back
    sp500_feautres_df = sp500_feautres_df[sp500_feautres_df.index > datetime(year=1990, month=12, day=31)]
    # get label for feature dataset
    sp500_timeboxed_feautres_df = pd.merge(sp500_df['Close'], sp500_feautres_df, left_index=True, right_index=True)
    # write out to parquet
    sp500_timeboxed_feautres_df.to_parquet(feature_data_path, compression='GZIP')
    features_numbers = len(sp500_timeboxed_feautres_df.columns) - 1
    total_days = len(sp500_timeboxed_feautres_df)
    print('Writing %s features for %s days' % (features_numbers, total_days))
    print('Done!')
    
    return feature_data_path

In [21]:
# create a KFP component
feature_processing_op = comp.create_component_from_func(
    feature_processing, output_component_file='feature_processing.yaml', packages_to_install=['fastparquet', 'fsspec', 'gcsfs'])

#### Component: Train Vanilla Random Forest Model

In [22]:
def train_vanilla_rf(feature_data_path: str, vanilla_model_path: str, holdout_days: int) -> str:
    '''train a random forest model with default parameters'''
    import pandas as pd
    from datetime import datetime, timedelta
    import _pickle as cPickle # save ML model
    from google.cloud import storage # save the model to GCS
    from sklearn.ensemble import RandomForestRegressor
    from sklearn.metrics import mean_absolute_error, mean_squared_error
    from sklearn.model_selection import train_test_split
    from sklearn.model_selection import RandomizedSearchCV
    from sklearn.model_selection import GridSearchCV
    from urllib.parse import urlparse
    
    
    # read dataframe
    sp500_timeboxed_feautres_df = pd.read_parquet(feature_data_path)
    
    # this will be our training set
    sp500_train_df = sp500_timeboxed_feautres_df[sp500_timeboxed_feautres_df.index < (datetime.today() - timedelta(days=holdout_days))]
    
    # get x and y
    x_train, y_train = sp500_train_df.drop('Close', axis=1), sp500_train_df['Close']
    # split the data for initial testing
    X_train, X_test, Y_train, Y_test = train_test_split(x_train, y_train, test_size=0.2,random_state=786)
    
    # train the model
    print('Training vanilla Random Forest models')
    print('Shape of X: %s, %s' % (len(x_train), len(x_train.columns)))
    vanilla_rf = RandomForestRegressor()
    vanilla_rf.fit(X_train, Y_train)
    
    # some initial testing
    predictions_vanilla_rf = vanilla_rf.predict(X_test)
    print('mean absolute error without optimization: %s' % mean_absolute_error(Y_test, predictions_vanilla_rf))
    print('mean squared error without optimization is: %s' % mean_squared_error(Y_test, predictions_vanilla_rf)) 
    
    # write out output
    # save the model into temp
    with open('/tmp/model.pickle', 'wb') as f:
        cPickle.dump(vanilla_rf, f, -1)
        
    # get client and write to GCS
    # parse model write path for GS
    parse = urlparse(url=vanilla_model_path, allow_fragments=False)
    if parse.path[0] =='/':
        model_path = parse.path[1:]
        
    client = storage.Client()
    bucket = client.get_bucket(parse.netloc)
    blob = bucket.blob(model_path)
    blob.upload_from_filename('/tmp/model.pickle')
    
    return vanilla_model_path

In [23]:
# create a KFP component
train_vanilla_rf_op = comp.create_component_from_func(
    train_vanilla_rf, output_component_file='train_vanilla_rf.yaml', packages_to_install=['scikit-learn', 'fastparquet', 'fsspec', 'gcsfs', 'google-cloud-storage'])

#### Componenet: Random Search for Random Forest

In [24]:
def hyp_tune_train_rf(feature_data_path: str, tuned_model_path: str,
                      holdout_days: int, random_iterations: int, random_params: str) -> str:
    '''random search with cross validation to find the best parameters for our random forest'''
    import json
    import pandas as pd
    from datetime import datetime, timedelta
    import _pickle as cPickle # save ML model
    from google.cloud import storage # save the model to GCS
    from sklearn.ensemble import RandomForestRegressor
    from sklearn.metrics import mean_absolute_error, mean_squared_error
    from sklearn.model_selection import train_test_split
    from sklearn.model_selection import RandomizedSearchCV
    from sklearn.model_selection import GridSearchCV
    from urllib.parse import urlparse
    
    
    # read dataframe
    sp500_timeboxed_feautres_df = pd.read_parquet(feature_data_path)
    
    # this will be our training set
    sp500_train_df = sp500_timeboxed_feautres_df[sp500_timeboxed_feautres_df.index < (datetime.today() - timedelta(days=holdout_days))]
    
    # get x and y
    x_train, y_train = sp500_train_df.drop('Close', axis=1), sp500_train_df['Close']
    # split the data for initial testing
    X_train, X_test, Y_train, Y_test = train_test_split(x_train, y_train, test_size=0.2,random_state=786)
    
    # create random grid
    random_grid = json.loads(random_params)
    
    # train the model
    rf = RandomForestRegressor()
    # Random search of parameters, using 3 fold cross validation and search across 100 different combinations
    rf_random = RandomizedSearchCV(estimator = rf, param_distributions = random_grid, n_iter = random_iterations, cv = 3, verbose=2, random_state=42, n_jobs = -1)
    # Fit the random search model
    rf_random.fit(x_train, y_train) # since we will use CV, we don't need to split data to train and test
    
    # some initial testing
    predictions_tuned_rf = rf_random.predict(X_test)
    mae_score = mean_absolute_error(Y_test, predictions_tuned_rf)
    mse_score = mean_squared_error(Y_test, predictions_tuned_rf)
    print('mean absolute error without optimization: %s' % mae_score)
    print('mean squared error without optimization is: %s' % mse_score) 
    
    temp_model_path = '/tmp/model.pickle'
    
    # write out output
    # save the model into temp
    with open(temp_model_path, 'wb') as f:
        cPickle.dump(rf_random.best_estimator_, f, -1)
        
    # get client and write to GCS
    # parse model write path for GS
    parse = urlparse(url=tuned_model_path, allow_fragments=False)
    
    if parse.path[0] =='/':
        model_path = parse.path[1:]
    client = storage.Client()
    bucket = client.get_bucket(parse.netloc)
    model = bucket.blob(model_path)
    model.upload_from_filename(temp_model_path)
    
    return tuned_model_path

In [25]:
# create a KFP component
hyp_tune_train_rf_op = comp.create_component_from_func(
    hyp_tune_train_rf, output_component_file='hyp_tune_train_rf.yaml', packages_to_install=['scikit-learn', 'fastparquet', 'fsspec', 'gcsfs', 'google-cloud-storage'])

#### Component: Evaluate The Models

In [26]:
def eval_models(feature_data_path: str, vanilla_model_path, tuned_model_path: str, holdout_days: int) -> None:
    '''Evaluate different models on holdout dataset to see which model performs the best'''
    import json
    import pandas as pd
    from io import BytesIO
    from datetime import datetime, timedelta
    import _pickle as cPickle # save ML model
    from google.cloud import storage # save the model to GCS
    from sklearn.ensemble import RandomForestRegressor
    from sklearn.metrics import mean_absolute_error, mean_squared_error
    from sklearn.model_selection import train_test_split
    from sklearn.model_selection import RandomizedSearchCV
    from sklearn.model_selection import GridSearchCV
    from urllib.parse import urlparse
    from collections import namedtuple
    
    # read dataframe
    sp500_timeboxed_feautres_df = pd.read_parquet(feature_data_path)
    
    # this will be our training set
    sp500_holdout_df = sp500_timeboxed_feautres_df[sp500_timeboxed_feautres_df.index > (datetime.today() - timedelta(days=holdout_days))]
    
    # get x and y
    x_val, y_val = sp500_holdout_df.drop('Close', axis=1), sp500_holdout_df['Close']
    
    def get_mae(model_path):
        '''this function evaluates a model on our holdout dataset given just the model path'''
        parse = urlparse(url=model_path, allow_fragments=False)

        if parse.path[0] =='/':
            model_path = parse.path[1:]

        client = storage.Client()
        bucket = client.get_bucket(parse.netloc)
        blob = bucket.get_blob(model_path)
        if blob is None:
            raise AttributeError('No files to download') 
        model_bytestream = BytesIO(blob.download_as_string())
        model = cPickle.load(model_bytestream)
        predictions = model.predict(x_val)
        return mean_absolute_error(y_val, predictions)
    
    Models = namedtuple('Model', 'type score path')
    m_list = list()
    
    vanilla_mae = get_mae(vanilla_model_path)
    m_list.append(Models('vanilla', vanilla_mae, vanilla_model_path))
    
    tuned_mae = get_mae(tuned_model_path)
    m_list.append(Models('tuned', tuned_mae, tuned_model_path))
    
    max_score = max([model.score for model in m_list])
    max_score_index = [model.score for model in m_list].index(max_score)
    print('Best Model: ', m_list[max_score_index])
    path = m_list[max_score_index].path
    return path

In [27]:
# create a KFP component
eval_models_op = comp.create_component_from_func(
    eval_models, output_component_file='eval_models.yaml', packages_to_install=['scikit-learn', 'fastparquet', 'fsspec', 'gcsfs', 'google-cloud-storage'])

#### Create & Run KubeFlow Pipeline With Python SDK

In [28]:
@dsl.pipeline(
  name='SP500 Random Forest',
  description='Predicting closing value of SP500 with Random Forest'
)
def sp500_pipeline(raw_data_path, feature_data_path, vanilla_model_path, tuned_model_path, 
                   year_from, holdout_days, random_iterations, random_params, disable_cache):

  download_raw_data_task = download_raw_data_op(raw_data_path)
  feature_processing_task = feature_processing_op(download_raw_data_task.output, feature_data_path, year_from)
  train_vanilla_rf_task = train_vanilla_rf_op(feature_processing_task.output, vanilla_model_path, holdout_days)
  hyp_tune_train_rf_task = hyp_tune_train_rf_op(feature_processing_task.output, tuned_model_path, holdout_days, random_iterations, random_params)
  eval_models_task = eval_models_op(feature_processing_task.output, train_vanilla_rf_task.output, hyp_tune_train_rf_task.output, holdout_days)

  if disable_cache:
      download_raw_data_task.execution_options.caching_strategy.max_cache_staleness = "P0D"
      feature_processing_task.execution_options.caching_strategy.max_cache_staleness = "P0D"
      train_vanilla_rf_task.execution_options.caching_strategy.max_cache_staleness = "P0D"
      hyp_tune_train_rf_task.execution_options.caching_strategy.max_cache_staleness = "P0D"
      eval_models_task.execution_options.caching_strategy.max_cache_staleness = "P0D"
    
# Create the json string of random grid
random_grid = json.dumps({'n_estimators': n_estimators,
                           'max_features': max_features,
                           'max_depth': max_depth,
                           'min_samples_split': min_samples_split,
                           'min_samples_leaf': min_samples_leaf,
                           'bootstrap': bootstrap})
    
# Specify argument values for pipeline run.
arguments = {'raw_data_path': 'gs://mlops-stock-prediction/raw/sp500.parquet',
            'feature_data_path': 'gs://mlops-stock-prediction/feature_store/sp500_features.parquet',
            'vanilla_model_path': 'gs://mlops-stock-prediction/model_store/vanilla/vanilla_rf.pickle',
            'tuned_model_path': 'gs://mlops-stock-prediction/model_store/tuned/tuned_rf.pickle',
            'year_from': year_from,
            'holdout_days': holdout_days,
            'random_iterations': random_iterations,
            'random_params': random_grid,
            'disable_cache': disable_cache,
            }
    
# Create a pipeline run, using the client you initialized in a prior step.
client.create_run_from_pipeline_func(sp500_pipeline, arguments=arguments)

RunPipelineResult(run_id=5d1c5816-857d-45fe-87b2-df7f4704b92f)