In [5]:
!pip install -U --user pip

Collecting pip
  Using cached https://files.pythonhosted.org/packages/00/b6/9cfa56b4081ad13874b0c6f96af8ce16cfbc1cb06bedf8e9164ce5551ec1/pip-19.3.1-py2.py3-none-any.whl
Installing collected packages: pip
Successfully installed pip-19.3.1


In [7]:
!pip install -U --user tfx

Requirement already up-to-date: tfx in ./.local/lib/python3.5/site-packages (0.15.0)


In [None]:
!pip install -U --user kfp

In [6]:
from __future__ import absolute_import                                                       
from __future__ import division                                                              
from __future__ import print_function                                                        
                                              
import os                                                                                    
from typing import Text              
                                                                                             
import absl                                                                                  
                                                                                             
from tfx.components.base import base_component                                               
from tfx.components.base import executor_spec
from tfx.types import component_spec                                                         
                                                                                             
from tfx.orchestration import metadata                                                       
from tfx.orchestration import pipeline                                                       
from tfx.orchestration.beam import beam_dag_runner
from tfx.orchestration.config import docker_component_config                                 
from tfx.orchestration.config import pipeline_config                                         
from tfx.orchestration.kubeflow import kubeflow_dag_runner
from tfx.orchestration.launcher import docker_component_launcher              
from tfx.orchestration.launcher import in_process_component_launcher
from tfx.orchestration.launcher import kubernetes_component_launcher 

In [7]:
_pipeline_name = 'bigquery_pipeline'
_tfx_root = os.path.join(os.environ['HOME'], 'tfx')
_pipeline_root = os.path.join(_tfx_root, 'pipelines', _pipeline_name)
# Sqlite ML-metadata db path.
_metadata_path = os.path.join(_tfx_root, 'metadata', _pipeline_name,
                              'metadata.db')

# The rate at which to sample rows from the Chicago Taxi dataset using BigQuery.
# The full taxi dataset is > 120M record.  In the interest of resource
# savings and time, we've set the default for this example to be much smaller.
# Feel free to crank it up and process the full dataset!
_query_sample_rate = 0.001  # Generate a 0.1% random sample.

# This is the upper bound of FARM_FINGERPRINT in Bigquery (ie the max value of
# signed int64).
_max_int64 = '0x7FFFFFFFFFFFFFFF'

# The query that extracts the examples from BigQuery.  The Chicago Taxi dataset
# used for this example is a public dataset available on Google AI Platform.
# https://console.cloud.google.com/marketplace/details/city-of-chicago-public-data/chicago-taxi-trips
_query = """
         SELECT
           pickup_community_area,
           fare,
           EXTRACT(MONTH FROM trip_start_timestamp) AS trip_start_month,
           EXTRACT(HOUR FROM trip_start_timestamp) AS trip_start_hour,
           EXTRACT(DAYOFWEEK FROM trip_start_timestamp) AS trip_start_day,
           UNIX_SECONDS(trip_start_timestamp) AS trip_start_timestamp,
           pickup_latitude,
           pickup_longitude,
           dropoff_latitude,
           dropoff_longitude,
           trip_miles,
           pickup_census_tract,
           dropoff_census_tract,
           payment_type,
           company,
           trip_seconds,
           dropoff_community_area,
           tips
         FROM `bigquery-public-data.chicago_taxi_trips.taxi_trips`
         WHERE (ABS(FARM_FINGERPRINT(unique_key)) / {max_int64})
           < {query_sample_rate}""".format(
               max_int64=_max_int64, query_sample_rate=_query_sample_rate)

In [22]:
class BigQueryViewer(base_component.BaseComponent):

  class Spec(component_spec.ComponentSpec):
    INPUTS = {}
    OUTPUTS = {}
    PARAMETERS = {
        'query': component_spec.ExecutionParameter(type=str),
    }

  SPEC_CLASS = Spec
  EXECUTOR_SPEC = executor_spec.ExecutorContainerSpec(
      image='google/cloud-sdk:latest',
      command=['bash', '-c'],
      args=[
          'echo -e $0;'
          '[ ! -z "$GOOGLE_APPLICATION_CREDENTIALS" ] && gcloud auth activate-service-account'
          ' --key-file=$GOOGLE_APPLICATION_CREDENTIALS;'
          ' bq query --nouse_legacy_sql $0',
          '{{exec_properties.query}}',
      ])

  def __init__(self, query):
    super(BigQueryViewer, self).__init__(BigQueryViewer.SPEC_CLASS(query=query))


def _create_pipeline(pipeline_name: Text, pipeline_root: Text,
                     metadata_path: Text, query: Text) -> pipeline.Pipeline:
  bq_viewer = BigQueryViewer(query=query)

  return pipeline.Pipeline(
      pipeline_name=pipeline_name,
      pipeline_root=pipeline_root,
      components=[bq_viewer],
      enable_cache=False,
      metadata_connection_config=metadata.sqlite_metadata_connection_config(
          metadata_path),
      additional_pipeline_args={},
  )

