In [1]:
import json
import kfp
import kfp.dsl as dsl
import kfp.components as comp

In [2]:
# replace this value with the value of the KFP host name
KFP_HOST_NAME = 'https://31a528101f3f8d80-dot-us-central2.pipelines.googleusercontent.com/'
# number of latest days to holdout for validation
holdout_engine = 62
# number of random iterations for random search hyperparameter optimization
random_iterations = 3
# Number of trees in random forest
n_estimators = [150, 250, 300, 400]
# Number of features to consider at every split
max_features = ['auto', 'sqrt']
# Maximum number of levels in tree
max_depth = [5, 10, 50, 100, None]
# Minimum number of samples required to split a node
min_samples_split = [2, 3, 5, 10]
# Minimum number of samples required at each leaf node
min_samples_leaf = [1, 2, 4]

In [3]:
client = kfp.Client(host=KFP_HOST_NAME)

In [4]:
def get_clean_data(raw_data_path: str, cleaned_data_path: str) -> str:
    '''Hier moet de data import komen'''
    import pandas as pd
    
    #raw data import
    data = pd.read_csv(raw_data_path, sep=" ", header=None)
    
    data.columns = ['engine_id', 'cycle', 'setting1', 'setting2', 'setting3', 's1', 's2', 's3', 's4', 's5', 's6', 's7', 's8', 's9', 's10', 's11', 's12', 's13', 's14', 's15', 's16', 's17', 's18', 's19', 's20', 's21', 'NA', 'NA']
    del data['NA']

    # Creating and adding the RUL to the dataframe
    RUL_list = []
    for engine in set(data['engine_id']):
        max_cycle = data.loc[data['engine_id'] == engine].cycle.max()
    
        for cycle in list(data.loc[data['engine_id'] == engine].cycle):
            RUL_list.append(max_cycle - cycle +1)    
        

    data.insert(2, 'RUL', RUL_list)
    data.to_parquet(cleaned_data_path, compression='GZIP')
    
    return cleaned_data_path

In [5]:
get_clean_data_op = comp.create_component_from_func(
    get_clean_data, output_component_file = 'get_clean_data.yaml', packages_to_install=['fastparquet', 'fsspec', 'gcfs'])

In [6]:
def feature_processing(cleaned_data_path: str, feature_data_path: str) -> str:
    import pandas as pd
    from sklearn.cluster import KMeans
    
    data = pd.read_parquet(cleaned_data_path)
    
    engine = data.iloc[:,0].to_list()
    cycle = data.iloc[:,1].to_list()

    # Clustering the data
    X_cluster = data[['setting1', 'setting2', 'setting3']]

    # creates the clusters
    kmeans = KMeans(n_clusters=3).fit(X_cluster)
    data['settings_clusters'] = kmeans.predict(X_cluster)

    features = data.columns[3:-1]
    for feature in features:
        # Creating min, max and delta variables
        data['max_' + feature] = data.groupby('engine_id')[feature].cummax()
        data['min_' + feature] = data.groupby('engine_id')[feature].cummin()

        data['delta_' + feature] = data.groupby('engine_id')[feature].diff()
        data['delta_' + feature].fillna(0, inplace=True)

    
    data.to_parquet(feature_data_path, compression='GZIP')
    
    print('Created an saved features.')
    
    return feature_data_path

In [7]:
feature_processing_op = comp.create_component_from_func(
    feature_processing, output_component_file = 'feature_processing.yaml', packages_to_install=['fastparquet', 'fsspec', 'gcfs'])

