# AutoML Building Kubeflow Pipeline

!python -m pip install --upgrade pip
!pip install kfp==1.4.0
!pip install kubeflow-katib==0.11.0
!pip install kfp-tekton==0.7.0
!pip install kubernetes==12.0.0

In [1]:
import kfp
import json
import kfp.dsl as dsl
from kfp import components

from kubeflow.katib import ApiClient
from kubeflow.katib import V1beta1ExperimentSpec
from kubeflow.katib import V1beta1AlgorithmSpec
from kubeflow.katib import V1beta1EarlyStoppingSpec
from kubeflow.katib import V1beta1EarlyStoppingSetting
from kubeflow.katib import V1beta1ObjectiveSpec
from kubeflow.katib import V1beta1ParameterSpec
from kubeflow.katib import V1beta1FeasibleSpace
from kubeflow.katib import V1beta1TrialTemplate
from kubeflow.katib import V1beta1TrialParameterSpec

from kubeflow.katib import KatibClient
from kubernetes.client import V1ObjectMeta
from kubeflow.katib import V1beta1Experiment
from kubeflow.katib import V1beta1AlgorithmSpec
from kubeflow.katib import V1beta1AlgorithmSetting
from kubeflow.katib import V1beta1ObjectiveSpec
from kubeflow.katib import V1beta1MetricsCollectorSpec
from kubeflow.katib import V1beta1CollectorSpec
from kubeflow.katib import V1beta1SourceSpec
from kubeflow.katib import V1beta1FilterSpec
from kubeflow.katib import V1beta1FeasibleSpace
from kubeflow.katib import V1beta1ExperimentSpec
from kubeflow.katib import V1beta1NasConfig
from kubeflow.katib import V1beta1GraphConfig
from kubeflow.katib import V1beta1Operation
from kubeflow.katib import V1beta1ParameterSpec
from kubeflow.katib import V1beta1TrialTemplate
from kubeflow.katib import V1beta1TrialParameterSpec

In [2]:


# Trial count specification.
max_trial_count = 24
max_failed_trial_count = 3
parallel_trial_count = 3

# Objective specification.
objective=V1beta1ObjectiveSpec(
    type="maximize",
    goal= 0.95,
    objective_metric_name="val_accuracy",
    additional_metric_names=[
        "accuracy"
    ]
)

# Algorithm specification.
algorithm=V1beta1AlgorithmSpec(
    algorithm_name="bayesianoptimization",
)

# Early Stopping specification.
early_stopping=V1beta1EarlyStoppingSpec(
    algorithm_name="medianstop",
    algorithm_settings=[
        V1beta1EarlyStoppingSetting(
            name="min_trials_required",
            value="2"
        )
    ]
)

metrics_collector_spec=V1beta1MetricsCollectorSpec(
collector=V1beta1CollectorSpec(kind="StdOut"))

# Experiment search space.
# In this example we tune learning rate, number of layer and optimizer.
# Learning rate has bad feasible space to show more early stopped Trials.
parameters=[
    V1beta1ParameterSpec(
        name="learning-rate",
        parameter_type="double",
        feasible_space=V1beta1FeasibleSpace(
            min="0.0001",
            max="0.00015"
        ),
    ),
    V1beta1ParameterSpec(
        name="optimizer",
        parameter_type="categorical",
        feasible_space=V1beta1FeasibleSpace(
            list=[
                "SGD", 
                "ADAM",
                "RMSprop"
            ]
        ),
    ),
     V1beta1ParameterSpec(
        name="units",
        parameter_type="categorical",
        feasible_space=V1beta1FeasibleSpace(
            list=[
                8,16,32
            ]
        ),
    ),
    V1beta1ParameterSpec(
        name="layers",
        parameter_type="categorical",
        feasible_space=V1beta1FeasibleSpace(
            list=[
               1,3
            ]
        ),
    ),
       V1beta1ParameterSpec(
        name="intializer",
        parameter_type="categorical",
        feasible_space=V1beta1FeasibleSpace(
            list=[
                "glorot_uniform", 
                "random_uniform",
               
            ]
        ),
    ),
     V1beta1ParameterSpec(
        name="drop-out",
        parameter_type="double",
        feasible_space=V1beta1FeasibleSpace(
            min="0.15",
            max="0.25"
        ),
    ),
    
    
]

