In [1]:
from google.cloud import aiplatform

In [2]:
aiplatform.init(project='sashaproject-1', location='us-central1' )

In [3]:
import kfp.v2.components as comp

In [None]:
#             if key in init_arg_names:
#                 serialized_args['init'][key] = value
#                 #runner_args.append(f'--{INIT_KEY}.{key}={value}')
#             elif key in method_arg_names:
#                 serialized_args['method'][key] = value
#                 runner_args.append(f'--{METHOD_KEY}.{key}={value}')
#             elif key == 'staging_bucket':
#                 pass
                #runner_args.append(f'--staging_bucket={value}')
        

In [505]:
import importlib
import subprocess
import inspect
import json
from kfp.v2 import dsl
from kfp import components
import kfp
import re

from google.cloud import aiplatform

INIT_KEY = 'init'
METHOD_KEY = 'method'

gcs_csv_path = 'gs://ucaip-mb-sasha-dev/data/abalone_train.csv'
    
def method_converter(method, should_serialize_init=False):
    method_signature = inspect.signature(method)
    method_name = method.__name__
    
    if inspect.ismethod(method):
        cls_name = method.__self__.__name__
        init_signature = inspect.signature(method.__self__.__init__)
    else:
        cls = getattr(inspect.getmodule(method),
                      method.__qualname__.split('.<locals>', 1)[0].rsplit('.', 1)[0],
                      None)
        cls_name = cls.__name__
        init_signature = inspect.signature(cls.__init__)
    
    method_arg_names = set(method_signature.parameters.keys())
    init_arg_names = set(init_signature.parameters.keys()) if should_serialize_init else set([])
    
    
    subcommand = [
        'python3',
        'auto_runner.py',
        f'--cls_name={cls_name}',
        f'--method_name={method_name}'
    ]
    
    def make_args(sa):
        additional_args = []
        for key, args in sa.items():
            for arg_key, value in args.items():
                additional_args.append(f"    - --{key}.{arg_key}={value}")
        return '\n'.join(additional_args)

    def f(**kwargs):
        runner_args = []
        inputs = ["inputs:"]
        input_args = []
        input_kwargs = {}
        
        serialized_args = {"init": {}, "method": {}}
        
        for key, value in kwargs.items():
            prefix_key = "init" if key in init_arg_names else "method"
            if isinstance(value, kfp.dsl._pipeline_param.PipelineParam):
                name = key
                inputs.append("- {name: %s, type: Artifact}" % (name))
                input_args.append("""
    - --%s
    - {inputUri: %s}
""" % (f'{prefix_key}.{key}', key))
                input_kwargs[key] = value
            else:
                serialized_args[prefix_key][key] = value
       
        inputs = "\n".join(inputs) if len(inputs) > 1 else ''
        input_args = "\n".join(input_args) if input_args else ''
        component_text = """
name: %s-%s
%s
outputs:
- {name: resource_name_output, type: Artifact}
implementation:
  container:
    image: gcr.io/sashaproject-1/mb_sdk_component:latest
    command:
    - python3
    - remote_runner.py
    - --cls_name=%s
    - --method_name=%s
%s
    args:
    - --resource_name_output_uri
    - {outputUri: resource_name_output}
%s
""" % (cls_name,
       method_name,
       inputs,
       cls_name,
       method_name,
       make_args(serialized_args),
      input_args)
        
        print(component_text)
        
        return components.load_component_from_text(component_text)(**input_kwargs)
        #return subprocess.check_output(' '.join(subcommand + runner_args), shell=True)

    return f
    

In [491]:
from google.cloud import aiplatform
from kfp import dsl

In [506]:
from kfp.v2 import dsl
from kfp.v2 import compiler



@kfp.v2.dsl.pipeline(name='datasetcreatetest')
def pipeline():
    op = DatasetCreateOp(
      project='sashaproject-1',
      display_name='abalone',
      metadata_schema_uri=aiplatform.schema.dataset.metadata.tabular,
      gcs_source=gcs_csv_path)
    print(op)
    
compiler.Compiler().compile(pipeline_func=pipeline,
                            pipeline_root='gs://ucaip-mb-sasha-dev/pipeline-dev',
                            output_path='pipeline.json')



['python3', 'auto_runner.py', '--cls_name=Dataset', '--method_name=create']
[]
{'init': {}, 'method': {'project': 'sashaproject-1', 'display_name': 'abalone', 'metadata_schema_uri': 'gs://google-cloud-aiplatform/schema/dataset/metadata/tabular_1.0.0.yaml', 'gcs_source': 'gs://ucaip-mb-sasha-dev/data/abalone_train.csv'}}

