TFX Bug bash

In [1]:
cluster_endpoint = None

# ! Use kfp.Client(host='https://xxxxx.notebooks.googleusercontent.com/') if working from GCP notebooks (or local notebooks)
cluster_endpoint = 'https://34c40cdd21e49f0a-dot-us-central1.notebooks.googleusercontent.com'

In [2]:
import os

_pipeline_name = 'tfx_container_pipeline'
#_tfx_root = os.path.join(os.environ['HOME'], 'tfx')
_tfx_root = 'gs://avolkov/tmp/tfx/' + _pipeline_name

_pipeline_root = os.path.join(_tfx_root, 'pipelines', _pipeline_name)
# Sqlite ML-metadata db path.
_metadata_path = os.path.join(_tfx_root, 'metadata', _pipeline_name, 'metadata.db')

In [3]:
!python3 -m pip install --upgrade 'tfx>=0.15.0' --user --quiet

In [4]:
from typing import Optional

import tfx
#import tfx.types

from tfx.components.base import base_component, executor_spec
from tfx.types import component_spec

In [5]:
# TFX cannot use types that are not partt of the base image: AttributeError: module '__main__' has no attribute 'CsvArtifact'
class CsvArtifact(component_spec.Artifact):
    TYPE_NAME = 'CsvPath'

class QueryBigQueryComponent(base_component.BaseComponent):
    class Spec(component_spec.ComponentSpec):
        INPUTS = {}
        OUTPUTS = {
            #'results': component_spec.ChannelParameter(type=CsvArtifact), # Does not work with custom types
            'results': component_spec.ChannelParameter(type_name='CsvPath'), # Using this deprecated method to workaround TFX deficiencies
        }
        PARAMETERS = {
            'query': component_spec.ExecutionParameter(type=str),
        }

    SPEC_CLASS = Spec
    EXECUTOR_SPEC = executor_spec.ExecutorContainerSpec(
        image='google/cloud-sdk:latest',
        command=[
            'bash',
            '-e',
            '-x',
            '-c',
            '''
            if [ -n "$GOOGLE_APPLICATION_CREDENTIALS" ]; then
                gcloud auth activate-service-account --key-file=$GOOGLE_APPLICATION_CREDENTIALS
            fi
            query="$0"
            #results_uri=$1
            results_uri="${1}data"  # The passed path is a directory path. This is problematic since it forces people to add file name to that directory which will bring incompatibilities between components (e.g. components A and B write to /a.txt and /b.txt - the consumer does not know which file will exist) This problem has been fixed in KFP and passed paths are file paths (that can be used as directories though.)
            results_path=$(mktemp)
            
            echo 'y' | bq init
            bq query --nouse_legacy_sql --format csv -q "$query" > "$results_path"
            if [ -n "$results_uri" ]; then
                gsutil cp "$results_path" "$results_uri" #  The results_uri must not be a "directory" URI. Otherwise the uploaded blob name will be random: gs://avolkov/tmp/tfx/tfx_container_pipeline/pipelines/tfx_container_pipeline/QueryBigQueryComponent/results/87/tmp.Qh7VDPYFgx
            fi
            ''',
            '{{exec_properties.query}}',  # Do not forget the commas!; 
            #'{{output_dict.results}}',  #='[Artifact(type_name: CsvPath, uri: gs://avolkov/tmp/tfx/tfx_container_pipeline/pipelines/tfx_container_pipeline/QueryBigQueryComponent/results/84/, split: , id: 0)]' ']'
            #'{{output_dict.results.uri}}',  #=''
            '{{output_dict.results[0].uri}}', #= 'gs://avolkov/tmp/tfx/tfx_container_pipeline/pipelines/tfx_container_pipeline/QueryBigQueryComponent/results/87/' - The path is a directory and this will force people adding file names to that directory which will bring incompatibilities between components (e.g. components A and B write to /a.txt and /b.txt - the consumer does not know which file will exist)
        ])

    def __init__(
        self,
        query: str,
        results: Optional[tfx.types.Channel] = None,
        instance_name: str = 'QueryBigQueryComponent'
    ):
        results = results or tfx.types.Channel(
            type=CsvArtifact,
            artifacts=[
        #        CsvArtifact(),
                component_spec.Artifact(type_name='CsvPath'),
            ],
        )
        super(QueryBigQueryComponent, self).__init__(
            QueryBigQueryComponent.SPEC_CLASS(
                query=query,
                results=results,
                instance_name=instance_name,
            ),
        )