In [3]:
# JSON template specification for the Trial's Worker Kubernetes Job.
#Paste the project ID
trial_spec={
    "apiVersion": "batch/v1",
    "kind": "Job",
    "spec": {
        "template": {
            "metadata": {
                "annotations": {
                     "sidecar.istio.io/inject": "false"
                }
            },
            "spec": {
                "containers": [
                    {
                        "name": "training-container",
                        "image": "gcr.io/<PROJECT_ID>/mlops_world/katibfeature:v3",
                        "command": [
                            "python3",
                            "/opt/model.py",
                            "--learning-rate=${trialParameters.learningRate}",
                            "--optimizer=${trialParameters.optimizer}"
                            "--drop-out=${trialParameters.dropOut}"
                            "--units=${trialParameters.units}"
                            "--layers=${trialParameters.layers}"
                            "--intializer=${trialParameters.intializer}"
                            "--data=gs://featurestore_artifacts/train.csv"
                            "--target=fare_statistics__target"
                            
                            
                            
                    
                        ]
                    }
                ],
                "restartPolicy": "Never"
            }
        }
    }
}

# Configure parameters for the Trial template.
# We set the retain parameter to "True" to not clean-up the Trial Job's Kubernetes Pods.
trial_template=V1beta1TrialTemplate(
    retain=True,
    primary_container_name="training-container",
    trial_parameters=[
        V1beta1TrialParameterSpec(
            name="learningRate",
            description="Learning rate for the training model",
            reference="learning-rate"
        ),
        V1beta1TrialParameterSpec(
            name="units",
            description="units for the training model",
            reference="units"
        ),
        V1beta1TrialParameterSpec(
            name="layers",
            description="layers  for the training model",
            reference="layers"
        ),
        V1beta1TrialParameterSpec(
            name="intializer",
            description="intializer  for the training model",
            reference="intializer"
        ),
         V1beta1TrialParameterSpec(
            name="dropOut",
            description="Drop out of training model layers",
            reference="drop-out"
        ),
        V1beta1TrialParameterSpec(
            name="optimizer",
            description="Training model optimizer (sdg, adam or rmsprop)",
            reference="optimizer"
        ),
      
      
    ],
    trial_spec=trial_spec
)

In [4]:
experiment_spec=V1beta1ExperimentSpec(
    max_trial_count=max_trial_count,
    max_failed_trial_count=max_failed_trial_count,
    parallel_trial_count=parallel_trial_count,
    objective=objective,
    algorithm=algorithm,
    early_stopping=early_stopping,
     metrics_collector_spec=metrics_collector_spec,
    parameters=parameters,
    trial_template=trial_template
)

In [5]:
# Get the Katib launcher.
katib_experiment_launcher_op = components.load_component_from_url(
    "https://raw.githubusercontent.com/kubeflow/pipelines/master/components/kubeflow/katib-launcher/component.yaml")
import kfp.gcp as gcp
from kfp import components
import kfp.dsl as dsl
from string import Template

def convert_result(experiment_result) -> str:
    import json
    r = json.loads(experiment_result)
    print(type(r))
    print(r)    
    args={}
    for hp in r['currentOptimalTrial']['parameterAssignments']:
            z=hp["value"]
            args[f'{hp["name"]}']=f"{z}"

    
    return str(args)

def set_resources(memory_req=None, memory_lim=None, cpu_req=None, cpu_lim=None, gpus="0", container:dsl.ContainerOp.container=None):
    if container:
        container.set_memory_request(memory_req)
        container.set_memory_limit(memory_lim)
        container.set_cpu_request(cpu_req)
        container.set_cpu_limit(cpu_lim)
        if int(gpus) > 0:
            container.set_gpu_limit(gpus)
        return container
    else:
        return None

