<a href="https://colab.research.google.com/github/epadam/production-level-data-analysis/blob/master/pipeline/exp/pipeline_bike_gcp.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

#### Utility

In [None]:
import os
import pickle
from typing import Tuple

import absl
import numpy as np
from sklearn.neural_network import MLPClassifier
from sklearn.pipeline import Pipeline
from sklearn.preprocessing import StandardScaler

from tfx.components.trainer.fn_args_utils import DataAccessor
from tfx.components.trainer.fn_args_utils import FnArgs
from tfx.dsl.io import fileio
from tfx.utils import io_utils
from tfx_bsl.tfxio import dataset_options

from tensorflow_metadata.proto.v0 import schema_pb2
import tensorflow as tf
from tensorflow import keras
import tensorflow_transform as tft

_FEATURE_KEYS = [
   'season', 'yr', 'mnth', 'hr', 'holiday', 'weekday', 'workingday', 'weathersit', 'temp', 'atemp', 'hum', 'windspeed'
]
_LABEL_KEY = 'cnt'

In [None]:
_TRAIN_DATA_SIZE = 658
_TRAIN_BATCH_SIZE = 20

def _input_fn(
    file_pattern: str,
    data_accessor: DataAccessor,
    schema: schema_pb2.Schema,
    batch_size: int = 20,
) -> Tuple[np.ndarray, np.ndarray]:
  """Generates features and label for tuning/training.
  Args:
    file_pattern: input tfrecord file pattern.
    data_accessor: DataAccessor for converting input to RecordBatch.
    schema: schema of the input data.
    batch_size: An int representing the number of records to combine in a single
      batch.
  Returns:
    A (features, indices) tuple where features is a matrix of features, and
      indices is a single vector of label indices.
  """
  record_batch_iterator = data_accessor.record_batch_factory(
      file_pattern,
      dataset_options.RecordBatchesOptions(batch_size=batch_size, num_epochs=1),
      schema)

  feature_list = []
  label_list = []
  for record_batch in record_batch_iterator:
    record_dict = {}
    for column, field in zip(record_batch, record_batch.schema):
      record_dict[field.name] = column.flatten()

    label_list.append(record_dict[_LABEL_KEY])
    features = [record_dict[key] for key in _FEATURE_KEYS]
    feature_list.append(np.stack(features, axis=-1))

  return np.concatenate(feature_list), np.concatenate(label_list)


# TFX Trainer will call this function.
def run_fn(fn_args: FnArgs):
  """Train the model based on given args.
  Args:
    fn_args: Holds args used to train the model as name/value pairs.
  """
  schema = io_utils.parse_pbtxt_file(fn_args.schema_file, schema_pb2.Schema())

  x_train, y_train = _input_fn(fn_args.train_files, fn_args.data_accessor,
                               schema)
  x_eval, y_eval = _input_fn(fn_args.eval_files, fn_args.data_accessor, schema)

  steps_per_epoch = _TRAIN_DATA_SIZE / _TRAIN_BATCH_SIZE

  estimator = MLPClassifier(
      hidden_layer_sizes=[8, 8, 8],
      activation='relu',
      solver='adam',
      batch_size=_TRAIN_BATCH_SIZE,
      learning_rate_init=0.0005,
      max_iter=int(fn_args.train_steps / steps_per_epoch),
      verbose=True)

  # Create a pipeline that standardizes the input data before passing it to an
  # estimator. Once the scaler is fit, it will use the same mean and stdev to
  # transform inputs at both training and serving time.
  model = Pipeline([
      ('scaler', StandardScaler()),
      ('estimator', estimator),
  ])
  model.feature_keys = _FEATURE_KEYS
  model.label_key = _LABEL_KEY
  model.fit(x_train, y_train)
  absl.logging.info(model)

  score = model.score(x_eval, y_eval)
  absl.logging.info('Accuracy: %f', score)

  # Export the model as a pickle named model.pkl. AI Platform Prediction expects
  # sklearn model artifacts to follow this naming convention.
  os.makedirs(fn_args.serving_model_dir)

  model_path = os.path.join(fn_args.serving_model_dir, 'model.pkl')
  with fileio.open(model_path, 'wb+') as f:
    pickle.dump(model, f)

