## End-to-end Heart Failure Prediction Pipeline

#### Building our lightweight pipelines components using Python

### Lightweight python components

Lightweight python components do not require you to build a new container image for every code change. They're intended to use for fast iteration in notebook environment.

#### Building a lightweight python component

To build a component just define a stand-alone python function and then call kfp.components.func_to_container_op(func) to convert it to a component that can be used in a pipeline.

There are several requirements for the function:

- The function should be stand-alone. It should not use any code declared outside of the function definition. Any imports should be added inside the main function. Any helper functions should also be defined inside the main function.


- The function can only import packages that are available in the base image. If you need to import a package that's not available you can try to find a container image that already includes the required packages. (As a workaround you can use the module subprocess to run pip install for the required package.)


- If the function operates on numbers, the parameters need to have type hints. Supported types are [int, float, bool]. Everything else is passed as string.

### Building Python function-based components

A Kubeflow Pipelines component is a self-contained set of code that performs one step in your ML workflow. A pipeline component is composed of:

- The component code, which implements the logic needed to perform a step in your ML workflow.

- A component specification, which defines the following:
    - The component's metadata, its name and description.
    - The component's interface, the component's inputs and outputs.
    - The component's implementation, the Docker container image to run, how to pass inputs to your component code, and how to get the component's outputs.
    

Python function-based components make it easier to iterate quickly by letting you build your component code as a Python function and generating the component specification for you.

## Setup

In [None]:
!python -m pip install --user --upgrade pip

In [None]:
!pip3 install -U --user numpy==1.19.3

## Install or update the pipelines SDK

#### Run the following command to install the Kubeflow Pipelines SDK.

In [None]:
# You may need to restart your notebook kernel after updating the kfp sdk
!pip3 install --user --upgrade kfp
!pip3 install kfp --upgrade
!pip3 install kfp --upgrade --user
!pip3 install -U kfp

`Restart the kernel before you proceed`

In [None]:
# Restart kernel after the pip install
import IPython

IPython.Application.instance().kernel.do_shutdown(True)

`Check if the install was successful:`

## Build the Components

#### Import the kfp and kfp.components packages.

In [None]:
# !pip3 install --upgrade numpy

In [None]:
import kfp                  # the Pipelines SDK. 
from kfp import compiler
import kfp.dsl as dsl
import kfp.gcp as gcp
import kfp.components as comp
import os
import subprocess
import json

from kfp.dsl.types import Integer, GCSPath, String
import kfp.notebook

In [None]:
# where the outputs are stored
out_dir = "/home/jovyan/stage-f-07-heart-failure/data/out/"

## Create a release experiment in the Kubeflow pipeline

#### Kubeflow Pipeline requires having an Experiment before making a run. An experiment is a group of comparable runs

In [None]:
EXPERIMENT_NAME = 'Heart Failure Prediction Pipeline'        # Name of the experiment in the UI
BASE_IMAGE = "tensorflow/tensorflow:latest-gpu-py3"    # Base image used for components in the pipeline

PROJECT_NAME = "Kubeflow-mlops-pipeline"

#### Create an instance of the kfp.Client class

In [None]:
client = kfp.Client()
exp = client.create_experiment(name=EXPERIMENT_NAME)

## Building Python function-based components

#### Define your component's code as a standalone python function.

### Preprocessing Function

