In [1]:
from google_cloud_pipeline_components.experimental.dataflow import DataflowPythonJobOp
from google_cloud_pipeline_components.experimental.wait_gcp_resources import WaitGcpResourcesOp

In [2]:
import kfp.dsl as dsl
import json

In [3]:
PROJECT_ID = 'hd-datascience-np'
LOCATION = "us-central1"
PIPELINE_ROOT = 'gs://hd-datascience-np-data/hotspot/' # No ending slash

# Dataflow sample parameters
PIPELINE_NAME = 'dataflow-pipeline-sample'
OUTPUT_FILE = '{}/wc/wordcount.out'.format(PIPELINE_ROOT)

In [4]:
import kfp.components as comp

dataflow_python_op = comp.load_component_from_url('https://raw.githubusercontent.com/kubeflow/pipelines/1.7.0-rc.3/components/gcp/dataflow/launch_python/component.yaml')
help(dataflow_python_op)

Help on function Launch Python:

Launch Python(python_file_path: str, project_id: str, region: str, staging_dir: 'GCSPath' = '', requirements_file_path: 'GCSPath' = '', args: list = '[]', wait_interval: int = '30')
    Launch Python
    Launch a self-executing beam python file.



In [5]:
import kfp.dsl as dsl
import json

@dsl.pipeline(
    name=PIPELINE_NAME,
    description='Dataflow launch python pipeline'
)
def pipeline(
    python_file_path:str = 'gs://ml-pipeline-playground/samples/dataflow/wc/wc.py',
    project_id:str = PROJECT_ID,
    location:str = LOCATION,
    staging_dir:str = PIPELINE_ROOT,
    requirements_file_path:str = 'gs://ml-pipeline-playground/samples/dataflow/wc/requirements.txt',
):
    dataflow_python_op = DataflowPythonJobOp(
        project=project_id,
        location=location,
        python_module_path=python_file_path,
        temp_location = staging_dir,
        requirements_file_path = requirements_file_path,
        args = ['--output', OUTPUT_FILE],
    )
    dataflow_wait_op = WaitGcpResourcesOp(
      gcp_resources = dataflow_python_op.outputs["gcp_resources"])

In [7]:
pipeline_func = pipeline
# pipeline_filename = pipeline_func.__name__ + '.zip'
from kfp.v2 import compiler
compiler.Compiler().compile(pipeline_func, package_path='dataflow-pipeline-sample.json')



In [8]:
# #Specify values for the pipeline's arguments
# arguments = {}

# #Get or create an experiment
# import kfp
# client = kfp.Client()
# experiment = client.create_experiment(EXPERIMENT_NAME)

# #Submit a pipeline run
# run_name = pipeline_func.__name__ + ' run'
# run_result = client.run_pipeline(experiment.id, run_name, pipeline_filename, arguments)

from kfp.v2.google.client import AIPlatformClient

api_client = AIPlatformClient(
                project_id=PROJECT_ID,
                region=LOCATION
                )

response = api_client.create_run_from_job_spec(
    job_spec_path="dataflow-pipeline-sample.json",
    enable_caching =False,
    service_account='visual-ui-pipeline-sa@hd-datascience-np.iam.gserviceaccount.com',
    pipeline_root=PIPELINE_ROOT  # this argument is necessary if you did not specify PIPELINE_ROOT as part of the pipeline definition.
)



In [9]:
!gsutil cat $OUTPUT_FILE

CommandException: No URLs matched: gs://hd-datascience-np-data/hotspot//wc/wordcount.out