### tfx pipeline

In [None]:
import os
from typing import List

import absl
import tensorflow_model_analysis as tfma
from tfx.components import CsvExampleGen
from tfx.components import Evaluator
from tfx.components import ExampleValidator
from tfx.components import Pusher
from tfx.components import SchemaGen
from tfx.components import StatisticsGen
from tfx.components import Trainer
from tfx.components import Transform
from tfx.dsl.components.common import resolver
from tfx.dsl.experimental import latest_blessed_model_resolver
from tfx.orchestration import metadata
from tfx.orchestration import pipeline
from tfx.orchestration.beam.beam_dag_runner import BeamDagRunner
from tfx.proto import pusher_pb2
from tfx.proto import trainer_pb2
from tfx.types import Channel
from tfx.types.standard_artifacts import Model
from tfx.types.standard_artifacts import ModelBlessing

# Identifier for the pipeline. This will also be used as the model name on AI
# Platform, so it should begin with a letter and only consist of letters,
# numbers, and underscores.
_pipeline_name = 'bike_sklearn_gcp'

# Google Cloud Platform project id to use when deploying this pipeline. Leave
# blank to run locally.
_project_id = 'PROJECT_ID'

# Directory and data locations (uses Google Cloud Storage).
_bucket = 'gs://BUCKET'

# Custom container image in Google Container Registry (GCR) to use for training
# on Google Cloud AI Platform.
_tfx_image = f'gcr.io/{_project_id}/tfx-example-sklearn'

# Region to use for Dataflow jobs and AI Platform jobs.
#   Dataflow: https://cloud.google.com/dataflow/docs/concepts/regional-endpoints
#   AI Platform: https://cloud.google.com/ml-engine/docs/tensorflow/regions
_gcp_region = 'us-central1'

# A dict which contains the training job parameters to be passed to Google
# Cloud AI Platform. For the full set of parameters supported by Google Cloud AI
# Platform, refer to
# https://cloud.google.com/ml-engine/reference/rest/v1/projects.jobs#Job
_ai_platform_training_args = {
    'project': _project_id,
    'region': _gcp_region,
    # Override the default TFX image used for training with one with the correct
    # scikit-learn version.
    'masterConfig': {
        'imageUri': _tfx_image,
    },
}

# A dict which contains the serving job parameters to be passed to Google
# Cloud AI Platform. For the full set of parameters supported by Google Cloud AI
# Platform, refer to
# https://cloud.google.com/ml-engine/reference/rest/v1/projects.models
_ai_platform_serving_args = {
    'model_name': _pipeline_name,
    'project_id': _project_id,
    # The region to use when serving the model. See available regions here:
    # https://cloud.google.com/ml-engine/docs/regions
    # Note that serving currently only supports a single region:
    # https://cloud.google.com/ml-engine/reference/rest/v1/projects.models#Model
    'regions': [_gcp_region],
    # TODO(b/176256164): Update to runtime version 2.4 once that is available
    # to align with the version of TF supported by TFX.
    # LINT.IfChange
    'runtime_version': '2.3',
    # LINT.ThenChange(../../../dependencies.py)
}

# This example assumes that Penguin data is stored in ~/penguin/data and the
# utility function is in ~/penguin. Feel free to customize as needed.
_penguin_root = os.path.join(_bucket, 'penguin')
_data_root = os.path.join(_penguin_root, 'data')

# Python module file to inject customized logic into the TFX components.
# Trainer requires user-defined functions to run successfully.
_trainer_module_file = os.path.join(
    _penguin_root, 'experimental', 'penguin_utils_sklearn.py')

# Python module file to inject customized logic into the TFX components. The
# Evaluator component needs a custom extractor in order to make predictions
# using the scikit-learn model.
_evaluator_module_file = os.path.join(
    _penguin_root, 'experimental', 'sklearn_predict_extractor.py')

# Directory and data locations. This example assumes all of the
# example code and metadata library is relative to $HOME, but you can store
# these files anywhere on your local filesystem. The AI Platform Pusher requires
# that pipeline outputs are stored in a GCS bucket.
_tfx_root = os.path.join(_bucket, 'tfx')
_pipeline_root = os.path.join(_tfx_root, 'pipelines', _pipeline_name)