In [None]:
def preprocess(data_path):
    
     # func_to_container_op requires packages to be imported inside of the function.
    import pandas as pd
    import numpy as np
    from pandas import Series, DataFrame,read_csv
    from collections import Counter
    from sklearn.utils import shuffle
    import sys, subprocess;
    subprocess.run([sys.executable, '-m', 'pip', 'install', 'pandas==0.23.4'])
    subprocess.run([sys.executable, '-m', 'pip', 'install', 'scikit-learn==0.22'])
    
    # Read the dataset as a csv file 
    df = pd.read_csv("https://raw.githubusercontent.com/HamoyeHQ/stage-f-07-heart-failure/master/data/heart_failure_clinical_records_dataset.csv")
    
    # Re-assign the features with binary numbers to a boolean label
    df['anaemia'] = np.where(df['anaemia'] == 1 ,True,False)
    df['diabetes'] = np.where(df['diabetes'] == 1, True, False)
    df['high_blood_pressure'] = np.where(df['high_blood_pressure'] == 1, True, False)
    df['smoking'] = np.where(df['smoking'] == 1, True, False)
    df['sex'] = np.where(df['sex'] == 1, 'Male','Female')
    
    
    # prints the number of missing values in the different variables.
    df.apply(lambda x: sum(x.isnull()),axis=0)
    
    #Delete row with dummy value
    df = df.dropna(how='any',axis=0)
    
    #output file to path
    # NPZ is a file format by numpy that provides storage of array data using gzip compression. 
    np.savez_compressed(f'{data_path}/preprocessed-data.npz',
                       df = df)
    print("Preprocessing Done")

### Analysis Function

In [None]:
# Exploratory Data Analysis
def Analyze(data_path):
    
     # func_to_container_op requires packages to be imported inside of the function.
    import pandas as pd
    import numpy as np
    from pandas import Series, DataFrame,read_csv
    import matplotlib.pyplot as plt
    import seaborn as sns
    %matplotlib inline
    sns.set_style("whitegrid")
    sns.set_context("paper")
    import sys, subprocess;
    subprocess.run([sys.executable, '-m', 'pip', 'install', 'pandas==0.23.4'])
    subprocess.run([sys.executable, '-m', 'pip', 'install', 'scikit-learn==0.22'])
    
    # Read the dataset as a csv file
    df = pd.read_csv("https://raw.githubusercontent.com/HamoyeHQ/stage-f-07-heart-failure/master/data/heart_failure_clinical_records_dataset.csv")
    
    # Statistical Inference from data
    df.describe()
    
    # HeatMap Correlation plot
    plt.figure(figsize = (12,8))
    sns.heatmap(df.corr(), annot = True)
    
    # Split into Features and Labels
    x = df.drop('DEATH_EVENT', axis = 1)
    y = df['DEATH_EVENT']
    
    #@title Install the facets_overview pip package.
    !pip install facets-overview
    
    train_data = x[0:150] 
    test_data = x[150: ]
    
    
    # Create the feature stats for the datasets and stringify it.
    import base64
    from facets_overview.generic_feature_statistics_generator import GenericFeatureStatisticsGenerator

    gfsg = GenericFeatureStatisticsGenerator()
    proto = gfsg.ProtoFromDataFrames([{'name': 'train', 'table': train_data},
                                  {'name': 'test', 'table': test_data}])
    protostr = base64.b64encode(proto.SerializeToString()).decode("utf-8")
    
    
    # Display the facets overview visualization for this data
    from IPython.core.display import display, HTML

    HTML_TEMPLATE = """
        <script src="https://cdnjs.cloudflare.com/ajax/libs/webcomponentsjs/1.3.3/webcomponents-lite.js"></script>
        <link rel="import" href="https://raw.githubusercontent.com/PAIR-code/facets/1.0.0/facets-dist/facets-jupyter.html" >
        <facets-overview id="elem"></facets-overview>
        <script>
          document.querySelector("#elem").protoInput = "{protostr}";
        </script>"""
    html = HTML_TEMPLATE.format(protostr=protostr)
    display(HTML(html))
    
    
    # Distingushing those that died from a factor, from those that didn't
    fig,ax = plt.subplots(2,3,figsize=(15,8))
    ax1,ax2,ax3,ax4, ax5, ax6 = ax.flatten()

    sns.countplot(df['anaemia'], hue = df["DEATH_EVENT"],ax=ax1)
    sns.countplot(df['diabetes'],hue = df["DEATH_EVENT"],ax=ax2)
    sns.countplot(df['high_blood_pressure'],hue = df["DEATH_EVENT"], ax=ax3)
    sns.countplot(df['sex'],hue = df["DEATH_EVENT"], ax=ax4)
    sns.countplot(df['smoking'],hue = df["DEATH_EVENT"], ax=ax5)
    sns.countplot(df['DEATH_EVENT'],hue = df["DEATH_EVENT"], ax=ax6)

