In [1]:
#installing kfp in notebook enviroment
!python -m pip install --user --upgrade pip
!pip3 install kfp --upgrade --user

Collecting pip
  Downloading pip-21.3.1-py3-none-any.whl (1.7 MB)
[K     |████████████████████████████████| 1.7 MB 32.0 MB/s 
[?25hInstalling collected packages: pip
Successfully installed pip-21.3.1
Collecting kfp
  Downloading kfp-1.8.6.tar.gz (266 kB)
     |████████████████████████████████| 266 kB 27.5 MB/s            
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Collecting absl-py<=0.11,>=0.9
  Downloading absl_py-0.11.0-py3-none-any.whl (127 kB)
     |████████████████████████████████| 127 kB 49.2 MB/s            
[?25hCollecting PyYAML<6,>=5.3
  Downloading PyYAML-5.4.1-cp37-cp37m-manylinux1_x86_64.whl (636 kB)
     |████████████████████████████████| 636 kB 40.4 MB/s            
[?25hCollecting google-cloud-storage<2,>=1.20.0
  Downloading google_cloud_storage-1.42.3-py2.py3-none-any.whl (105 kB)
     |████████████████████████████████| 105 kB 54.8 MB/s            
[?25hCollecting kubernetes<19,>=8.0.0
  Downloading kubernetes-18.20.0-py2.py3-none-any.whl (1.6 MB

In [1]:
import kfp
from kfp import dsl
import kfp.components as comp

In [2]:
def optain_data(data_path):
  import pickle
  import sys, subprocess;
  subprocess.run([sys.executable, 'm', 'pip', 'install', 'pandas==0.23.4'])
  import pandas as pd
  
  #reading the data from its source
  data = pd.read_csv("https://raw.githubusercontent.com/MavenCode/KubeflowTraining/master/Data/Telco/Churn_Modelling.csv")
  #save the data as a pickle file to be used by the preprocessing components
  with open(f'{data_path}/working_data', 'wb') as f:
    pickle.dump(data,f)
  

In [3]:
def preprocessing(data_path):
  import pickle
  import sys, subprocess;
  subprocess.run([sys.executable, 'm', 'pip', 'install', 'pandas==0.23.4'])
  subprocess.run([sys.executable, 'm', 'pip', 'install', 'scikit-learn==0.22'])
  import pandas as pd
  import numpy as np
  from sklearn.preprocessing import LabelEncoder
  from sklearn.preprocessing import OneHotEncoder
  from sklearn.model_selection import train_test_split
  from sklearn.preprocessing import StandardScaler
   #load the working data
  with open(f'{data_path}/working_data', 'rb') as f:
    data = pickle.load(f)
    
  #drop columns that are not needed
  data = data.drop(columns=['CustomerId','Surname','RowNumber'],axis=1)

  #independent features
  X = data.iloc[:,:-1]

  #dependent feature
  y = data.iloc[:,-1:]

  #use onehot encoder and label encoder for the categorical features
  le = LabelEncoder()
  ohe = OneHotEncoder()

  X['Gender']= le.fit_transform(X['Gender'])
  geo_df = pd.DataFrame(ohe.fit_transform(X[['Geography']]).toarray())

  #getting feature name after onehotencoding
  geo_df.columns = ohe.get_feature_names(['Geography'])

  #merging geo_df with the main data
  X = X.join(geo_df)

  #drop redundant column
  X = X.drop(columns=['Geography'],axis=1)

  #split data
  X_train,X_test,y_train,y_test = train_test_split( X,y, test_size=0.2, random_state = 42)

  #data scaling
  from sklearn.preprocessing import MinMaxScaler
  min_max_scaler = MinMaxScaler(feature_range =(0, 1))
  
  # Scaled feature
  X_train = min_max_scaler.fit_transform(X_train)
  X_test = min_max_scaler.fit_transform(X_test)

  sc =StandardScaler()
  X_train = sc.fit_transform(X_train)
  X_test = sc.transform(X_test)
  #Save the train data as a pickle file to be used for train component
  with open(f'{data_path}/train_data', 'wb') as f:
    pickle.dump((X_train, y_train),f)
     #Save the train data as a pickle file to be used for train component
  with open(f'{data_path}/test_data', 'wb') as f:
    pickle.dump((X_test, y_test),f)

In [4]:
def train_tensorflow(data_path,train_data,model):
  import pickle
  #import library
  import numpy as np
  from tensorflow import keras
  from tensorflow.keras.models import Sequential
  from tensorflow.keras.layers import Dense

  #loading the train data
  with open(f'{data_path}/{train-data}', 'rb') as f:
    train_data = pickle.load(f)
  #seperate the X_train from y_train.
  X_train, y_train = train_data

  #initializing the classifier model with its input, and output layer
  #using keras sequential model
  #initializing the classifier model with its input, hidden and output layers
  classifier = Sequential()
  classifier.add(Dense(units = 16, activation='relu', input_dim=12,))
  classifier.add(Dense(units = 8, activation='relu'))
  classifier.add(Dense(units = 1, activation='sigmoid'))
  #Compiling the classifier model with Stochastics Gradient Descent
  classifier.compile(optimizer = 'adam', loss='binary_crossentropy' , metrics =['accuracy']) 
  #fitting the model
  classifier.fit(X_train,y_train,batch_size=10, epochs=150)
  #saving the model
  classifier.save(f'{data_path}/{model}')

In [5]:
def predict_tensorflow(data_path,test_data,model):
    import pickle
    import numpy as np
    from tensorflow import keras
    from tensorflow.keras.models import load_model

    #loading the X_test and y_test
    with open(f'{data_path}/{test_data}', 'rb') as f:
      test_data = pickle.load(f)
    # Seperate the X_test from y_test.
    X_test, y_test = test_data
    #loading the model
    classifier = load_model(f'{data_path}/{model}')
    #Evaluation the model and print the results
    test_loss, test_acc = classifier.evaluation(X_test, y_test, verbose=0)
    #model's prediction on test date
    y_pred = classifier.predict(X_test)
    # create a threshold for the confution matrics
    y_pred=(y_pred>0.5)

    #saving the test_loss and test_acc
    with open(f'{data_path}/performance.txt', 'w') as f:
      f.write("Test_loss: {}, Test_accuracy: {} ".format(test_loss,test_acc))

    #saving the predictions
    with open(f'{data_path}/results.txt', 'w') as result:
      result.write(" Prediction: {}, Actual: {} ".format(y_pred,y_test.astype(np.bool))) 


In [None]:
#create light-weight components
obtain_data_op = kfp.components.create_component_from_func(obtain_data,base_image="python:3.7.1")
preprocess_op = ktp.components.create_component_from_func(preprocess,base_image="python:3.7.1")
train_op = ktp.components.create_component_from_func(train_tensorflow,base_image="tensorflow/tensorflow:latest-gpu-py3")
predict_op = ktp.components.create_component_from_func(predict_tensorflow,base_image="tensorflow/tensorflow:latest-gpu-py3")


**Define tensorflow pipeline**

In [None]:
from kfp.components import create_component_from_func
#create a cleint that will enable communication with pipeline API
client = kfp.Client()
# Define pipeline
@dsl.pipeline(name="Churn Pipeline", description="Performs Preprocessing, training and prediction of churn rate")
#Define parameters to be fed into thr pipeline
def churn_lightweight_tensorflow_pipeline(data_path:str,
                                          working_data: str,
                                          train_data: str,
                                          test_data: str,
                                          model:str):
  #define volume to share data between components
  volume_op = dsl.VolumeOp(
      name="data_volume",
      resource_name="data-volume",
      size="1Gi",
      modes=dsl.VOLUME_MODE_RWO
  )