In [23]:

absl.logging.set_verbosity(absl.logging.INFO)
beam_dag_runner.BeamDagRunner(
      config=pipeline_config.PipelineConfig(
          supported_launcher_classes=[
              docker_component_launcher.DockerComponentLauncher
          ],
          default_component_configs=[
              docker_component_config.DockerComponentConfig(volumes=[
                  '/usr/local/google/home/hongyes/.config/gcloud:/root/.config/gcloud'
              ])
          ])).run(
              _create_pipeline(
                  pipeline_name=_pipeline_name,
                  pipeline_root=_pipeline_root,
                  metadata_path=_metadata_path,
                  query=_query))

INFO:absl:Component BigQueryViewer depends on [].
INFO:absl:Component BigQueryViewer is scheduled.
INFO:absl:Component BigQueryViewer is running.
INFO:absl:Running driver for BigQueryViewer
INFO:absl:MetadataStore with DB connection initialized
INFO:absl:Running executor for BigQueryViewer
INFO:absl:Container spec: {'command': ['bash', '-c'], 'args': ['echo -e $0;[ ! -z "$GOOGLE_APPLICATION_CREDENTIALS" ] && gcloud auth activate-service-account --key-file=$GOOGLE_APPLICATION_CREDENTIALS; bq query --nouse_legacy_sql $0', '\n         SELECT\n           pickup_community_area,\n           fare,\n           EXTRACT(MONTH FROM trip_start_timestamp) AS trip_start_month,\n           EXTRACT(HOUR FROM trip_start_timestamp) AS trip_start_hour,\n           EXTRACT(DAYOFWEEK FROM trip_start_timestamp) AS trip_start_day,\n           UNIX_SECONDS(trip_start_timestamp) AS trip_start_timestamp,\n           pickup_latitude,\n           pickup_longitude,\n           dropoff_latitude,\n           dropoff

In [24]:
from kubernetes import client
def use_gcp_secret():
  secret_name = 'user-gcp-sa'
  secret_volume_mount_path = '/secret/gcp-credentials'
  secret_file_path_in_volume = '/' + secret_name + '.json'
  volume_name = 'gcp-credentials-' + secret_name
  volumes = [
      client.V1Volume(
          name=volume_name,
          secret=client.V1SecretVolumeSource(secret_name=secret_name))
  ]
  containers = [
      client.V1Container(
          name='main',
          volume_mounts=[
              client.V1VolumeMount(
                  name=volume_name,
                  mount_path=secret_volume_mount_path,
              )
          ],
          env=[
              client.V1EnvVar(
                  name='GOOGLE_APPLICATION_CREDENTIALS',
                  value=secret_volume_mount_path + secret_file_path_in_volume,
              ),
              client.V1EnvVar(
                  name='CLOUDSDK_AUTH_CREDENTIAL_FILE_OVERRIDE',
                  value=secret_volume_mount_path + secret_file_path_in_volume,
              )
          ])
  ]

  return client.V1Pod(
      spec=client.V1PodSpec(containers=containers, volumes=volumes))

In [25]:

from tfx.orchestration.config import kubernetes_component_config

metadata_config = kubeflow_dag_runner.get_default_kubeflow_metadata_config()
# This pipeline automatically injects the Kubeflow TFX image if the
# environment variable 'KUBEFLOW_TFX_IMAGE' is defined. Currently, the tfx
# cli tool exports the environment variable to pass to the pipelines.
tfx_image = os.environ.get('KUBEFLOW_TFX_IMAGE', None)
k8s_config = kubernetes_component_config.KubernetesComponentConfig(
      use_gcp_secret())
runner_config = kubeflow_dag_runner.KubeflowDagRunnerConfig(
  kubeflow_metadata_config=metadata_config,
  # Specify custom docker image to use.
  tfx_image=tfx_image,
  supported_launcher_classes=[
      in_process_component_launcher.InProcessComponentLauncher,
      kubernetes_component_launcher.KubernetesComponentLauncher,
  ],
  default_component_configs=[k8s_config])
kubeflow_dag_runner.KubeflowDagRunner(config=runner_config).run(
  _create_pipeline(
      pipeline_name=_pipeline_name,
      pipeline_root=_pipeline_root,
      metadata_path=_metadata_path,
      query=_query))

INFO:absl:Adding upstream dependencies for component BigQueryViewer


In [26]:
import kfp
client = kfp.Client(host='36a59b5371d6ab2-dot-us-west1.notebooks.googleusercontent.com')

In [27]:
client.create_run_from_pipeline_package(pipeline_file='bigquery_pipeline.tar.gz', arguments={})

RunPipelineResult(run_id=8edae657-262c-4dcd-bbb9-b60821645604)