In [2]:
import kfp
import kfp.dsl as dsl
import kfp.gcp as gcp
from kfp import components
import json
from string import Template

ModuleNotFoundError: No module named 'kfp'

In [3]:
def convert_result(result) -> str:
    import json
    hyperparameters = json.loads(result)
    args = []
    for param in hyperparameters:
        args.append(f"{param['name']}={param['value']}")
        return " ".join(args)

In [None]:
KATIB_NAMESPACE = ""

In [None]:
@dsl.pipeline(
    name="Used-Cars",
    description="A pipeline to train and serve the used cars price prediction model"
)

def used_cars_pipeline(
    name="used-cars-{{workflow.id}}",
    katib_namespace=KATIB_NAMESPACE,
    goal=0.9
    preprocess_image="gcr.io/<PROJECT_ID>/kubeflow-used-cars-preprocess:latest"
    training_image="gcr.io/<PROJECT_ID>/kubeflow-used-cars-train:latest",
    training_namespace="kubeflow",
    export_dir="gs://<BUCKET_NAME>/export",
    parallel_trial_count=5,
    max_trial_count=3
):
    
    objectiveConfig = {
        "type": "maximize",
        "goal": goal,
        "objectiveMericName": "val_accuracy",
        "additionalMetricNames": ["loss", "accuracy"],
    }
    
    algorithmConfig = {"algorithmName": "random"}
    
    metricCollectorSpec = {
        "collector": {
            "kind": "StdOut"
        }
    }
    
    parameters = [
        {"name": "--tf-learning-rate", "parameterType": "double", "feasibleSpace": {"min": "0.001", "max":"0.05"}},
    ]
    
    rawTemplate = {
        "apiVersion": "batch/v1",
        "kind": "Job",
        "metadata": {
            "name": "{{.Trial}}",
            "namespace": "{{.Namespace}}"
        },
        
        "spec": {
            "template": {
                "spec": {
                    "restartPolicy": "Never",
                    "containers": [
                        {
                            "name": "{{.Trial}}",
                            "image": str(training_image),
                            "imagePullPolicy": "Always",
                            "command": [
                                "python",
                                "/opt/model.py",
                                "--tf-mode=local",
                                "{{- with .Hyperparameters}}",
                                "{{- range}}",
                                "{{.Name}}={{.Value}}",
                                "{{- end}}",
                                "{{- end}}"
                            ]
                        }
                    ]
                }
            }
        }
    }
    
    trialTemplate = {
        "goTemplate": {
            "rawTemplate": json.dumps(rawTemplate)
        }
    }
    
    katib_experiment_launcher_op = components.load_component_from_url(
        'https://raw.githubusercontent.com/kubeflow/pipelines/master/components/kubeflow/katib-launcher/component.yaml'
    )
    
    katib_op = katib_experiment_launcher_op(
        experiment_name=name,
        experiment_namespace=katib_namespace,
        parallel_trial_count=parallel_trial_count,
        max_trial_count=max_trial_count,
        objective=str(objectiveConfig),
        algorithm=str(algorithmConfig),
        trailTemplate=str(trialTemplate),
        parameters=str(parameters),
        metrics_collector=str(metricsCollectorSpec),
        delete_finished_experiment=False
    )
    
    
    convert_op = components.func_to_container_op(convert_result)
    op2 = convert_op(katib_op.output)