@dsl.pipeline(
    name='Feature Store Service Kubeflow pipeline',
  description='End to End pipeline for Feature Store training and serving'
)
def feature_pipeline(experiment_name = "katib-pipes",
                     experiment_namespace = "aniruddha-choudhury",
                    staging_bucket="gs://featurestore/",
                    target_name="fare_statistics__target",
                    data_id="gs://featurestore_artifacts/driver_id.csv",
                    data_source="batch",
                    data_features="gs://featurestore_artifacts/features.json",
                    save_features="gs://featurestore_artifacts/train.csv",
                    epochs=5,
                    batch_size=32, 
                    tensorboard_gcs_logs="gs://featurestore_artifacts/taxi/logs",
                    mode="GCS",
                    gcs_path="gs://featurestore_artifacts/taxi/model",
                    gcs_path_confusion="gs://featurestore_artifacts/taxi/",
                    probability=0.5,
                     
                    feature_step_image="gcr.io/<PROJECT_ID>/mlops_world/featureingest:v2",
                    evaluator_step_image= "gcr.io/<PROJECT_ID>/mlops_world/modeleval:v1",
                    
                    
                    serving_name='seldon-serving',
                    serving_namespace="aniruddha-choudhury",
                    serving_step_image="gcr.io/<PROJECT_ID>/feature/kfservingcustom:latest",
                    model_storage_path="gs://featurestore_artifacts/taxi/model"
       
    ):
    
    
    vop = dsl.VolumeOp(
      name='my-pvc',
      resource_name="my-pvc",
      modes=dsl.VOLUME_MODE_RWO,
      size="1Gi"
    )
    
    
    feature_store_step = dsl.ContainerOp(
        name='Feature Store Service',
        image=feature_step_image,
        command="python",
        arguments=[
            "/app/featurestore_service.py",
            "--staging-bucket", staging_bucket,
            "--target-name", target_name,
            "--data-id", data_id,
            "--data-source",data_source,
            "--data-features", data_features,
            "--save-features",save_features
          
    
        ],
        pvolumes={"/mnt": vop.volume}
    )
    set_resources(memory_req="4Gi", memory_lim="6Gi", cpu_req="4", cpu_lim="6", container=feature_store_step.container)
    
    # Katib launcher component.
    # Experiment Spec should be serialized to a valid Kubernetes object.
    op = katib_experiment_launcher_op(
        experiment_name=experiment_name,
        experiment_namespace=experiment_namespace,
        experiment_spec=ApiClient().sanitize_for_serialization(experiment_spec),
        experiment_timeout_minutes=240,
        delete_finished_experiment=False).after(feature_store_step)
    
    # Output container to print the results.
    set_resources(memory_req="4Gi", memory_lim="6Gi", cpu_req="2", cpu_lim="4", container=op.container)
    '''
    op_out = dsl.ContainerOp(
        name="best-hp",
        image="library/bash:4.4.23",
        command=["sh", "-c"],
        arguments=["echo Best HyperParameters: %s" % op.output],
    )
    '''
    
   
    
    convert_op = components.func_to_container_op(convert_result)
    op2 = convert_op(op.output)
    
      #evaluationmodel
    evaluation_model_step = dsl.ContainerOp(
        name='evaluation_model',
        image=evaluator_step_image,
        command="python",
        arguments=[
            "/app/evaluator.py",
            
            "--probability",probability,
            "--gcs-path", gcs_path,
            "--gcs-path-confusion", gcs_path_confusion,
            "--mode",mode,
            "--artifacts",op2.output,
            
          
        ],file_outputs={"mlpipeline-metrics":"/mlpipeline-metrics.json","mlpipeline-ui-metadata": "/mlpipeline-ui-metadata.json"
        },
        pvolumes={"/mnt": feature_store_step.pvolume}
    ).after(op)
    
     
    template=Template("""
                {
               "apiVersion": "serving.kubeflow.org/v1alpha2",
               "kind": "InferenceService",
               "metadata": {
                  "labels": {
                     "controller-tools.k8s.io": "1.0"
                  },
                  "name": "$name",
                  "namespace": "$namespace",
                  "annotations": {
                     "sidecar.istio.io/inject": "false"
                  }
               },
               "spec": {
                  "default": {
                     "predictor": {
                        "minReplicas": 1,
                        "custom": {
                           "container": {
                              "name": "kfserving-container",
                              "image": "$servingimage",
                              "env": [
                                 {
                                    "name": "STORAGE_URI",
                                    "value": "$bucket"
                                 }
                              ],
                              "imagePullPolicy": "Always"
                           }
                        }
                     }
                  }
               }
            }
    
    
    """)
    tensorjson = template.substitute({ 'bucket': str(gcs_path),'name': str(serving_name),'namespace': str(serving_namespace),'servingimage': str(serving_step_image)})

    kfservingdeployment = json.loads(tensorjson)

    

        

    serve = dsl.ResourceOp(
        name="serve",
        k8s_resource=kfservingdeployment,
        action="apply",
        success_condition="status.url"
    )
    serve.after(evaluation_model_step)
    
    
    
    