### Feature Engineering Function

In [None]:
def feature_engineer(data_path):
    
     # func_to_container_op requires packages to be imported inside of the function.
    import pandas as pd
    import numpy as np
    from pandas import Series, DataFrame,read_csv
    from collections import Counter
    from sklearn.utils import shuffle
    import sys, subprocess;
    subprocess.run([sys.executable, '-m', 'pip', 'install', 'pandas==0.23.4'])
    subprocess.run([sys.executable, '-m', 'pip', 'install', 'scikit-learn==0.22'])
    
    #load the preprocessed data
    preprocessed_data = np.load(f'{data_path}/preprocessed-data.npz')
    df = preprocessed_data['df']
    
    
    # Re-engineer some features based on generally accepted medical values for those feature
    
    # creatinine_phosphokinase normal values ranges from 10 to 120 micrograms per liter (mcg/L)
    def set_cpk(row):
        if row['creatinine_phosphokinase'] >= 10 and row['creatinine_phosphokinase'] <= 120:
            return 'Normal'
        else:
            return 'Abnormal'
    df = df.assign(cp_desc = df.apply(set_cpk, axis =1))
    
    
    # Range of EJECTION FRACTION for Heart Failure
    def set_eject_fraction(row):
        if row['ejection_fraction'] > 50 and row['ejection_fraction'] <= 75:
            return 'Normal'
        else:
            return 'Abnormal'
    df['ejection_fraction_desc'] =  df.apply(set_eject_fraction, axis =1)
    
    
    # Range of PLATELETS for Male and Female
    def set_platelets(row):
        if row['sex'] == 'Female':
            if row['platelets'] >= 157000 and row['platelets'] <= 371000:
                return 'Normal'
            else:
                return 'Abnormal'
        elif row['sex'] == 'Male':
            if row['platelets'] >= 135000 and row['platelets'] <= 317000:
                return 'Normal'
            else:
                return 'Abnormal'
    df['platelets_desc'] = df.apply(set_platelets, axis = 1)
    
    
    # Range of SERUM SODIUM for Heart Failure
    def set_sodium(row):
        if row['serum_sodium'] >= 135 and row['serum_sodium'] <= 145:
            return 'Normal'
        else:
            return 'Abnormal'
    df['sodium_desc'] = df.apply(set_sodium, axis =1)
    
    # Range of SERUM CREATININE for Heart Failure (Varies for male and female)
    def set_creatinine(row):
        if row['sex'] == 'Female':
            if  row['serum_creatinine'] >= 0.5 and  row['serum_creatinine'] <= 1.1:
                return 'Normal'
            else:
                return 'Abnormal'
        elif row['sex'] == 'Male':
            if  row['serum_creatinine'] >= 0.6 and row['serum_creatinine'] <= 1.2:
                return 'Normal'
            else:
                return 'Abnormal'
    df['serum_creatinine_desc'] = df.apply(set_creatinine, axis = 1)
    
    
    #output file to path
    np.savez_compressed(f'{data_path}/feature-engineered-data.npz', 
                       x=x,
                       y=y)
    print("Feature Engineering Done")

In [8]:
# import numpy as np
# import pandas as pd
# z = np.random.randn(20, 10)
# z = pd.DataFrame(z, columns=["Age", "Workclass", "fnlwgt", "Education", "Education-Num", "Marital Status","Occupation", 
#                              "Relationship", "Race", "Sex"])
# z

In [9]:
# train_df = z[0: 10] 
# test_df = z[10:  ]

In [10]:
#@title Install the facets_overview pip package.
# !pip install facets-overview

