<a href="https://colab.research.google.com/github/KUrushi/toy_code/blob/main/tfx_kfp_component.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [4]:
! pip install -U pip
!pip install tfx kfp google-cloud-aiplatform


[0m

In [1]:
%%writefile component.yaml
name: echo
inputs:
- {name: message, type: String}
outputs:
- {name: Output, type: String}
implementation:
  container:
    image: python:3.7
    command:
    - sh
    - -c
    - |
      mkdir -p "$(dirname "$1")"
      echo "$0" > "$1"
    args:
    - {inputValue: message}
    - {outputPath: Output}

Overwriting component.yaml


# Change load_kfp_yaml_container_component
Inputs of `container_component.create_container_component` is assumed to pass  as a tfx's standard artifact. Thus,  we need to pass to the arguments of inputs as an artifact using other component's output.

If we want to pass a primitive type value, we must pass to create_container_component function as the `parameter`. 

In the current tfx version, we cannot pass the parameter and tfx doesn't treat inputs value as a parameter.

So I fixed this function.
```python
def load_kfp_yaml_container_component(
    path: str) -> Callable[..., base_component.BaseComponent]:
  """Creates a container-based component from a Kubeflow component spec.

  See
  https://www.kubeflow.org/docs/pipelines/reference/component-spec/

  Example:
    component = load_kfp_yaml_container_component(
      "kfp_pipelines_root/components/datasets/Chicago_Taxi_Trips/component.yaml"
    )

  Args:
    path: local file path of a Kubeflow Pipelines component YAML file.

  Returns:
    Container component that can be instantiated in a TFX pipeline.
  """
  with open(path) as component_file:
    data = yaml.load(component_file, Loader=yaml.SafeLoader)
  _convert_target_fields_to_kv_pair(data)
  component_spec = json_format.ParseDict(data,
                                         kfp_component_spec_pb2.ComponentSpec())
  container = component_spec.implementation.container
  command = (
      list(map(_get_command_line_argument_type, container.command)) +
      list(map(_get_command_line_argument_type, container.args)))
  # TODO(ericlege): Support classname to class translation in inputs.type
  inputs_value_placeholder = [c.input_name for c in command if isinstance(c, placeholders.InputValuePlaceholder)]
  inputs = {
      item.name: standard_artifacts.String for item in component_spec.inputs 
      if item.name not in inputs_value_placeholder
  }
  outputs = {
      item.name: standard_artifacts.String for item in component_spec.outputs
  }
  parameters = {
    item.name: str for item in component_spec.inputs 
    if item.name in inputs_value_placeholder
  }

  return container_component.create_container_component(
      name=component_spec.name,
      image=container.image,
      command=command,
      inputs=inputs,
      outputs=outputs,
      parameters=parameters,
  )
```

In [2]:
# Copyright 2020 Google LLC. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Functions for creating container components from kubeflow components."""

from typing import Any, Callable, Dict

from tfx.dsl.component.experimental import container_component
from tfx.dsl.component.experimental import placeholders
from tfx.dsl.components.base import base_component
from tfx.extensions.experimental.kfp_compatibility.proto import kfp_component_spec_pb2
from tfx.types import standard_artifacts
import yaml

from google.protobuf import json_format


def load_kfp_yaml_container_component(
    path: str) -> Callable[..., base_component.BaseComponent]:
  """Creates a container-based component from a Kubeflow component spec.

  See
  https://www.kubeflow.org/docs/pipelines/reference/component-spec/

  Example:
    component = load_kfp_yaml_container_component(
      "kfp_pipelines_root/components/datasets/Chicago_Taxi_Trips/component.yaml"
    )

  Args:
    path: local file path of a Kubeflow Pipelines component YAML file.

  Returns:
    Container component that can be instantiated in a TFX pipeline.
  """
  with open(path) as component_file:
    data = yaml.load(component_file, Loader=yaml.SafeLoader)
  _convert_target_fields_to_kv_pair(data)
  component_spec = json_format.ParseDict(data,
                                         kfp_component_spec_pb2.ComponentSpec())
  container = component_spec.implementation.container
  command = (
      list(map(_get_command_line_argument_type, container.command)) +
      list(map(_get_command_line_argument_type, container.args)))
  # TODO(ericlege): Support classname to class translation in inputs.type
  inputs_value_placeholder = [c.input_name for c in command if isinstance(c, placeholders.InputValuePlaceholder)]
  inputs = {
      item.name: standard_artifacts.String for item in component_spec.inputs 
      if item.name not in inputs_value_placeholder
  }
  outputs = {
      item.name: standard_artifacts.String for item in component_spec.outputs
  }
  parameters = {
    item.name: str for item in component_spec.inputs 
    if item.name in inputs_value_placeholder
  }

  return container_component.create_container_component(
      name=component_spec.name,
      image=container.image,
      command=command,
      inputs=inputs,
      outputs=outputs,
      parameters=parameters,
  )

def _convert_target_fields_to_kv_pair(parsed_dict: Dict[str, Any]) -> None:
  """Converts in place specific string fields to key value pairs of {constantValue: [Text]} for proto3 compatibility.

  Args:
    parsed_dict: dictionary obtained from parsing a Kubeflow component spec.
      This argument is modified in place.

  Returns:
    None
  """
  conversion_string_paths = [
      ['implementation', 'container', 'command'],
      ['implementation', 'container', 'args'],
  ]
  for path in conversion_string_paths:
    parsed_dict_location = parsed_dict
    for label in path:
      parsed_dict_location = parsed_dict_location.get(label, {})
    if isinstance(parsed_dict_location, list):
      for ind, value in enumerate(parsed_dict_location):
        if isinstance(value, str):
          parsed_dict_location[ind] = {'constantValue': value}


def _get_command_line_argument_type(
    command: kfp_component_spec_pb2.StringOrPlaceholder
) -> placeholders.CommandlineArgumentType:
  """Converts a container command to the corresponding type.

  Args:
    command: StringOrPlaceholder which encodes a container command.

  Returns:
    command to be passed into create_container_component.
  """
  if command.HasField('constantValue'):
    return command.constantValue
  if command.HasField('inputValue'):
    return placeholders.InputValuePlaceholder(command.inputValue)
  if command.HasField('inputPath'):
    return placeholders.InputUriPlaceholder(command.inputPath)
  if command.HasField('outputPath'):
    return placeholders.OutputUriPlaceholder(command.outputPath)
  raise ValueError('Unrecognized command %s' % command)


# Run tfx pipeline on vertex pipeline

In [5]:
from google.colab import auth
auth.authenticate_user()

In [8]:
PROJECT = "" # @param {type: "string"}
PIPELINE_ROOT = "" # @param {type: "string"}
location = "us-central1" # @param {type: "string"}

In [None]:
from tfx import v1 as tfx
from tfx.dsl.component.experimental.annotations import OutputDict
from tfx.dsl.component.experimental.decorators import component
from tfx.orchestration.kubeflow.v2 import kubeflow_v2_dag_runner



TfxEchoComponent = load_kfp_yaml_container_component("component.yaml")
echo = TfxEchoComponent(message="hello, world").with_id("echo")
tfx_pipeline = tfx.dsl.Pipeline(pipeline_name="hello-world-tfx", pipeline_root=PIPELINE_ROOT, 
        components=[echo])

runner = kubeflow_v2_dag_runner.KubeflowV2DagRunner(
    config=kubeflow_v2_dag_runner.KubeflowV2DagRunnerConfig(),
output_dir=".", output_filename="decorator.json")
runner.run(tfx_pipeline)


In [11]:
from google.cloud import aiplatform as vertex
vertex.init(project=PROJECT, location=location)
job = vertex.PipelineJob(display_name="tfx_pipeline_sample", 
                         template_path="decorator.json", enable_caching=False)
job.run()