In [None]:
#Make sure you follow all the steps

import kfp
from kfp import dsl
from kfp.components import create_component_from_func
from typing import NamedTuple

def Download_data():
    #upload the zip file downloaded from kaggle to GCS bucket and retrieve the credentials before proceeding
    from google.cloud import storage
    import os

    bucket_name = "pneumonia-classify"
    bucket_folder_path = "archive.zip"  
    local_directory = "home/jovyan/archive.zip"  

    client = storage.Client.from_service_account_json('key.json') #include the path of ur credentials
    bucket = client.bucket(bucket_name)
    blob = bucket.blob(bucket_folder_path)
    
    blob.download_to_filename(local_directory)
    print("pneumonia-dataset downloaded successfully")

def Load_data():
    #extract data from archive folder into train, test folders
    import zipfile
    
    zip_path = 'home/jovyan/archive.zip'
    extract_path = 'home/jovyan/'

    with zipfile.ZipFile(zip_path,'r') as file:
        file.extractall(extract_path)

    print("data loaded successfully")

def Preprocess_data() -> NamedTuple('Outputs',[('preprocessed_path',str)]):
    #data preprocessing
    from tensorflow import keras
    import tensorflow as tf
    
    print("Preprocessing given data:")
    train_path = '/home/jovyan/train/'
    image_height = 227
    image_width = 227
    class_names = ['normal','pneumonia']

    train_data = keras.preprocessing.image_dataset_from_directory(
        train_path,
        batch_size = 10,
        image_size = (227,227),
        shuffle = True,
        class_names = class_names,
        seed = 1234,
        subset = 'training',
        validation_split = 0.15
        )

    valid_data = keras.preprocessing.image_dataset_from_directory(
        train_path,
        batch_size = 10,
        image_size = (227,227),
        shuffle = True,
        seed = 1234,
        class_names = class_names,
        subset = 'validation',
        validation_split = 0.15
        )
    
    tf.data.experimental.save(train_data,'/home/jovyan/preprocess/train/')
    print("Training data is saved successfully!!")
    tf.data.experimental.save(train_data,'/home/jovyan/preprocess/valid/')
    print("Validation data is saved successfully!!")
    return '/home/jovyan/preprocess/'

def Modelling():
    #ResNet50
    from tensorflow import keras
    import tensorflow as tf
    from keras import layers
    from keras.models import Sequential
    from keras.layers import Dense,Flatten

    train_data = tf.data.experimental.load('/home/jovyan/preprocess/train/')
    valid_data = tf.data.experimental.load("/home/jovyan/preprocess/valid/")

    resnet = Sequential()
    pretrained_model = keras.applications.ResNet50(
        include_top=False,
        input_shape=(227,227,3),
        pooling='avg',
        classes='2',
        weights='imagenet'
    )

    for i in pretrained_model.layers:
        i.trainable = False

    resnet.add(pretrained_model)
    resnet.add(Flatten())
    resnet.add(Dense(512, activation='relu'))
    resnet.add(Dense(2, activation='softmax'))

    resnet.compile(loss='sparse_categorical_crossentropy',optimizer= keras.optimizers.SGD(lr=0.0009),metrics=["accuracy"])

    model = resnet.fit(
        train_data,
        validation_data= valid_data,
        epochs= 10
    )
    resnet.save('/home/jovyan/model.h5')
    print("Model trained succesfully")

def Prediction():
    from tensorflow.keras.models import load_model
    from tensorflow import keras
    import tensorflow as tf
    import numpy as np
    from sklearn.metrics import confusion_matrix

    saved_model = load_model('/home/jovyan/model.h5')
    valid_data = tf.data.experimental.load("/home/jovyan/preprocess/valid/")

    predictions = saved_model.predict(valid_data)
    
    true_label = valid_data.class_names
    predicted_classes = np.argmax(predictions,axis=1)

    print("Confusion matrix")
    cm = confusion_matrix(true_label,predicted_classes)
    print(cm)
    
#components creation
download = create_component_from_func(Download_data,packages_to_install = 'google-cloud-storage')
load = create_component_from_func(Load_data)
preprocess = create_component_from_func(Preprocess_data,base_image = 'tensorflow/tensorflow:2.8')
model_v = create_component_from_func(Modelling,base_image = 'tensorflow/tensorflow:2.8')
prediction = create_component_from_func(Prediction, packages_to_install=['numpy','scikit-learn'])

@dsl.pipeline(
    name = 'ml-pipeline',
    description = 'pneumonia classifier'
)

def ml_pipeline():
    #attach a existing pvc
    pvc = dsl.PipelineVolume(pvc='pneumonia-classifier-volume')

    #defining pipeline steps
    step_1 = download().add_pvolumes(pvc)

    step_2 = load().add_pvolumes(pvc)
    step_2.after(step_1)

    step_3 = preprocess().add_pvolumes(pvc)
    step_3.after(step_2)

    step_4 = model_v().add_pvolumes(pvc)
    step_4.after(step_3)

    step_5 = prediction().add_pvolumes(pvc)
    step_5.after(step_4)
    
if __name__ == "__main__":
    client = kfp.Client()
    client.create_run_from_pipeline_func(ml_pipeline,arguments=None,experiment_name='test')