In [11]:
# # Create the feature stats for the datasets and stringify it.
# import base64
# from facets_overview.generic_feature_statistics_generator import GenericFeatureStatisticsGenerator

# gfsg = GenericFeatureStatisticsGenerator()
# proto = gfsg.ProtoFromDataFrames([{'name': 'train', 'table': train_df},
#                                   {'name': 'test', 'table': test_df}])
# protostr = base64.b64encode(proto.SerializeToString()).decode("utf-8")

In [12]:
# # Display the facets overview visualization for this data
# from IPython.core.display import display, HTML

# HTML_TEMPLATE = """
#         <script src="https://cdnjs.cloudflare.com/ajax/libs/webcomponentsjs/1.3.3/webcomponents-lite.js"></script>
#         <link rel="import" href="https://raw.githubusercontent.com/PAIR-code/facets/1.0.0/facets-dist/facets-jupyter.html" >
#         <facets-overview id="elem"></facets-overview>
#         <script>
#           document.querySelector("#elem").protoInput = "{protostr}";
#         </script>"""
# html = HTML_TEMPLATE.format(protostr=protostr)
# display(HTML(html))

### Scaling and Transformation Function

In [None]:
def scale_transform(data_path):
    
     # func_to_container_op requires packages to be imported inside of the function.
    import pandas as pd
    import numpy as np
    from collections import Counter
    from sklearn.utils import shuffle
    import imblearn
    from imblearn.over_sampling import SMOTENC
    from sklearn.preprocessing import MinMaxScaler
    import sys, subprocess;
    subprocess.run([sys.executable, '-m', 'pip', 'install', 'pandas==0.23.4'])
    subprocess.run([sys.executable, '-m', 'pip', 'install', 'scikit-learn==0.22'])
    
    
    #load the Feature Engineered data
    feature-engineered-data = np.load(f'{data_path}/feature-engineered-data.npz')
    x = feature-engineered-data['x']
    y = feature-engineered-data['y']
    
    
    # use SMOTENC (Synthetic Minority Over-sampling Technique for Nominal and Continuous (SMOTE-NC)), 
    # for imbalance between the target class
    smote = SMOTENC(random_state=1,categorical_features=[0,1,3,5,9,10,12,13,14,15,16])
    x_bal, y_bal = smote.fit_sample(x, y)
    x_bal = DataFrame(x_bal, columns = x.columns)
    
    # create dummy variables for the newly engineered features.
    encode = ['sex','cp_desc','ejection_fraction_desc','platelets_desc','sodium_desc','serum_creatinine_desc']
    x_bal = pd.get_dummies(x_bal,columns = encode, drop_first = True)
    
    # Seperate the dataset, Scale the data, then concatenate the data back with its labels
    data1 = x_bal[['age','creatinine_phosphokinase','ejection_fraction','platelets','serum_creatinine','serum_sodium','time']]
    data2 = x_bal.drop(['age','creatinine_phosphokinase','ejection_fraction','platelets','serum_creatinine','serum_sodium',
                        'time'], axis = 1)
    
    # Scale the data
    scaler = MinMaxScaler()
    data = DataFrame(scaler.fit_transform(data1), columns = data1.columns)
    
    # Concatenate the data back with its labels.Simply re-join the scaled data back to its labels. 
    x = pd.concat([data,data2], axis = 1)
    
    #output file to path
    np.savez_compressed(f'{data_path}/scale_transform-data.npz', 
                       x=x,
                       y=y)
    print("Scale and transform Done")

### Training Function

In [None]:
!pip install -q tf-nightly-2.0-preview

In [None]:
import tensorflow
tensorflow.__version__

In [None]:
%load_ext tensorboard

In [None]:
import tensorflow as tf
import datetime, os

logs_base_dir = "./logs"
os.makedirs(logs_base_dir, exist_ok=True)
%tensorboard --logdir {logs_base_dir}

