In [None]:

import kfp.dsl as dsl
import yaml
from kubernetes import client as k8s
import kfp.gcp as gcp
from kfp import components
from string import Template
import json
from kubernetes import client as k8s_client


@dsl.pipeline(
  name='',
  description='End to End pipeline for Tensorflow Brain MRI '
)


def brain_tensorflow_pipeline(
        dataextraction_step_image="gcr.io/<PROJECT_ID>/brain_tumor_scan1/step1_download_data:v1",
        dataprocessing_step_image="gcr.io/<PROJECT_ID>/brain_tumor_scan4/step2_dataprocessing:v1",
        trainmodel_step_image="gcr.io/<PROJECT_ID>/brain_tumor_scan1/step3_training_model:v1",
        evaluator_step_image="gcr.io/<PROJECT_ID>/brain_tumor_scan1/step4_evaluation_model:V1",
        root="/mnt/",
        data_file="/mnt/BrainScan_Data/",
        kaggle_api_data="navoneel/brain-mri-images-for-brain-tumor-detection",
        train_file='/mnt/training.data',
        test_file='/mnt/test.data',
        validation_file="/mnt/validation.data",
        label="/mnt/labels.data",
        activation="sigmoid",
        image_size=224,
        train_target="/mnt/trainingtarget.data",
        test_target="/mnt/testtarget.data",
        validation_target="/mnt/validationtarget.data",
        epochs=10,
        learning_rate=.001,
        shuffle_size=1000,
        tensorboard_logs="/mnt/logs/",
        tensorboard_gcs_logs="gs://<BUCKET_NAME>/brain/logs",
        model_output_base_path="/mnt/saved_model",
        gcs_path="gs://<BUCKET_NAME>/brain/model",
        gcs_path_confusion="gs://<BUCKET_NAME>/brain",
        mode="gcs",
        probability=0.5,
        serving_name="kfserving-braintumor",
        serving_namespace="kubeflow",
        image="gcr.io/<PROJECT_ID>/brain_tumor_scan/kf_serving_braintest:v1"
        
    ):


    """
    Pipeline
    """
    # PVC : PersistentVolumeClaim volume
    vop = dsl.VolumeOp(
      name='my-pvc',
      resource_name="my-pvc",
      modes=dsl.VOLUME_MODE_RWO,
      size="1Gi"
    )


    # data extraction
    data_extraction_step = dsl.ContainerOp(
        name='data_extraction',
        image=dataextraction_step_image,
        command="python",
        arguments=[
            "/app/dataextract.py",
            "--root",root,
            "--data-file", data_file,
            "--kaggle-api-data", kaggle_api_data,
        ],
        pvolumes={"/mnt": vop.volume}
    ).apply(gcp.use_gcp_secret("user-gcp-sa"))

    
    
    # processing
    data_processing_step = dsl.ContainerOp(
        name='data_processing',
        image=dataprocessing_step_image,
        command="python",
        arguments=[
            "/app/preprocessing.py",
            "--train-file", train_file,
            "--test-file", test_file,
            "--validation-file", validation_file,
            "--root",root,
            "--image-size",image_size,
            "--train-target",train_target,
            "--test-target",test_target,
            "--validation-target",validation_target,
            "--label",label
            
        ],
        pvolumes={"/mnt": data_extraction_step.pvolume}
    ).apply(gcp.use_gcp_secret("user-gcp-sa"))



   #trainmodel
    train_model_step = dsl.ContainerOp(
        name='train_model',
        image=trainmodel_step_image,
        command="python",
        arguments=[
            "/app/train.py",
            "--train-file", train_file,
            "--test-file", test_file,
            "--label",label,
            "--activation",activation,
            "--validation-file", validation_file,
            "--train-target",train_target,
            "--test-target",test_target,
            "--validation-target",validation_target,
            "--epochs",epochs,
            "--image-size",image_size,
            
            "--learning-rate",learning_rate,
            "--tensorboard-logs",tensorboard_logs,
            "--tensorboard-gcs-logs",tensorboard_gcs_logs,
            "--model-output-base-path",model_output_base_path,
            "--gcs-path", gcs_path,
            "--mode", mode,
          
        ],file_outputs={"mlpipeline-ui-metadata": "/mlpipeline-ui-metadata.json"
        },
        pvolumes={"/mnt": data_processing_step.pvolume}
    ).apply(gcp.use_gcp_secret("user-gcp-sa")) 
    

    #evaluation
    evaluation_model_step = dsl.ContainerOp(
        name='evaluation_model',
        image=evaluator_step_image,
        command="python",
        arguments=[
            "/app/evaluator.py",
            "--test-file", test_file,
            "--test-target",test_target,
            "--probability",probability,
            "--model-output-base-path",model_output_base_path,
            "--gcs-path", gcs_path,
            "--label",label,
            "--gcs-path-confusion", gcs_path_confusion,
          
        ],file_outputs={"mlpipeline-metrics":"/mlpipeline-metrics.json","mlpipeline-ui-metadata": "/mlpipeline-ui-metadata.json"
        },
        pvolumes={"/mnt": train_model_step.pvolume}
    ).apply(gcp.use_gcp_secret("user-gcp-sa")) 


    kfserving_template = Template("""{
                              "apiVersion": "serving.kubeflow.org/v1alpha2",
                              "kind": "InferenceService",
                              "metadata": {
                                "labels": {
                                  "controller-tools.k8s.io": "1.0"
                                },
                                "name": "$name",
                                "namespace": "$namespace"
                              },
                              "spec": {
                                "default": {
                                  "predictor": {
                                    "custom": {
                                      "container": {
                                        "image": "$image"
                                      }
                                    }
                                  }
                                }
                              }
                            }""")


    kfservingjson = kfserving_template.substitute({ 'name': str(serving_name),
                                'namespace': str(serving_namespace),
                                'image': str(image)})

    kfservingdeployment = json.loads(kfservingjson)

    serve = dsl.ResourceOp(
        name="serve",
        k8s_resource=kfservingdeployment,
        action="apply",
        success_condition="status.url"
    )
    serve.after(evaluation_model_step)
    


In [3]:
import kfp.dsl as dsl
import yaml
import kfp
from kubernetes import client as k8s
import kfp.gcp as gcp
kfp.__version__


'1.0.4'

In [None]:
if __name__ == '__main__':
    import kfp.compiler as compiler
    pipeline_func = brain_tensorflow_pipeline
    pipeline_filename = pipeline_func.__name__ + '.pipeline.yaml'
    compiler.Compiler().compile(pipeline_func,pipeline_filename)

In [None]:
import kfp
from kfp import compiler
import kfp.components as comp
import kfp.dsl as dsl
from kfp import gcp
EXPERIMENT_NAME = 'Brain_experiment'
client = kfp.Client()

try:
    experiment = client.get_experiment(experiment_name=EXPERIMENT_NAME)
except:
    experiment = client.create_experiment(EXPERIMENT_NAME)
    
print(experiment)

In [None]:
arguments = {}

run_name = pipeline_func.__name__ + 'heart_run'
run_result = client.run_pipeline(experiment.id, 
                                 run_name, 
                                 pipeline_filename, 
                                 arguments)

print(experiment.id)
print(run_name)
print(pipeline_filename)
print(arguments)