In [6]:
# TFX cannot use types that are not partt of the base image: AttributeError: module '__main__' has no attribute 'CsvArtifact'
class SKLearnSvmSvrModelArtifact(component_spec.Artifact):
    TYPE_NAME = 'SKLearnSvmSvrModelPath'

class SKLearnTransformerArtifact(component_spec.Artifact):
    TYPE_NAME = 'SKLearnTransformerPath'

#class FloatArtifact(component_spec.Artifact):
#    TYPE_NAME = 'FloatPath'
    

class TrainSklearnSvmCsrComponent(base_component.BaseComponent):
    class Spec(component_spec.ComponentSpec):
        INPUTS = {
            #'training_data': component_spec.ChannelParameter(type=CsvArtifact), # Does not work with custom types
            'training_data': component_spec.ChannelParameter(type_name='CsvPath'), # Using this deprecated method to workaround TFX deficiencies
        }
        OUTPUTS = {
            #'model': component_spec.ChannelParameter(type=SKLearnSvmSvrModel), # Does not work with custom types
            'model': component_spec.ChannelParameter(type_name='SKLearnSvmSvrModelPath'), # Using this deprecated method to workaround TFX deficiencies
            'transformer': component_spec.ChannelParameter(type_name='SKLearnTransformerPath'),
            #'mean_square_error': component_spec.ChannelParameter(type_name='FloatPath'),
        }
        PARAMETERS = {
            'target_column_name': component_spec.ExecutionParameter(type=str),
        }

    SPEC_CLASS = Spec
    EXECUTOR_SPEC = executor_spec.ExecutorContainerSpec(
        image='tensorflow/tensorflow:1.15.0-py3',
        command=[
            #'sh',
            #'-e',
            #'-x',
            #'-c',
            #'python3 -m pip install --upgrade --user pandas gcsfs && echo "0=$0" && echo "1=$1" && echo "2=$2" && echo "3=$3" && echo "4=$4" && "$0" "$*"',
            'python3',
            '-c',
            '''
import sys
import subprocess
subprocess.run([sys.executable, '-m', 'pip', 'install', '--upgrade', '--quiet', 'sklearn' , 'pandas', 'gcsfs'])


training_data_uri = sys.argv[1]
output_model_uri = sys.argv[2]
output_transformer_uri = sys.argv[3]
target_column_name = sys.argv[4]

# The passed path is a directory path. This is problematic since it forces people to add file name to that directory which will bring incompatibilities between components (e.g. components A and B write to /a.txt and /b.txt - the consumer does not know which file will exist) This problem has been fixed in KFP and passed paths are file paths (that can be used as directories though.)
if training_data_uri.endswith("/"):
    training_data_uri = training_data_uri + "data"
if output_transformer_uri.endswith("/"):
    output_transformer_uri = output_transformer_uri + "data"
if output_model_uri.endswith("/"):
    output_model_uri = output_model_uri + "data"

import pandas
data = pandas.read_csv(training_data_uri)
cleaned_data = data.select_dtypes("number").fillna(0)

from sklearn.model_selection import train_test_split
training_data, testing_data = train_test_split(cleaned_data, test_size=0.5)

training_features = training_data.drop(target_column_name, axis=1).values
training_labels = training_data[target_column_name].values
testing_features = testing_data.drop(target_column_name, axis=1).values
testing_labels = testing_data[target_column_name].values

from sklearn import preprocessing
transformer = preprocessing.StandardScaler()
transformer.fit(training_features)

scaled_training_features = transformer.transform(training_features)
scaled_testing_features = transformer.transform(training_features)

from sklearn import svm
model = svm.SVR().fit(scaled_training_features, training_labels)

predictions = model.predict(scaled_testing_features)

from sklearn import metrics
mean_squared_error = metrics.mean_squared_error(testing_labels, predictions)
print("mean_squared_error=" + str(mean_squared_error))

import pickle
def pickle_to_local_or_gcs(obj, path):
    if path.startswith("gs:/"):
        import gcsfs
        file = gcsfs.GCSFileSystem().open(path, "wb")
    else:
        file = open(path, "wb")

    with file as f:
        pickle.dump(obj, f)

pickle_to_local_or_gcs(transformer, output_transformer_uri)
pickle_to_local_or_gcs(model, output_model_uri)
''',
            '{{input_dict.training_data[0].uri}}', #= 'gs://avolkov/tmp/tfx/tfx_container_pipeline/pipelines/tfx_container_pipeline/QueryBigQueryComponent/results/87/' - The path is a directory and this will force people adding file names to that directory which will bring incompatibilities between components (e.g. components A and B write to /a.txt and /b.txt - the consumer does not know which file will exist)
            '{{output_dict.model[0].uri}}',
            '{{output_dict.transformer[0].uri}}',
            '{{exec_properties.target_column_name}}',  # Do not forget the commas!; 
        ])

    def __init__(
        self,
        target_column_name: str,
        training_data: Optional[tfx.types.Channel] = None,
        model: Optional[tfx.types.Channel] = None,
        transformer: Optional[tfx.types.Channel] = None,
        instance_name: str = 'TrainSklearnSvmCsr'
    ):
        training_data = training_data or tfx.types.Channel(
            type=CsvArtifact,
            artifacts=[
        #        CsvArtifact(),
                component_spec.Artifact(type_name='CsvPath'),
            ],
        )
        model = model or tfx.types.Channel(
            type=SKLearnSvmSvrModelArtifact,
            artifacts=[
        #        SKLearnSvmSvrModelArtifact(),
                component_spec.Artifact(type_name='SKLearnSvmSvrModelPath'),
            ],
        )
        transformer = transformer or tfx.types.Channel(
            type=SKLearnTransformerArtifact,
            artifacts=[
        #        SKLearnTransformerArtifact(),
                component_spec.Artifact(type_name='SKLearnTransformerPath'),
            ],
        )
        #mean_square_error = mean_square_error or tfx.types.Channel(
        #    type=FloatArtifact,
        #    artifacts=[
        ##        FloatArtifact(),
        #        component_spec.Artifact(type_name='FloatPath'),
        #    ],
        #)
        super(TrainSklearnSvmCsrComponent, self).__init__(
            TrainSklearnSvmCsrComponent.SPEC_CLASS(
                target_column_name=target_column_name,
                training_data=training_data,
                model=model,
                transformer=transformer,
                #mean_square_error=mean_square_error,
                instance_name=instance_name,
            ),
        )