#### If your component returns multiple outputs, annotate your function with the typing.NamedTuple type hint and use the collections.namedtuple function return your function's outputs as a new subclass of tuple.

- You can also return metadata and metrics from your function.

    - Metadata helps you visualize pipeline results.
    - Metrics help you compare pipeline runs.

In [None]:
from typing import NamedTuple
def training(data_path, classifier_file) -> NamedTuple(
    'TrainingOutput',
    [
        ('mlpipeline_ui_metadata', 'UI_metadata'),
        ('mlpipeline_metrics', 'Metrics')
    ]):
    
    # func_to_container_op requires packages to be imported inside of the function.
    import pandas as pd
    import numpy as np
    from collections import Counter
    from sklearn.utils import shuffle
    import imblearn
    from imblearn.over_sampling import SMOTENC
    from sklearn.preprocessing import MinMaxScaler
    from sklearn.model_selection import train_test_split
    from sklearn.model_selection import GridSearchCV
    from sklearn.ensemble import RandomForestClassifier
    import sys, subprocess;
    subprocess.run([sys.executable, '-m', 'pip', 'install', 'pandas==0.23.4'])
    subprocess.run([sys.executable, '-m', 'pip', 'install', 'scikit-learn==0.22'])
    
    #load the transformed data
    scale-transformed-data = np.load(f'{data_path}/scale_transform-data.npz')
    x = scale-transformed-data['x']
    y = scale-transformed-data['y']
    
    # split data into training and testing set
    x_train,x_test,y_train,y_test = train_test_split(x,y_bal, test_size = 0.2, random_state = 0)
    
    # Instantiate classifier with obtained optimum parameters for training
    classifier = RandomForestClassifier(max_depth = 7, max_features= 'sqrt', random_state = 4, min_samples_leaf = 1,
                                min_samples_split = 5, n_estimators = 100)
    
    
    import tensorflow as tf
    from tensorflow import keras
    from tensorflow.python.lib.io import file_io
    import json
    import datetime, os
    from datetime import datetime
    %load_ext tensorboard 
    
    logdir = "/home/jovyan/stage-f-07-heart-failure/pipeline/logs/" + datetime.now().strftime("%d/%m/%Y - %H:%M:%S")
    tensorboard_callback = keras.callbacks.TensorBoard(log_dir=logdir)
    
    
    # Fit to x_train and y_train
    classifier.fit(x_train, y_train, )
    
    
    # Export a sample tensorboard
    metadata = {
      'outputs' : [{
        'type': 'tensorboard',
        'source': args.job_dir,
      }]
    }
    
    # Export metrics
    metrics = {
      'metrics': [{
          'name': 'classifier-output',
          'numberValue':  classifier,
        }]
    }
          
          
    #output the splitted data file to path
    np.savez_compressed(f'{data_path}/train-test-data.npz', 
                       x_train=x_train,
                       x_test=x_test,
                       y_train=y_train,
                       y_test=y_test)
    
    # Save the classifier model to the designated 
    with open(f'{data_path}/{classifier_file}', 'wb') as file:
        pickle.dump(classifier, file)
        
    from collections import namedtuple
    training_output = namedtuple(
        'TrainingOutput',
        ['mlpipeline_ui_metadata', 'mlpipeline_metrics'])
    return training_output(json.dumps(metadata), json.dumps(metrics))

### Model Validation Function