# Pipeline arguments for Beam powered Components.
# TODO(b/171316320): Change direct_running_mode back to multi_processing and set
# direct_num_workers to 0. Additionally, try to use the Dataflow runner instead
# of the direct runner.
_beam_pipeline_args = [
    '--direct_running_mode=multi_threading',
    # 0 means auto-detect based on on the number of CPUs available
    # during execution time.
    '--direct_num_workers=1',
]


def _create_pipeline(
    pipeline_name: str,
    pipeline_root: str,
    data_root: str,
    trainer_module_file: str,
    evaluator_module_file: str,
    ai_platform_training_args: Optional[Dict[str, str]],
    ai_platform_serving_args: Optional[Dict[str, str]],
    beam_pipeline_args: List[str],
) -> tfx.dsl.Pipeline:
  """Implements the Penguin pipeline with TFX."""
  # Brings data into the pipeline or otherwise joins/converts training data.
  example_gen = tfx.components.CsvExampleGen(
      input_base=os.path.join(data_root, 'labelled'))

  # Computes statistics over data for visualization and example validation.
  statistics_gen = tfx.components.StatisticsGen(
      examples=example_gen.outputs['examples'])

  # Generates schema based on statistics files.
  schema_gen = tfx.components.SchemaGen(
      statistics=statistics_gen.outputs['statistics'], infer_feature_shape=True)

  # Performs anomaly detection based on statistics and data schema.
  example_validator = tfx.components.ExampleValidator(
      statistics=statistics_gen.outputs['statistics'],
      schema=schema_gen.outputs['schema'])

  # TODO(humichael): Handle applying transformation component in Milestone 3.

  # Uses user-provided Python function that trains a model.
  # Num_steps is not provided during evaluation because the scikit-learn model
  # loads and evaluates the entire test set at once.
  trainer = tfx.extensions.google_cloud_ai_platform.Trainer(
      module_file=trainer_module_file,
      examples=example_gen.outputs['examples'],
      schema=schema_gen.outputs['schema'],
      train_args=tfx.proto.TrainArgs(num_steps=2000),
      eval_args=tfx.proto.EvalArgs(),
      custom_config={
          tfx.extensions.google_cloud_ai_platform.TRAINING_ARGS_KEY:
          ai_platform_training_args,
      })

  # Get the latest blessed model for model validation.
  model_resolver = tfx.dsl.Resolver(
      strategy_class=tfx.dsl.experimental.LatestBlessedModelStrategy,
      model=tfx.dsl.Channel(type=tfx.types.standard_artifacts.Model),
      model_blessing=tfx.dsl.Channel(
          type=tfx.types.standard_artifacts.ModelBlessing)).with_id(
              'latest_blessed_model_resolver')

  # Uses TFMA to compute evaluation statistics over features of a model and
  # perform quality validation of a candidate model (compared to a baseline).
  eval_config = tfma.EvalConfig(
      model_specs=[tfma.ModelSpec(label_key='species')],
      slicing_specs=[tfma.SlicingSpec()],
      metrics_specs=[
          tfma.MetricsSpec(metrics=[
              tfma.MetricConfig(
                  class_name='Accuracy',
                  threshold=tfma.MetricThreshold(
                      value_threshold=tfma.GenericValueThreshold(
                          lower_bound={'value': 0.6}),
                      change_threshold=tfma.GenericChangeThreshold(
                          direction=tfma.MetricDirection.HIGHER_IS_BETTER,
                          absolute={'value': -1e-10})))
          ])
      ])

  evaluator = tfx.components.Evaluator(
      module_file=evaluator_module_file,
      examples=example_gen.outputs['examples'],
      model=trainer.outputs['model'],
      baseline_model=model_resolver.outputs['model'],
      eval_config=eval_config)

  pusher = tfx.extensions.google_cloud_ai_platform.Pusher(
      model=trainer.outputs['model'],
      model_blessing=evaluator.outputs['blessing'],
      custom_config={
          tfx.extensions.google_cloud_ai_platform.experimental
          .PUSHER_SERVING_ARGS_KEY: ai_platform_serving_args,
      })

  return tfx.dsl.Pipeline(
      pipeline_name=pipeline_name,
      pipeline_root=pipeline_root,
      components=[
          example_gen,
          statistics_gen,
          schema_gen,
          example_validator,
          trainer,
          model_resolver,
          evaluator,
          pusher,
      ],
      enable_cache=True,
      beam_pipeline_args=beam_pipeline_args,
  )