name: Dataset-create

outputs:
- {name: resource_name_output, type: Artifact}
implementation:
  container:
    image: gcr.io/sashaproject-1/mb_sdk_component:latest
    command:
    - python3
    - remote_runner.py
    - --cls_name=Dataset
    - --method_name=create
    - --method.project=sashaproject-1
    - --method.display_name=abalone
    - --method.metadata_schema_uri=gs://google-cloud-aiplatform/schema/dataset/metadata/tabular_1.0.0.yaml
    - --method.gcs_source=gs://ucaip-mb-sasha-dev/data/abalone_train.csv
    args:
    - --resource_name_output_uri
    - {outputUri: resource_name_output}


{'ContainerOp': {'is_exit_handler': False, 'human_name': 'Dataset-create

In [527]:
aiplatform.pipelines.Datacr

DatasetCreateOp = method_converter(aiplatform.Dataset.create)

CustomContainerTrainingJobRunOp = method_converter(aiplatform.CustomContainerTrainingJob.run,
                                                  should_serialize_init=True)

ModelDeployOp = method_converter(aiplatform.Model.deploy, should_serialize_init=True)




In [537]:
@kfp.v2.dsl.pipeline(name='customtrainingtest')
def pipeline():
    dataset_create_op = DatasetCreateOp(
      project='sashaproject-1',
      display_name='bq_iris_dataset',
      bq_source=f'bq://sashaproject-1.ml_datasets.iris',
      metadata_schema_uri=aiplatform.schema.dataset.metadata.tabular)
    
    custom_training_op = CustomContainerTrainingJobRunOp(
        project='sashaproject-1',
        container_uri='gcr.io/sashaproject-1/test-custom-container:latest',
        model_serving_container_image_uri='gcr.io/cloud-aiplatform/prediction/tf2-cpu.2-2:latest',
        dataset=dataset_create_op.outputs['resource_name_output'],
        #dataset='projects/40556267596/locations/us-central1/datasets/1832349321828237312',
        replica_count=1, 
        model_display_name='bq-iris-model',
        bigquery_destination=f'bq://sashaproject-1',
        display_name='my-training-job',
        staging_bucket='gs://ucaip-mb-sasha-dev',
    )
    
    model_deploy_op = ModelDeployOp(
        project='sashaproject-1',
        model_name=custom_training_op.outputs['resource_name_output'],
        machine_type='n1-standard-4')
    
compiler.Compiler().compile(pipeline_func=pipeline,
                        pipeline_root='gs://ucaip-mb-sasha-dev/pipeline-dev',
                        output_path='pipeline.json')


name: Dataset-create

outputs:
- {name: resource_name_output, type: Artifact}
implementation:
  container:
    image: gcr.io/sashaproject-1/mb_sdk_component:latest
    command:
    - python3
    - remote_runner.py
    - --cls_name=Dataset
    - --method_name=create
    - --method.project=sashaproject-1
    - --method.display_name=bq_iris_dataset
    - --method.bq_source=bq://sashaproject-1.ml_datasets.iris
    - --method.metadata_schema_uri=gs://google-cloud-aiplatform/schema/dataset/metadata/tabular_1.0.0.yaml
    args:
    - --resource_name_output_uri
    - {outputUri: resource_name_output}



name: CustomContainerTrainingJob-run
inputs:
- {name: dataset, type: Artifact}
outputs:
- {name: resource_name_output, type: Artifact}
implementation:
  container:
    image: gcr.io/sashaproject-1/mb_sdk_component:latest
    command:
    - python3
    - remote_runner.py
    - --cls_name=CustomContainerTrainingJob
    - --method_name=run
    - --init.project=sashaproject-1
    - --init.containe

In [538]:
!cat pipeline.json

{
  "pipelineSpec": {
    "pipelineInfo": {
      "name": "customtrainingtest"
    },
    "schemaVersion": "2.0.0",
    "components": {
      "comp-model-deploy": {
        "outputDefinitions": {
          "artifacts": {
            "resource_name_output": {
              "artifactType": {
                "instanceSchema": "properties:\ntitle: kfp.Artifact\ntype: object\n"
              }
            }
          }
        },
        "inputDefinitions": {
          "artifacts": {
            "model_name": {
              "artifactType": {
                "instanceSchema": "properties:\ntitle: kfp.Artifact\ntype: object\n"
              }
            }
          }
        },
        "executorLabel": "exec-model-deploy"
      },
      "comp-dataset-create": {
        "outputDefinitions": {
          "artifacts": {
            "resource_name_output": {
              "artifactType": {
                "instanceSchema": "properties:\ntitle: kfp.Artifact\ntype: object\n"
              }
      

In [539]:
from aiplatform.pipelines import client

api_client = client.Client(project_id='sashaproject-1', region='us-central1',
                           api_key='AIzaSyB22mkgt9qWfL_wr8GH0zJX9elz3EvuouM')

In [540]:
api_client.create_run_from_job_spec('pipeline.json', pipeline_root='gs://ucaip-mb-sasha-dev')

INFO:absl:Compiled JSON request: {"pipelineSpec": {"pipelineInfo": {"name": "customtrainingtest"}, "schemaVersion": "2.0.0", "components": {"comp-model-deploy": {"outputDefinitions": {"artifacts": {"resource_name_output": {"artifactType": {"instanceSchema": "properties:\ntitle: kfp.Artifact\ntype: object\n"}}}}, "inputDefinitions": {"artifacts": {"model_name": {"artifactType": {"instanceSchema": "properties:\ntitle: kfp.Artifact\ntype: object\n"}}}}, "executorLabel": "exec-model-deploy"}, "comp-dataset-create": {"outputDefinitions": {"artifacts": {"resource_name_output": {"artifactType": {"instanceSchema": "properties:\ntitle: kfp.Artifact\ntype: object\n"}}}}, "executorLabel": "exec-dataset-create"}, "comp-customcontainertrainingjob-run": {"inputDefinitions": {"artifacts": {"dataset": {"artifactType": {"instanceSchema": "properties:\ntitle: kfp.Artifact\ntype: object\n"}}}}, "outputDefinitions": {"artifacts": {"resource_name_output": {"artifactType": {"instanceSchema": "properties:\nt

{'name': 'projects/40556267596/locations/us-central1/pipelineJobs/customtrainingtest-20210317200636',
 'displayName': 'customtrainingtest-20210317200636',
 'createTime': '2021-03-18T00:06:36.250265Z',
 'updateTime': '2021-03-18T00:06:36.250265Z',
 'pipelineSpec': {'deploymentConfig': {'@type': 'type.googleapis.com/ml_pipelines.PipelineDeploymentConfig',
   'executors': {'exec-dataset-create': {'container': {'image': 'gcr.io/sashaproject-1/mb_sdk_component:latest',
      'command': ['python3',
       'remote_runner.py',
       '--cls_name=Dataset',
       '--method_name=create',
       '--method.project=sashaproject-1',
       '--method.display_name=bq_iris_dataset',
       '--method.bq_source=bq://sashaproject-1.ml_datasets.iris',
       '--method.metadata_schema_uri=gs://google-cloud-aiplatform/schema/dataset/metadata/tabular_1.0.0.yaml'],
      'args': ['--resource_name_output_uri',
       "{{$.outputs.artifacts['resource_name_output'].uri}}"]}},
    'exec-customcontainertrainingjob-

In [512]:
api_client.get_job('customtrainingtest-20210317152513')

{'name': 'projects/40556267596/locations/us-central1/pipelineJobs/customtrainingtest-20210317152513',
 'displayName': 'customtrainingtest-20210317152513',
 'createTime': '2021-03-17T19:25:13.705213Z',
 'startTime': '2021-03-17T19:25:44.872367Z',
 'endTime': '2021-03-17T19:27:32.591102Z',
 'updateTime': '2021-03-17T19:27:32.591102Z',
 'pipelineSpec': {'deploymentConfig': {'@type': 'type.googleapis.com/ml_pipelines.PipelineDeploymentConfig',
   'executors': {'exec-dataset-create': {'container': {'image': 'gcr.io/sashaproject-1/mb_sdk_component:latest',
      'command': ['python3',
       'remote_runner.py',
       '--cls_name=Dataset',
       '--method_name=create',
       '--method.project=sashaproject-1',
       '--method.display_name=abalone',
       '--method.metadata_schema_uri=gs://google-cloud-aiplatform/schema/dataset/metadata/tabular_1.0.0.yaml',
       '--method.gcs_source=gs://ucaip-mb-sasha-dev/data/abalone_train.csv'],
      'args': ['--resource_name_output_uri',
       "{{$