In [None]:
from typing import NamedTuple
def model_validation(data_path, classifier_file) -> NamedTuple(
    'ModelvalidationOutputs',
    [
      ('recall', float),
      ('accuracy', float),
      ('precision', float),
      ('f1score', float),
      ('mlpipeline_ui_metadata', 'UI_metadata'),
      ('mlpipeline_metrics', 'Metrics')
    ]):
    
     # func_to_container_op requires packages to be imported inside of the function.
    import pandas as pd
    import numpy as np
    from sklearn.metrics import classification_report, recall_score, accuracy_score,precision_score, f1_score, confusion_matrix
    import sys, subprocess;
    subprocess.run([sys.executable, '-m', 'pip', 'install', 'pandas==0.23.4'])
    subprocess.run([sys.executable, '-m', 'pip', 'install', 'scikit-learn==0.22'])
    
    #load the transformed data
    train-test-data = np.load(f'{data_path}/train-test-data.npz')
    x_train = train-test-data['x_train']
    x_test  = train-test-data['x_test']
    y_train = train-test-data['y_train']
    y_test  = train-test-data['y_test']
    
    # Load the saved classifier model
    with open(f'{data_path}/{classifier_file}', 'rb') as file:
        classifier = pickle.load(file)
    
    # predict on x_test
    y_pred = classifier.predict(x_test)
    
    
    # Model Evaluation
    recall = recall_score(y_test,y_pred)
    accuracy = accuracy_score(y_test,y_pred)
    precision = precision_score(y_test,y_pred)
    f1score = f1_score(y_test,y_pred)
#     print('Recall  is: {}'.format(round(recall,4)))
#     print("=====================================")
#     print(f'Accuracy  is : {round(accuracy,4)}')
#     print("=====================================")
#     print('Precision  is: %s' %(round(precision,4)))
#     print("=====================================")
#     print(f'F1_score is : {round(f1score,4)}')

    # Export metrics
    metrics = {
      'metrics': [{
        'name': 'accuracy-score', # The name of the metric. Visualized as the column name in the runs table.
        'numberValue':  accuracy, # The value of the metric. Must be a numeric value.
        'format': "PERCENTAGE",   # The optional format of the metric. Supported values are "RAW" (displayed in raw format) and "PERCENTAGE" (displayed in percentage format).
      },{
        'name': 'recall-score',
        'numberValue': recall,
        'format': "PERCENTAGE",
      },{
        'name': 'precision-score',
        'numberValue': precision,
        'format': "PERCENTAGE",
      },{
        'name': 'f1score',
        'numberValue': f1score,
        'format': "PERCENTAGE",
          
      }]}
    
    
#     with file_io.FileIO('/mlpipeline-metrics.json', 'w') as f:
#       json.dump(metrics, f)
    
    # Classification Report table
    report = classification_report(y_test,y_pred)
    print(report)
    
    
    metadata = {
      'outputs' : [{
        'type': 'table',
        'storage': 'inline',
        'format': 'csv',
        'header': [x['name'] for x in schema],
        'source': report
      }]
    }
    with open('/mlpipeline-ui-metadata.json', 'w') as f:
      json.dump(metadata, f)
    
    # The Report file
    with open(f'{data_path}/result.txt', 'w') as result:
        result.write("Report: {} ".format(report))
    
    #output the splitted data file to path
    np.savez_compressed(f'{data_path}/validated-data.npz', 
                       x_test=x_test,
                       y_test=y_test,
                       y_pred=y_pred)
    
    # Save the classifier model to the designated 
    with open(f'{data_path}/{classifier_file}', 'wb') as file:
        pickle.dump(classifier, file)
        
        
    from collections import namedtuple
    model_eval_output = namedtuple(
        'ModelvalidationOutputs',
        ['accuracy', 'recall', 'precision', 'f1score', 'mlpipeline_ui_metadata', 'mlpipeline_metrics'])
    return model_eval_output(accuracy, recall, precision, f1score, json.dumps(metadata), json.dumps(metrics))

