In [1]:
import json
import kfp
import kfp.dsl as dsl
import kfp.components as comp
from kfp.components import OutputPath

ModuleNotFoundError: No module named 'kfp'

#### Pipeline Configurations

In [2]:
# replace this value with the value of the KFP host name
KFP_HOST_NAME = 'https://XYZ.pipelines.googleusercontent.com/'
# change caching policy of KFP run (useful in debug/dev mode)
disable_cache = False

In [3]:
client = kfp.Client(host=KFP_HOST_NAME)

NameError: name 'kfp' is not defined

#### Component - Download Raw Data

In [18]:
def download_raw_data(raw_data_path: str) -> str:
    '''load data from local storage'''
    df =  pd.read_csv("raw_data2.csv", error_bad_lines=False)
    print('Downloaded data...')
    
    def dfreplace(df, *args, **kwargs):
        s = pd.Series(df.values.flatten())
        s = s.str.replace(*args, **kwargs)
    return pd.DataFrame(s.values.reshape(df.shape), df.index, df.columns)

    weather_df = dfreplace(df, ',', '')

    for i in weather_df.columns:
        weather_df[i] = weather_df[i].astype(str)
        weather_df[i][weather_df[i].apply(lambda i: True if re.search('^\s*$', str(i)) else False)]=np.NaN

    print(weather_df.head())
    print('trying to write to GS')
    weather_df.to_parquet(raw_data_path, compression='GZIP')
    print('Done!')
    return raw_data_path

In [19]:
# create a KFP component
download_raw_data_op = comp.create_component_from_func(
    download_raw_data, output_component_file='download_raw_data.yaml', packages_to_install=['fastparquet', 'fsspec', 'gcsfs'])

#### Component - Feature Processing

In [20]:
def feature_processing(raw_data_path: str, feature_data_path: str) -> str:
    '''calculate features for our machine learning model'''
    import pandas as pd
    from datetime import datetime

    # read dataframe
    weather_df = pd.read_parquet(raw_data_path)
    
    # create empty df to store feature
    weather_feautres_df = weather_df
    
    #create variables for years, months and days
    weather_feautres_df['YYYY'] = weather_feautres_df['YYYYMMDD,'].str.slice(0,4) #create a variable for years
    weather_feautres_df['MM'] = weather_feautres_df['YYYYMMDD,'].str.slice(4,6)#create a variable for months
    weather_feautres_df['DD'] = weather_feautres_df['YYYYMMDD,'].str.slice(6,8)
    for i in weather_feautres_df.columns:
            weather_feautres_df[i] = weather_feautres_df[i].astype(float, errors= 'ignore') 
    weather_feautres_df = weather_feautres_df.drop('YYYYMMDD,', axis=1)
    
    #remove irrelevant features manually
    weather_feautres_df = weather_feautres_df.drop(columns = ['STN,','EV24', 'NG,', 'TN,', 'TNH,', 'TX,', 'TXH,', 'T10N,', 'T10NH,'])
    weather_feautres_df = weather_feautres_df.dropna()

    weather_feautres_df.to_parquet(feature_data_path, compression='GZIP')
    features_numbers = len(weather_feautres_df.columns) - 1
    print('Writing %s features' (features_numbers))
    print('Done!')
    
    return feature_data_path

In [21]:
# create a KFP component
feature_processing_op = comp.create_component_from_func(
    feature_processing, output_component_file='feature_processing.yaml', packages_to_install=['fastparquet', 'fsspec', 'gcsfs'])

#### Component: Train Vanilla Random Forest Model