In [7]:

from tfx.orchestration import metadata
#metadata_connection_config=metadata.sqlite_metadata_connection_config(metadata_path)


# The rate at which to sample rows from the Chicago Taxi dataset using BigQuery.
# The full taxi dataset is > 120M record.  In the interest of resource
# savings and time, we've set the default for this example to be much smaller.
# Feel free to crank it up and process the full dataset!
#_query_sample_rate = 0.001  # Generate a 0.1% random sample.
_query_sample_rate = 0.01  # Generate a 0.1% random sample.

# This is the upper bound of FARM_FINGERPRINT in Bigquery (ie the max value of
# signed int64).
_max_int64 = '0x7FFFFFFFFFFFFFFF'

# The query that extracts the examples from BigQuery.  The Chicago Taxi dataset
# used for this example is a public dataset available on Google AI Platform.
# https://console.cloud.google.com/marketplace/details/city-of-chicago-public-data/chicago-taxi-trips
_query = """
         SELECT
           pickup_community_area,
           fare,
           EXTRACT(MONTH FROM trip_start_timestamp) AS trip_start_month,
           EXTRACT(HOUR FROM trip_start_timestamp) AS trip_start_hour,
           EXTRACT(DAYOFWEEK FROM trip_start_timestamp) AS trip_start_day,
           UNIX_SECONDS(trip_start_timestamp) AS trip_start_timestamp,
           pickup_latitude,
           pickup_longitude,
           dropoff_latitude,
           dropoff_longitude,
           trip_miles,
           pickup_census_tract,
           dropoff_census_tract,
           payment_type,
           company,
           trip_seconds,
           dropoff_community_area,
           tips
         FROM `bigquery-public-data.chicago_taxi_trips.taxi_trips`
         WHERE (ABS(FARM_FINGERPRINT(unique_key)) / {max_int64})
           < {query_sample_rate}""".format(
               max_int64=_max_int64, query_sample_rate=_query_sample_rate)

In [8]:
from tfx.orchestration import pipeline