In [None]:
def Confusion_matrix(data_path, classifier_file):
    
     # func_to_container_op requires packages to be imported inside of the function.
    import pandas as pd
    import numpy as np
    from sklearn.metrics import classification_report, confusion_matrix
    from sklearn.metrics import plot_confusion_matrix
    import sys, subprocess;
    subprocess.run([sys.executable, '-m', 'pip', 'install', 'pandas==0.23.4'])
    subprocess.run([sys.executable, '-m', 'pip', 'install', 'scikit-learn==0.22'])
    
    
    # Load the saved classifier model
    with open(f'{data_path}/{classifier_file}', 'rb') as file:
        classifier = pickle.load(file)
        
    # load the transformed data
    validated-data = np.load(f'{data_path}/validated-data.npz')
    x_test = validated-data['x_test']
    y_test = validated-data['y_test']
    y_pred = validated-data['y_pred']
    
    
    # Confusion matrix
    matrix = confusion_matrix(y_test,y_pred)
    print(matrix)
    
    # Make confusion matrix plot
    matrix_plot = plot_confusion_matrix(classifier, y_test, y_pred,
                                 cmap=plt.cm.Blues)
    plt.title('Confusion matrix for our classifier')
    plt.show(matrix_plot)
    plt.show()
    
    
#      metadata = {
#     'outputs' : [{
#       'type': 'confusion_matrix',
#       'format': 'csv',
#       'schema': [
#         {'name': 'target', 'type': 'CATEGORY'},
#         {'name': 'predicted', 'type': 'CATEGORY'},
#         {'name': 'count', 'type': 'NUMBER'},
#       ],
#       'source': <CONFUSION_MATRIX_CSV_FILE>,
#       # Convert vocab to string because for bealean values we want "True|False" to match csv data.
#       'labels': list(map(str, vocab)),
#     }]
#   }
#   with file_io.FileIO('/mlpipeline-ui-metadata.json', 'w') as f:
#     json.dump(metadata, f)

In [None]:
# def roc()

# Build a pipeline component from the function

#### Convert the function to a pipeline operation.

- Use `kfp.components.create_component_from_func` to return a factory function that you can use to create `kfp.dsl.ContainerOp` class instances for the pipeline. We also specify the base container image to run this function in.

In [None]:
# Create preproces lightweight components.
preprocess_op = comp.func_to_container_op(preprocess, base_image=BASE_IMAGE)

# Create the analysis lightweight components.
analyze_op = comp.func_to_container_op(analyze, base_image=BASE_IMAGE)

# Create the feature Engineering lightweight components.
feature_engineer_op = comp.func_to_container_op(feature_engineer, base_image=BASE_IMAGE)

# Create the scale and transform lightweight components.
scale_transform_op = comp.func_to_container_op(scale_transform, base_image=BASE_IMAGE)

# Create the training lightweight components.
training_op = comp.func_to_container_op(training, base_image=BASE_IMAGE)

# Create the model evaluation lightweight components.
model_validation_op = comp.func_to_container_op(model_validation, base_image=BASE_IMAGE)

# Create the confusion matrix lightweight components.
confusion_matrix_op = comp.func_to_container_op(confusion_matrix, base_image=BASE_IMAGE)

# Create predict_classifier lightweight components.
# training_op = comp.func_to_container_op(training, base_image=BASE_IMAGE)

# Build Kubeflow Pipeline

- Our next step will be to create the various components that will make up the pipeline. Define the pipeline using the *@dsl.pipeline* decorator.


- The pipeline function is defined and includes a number of paramters that will be fed into our various components throughout execution. Kubeflow Pipelines are created decalaratively. This means that the code is not run until the pipeline is compiled.