In [22]:
def train_vanilla_rf(feature_data_path: str, vanilla_model_path: str) -> str:
    '''train a random forest model with default parameters'''
    import pandas as pd
    from datetime import datetime, timedelta
    import _pickle as cPickle # save ML model
    from google.cloud import storage # save the model to GCS
    from sklearn.ensemble import RandomForestRegressor
    from sklearn.metrics import mean_absolute_error, mean_squared_error
    from sklearn.linear_model import LinearRegression
    from sklearn.model_selection import train_test_split
    from sklearn.model_selection import RandomizedSearchCV
    from sklearn.model_selection import GridSearchCV
    from urllib.parse import urlparse
    
    
    # read dataframe
    weather_feautres_df = pd.read_parquet(feature_data_path)
    
    # get x and y
    x_train, y_train = weather_feautres_df.drop('TG,', axis=1), weather_feautres_df['TG,']
    # split the data for initial testing
    X_train, X_test, Y_train, Y_test = train_test_split(x_train, y_train, test_size=0.2,random_state=1)
    
    # train the model
    print('Training regression model')
    print('Shape of X: %s, %s' % (len(x_train), len(x_train.columns)))
    regression_model = LinearRegression()
    model.fit(X_train, Y_train)
    
    # some initial testing
    predictions = model.predict(X_test)
    print('mean absolute error without optimization: %s' % mean_absolute_error(Y_test, predictions))
    print('mean squared error without optimization is: %s' % mean_squared_error(Y_test, predictions)) 
    
    # write out output
    # save the model into temp
    with open('/tmp/model.pickle', 'wb') as f:
        cPickle.dump(regression_model, f, -1)
        
    # get client and write to GCS
    # parse model write path for GS
    parse = urlparse(url=vanilla_model_path, allow_fragments=False)
    if parse.path[0] =='/':
        model_path = parse.path[1:]
        
    client = storage.Client()
    bucket = client.get_bucket(parse.netloc)
    blob = bucket.blob(model_path)
    blob.upload_from_filename('/tmp/model.pickle')
    
    return vanilla_model_path

In [23]:
# create a KFP component
train_vanilla_rf_op = comp.create_component_from_func(
    train_vanilla_rf, output_component_file='train_vanilla_rf.yaml', packages_to_install=['scikit-learn', 'fastparquet', 'fsspec', 'gcsfs', 'google-cloud-storage'])

#### Componenet: Random Search for Random Forest

In [24]:
def hyp_tune_train_rf(feature_data_path: str, tuned_model_path: str) -> str:
    '''random search with cross validation to find the best parameters for our random forest'''
    import json
    import pandas as pd
    from datetime import datetime, timedelta
    import _pickle as cPickle # save ML model
    from google.cloud import storage # save the model to GCS
    from sklearn.ensemble import RandomForestRegressor
    from sklearn.metrics import mean_absolute_error, mean_squared_error
    from sklearn.model_selection import train_test_split
    from sklearn.model_selection import RandomizedSearchCV
    from sklearn.model_selection import GridSearchCV
    from urllib.parse import urlparse
    
    
    # read dataframe
    weather_feautres_df = pd.read_parquet(feature_data_path)
    
    # get x and y
    x_train, y_train = weather_feautres_df.drop('TG,', axis=1), weather_feautres_df['TG,']
    # split the data for initial testing
    X_train, X_test, Y_train, Y_test = train_test_split(x_train, y_train, test_size=0.2,random_state=1)
    
    # train the model
    print('Training regression model')
    print('Shape of X: %s, %s' % (len(x_train), len(x_train.columns)))
    regression_model = LinearRegression()
    model.fit(X_train, Y_train)
    
    # some initial testing
    predictions = model.predict(X_test)
    print('mean absolute error without optimization: %s' % mean_absolute_error(Y_test, predictions))
    print('mean squared error without optimization is: %s' % mean_squared_error(Y_test, predictions)) 
    
    temp_model_path = '/tmp/model.pickle'
    
    # write out output
    # save the model into temp
    with open(temp_model_path, 'wb') as f:
        cPickle.dump(model, f, -1)
        
    # get client and write to GCS
    # parse model write path for GS
    parse = urlparse(url=tuned_model_path, allow_fragments=False)
    
    if parse.path[0] =='/':
        model_path = parse.path[1:]
    client = storage.Client()
    bucket = client.get_bucket(parse.netloc)
    model = bucket.blob(model_path)
    model.upload_from_filename(temp_model_path)
    
    return tuned_model_path

In [25]:
# create a KFP component
hyp_tune_train_rf_op = comp.create_component_from_func(
    hyp_tune_train_rf, output_component_file='hyp_tune_train_rf.yaml', packages_to_install=['scikit-learn', 'fastparquet', 'fsspec', 'gcsfs', 'google-cloud-storage'])

#### Component: Evaluate The Models