def _create_pipeline(
    pipeline_name: str,
    pipeline_root: str,
    metadata_path: str,
    query: str,
) -> pipeline.Pipeline:

    bq_query_task = QueryBigQueryComponent(query=query)
    train_svm_task = TrainSklearnSvmCsrComponent(
        training_data=bq_query_task.outputs['results'],
        target_column_name='tips',
    )

    return pipeline.Pipeline(
        pipeline_name=pipeline_name,
        pipeline_root=pipeline_root,
        components=[
            bq_query_task,
            train_svm_task,
        ],
        enable_cache=False,
        metadata_connection_config=metadata.sqlite_metadata_connection_config(metadata_path),
        additional_pipeline_args={},
    )

In [9]:
import absl
absl.logging.set_verbosity(absl.logging.INFO)

from tfx.orchestration.beam import beam_dag_runner
from tfx.orchestration.config import pipeline_config, docker_component_config
from tfx.orchestration.launcher import docker_component_launcher, in_process_component_launcher, kubernetes_component_launcher

if False:
    beam_dag_runner.BeamDagRunner(
      config=pipeline_config.PipelineConfig(
          supported_launcher_classes=[
              docker_component_launcher.DockerComponentLauncher
          ],
          default_component_configs=[
              docker_component_config.DockerComponentConfig(volumes=[
                  '/home/jupyter/.config/gcloud:/root/.config/gcloud'
              ])
          ])).run(
              _create_pipeline(
                  pipeline_name=_pipeline_name,
                  pipeline_root=_pipeline_root,
                  metadata_path=_metadata_path,
                  query=_query))

In [10]:
from kubernetes import client as k8s_client
def use_gcp_secret():
    secret_name = 'user-gcp-sa'
    secret_volume_mount_path = '/secret/gcp-credentials'
    secret_file_path_in_volume = '/' + secret_name + '.json'
    volume_name = 'gcp-credentials-' + secret_name
    volumes = [
        k8s_client.V1Volume(
            name=volume_name,
            secret=k8s_client.V1SecretVolumeSource(secret_name=secret_name))
    ]
    containers = [
        k8s_client.V1Container(
            name='main',
            volume_mounts=[
                k8s_client.V1VolumeMount(
                    name=volume_name,
                    mount_path=secret_volume_mount_path,
                )
            ],
            env=[
                k8s_client.V1EnvVar(
                    name='GOOGLE_APPLICATION_CREDENTIALS',
                    value=secret_volume_mount_path + secret_file_path_in_volume,
                ),
                k8s_client.V1EnvVar(
                    name='CLOUDSDK_AUTH_CREDENTIAL_FILE_OVERRIDE',
                    value=secret_volume_mount_path + secret_file_path_in_volume,
                )
            ])
    ]

    return k8s_client.V1Pod(
        spec=k8s_client.V1PodSpec(
            containers=containers,
            volumes=volumes,
        ),
    )

In [11]:
from tfx.orchestration.config import kubernetes_component_config
from tfx.orchestration.kubeflow import kubeflow_dag_runner

metadata_config = kubeflow_dag_runner.get_default_kubeflow_metadata_config()
# This pipeline automatically injects the Kubeflow TFX image if the
# environment variable 'KUBEFLOW_TFX_IMAGE' is defined. Currently, the tfx
# cli tool exports the environment variable to pass to the pipelines.
tfx_image = os.environ.get('KUBEFLOW_TFX_IMAGE', None)
k8s_config = kubernetes_component_config.KubernetesComponentConfig(
      use_gcp_secret(),
)
runner_config = kubeflow_dag_runner.KubeflowDagRunnerConfig(
    kubeflow_metadata_config=metadata_config,
    # Specify custom docker image to use.
    tfx_image=tfx_image,
    supported_launcher_classes=[
        in_process_component_launcher.InProcessComponentLauncher,
        kubernetes_component_launcher.KubernetesComponentLauncher,
    ],
    default_component_configs=[k8s_config])

kubeflow_dag_runner.KubeflowDagRunner(config=runner_config).run(
    _create_pipeline(
        pipeline_name=_pipeline_name,
        pipeline_root=_pipeline_root,
        metadata_path=_metadata_path,
        query=_query,
    ),
)

import kfp
kfp.Client(host=cluster_endpoint).create_run_from_pipeline_package(
    pipeline_file=_pipeline_name + '.tar.gz',
    arguments={},
)

INFO:absl:Adding upstream dependencies for component QueryBigQueryComponent
INFO:absl:Adding upstream dependencies for component TrainSklearnSvmCsrComponent
INFO:absl:   ->  Component: QueryBigQueryComponent


RunPipelineResult(run_id=386d35c7-8228-4f16-b8d4-71cf7c8c6dc7)