- A [Persistent Volume Claim](https://kubernetes.io/docs/concepts/storage/persistent-volumes/) can be quickly created using the [VolumeOp](https://) method to save and persist data between the components. 
   - Note that while this is a great method to use locally, you could also use a `cloud bucket` for your persistent storage.

In [None]:
# domain-specific language 
# Define the Pipeline
@dsl.pipeline(
    name='Heart Failure Prediction Pipeline',
    description='End-to-end training machine learning to predict mortality by heart failure.'
)

# Define parameters to be fed into pipeline
def Heart_Failure_container_pipeline(
    data_path: str = DATA_PATH,
    classifier_file: str = CLASSIFIER_PATH,    
):
    
    # Create a persistent volume
    # Define volume to share data between components
    vop = dsl.VolumeOp(
    name="creat_volume",
    resource_name="data-volume", 
    size="1Gi", 
    modes=dsl.VOLUME_MODE_RWO)
    
    # Define Pipeline Components and dependencies
    # We do this with ContainerOp, an object that defines a pipeline component from a container.
    
    # Create Heart Failure preprocessing component.
    heart_failure_preprocessing_container = preprocess_op(data_path).add_pvolumes({data_path: vop.volume})
    
    # Create Heart Failure analysis component
    heart_failure_analyze_container = analyze_op(data_path).add_pvolumes({data_path: vop.volume})
    
    # Create Heart Failure Feature Engineering component
    heart_failure_feature_engineer_container = feature_engineer_op(data_path) \
                                                .add_pvolumes({data_path: heart_failure_preprocessing_container.pvolume})
    
    # Create Heart Failure Scale and transform component
    heart_failure_scale_transform_container = scale_transform_op(data_path) \
                                                .add_pvolumes({data_path: heart_failure_feature_engineer_container.pvolume})
    
    # Create Heart Failure training component
    heart_failure_training_container = training_op(data_path, classifier_file) \
                                        .add_pvolumes({data_path: heart_failure_scale_transform_container.pvolume})
    
    # Create Heart Failure model evaluation component
    heart_failure_model_validation_container = model_validation_op(data_path, classifier_file) \
                                        .add_pvolumes({data_path: heart_failure_training_container.pvolume})
    
    # Create Heart Failure confusion matrix component
    heart_failure_confusion_matrix_container = confusion_matrix_op(data_path, classifier_file) \
                                        .add_pvolumes({data_path: heart_failure_model_validation.pvolume})
    
    # Create Heart Failure ROC Curve component
#     heart_failure_roc_container = roc_op(data_path, classifier_file) \
#                                         .add_pvolumes({data_path: heart_failure_model_validation.pvolume})


    
     # Print the result of the prediction
    Heart_Failure_result_container = dsl.ContainerOp(
        name="Heart Failure prediction",  # the name displayed for the component execution during runtime.
        image='library/bash:4.4.23',      # Image tag for the Docker container to be used.
        pvolumes={data_path: heart_failure_confusion_matrix_container.pvolume}, # dictionary of paths and associated Persistent Volumes to be mounted to the container before execution.
        arguments=['cat', f'{data_path}/classifier_result.txt'] # command to be run by the container at runtime.
    )

## Compile and run the pipeline

- Finally we feed our pipeline definition into the compiler and run it as an experiment. This will give us 2 links at the bottom that we can follow to the [Kubeflow Pipelines UI](https://www.kubeflow.org/docs/pipelines/overview/pipelines-overview/) where you can check logs, artifacts, inputs/outputs, and visually see the progress of your pipeline.


- Kubeflow Pipelines lets you group pipeline runs by Experiments. You can create a new experiment, or call `kfp.Client().list_experiments()` to see existing ones. If you don't specify the experiment name, the Default experiment will be used.

Define some environment variables which are to be used as inputs at various points in the pipeline.

In [None]:
DATA_PATH = '/mnt'  # mount your filesystems or devices
CLASSIFIER_PATH = 'heart_main.pkl'

In [None]:
# pipeline_func = Heart_Failure_container_pipeline

In [None]:
experiment_name=EXPERIMENT_NAME
run_name = pipeline_func.__name__ + ' run'


arguments = {"data_path":DATA_PATH,
             "classifier_file":CLASSIFIER_PATH}


# Compile pipeline to generate compressed YAML definition of the pipeline.
kfp.compiler.Compiler().compile(pipeline_func,'{}.zip'.format(experiment_name))



# Submit pipeline directly from pipeline function
run_result = client.create_run_from_pipeline_func(pipeline_func, 
                                                  experiment_name=experiment_name, 
                                                  run_name=run_name, 
                                                  arguments=arguments)