In [26]:
def eval_models(feature_data_path: str, vanilla_model_path, tuned_model_path: str) -> None:
    '''Evaluate different models on holdout dataset to see which model performs the best'''
    import json
    import pandas as pd
    from io import BytesIO
    from datetime import datetime, timedelta
    import _pickle as cPickle # save ML model
    from google.cloud import storage # save the model to GCS
    from sklearn.ensemble import RandomForestRegressor
    from sklearn.metrics import mean_absolute_error, mean_squared_error
    from sklearn.model_selection import train_test_split
    from sklearn.model_selection import RandomizedSearchCV
    from sklearn.model_selection import GridSearchCV
    from urllib.parse import urlparse
    from collections import namedtuple
    
    # read dataframe
    weather_feautres_df = pd.read_parquet(feature_data_path)
    
    x_train, y_train = weather_feautres_df.drop('TG,', axis=1), weather_feautres_df['TG,']
    # this will be our training set
    X_train, x_val, Y_train, y_val = train_test_split(x_train, y_train, test_size=0.2,random_state=1)
    ###########this still needs to be changed into something working, what we are now doing is not right, but we can test it now
    
    
    # get x and y
    x_val, y_val = weather_holdout_df.drop('Close', axis=1), weather_holdout_df['Close']
    
    def get_mae(model_path):
        '''this function evaluates a model on our holdout dataset given just the model path'''
        parse = urlparse(url=model_path, allow_fragments=False)

        if parse.path[0] =='/':
            model_path = parse.path[1:]

        client = storage.Client()
        bucket = client.get_bucket(parse.netloc)
        blob = bucket.get_blob(model_path)
        if blob is None:
            raise AttributeError('No files to download') 
        model_bytestream = BytesIO(blob.download_as_string())
        model = cPickle.load(model_bytestream)
        predictions = model.predict(x_val)
        return mean_absolute_error(y_val, predictions)
    
    Models = namedtuple('Model', 'type score path')
    m_list = list()
    
    vanilla_mae = get_mae(vanilla_model_path)
    m_list.append(Models('vanilla', vanilla_mae, vanilla_model_path))
    
    tuned_mae = get_mae(tuned_model_path)
    m_list.append(Models('tuned', tuned_mae, tuned_model_path))
    
    max_score = max([model.score for model in m_list])
    max_score_index = [model.score for model in m_list].index(max_score)
    print('Best Model: ', m_list[max_score_index])
    path = m_list[max_score_index].path
    return path

In [27]:
# create a KFP component
eval_models_op = comp.create_component_from_func(
    eval_models, output_component_file='eval_models.yaml', packages_to_install=['scikit-learn', 'fastparquet', 'fsspec', 'gcsfs', 'google-cloud-storage'])

#### Create & Run KubeFlow Pipeline With Python SDK

In [28]:
@dsl.pipeline(
  name='Weather regression',
  description='Predicting temperature in Eindoven using linear regression'
)
def weather_pipeline(raw_data_path, feature_data_path, vanilla_model_path, tuned_model_path, disable_cache):

  download_raw_data_task = download_raw_data_op(raw_data_path)
  feature_processing_task = feature_processing_op(download_raw_data_task.output, feature_data_path)
  train_vanilla_rf_task = train_vanilla_rf_op(feature_processing_task.output, vanilla_model_path)
  hyp_tune_train_rf_task = hyp_tune_train_rf_op(feature_processing_task.output, tuned_model_path)
  eval_models_task = eval_models_op(feature_processing_task.output, train_vanilla_rf_task.output, hyp_tune_train_rf_task.output)

  if disable_cache:
      download_raw_data_task.execution_options.caching_strategy.max_cache_staleness = "P0D"
      feature_processing_task.execution_options.caching_strategy.max_cache_staleness = "P0D"
      train_vanilla_rf_task.execution_options.caching_strategy.max_cache_staleness = "P0D"
      hyp_tune_train_rf_task.execution_options.caching_strategy.max_cache_staleness = "P0D"
      eval_models_task.execution_options.caching_strategy.max_cache_staleness = "P0D"
    
# Specify argument values for pipeline run.
arguments = {'raw_data_path': 'gs://mlops-stock-prediction/raw/sp500.parquet',
            'feature_data_path': 'gs://mlops-stock-prediction/feature_store/sp500_features.parquet',
            'vanilla_model_path': 'gs://mlops-stock-prediction/model_store/vanilla/vanilla_rf.pickle',
            'tuned_model_path': 'gs://mlops-stock-prediction/model_store/tuned/tuned_rf.pickle',
            'disable_cache': disable_cache,
            }
    
# Create a pipeline run, using the client you initialized in a prior step.
client.create_run_from_pipeline_func(sp500_pipeline, arguments=arguments)

RunPipelineResult(run_id=5d1c5816-857d-45fe-87b2-df7f4704b92f)