if __name__ == '__main__':
    import kfp.compiler as compiler
    pipeline_func = feature_pipeline
    pipeline_filename = pipeline_func.__name__ + '.pipeline.yaml'
    compiler.Compiler().compile(pipeline_func,pipeline_filename)

  serialized_value),


## Kubeflow API 

In [7]:
import kfp
client=kfp.Client()

In [20]:
experiment = client.create_experiment(name='RealtimeKubeflow' ,description='Run the Pipeline', namespace='aniruddha-choudhury')

In [21]:
experiment

{'created_at': datetime.datetime(2021, 7, 20, 17, 41, 19, tzinfo=tzlocal()),
 'description': 'Run the Pipeline',
 'id': 'cae8a4ee-5cb4-4c74-b77f-cb9a48d92268',
 'name': 'RealtimeKubeflow',
 'resource_references': [{'key': {'id': 'aniruddha-choudhury',
                                  'type': 'NAMESPACE'},
                          'name': None,
                          'relationship': 'OWNER'}],
 'storage_state': 'STORAGESTATE_ARCHIVED'}

In [8]:
filepath = '/home/jovyan/lost+found/feature_pipeline.pipeline.yaml'
name= 'automlfeaturepipelines'
pipeline = client.pipeline_uploads.upload_pipeline(filepath, name=name)

In [9]:
pipeline

{'created_at': datetime.datetime(2021, 7, 21, 11, 17, 33, tzinfo=tzlocal()),
 'default_version': {'code_source_url': None,
                     'created_at': datetime.datetime(2021, 7, 21, 11, 17, 33, tzinfo=tzlocal()),
                     'id': '67af83be-1747-45fb-b47f-17b8d2c795c2',
                     'name': 'automlfeaturepipelines',
                     'package_url': None,
                     'parameters': [{'name': 'experiment_name',
                                     'value': 'katib-pipes'},
                                    {'name': 'experiment_namespace',
                                     'value': 'aniruddha-choudhury'},
                                    {'name': 'staging_bucket',
                                     'value': 'gs://featurestore/'},
                                    {'name': 'target_name',
                                     'value': 'fare_statistics__target'},
                                    {'name': 'data_id',
                             

In [10]:
params={'experiment_name' : "katib-run",
        'experiment_namespace' : "aniruddha-choudhury",
        'staging_bucket':"gs://featurestore/",
        'target_name':"fare_statistics__target",
         'batch_size':32,  
        'tensorboard_gcs_logs':"gs://featurestore_artifacts/taxi/logs", 
        'save_features':"gs://featurestore_artifacts/train.csv" ,
        'epochs':5,
        'data_source':"batch", 
        'data_features':"gs://featurestore_artifacts/features.json",   
        'mode':"GCS",
        'data_id': "gs://featurestore_artifacts/driver_id.csv",
        'feature_step_image':"gcr.io/<PROJECT_ID>/mlops_world/featureingest:v2",
        'evaluator_step_image': "gcr.io/<PROJECT_ID>/mlops_world/pipelineval:v1",
        'gcs_path':"gs://featurestore_artifacts/taxi/model",
       'gcs_path_confusion':"gs://featurestore_artifacts/taxi/",
        'probability':0.5,
        'serving_name':'seldon-serving', 
        'serving_namespace':"aniruddha-choudhury",
        'serving_step_image':"gcr.io/<PROJECT_ID>/feature/kfservingcustom:latest",
        'model_storage_path':"gs://featurestore_artifacts/taxi/model"  }

In [31]:
 

run_pipeline = client.run_pipeline(job_name='AutoMLfeatures' ,experiment_id=experiment.id,params=params,pipeline_id=pipeline.id)