In [8]:
def train_vanilla_gbr(feature_data_path: str, vanilla_model_store_path: str, holdout_engine: int) -> None:
    import pandas as pd
    import _pickle as cPickle
    from google.cloud import storage
    from urlib.parse import urlparse
    from sklearn.model_selection import train_test_split
    from sklearn import ensemble
    from sklearn import metrics
    
    data = pd.read_parquet(feature_data_path)
    
    RUL_df = data.loc[data.engine_id != holdout_engine].iloc[:,2:].copy()
    
    labels = RUL_df['RUL']
    features = RUL_df.iloc[:,1:]
    X_train, X_test, y_train, y_test = train_test_split(features, labels, test_size = 0.2, random_state = 42)
    
    gbr_non_opt = ensemble.GradientBoostingRegressor()
    gbr_non_opt.fit(X_train, y_train)
    
    pred_non_opt = gbr_non_opt.predict(X_test)
    print('MAE: %s' % metrics.mean_absolute_error(y_test, pred_non_opt))
    print('MSE: %s' % metrics.mean_squared_error(y_test, pred_non_opt))
    
    with open('/tmp/model.pickle', 'wb') as f:
        cPickle.dump(gbr_non_opt, f, -1)
        
    parse = urlparse(url=vanilla_model_store_path, allow_fragments = False)
    if parse.path[0] == '/':
        model_path = parse.path[1:]
    client = storage.Client()
    bucket = client.get_bucket(parse.netloc)
    blob = bucket.blob(model_path)
    blob.upload_from_filename('/tmp/model.pickle')
        
    return vanilla_model_store_path

In [9]:
train_vanilla_gbr_op = comp.create_component_from_func(
    train_vanilla_gbr, output_component_file = 'train_vanilla_gbr.yaml', packages_to_install=['fastparquet', 'fsspec', 'gcfs', 'scikit-learn', 'google-cloud-storage'])

In [10]:
def hyp_tune_train_gbr(feature_data_path: str, tuned_model_store_path: str,
                       holdout_engine: int, random_iterations: int, 
                       random_params: str) -> str:
    import json
    import pandas as pd
    import _pickle as cPickle
    from google.cloud import storage
    from urlib.parse import urlparse
    from sklearn.model_selection import train_test_split, RandomizedSearchCV
    from sklearn import ensemble
    from sklearn import metrics
    
    data = pd.read_parquet(feature_data_path)
    
    RUL_df = data.loc[data.engine_id != holdout_engine].iloc[:,2:].copy()
    
    labels = RUL_df['RUL']
    features = RUL_df.iloc[:,1:]
    X_train, X_test, y_train, y_test = train_test_split(features, labels, test_size = 0.2, random_state = 42)
    
    random_grid = json.loads(random_params)
    
    gbr = ensemble.GradientBoostingRegressor()
    gbr_random = RandomizedSearchCV(estimator = gbr, param_distributions = random_grid, n_iter = 10, cv = 3, verbose = 2)
    gbr_random.fit(X_train, y_train)

    val_pred_random = gbr_random.predict(X_test)
    MAE_random = metrics.mean_absolute_error(y_test, val_pred_random)
    MSE_random = metrics.mean_squared_error(y_test, val_pred_random)
    print('MAE: %s' % MAE_random)
    print('MSE: %s' % MSE_random)
        
    temp_model_path = '/tmp/model.pickle'
        
    with open(temp_model_path, 'wb') as f:
        cPickle.dump(gbr_random.best_estimator_, f, -1)
        
    parse = urlparse(url=tuned_model_store_path, allow_fragments = False)
    if parse.path[0] =='/':
        model_path = parse.path[1:]
    client = storage.Client()
    bucket = client.get_bucket(parse.netloc)
    model = bucket.blob(model_path)
    model.upload_from_filename(temp_model_path)
    
    return tuned_model_store_path

In [11]:
hyp_tune_train_gbr_op = comp.create_component_from_func(
    hyp_tune_train_gbr, output_component_file = 'hyp_tune_train_gbr.yaml', packages_to_install=['fastparquet', 'fsspec', 'gcfs', 'scikit-learn', 'google-cloud-storage'])

