#Install Kubeflow Pipelines SDK

In [1]:
# Install the SDK
!pip3 install 'kfp>=0.1.31.2' --quiet

In [4]:
!which dsl-compile #Check if the install was successful

/usr/local/bin/dsl-compile


#Build the Components

In [2]:
import kfp
import kfp.components as comp

In [6]:
# Mount your Google drive folder on Colab
from google.colab import drive
drive.mount('/content/gdrive', force_remount=True)

Mounted at /content/gdrive


In [7]:
# where the outputs are stored
out_dir = "/content/gdrive/My Drive/"

In [9]:
def train(data_path, model_file):
  import numpy as np
  import pandas as pd
  from sklearn.model_selection import train_test_split
  import tensorflow as tf

  data = "https://raw.githubusercontent.com/Fitzpatrique/stage-f-09-campaign-finance/master/data/new_project_data2.csv"
  df = pd.read_csv(data)

  X = df[['can_off_dis', 'can_zip', 'ind_con', 'net_ope_exp', 'tot_con',
       'tot_dis', 'net_con', 'ope_exp', 'tot_rec', 'can_off_id', 'can_nam_id',
       'can_off_sta_id', 'can_par_aff_id', 'can_inc_cha_ope_sea_id',
       'can_cit_id', 'can_sta_id', 'cov_dur']]
  y = df[['winner_id']]

  #Perform train test split on the data
  X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=101)

  #Define the model 
  model  = tf.keras.Sequential([
                tf.keras.layers.Flatten(input_shape=(1,17)),
                tf.keras.layers.Dense(8, activation = 'relu'),
                tf.keras.layers.Dense(1, activation = 'sigmoid')
  ])

  model.compile(optimizer = 'adam', loss='binary_crossentropy', metrics =['accuracy'])


  num_epochs = 170

  history = model.fit(X_train, y_train, epochs = num_epochs,
                    validation_data = (X_test,y_test))
  
  #Save the model
  model.save('/content/gdrive/My Drive/saved_model')

In [10]:
classifier = train(out_dir, "model")

Epoch 1/170
Epoch 2/170
Epoch 3/170
Epoch 4/170
Epoch 5/170
Epoch 6/170
Epoch 7/170
Epoch 8/170
Epoch 9/170
Epoch 10/170
Epoch 11/170
Epoch 12/170
Epoch 13/170
Epoch 14/170
Epoch 15/170
Epoch 16/170
Epoch 17/170
Epoch 18/170
Epoch 19/170
Epoch 20/170
Epoch 21/170
Epoch 22/170
Epoch 23/170
Epoch 24/170
Epoch 25/170
Epoch 26/170
Epoch 27/170
Epoch 28/170
Epoch 29/170
Epoch 30/170
Epoch 31/170
Epoch 32/170
Epoch 33/170
Epoch 34/170
Epoch 35/170
Epoch 36/170
Epoch 37/170
Epoch 38/170
Epoch 39/170
Epoch 40/170
Epoch 41/170
Epoch 42/170
Epoch 43/170
Epoch 44/170
Epoch 45/170
Epoch 46/170
Epoch 47/170
Epoch 48/170
Epoch 49/170
Epoch 50/170
Epoch 51/170
Epoch 52/170
Epoch 53/170
Epoch 54/170
Epoch 55/170
Epoch 56/170
Epoch 57/170
Epoch 58/170
Epoch 59/170
Epoch 60/170
Epoch 61/170
Epoch 62/170
Epoch 63/170
Epoch 64/170
Epoch 65/170
Epoch 66/170
Epoch 67/170
Epoch 68/170
Epoch 69/170
Epoch 70/170
Epoch 71/170
Epoch 72/170
Epoch 73/170
Epoch 74/170
Epoch 75/170
Epoch 76/170
Epoch 77/170
Epoch 78

In [23]:
def predict(data_path, model_file):
  import tensorflow as tf
  import numpy as np
  import pandas as pd
  from sklearn.model_selection import train_test_split
  

  data = "https://raw.githubusercontent.com/Fitzpatrique/stage-f-09-campaign-finance/master/data/new_project_data2.csv"
  df = pd.read_csv(data)

  X = df[['can_off_dis', 'can_zip', 'ind_con', 'net_ope_exp', 'tot_con',
       'tot_dis', 'net_con', 'ope_exp', 'tot_rec', 'can_off_id', 'can_nam_id',
       'can_off_sta_id', 'can_par_aff_id', 'can_inc_cha_ope_sea_id',
       'can_cit_id', 'can_sta_id', 'cov_dur']]
  y = df[['winner_id']]

  #Perform train test split on the data
  X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=921)


  #Load saved model
  classifier = tf.keras.models.load_model('/content/gdrive/My Drive/saved_model')
  test_loss, test_acc = classifier.evaluate(X_test,  y_test, verbose=0)
  print('Test accuracy:', test_acc)
  from tensorflow.python.lib.io import file_io
  import json
    
    # Exports a sample tensorboard:
  metadata = {
      'outputs' : [{
        'type': 'tensorboard',
        'source': 'gs://ml-pipeline-dataset/tensorboard-train',
      }]
    }

    # Exports two sample metrics:
  metrics = {
      'metrics': [{
          'name': 'Test_Accuracy',
          'numberValue':  float(test_acc),
        }]}

  from collections import namedtuple
  predict_output = namedtuple('Test_Accuracy', ['Test_Accuracy', 'mlpipeline_ui_metadata', 'mlpipeline_metrics'])
  return predict_output(test_acc, json.dumps(metadata), json.dumps(metrics))

In [24]:
predict(out_dir, "model")

Test accuracy: 0.9035369753837585


Test_Accuracy(Test_Accuracy=0.9035369753837585, mlpipeline_ui_metadata='{"outputs": [{"type": "tensorboard", "source": "gs://ml-pipeline-dataset/tensorboard-train"}]}', mlpipeline_metrics='{"metrics": [{"name": "Test_Accuracy", "numberValue": 0.9035369753837585}]}')

In [25]:
# Create train and predict lightweight components.
train_op = comp.func_to_container_op(train , base_image = "tensorflow/tensorflow:latest-gpu-py3")
predict_op = comp.func_to_container_op(predict , base_image = "tensorflow/tensorflow:latest-gpu-py3")

#Build a Kubeflow Pipeline

In [26]:
import kfp.dsl as dsl
@dsl.pipeline(
   name='Campaign finance pipeline',
   description='A classification pipeline that performs predictions on electoral results.'
)
def camp_pipeline(
  data_path: str,
  model_file: str
):
    #Passing pipeline parameter and a constant value as operation arguments
    train_task = train_op(data_path, model_file) #Returns a dsl.ContainerOp class instance. 
    
    #Passing a task output reference as operation arguments
    #For an operation with a single return value, the output reference can be accessed using `task.output` or `task.outputs['output_name']` syntax
    predict_task = predict_op(data_path, model_file)
    

In [27]:
pipeline_func = camp_pipeline

In [29]:
DATA_PATH = '/mnt'
MODEL_PATH='churn_classifier.h5'

In [30]:
experiment_name = 'campaign_finance_kubeflow'
run_name = pipeline_func.__name__ + ' run'

arguments = {"data_path":DATA_PATH,
             "model_file":MODEL_PATH}

# Compile pipeline to generate compressed YAML definition of the pipeline.
kfp.compiler.Compiler().compile(pipeline_func,  
  '{}.zip'.format(experiment_name))