#### compile kubeflow pipeline

In [None]:
import os
from absl import logging

from pipeline import configs
from pipeline import pipeline
from tfx.orchestration.kubeflow.v2 import kubeflow_v2_dag_runner
from tfx.proto import trainer_pb2

# TFX pipeline produces many output files and metadata. All output data will be
# stored under this OUTPUT_DIR.
# NOTE: It is recommended to have a separated OUTPUT_DIR which is *outside* of
#       the source code structure. Please change OUTPUT_DIR to other location
#       where we can store outputs of the pipeline.
_OUTPUT_DIR = os.path.join('gs://', configs.GCS_BUCKET_NAME)

# TFX produces two types of outputs, files and metadata.
# - Files will be created under PIPELINE_ROOT directory.
# - Metadata will be written to metadata service backend.
_PIPELINE_ROOT = os.path.join(_OUTPUT_DIR, 'tfx_pipeline_output',
                              configs.PIPELINE_NAME)

# The last component of the pipeline, "Pusher" will produce serving model under
# SERVING_MODEL_DIR.
_SERVING_MODEL_DIR = os.path.join(_PIPELINE_ROOT, 'serving_model')

_DATA_PATH = 'gs://{}/tfx-template/data/taxi/'.format(configs.GCS_BUCKET_NAME)

def run():
  """Define a pipeline to be executed using Kubeflow V2 runner."""

  runner_config = kubeflow_v2_dag_runner.KubeflowV2DagRunnerConfig(
      default_image=configs.PIPELINE_IMAGE)

  dsl_pipeline = pipeline.create_pipeline(
      pipeline_name=configs.PIPELINE_NAME,
      pipeline_root=_PIPELINE_ROOT,
      data_path=_DATA_PATH,
      # TODO(step 7): (Optional) Uncomment here to use BigQueryExampleGen.
      # query=configs.BIG_QUERY_QUERY,
      preprocessing_fn=configs.PREPROCESSING_FN,
      run_fn=configs.RUN_FN,
      train_args=trainer_pb2.TrainArgs(num_steps=configs.TRAIN_NUM_STEPS),
      eval_args=trainer_pb2.EvalArgs(num_steps=configs.EVAL_NUM_STEPS),
      eval_accuracy_threshold=configs.EVAL_ACCURACY_THRESHOLD,
      serving_model_dir=_SERVING_MODEL_DIR,
    
      # Uncomment below to use Dataflow.
      # beam_pipeline_args=configs.DATAFLOW_BEAM_PIPELINE_ARGS,
    
      ai_platform_training_args=configs.GCP_AI_PLATFORM_TRAINING_ARGS,
    
      # Uncomment below to use Cloud AI Platform.
      # ai_platform_serving_args=configs.GCP_AI_PLATFORM_SERVING_ARGS,
  )

  runner = kubeflow_v2_dag_runner.KubeflowV2DagRunner(config=runner_config)

  runner.run(pipeline=dsl_pipeline)

In [None]:
absl.logging.set_verbosity(absl.logging.INFO)
runner_config = tfx.orchestration.experimental.KubeflowDagRunnerConfig(
      tfx_image=_tfx_image)

tfx.orchestration.experimental.KubeflowDagRunner(config=runner_config).run(
      _create_pipeline(
          pipeline_name=_pipeline_name,
          pipeline_root=_pipeline_root,
          data_root=_data_root,
          trainer_module_file=_trainer_module_file,
          evaluator_module_file=_evaluator_module_file,
          ai_platform_training_args=_ai_platform_training_args,
          ai_platform_serving_args=_ai_platform_serving_args,
          beam_pipeline_args=_beam_pipeline_args))

#### Or use TFX commandline

In [None]:
!TFX

#### kfp also has its own building **function**

In [None]:
import kfp
import kfp.dsl as dsl
import kfp.components as comp
from kfp import compiler

#### Seldon core 

In [None]:
!s2i