In [12]:
def eval_models(feature_data_path: str, vanilla_model_store_path, tuned_model_store_path: str, holdout_engine: int) -> None:
    '''Evaluate different models on holdout dataset to see which model performs the best'''
    import json
    import pandas as pd
    from io import BytesIO
    from datetime import datetime, timedelta
    import _pickle as cPickle # save ML model
    from google.cloud import storage # save the model to GCS
    from sklearn.ensemble import RandomForestRegressor
    from sklearn.metrics import mean_absolute_error, mean_squared_error
    from sklearn.model_selection import train_test_split
    from sklearn.model_selection import RandomizedSearchCV
    from sklearn.model_selection import GridSearchCV
    from urllib.parse import urlparse
    from collections import namedtuple
    
    # read dataframe
    complete_df = pd.read_parquet(feature_data_path)
    
    # this will be our holdout set for validation
    holdout_df = complete_df.loc[complete_df.engine_id == holdout_engine].iloc[:,2:].copy()
    
    # get x and y
    x_val, y_val = holdout_df.drop('RUL', axis=1), holdout_df['RUL']
    
    def get_mae(model_path):
        '''this function evaluates a model on our holdout dataset given just the model path'''
        parse = urlparse(url=model_path, allow_fragments=False)

        if parse.path[0] =='/':
            model_path = parse.path[1:]

        client = storage.Client()
        bucket = client.get_bucket(parse.netloc)
        blob = bucket.get_blob(model_path)
        if blob is None:
            raise AttributeError('No files to download') 
        model_bytestream = BytesIO(blob.download_as_string())
        model = cPickle.load(model_bytestream)
        predictions = model.predict(x_val)
        return mean_absolute_error(y_val, predictions)
    
    Models = namedtuple('Model', 'type score path')
    m_list = list()
    
    vanilla_mae = get_mae(vanilla_model_store_path)
    m_list.append(Models('vanilla', vanilla_mae, vanilla_model_store_path))
    
    tuned_mae = get_mae(tuned_model_store_path)
    m_list.append(Models('tuned', tuned_mae, tuned_model_store_path))
    
    max_score = max([model.score for model in m_list])
    max_score_index = [model.score for model in m_list].index(max_score)
    print('Best Model: ', m_list[max_score_index])
    path = m_list[max_score_index].path
    return path

In [13]:
# create a KFP component
eval_models_op = comp.create_component_from_func(
    eval_models, output_component_file='eval_models.yaml', packages_to_install=['scikit-learn', 'fastparquet', 'fsspec', 'gcsfs', 'google-cloud-storage'])

In [14]:
@dsl.pipeline(
    name='RUL gbr',
    description='Predicting the Remaining Usefull Lifetime of aircraft engines.'
)
def RUL_pipeline(raw_data_path, cleaned_data_path, feature_data_path, vanilla_model_store_path, tuned_model_store_path, holdout_engine, random_iterations, random_params):
    
    get_clean_data_task = get_clean_data_op(raw_data_path, cleaned_data_path)
    feature_processing_task = feature_processing_op(get_clean_data_task.output, feature_data_path)
    train_vanilla_gbr_task = train_vanilla_gbr_op(feature_processing_task.output, vanilla_model_store_path, holdout_engine)
    hyp_tune_train_gbr_task = hyp_tune_train_gbr_op(feature_processing_task.output, tuned_model_store_path, holdout_engine, random_iterations, random_params)
    eval_models_task = eval_models_op(feature_processing_task.output, train_vanilla_gbr_task.output, hyp_tune_train_gbr_task.output, holdout_engine)
    
random_grid = {'n_estimators' : n_estimators,
               'max_depth' : max_depth,
               'max_features' : max_features,
               'min_samples_split' : min_samples_split,
               'min_samples_leaf' : min_samples_leaf
}
    
arguments = {'raw_data_path': 'gs://DE_A3/raw/DataTrain.txt',
            'cleaned_data_path' : 'gs://DE_A3/cleaned/cleaned_data.parquet',
            'feature_data_path' : 'gs://DE_A3/feature_store/RUL_features.parquet',
            'vanilla_model_store_path' : 'gs://DE_A3/model_store/vanilla/vanilla_gbr.pickle',
            'tuned_model_store_path' : 'gs://DE_A3/model_store/tuned/tuned_gbr.pickle',
            'holdout_engine': holdout_engine,
            'random_iterations': random_iterations,
            'random_params': random_grid,
}

    
client.create_run_from_pipeline_func(RUL_pipeline, arguments=arguments)

RuntimeError: This task has multiple outputs. Use `task.outputs[<output name>]` dictionary to refer to the one you need.