In [40]:
import tensorflow as tf
print('TensorFlow version: {}'.format(tf.__version__))
from tfx import v1 as tfx
print('TFX version: {}'.format(tfx.__version__))
import kfp
print('KFP version: {}'.format(kfp.__version__))

TensorFlow version: 2.9.0
TFX version: 1.9.1
KFP version: 1.8.13


In [13]:
!find / -name $GOOGLE_APPLICATION_CREDENTIALS

/tfx/src/secrets/sfeir-data-394b11f86ed4.json


In [2]:
!gcloud auth activate-service-account --key-file='../secrets/sfeir-data-394b11f86ed4.json'

Activated service account credentials for: [devfest-2022@sfeir-data.iam.gserviceaccount.com]


In [3]:
import os
import pprint
pp = pprint.PrettyPrinter()

In [4]:
GOOGLE_CLOUD_PROJECT = 'sfeir-data'       
GOOGLE_CLOUD_REGION = 'europe-west1'         
GCS_BUCKET_NAME = 'devfest_mlops_2022'

In [5]:
PIPELINE_NAME = 'wine-quality-csv'

# Path to various pipeline artifact.
PIPELINE_ROOT = '/content/{}/pipeline_root/'.format(PIPELINE_NAME)

# Paths for users' Python module.
MODULE_ROOT = '/content/{}/pipeline_module/'.format(PIPELINE_NAME)

ENDPOINT_NAME = 'prediction-' + PIPELINE_NAME

print('PIPELINE_ROOT: {}'.format(PIPELINE_ROOT))

PIPELINE_ROOT: /content/wine-quality-csv/pipeline_root/


In [6]:
!rm -r /content/interactive_pipeline

rm: cannot remove '/content/interactive_pipeline': No such file or directory


In [7]:
QUERY = f"SELECT * FROM `{GOOGLE_CLOUD_PROJECT}.devfest_mlops.winequality_white_binary`"

In [8]:
_transformer_module_file = 'wine_quality_transformer.py'

In [9]:
%%writefile {_transformer_module_file}

import tensorflow as tf
import tensorflow_transform as tft

_NUMERIC_FEATURE_KEYS = [
    'fixed_acidity',	'volatile_acidity',	'citric_acid',	'residual_sugar',
    'chlorides',	'free_sulfur_dioxide',	'total_sulfur_dioxide',	'density',
    'ph',	'sulphates',	'alcohol'
]
_LABEL_KEY = 'quality'

def preprocessing_fn(inputs):
    """tf.transform's callback function for preprocessing inputs.
    Args:
        inputs: map from feature keys to raw not-yet-transformed features.
    Returns:
        Map from string feature key to transformed feature operations.
    """

    outputs = {}

    # scale features to [0,1]
    for key in _NUMERIC_FEATURE_KEYS:
        scaled = tft.scale_to_0_1(inputs[key])
        outputs[key] = tf.reshape(scaled, [-1])

    # transform the output
    outputs[_LABEL_KEY] = inputs[_LABEL_KEY]

    return outputs

Overwriting wine_quality_transformer.py


In [10]:
!gsutil cp {_transformer_module_file} {MODULE_ROOT}/

Copying file://wine_quality_transformer.py...
/ [0 files][    0.0 B/  843.0 B]                                                / [1 files][  843.0 B/  843.0 B]                                                
Operation completed over 1 objects/843.0 B.                                      


In [11]:
_trainer_module_file = 'wine_quality_trainer.py'

In [12]:
%%writefile {_trainer_module_file}

import tensorflow as tf
import tensorflow_transform as tft

from typing import List
from absl import logging

from tensorflow import keras
from tensorflow_metadata.proto.v0 import schema_pb2
from tensorflow_transform.tf_metadata import schema_utils

from tfx import v1 as tfx
from tfx_bsl.public import tfxio
from keras_tuner.engine import base_tuner

_FEATURE_KEYS = [
    'fixed_acidity',	'volatile_acidity',	'citric_acid',	'residual_sugar',
    'chlorides',	'free_sulfur_dioxide',	'total_sulfur_dioxide',	'density',
    'ph',	'sulphates',	'alcohol'
]
_LABEL_KEY = 'quality'

_TRAIN_BATCH_SIZE = 100
_EVAL_BATCH_SIZE = 20


def _input_fn(file_pattern: List[str],
              tf_transform_output,
              data_accessor: tfx.components.DataAccessor,
              batch_size: int) -> tf.data.Dataset:

  return data_accessor.tf_dataset_factory(
      file_pattern,
      tfxio.TensorFlowDatasetOptions(batch_size=batch_size, label_key=_LABEL_KEY),
      schema=tf_transform_output.transformed_metadata.schema).repeat()


def _get_serve_tf_examples_fn(model, tf_transform_output):
  """Returns a function that parses a serialized tf.Example and applies TFT."""

  # Get transformation graph
  model.tft_layer = tf_transform_output.transform_features_layer()

  @tf.function
  def serve_tf_examples_fn(serialized_tf_examples):
    """Returns the output to be used in the serving signature."""
    feature_spec = tf_transform_output.raw_feature_spec()
    feature_spec.pop(_LABEL_KEY)

    parsed_features = tf.io.parse_example(serialized_tf_examples, feature_spec)
    transformed_features = model.tft_layer(parsed_features)

    return model(transformed_features)

  return serve_tf_examples_fn


def _make_keras_model() -> tf.keras.Model:
  """Creates a DNN Keras model for classifying penguin data.

  Returns:
    A Keras Model.
  """
  # The model below is built with Functional API, please refer to
  # https://www.tensorflow.org/guide/keras/overview for all API options.
  inputs = [keras.layers.Input(shape=(1,), name=f) for f in _FEATURE_KEYS]

  model_layers = keras.layers.concatenate(inputs)
  model_layers = keras.layers.Dropout(0.2)(model_layers)
  model_layers = keras.layers.BatchNormalization()(model_layers)
  model_layers = keras.layers.Dense(units=8, activation='relu')(model_layers)
  model_layers = keras.layers.Dropout(0.2)(model_layers)
  model_layers = keras.layers.BatchNormalization()(model_layers)
  model_layers = keras.layers.Dense(units=14, activation='relu')(model_layers)
  model_layers = keras.layers.Dropout(0.2)(model_layers)
  model_layers = keras.layers.BatchNormalization()(model_layers)
  model_layers = keras.layers.Dense(units=6, activation='relu')(model_layers)
  model_layers = keras.layers.Dropout(0.2)(model_layers)
  model_layers = keras.layers.BatchNormalization()(model_layers)

  outputs = keras.layers.Dense(1)(model_layers)

  model = keras.Model(inputs=inputs, outputs=outputs)

  rms_prop_optimizer = tf.keras.optimizers.RMSprop(learning_rate=10e-5)
  model.compile(
      optimizer=rms_prop_optimizer,
      loss=tf.keras.losses.binary_crossentropy,
      metrics=['accuracy']
  )

  model.summary(print_fn=logging.info)
  return model


TunerFnResult = NamedTuple('TunerFnResult', [('tuner', base_tuner.BaseTuner),
                                             ('fit_kwargs', Dict[Text, Any])])

def tuner_fn(fn_args: FnArgs) -> TunerFnResult:
  """Build the tuner using the KerasTuner API.
  Args:
    fn_args: Holds args as name/value pairs.
      - working_dir: working dir for tuning.
      - train_files: List of file paths containing training tf.Example data.
      - eval_files: List of file paths containing eval tf.Example data.
      - train_steps: number of train steps.
      - eval_steps: number of eval steps.
      - schema_path: optional schema of the input data.
      - transform_graph_path: optional transform graph produced by TFT.
  Returns:
    A namedtuple contains the following:
      - tuner: A BaseTuner that will be used for tuning.
      - fit_kwargs: Args to pass to tuner's run_trial function for fitting the
                    model , e.g., the training and validation dataset. Required
                    args depend on the above tuner's implementation.
  """
  tuner = Tuner(
    module_file=module_file,  # Contains `tuner_fn`.
    examples=transform.outputs['transformed_examples'],
    transform_graph=transform.outputs['transform_graph'],
    train_args=trainer_pb2.TrainArgs(num_steps=20),
    eval_args=trainer_pb2.EvalArgs(num_steps=5))

    trainer = Trainer(
    module_file=module_file,  # Contains `run_fn`.
    examples=transform.outputs['transformed_examples'],
    transform_graph=transform.outputs['transform_graph'],
    schema=schema_gen.outputs['schema'],
    # This will be passed to `run_fn`.
    hyperparameters=tuner.outputs['best_hyperparameters'],
    train_args=trainer_pb2.TrainArgs(num_steps=100),
    eval_args=trainer_pb2.EvalArgs(num_steps=5))


def run_fn(fn_args: tfx.components.FnArgs):
  """Train the model based on given args.

  Args:
    fn_args: Holds args used to train the model as name/value pairs.
  """
  tf_transform_output = tft.TFTransformOutput(fn_args.transform_output)

  train_dataset = _input_fn(
      fn_args.train_files,
      tf_transform_output,
      fn_args.data_accessor,
      batch_size=_TRAIN_BATCH_SIZE)
  eval_dataset = _input_fn(
      fn_args.eval_files,
      tf_transform_output,
      fn_args.data_accessor,
      batch_size=_EVAL_BATCH_SIZE)

  model = _make_keras_model()
  model.fit(
      train_dataset,
      steps_per_epoch=fn_args.train_steps,
      validation_data=eval_dataset,
      validation_steps=fn_args.eval_steps)
  
  # Define default serving signature
  signatures = {
      'serving_default':
          _get_serve_tf_examples_fn(model,
                                    tf_transform_output).get_concrete_function(
                                        tf.TensorSpec(
                                            shape=[None],
                                            dtype=tf.string,
                                            name='examples'))
  }

  # The result of the training should be saved in `fn_args.serving_model_dir`
  # directory.
  model.save(fn_args.serving_model_dir, save_format='tf', signatures=signatures)

Overwriting wine_quality_trainer.py


In [13]:
!gsutil cp {_trainer_module_file} {MODULE_ROOT}/

Copying file://wine_quality_trainer.py...
/ [0 files][    0.0 B/  6.2 KiB]                                                / [1 files][  6.2 KiB/  6.2 KiB]                                                
Operation completed over 1 objects/6.2 KiB.                                      


In [14]:
!ls {MODULE_ROOT}/

wine_quality_trainer.py  wine_quality_transformer.py


In [46]:
from tfx.dsl.component.experimental.decorators import component
from tfx.dsl.component.experimental.annotations import InputArtifact

@component
def CreateMonitoringJob(statistics: tfx.dsl.components.InputArtifact[tfx.types.standard_artifacts.ExampleStatistics],
                        pushed_model: tfx.dsl.components.InputArtifact[tfx.types.standard_artifacts.PushedModel],
                        #pusher_config: tfx.dsl.components.Parameter[str],
                        DEFAULT_THRESHOLD_VALUE: tfx.dsl.components.Parameter[float] = 0.03,
                        MONITORING_FREQUENCY: tfx.dsl.components.Parameter[int] = 3600,
                        SAMPLE_RATE: tfx.dsl.components.Parameter[int] = 0,
                        EMAILS: tfx.dsl.components.Parameter[str]= ""
                        ):#-> tfx.dsl.components.OutputArtifact[tfx.types.standard_artifacts.SchemaGen]
    """
    Creates monitoring job for a model deployed on Vertex endpoint
    :param project_id: project id
    :param region: resource region
    :param endpoint_id: vertex endpoint id
    :param deployed_model_id: deployed model id (do not confuse with model id)
    :param model_name: model name
    :param features_file_uri: filepath to json file in Google Cloud Storage bucket
    :return: request response for deploying the monitoring job
    """
    
    #from google.cloud import aiplatform
    #import json
    
    import tensorflow_data_validation as tfdv
    from google.cloud.aiplatform_v1.services.job_service import JobServiceClient
    from google.cloud.aiplatform_v1.types import (SamplingStrategy, ModelMonitoringObjectiveConfig,
                                                  ModelMonitoringAlertConfig, ThresholdConfig,
                                                  ModelDeploymentMonitoringScheduleConfig,
                                                  ModelDeploymentMonitoringObjectiveConfig,
                                                  ModelDeploymentMonitoringJob
                                                  )
    from google.protobuf.duration_pb2 import Duration
    
    #try:
    pushed_destination = pushed_model.get()[0].__dict__['_artifact'].custom_properties['pushed_destination'].string_value.split('/')
    project_id = pushed_destination[1]
    region = pushed_destination[3]
    #except:
    #    project_id=json.loads(pusher_config)['ai_platform_serving_args']['project_id']
    #    region=json.loads(pusher_config)['ai_platform_vertex_region']
    
    #aiplatform.init(project=project_id, location=region)
    
    #endpoint = aiplatform.Endpoint.list(
      #filter=f"display_name={deployed_model_display_name}",
    #  order_by="update_time"
    #  )[-1]
    
    model = aiplatform.Model(pushed_model.get()[0].__dict__['_artifact'].custom_properties['pushed_destination'].string_value)
    #pusher_config = json.loads(pusher.exec_properties['custom_config'])

    model_name=model.display_name
    #endpoint.gca_resource.deployed_models[0].display_name
    endpoint_id = model.to_dict()['deployedModels'][0]['endpoint']
    #endpoint_id=endpoint.gca_resource.name.split('/')[-1]
    deployed_model_id = model.to_dict()['deployedModels'][0]['deployedModelId']
    #deployed_model_id=endpoint.list_models()[0].id
    
    # try:
    features_file_uri=statistics._artifacts[0].uri
    # except:
    #    features_file_uri='/content/interactive_pipeline/StatisticsGen/statistics/2'
    
    
    def get_features_names(features_file_uri: str):
        """
        Reads and parses json file containing features names from GCS bucket
        :param features_file_uri:
        :return: 
        """
        def load_statistics(file_path: str):
          return tfdv.load_stats_binary(file_path)

        return load_statistics(features_file_uri+'/Split-train/FeatureStats.pb')
    
    stats=get_features_names(features_file_uri)
    api_vertex_endpoint = f"{region.upper()}-aiplatform.googleapis.com"

    features = {}
    for feature in stats.datasets[0].features:
      features[feature.path.step[0]]:{'mean':feature.num_stats.mean, 'std_dev':feature.num_stats.std_dev}
    #numerical_features, categorical_features = get_features_names(features_file_uri)

    sampling_config = SamplingStrategy.RandomSampleConfig(sample_rate=SAMPLE_RATE)
    sampling_strategy = SamplingStrategy(random_sample_config=sampling_config)

    monitoring_duration = Duration(seconds=MONITORING_FREQUENCY)
    monitoring_config = ModelDeploymentMonitoringScheduleConfig(monitor_interval=monitoring_duration)

    email_config = ModelMonitoringAlertConfig.EmailAlertConfig(user_emails=[EMAILS])
    alert_config = ModelMonitoringAlertConfig(email_alert_config=email_config)

    # monitoring whether feature data distribution changes significantly over time
    drift_thresholds = {}
    default_threshold = ThresholdConfig(value=DEFAULT_THRESHOLD_VALUE)

    # set thresholds as default for all features
    for feature in features:
        drift_thresholds[feature] = 1,96*features[feature]['std_dev']+features[feature]['mean']

    drift_config = ModelMonitoringObjectiveConfig.PredictionDriftDetectionConfig(
        drift_thresholds=drift_thresholds
    )

    objective_config = ModelMonitoringObjectiveConfig(
        prediction_drift_detection_config=drift_config
    )
    monitoring_objective_configs = ModelDeploymentMonitoringObjectiveConfig(
        objective_config=objective_config
    )
    monitoring_objective_configs.deployed_model_id = deployed_model_id

    # create the monitoring job
    #endpoint = f"projects/{project_id}/locations/{region}/endpoints/{endpoint_id}"
    predict_schema = ""
    analysis_schema = ""

    # monitoring job will  create up to 4 bq tables with names :
    #   bq://<project_id>.model_deployment_monitoring_<endpoint_id>.<tolower(log_source)>_<tolower(log_type)>
    # https://cloud.google.com/python/docs/reference/aiplatform/latest/google.cloud.aiplatform_v1beta1.types.ModelDeploymentMonitoringJob
    monitoring_job = ModelDeploymentMonitoringJob(
        display_name=f"monitoring_{model_name}",
        endpoint=endpoint_id,
        model_deployment_monitoring_objective_configs=[monitoring_objective_configs],
        logging_sampling_strategy=sampling_strategy,
        model_deployment_monitoring_schedule_config=monitoring_config,
        model_monitoring_alert_config=alert_config,
        predict_instance_schema_uri=predict_schema,
        analysis_instance_schema_uri=analysis_schema,
        enable_monitoring_pipeline_logs=True
    )
    options = dict(api_endpoint=api_vertex_endpoint)
    client = JobServiceClient(client_options=options)
    parent = f"projects/{project_id}/locations/{region}"
    response = client.create_model_deployment_monitoring_job(
        parent=parent, model_deployment_monitoring_job=monitoring_job
    )

#    return response

In [47]:
from typing import List, Optional

def _create_pipeline(pipeline_name: str, pipeline_root: str, query: str,
                     transformer_module_file:str,
                     trainer_module_file: str,
                     endpoint_name: str,
                     project_id: str, region: str,
                     beam_pipeline_args: Optional[List[str]],
                     ) -> tfx.dsl.Pipeline:
    """Creates a TFX pipeline using BigQuery."""

    # query data in BigQuery as a data source.
    output = tfx.proto.Output(
             split_config=tfx.proto.SplitConfig(splits=[
                 tfx.proto.SplitConfig.Split(name='train', hash_buckets=4),
                 tfx.proto.SplitConfig.Split(name='eval', hash_buckets=1)
             ]))

    example_gen = tfx.extensions.google_cloud_big_query.BigQueryExampleGen(
      query=query, output_config=output)

    # compute the statistics
    statistics_gen = tfx.components.StatisticsGen(
      examples=example_gen.outputs['examples'])

    # generate schema
    schema_gen = tfx.components.SchemaGen(
      statistics=statistics_gen.outputs['statistics'])

    # validation component for inference examples
    validator = tfx.components.ExampleValidator(
      statistics=statistics_gen.outputs['statistics'],
      schema=schema_gen.outputs['schema']
      )

    # pre-processe data
    transformer = tfx.components.Transform(
      examples=example_gen.outputs['examples'],
      schema=schema_gen.outputs['schema'],
      module_file=transformer_module_file)

    # use user-provided Python function that trains a model
    trainer = tfx.components.Trainer(
      module_file=trainer_module_file,
      examples=transformer.outputs['transformed_examples'],
      transform_graph=transformer.outputs['transform_graph'],
      schema=schema_gen.outputs['schema'],
      train_args=tfx.proto.TrainArgs(num_steps=100),
      eval_args=tfx.proto.EvalArgs(num_steps=5))

    # push the model to model registry
    vertex_serving_spec = {
      'project_id': project_id,
      'endpoint_name': endpoint_name,
      'machine_type': 'n1-standard-4'
    }
    serving_image = 'europe-docker.pkg.dev/vertex-ai/prediction/tf2-cpu.2-8:latest'

    pusher = tfx.extensions.google_cloud_ai_platform.Pusher(
      model=trainer.outputs['model'],
      custom_config={
          tfx.extensions.google_cloud_ai_platform.ENABLE_VERTEX_KEY:
              True,
          tfx.extensions.google_cloud_ai_platform.VERTEX_REGION_KEY:
              region,
          tfx.extensions.google_cloud_ai_platform.VERTEX_CONTAINER_IMAGE_URI_KEY:
              serving_image,
          tfx.extensions.google_cloud_ai_platform.SERVING_ARGS_KEY:
            vertex_serving_spec,
      })

    # Create the monitoring job
    monitoring = CreateMonitoringJob(statistics=statistics_gen.outputs['statistics'],
                                  pushed_model=pusher.outputs['pushed_model'])
  

    components = [
      example_gen,
      statistics_gen,
      schema_gen,
      validator,
      transformer,
      trainer,
      pusher,
      monitoring
    ]

    return tfx.dsl.Pipeline(
      pipeline_name=pipeline_name,
      pipeline_root=pipeline_root,
      components=components,
      beam_pipeline_args=beam_pipeline_args)

In [30]:
%env GOOGLE_APPLICATION_CREDENTIALS ../secrets/sfeir-data-394b11f86ed4.json
!echo $GOOGLE_APPLICATION_CREDENTIALS

env: GOOGLE_APPLICATION_CREDENTIALS=../secrets/sfeir-data-394b11f86ed4.json
../secrets/sfeir-data-394b11f86ed4.json


In [55]:
!gsutil ls gs://devfest_mlops_2022

gs://devfest_mlops_2022/pipeline_module/
gs://devfest_mlops_2022/pipeline_root/
gs://devfest_mlops_2022/winequality/
gs://devfest_mlops_2022/winequality_white_wine/


In [51]:
PIPELINE_ROOT

'/content/wine-quality-csv/pipeline_root/'

In [56]:
import os

BIG_QUERY_WITH_DIRECT_RUNNER_BEAM_PIPELINE_ARGS = [
   '--project=' + GOOGLE_CLOUD_PROJECT,
   '--temp_location=' + os.path.join('gs://', GCS_BUCKET_NAME, 'tmp'),
   ]

PIPELINE_DEFINITION_FILE = PIPELINE_NAME + '_pipeline.json'

runner = tfx.orchestration.experimental.KubeflowV2DagRunner(
    config=tfx.orchestration.experimental.KubeflowV2DagRunnerConfig(),
    output_filename=PIPELINE_DEFINITION_FILE
)
_ = runner.run(
    _create_pipeline(
        pipeline_name=PIPELINE_NAME,
        pipeline_root='gs://devfest_mlops_2022/pipeline_root',
        query=QUERY,
        transformer_module_file=os.path.join(MODULE_ROOT, _transformer_module_file),
        trainer_module_file=os.path.join(MODULE_ROOT, _trainer_module_file),
        endpoint_name=ENDPOINT_NAME,
        project_id=GOOGLE_CLOUD_PROJECT,
        region=GOOGLE_CLOUD_REGION,
        beam_pipeline_args=BIG_QUERY_WITH_DIRECT_RUNNER_BEAM_PIPELINE_ARGS
    )
)

INFO:absl:Excluding no splits because exclude_splits is not set.
INFO:absl:Excluding no splits because exclude_splits is not set.
INFO:absl:Excluding no splits because exclude_splits is not set.
INFO:absl:Generating ephemeral wheel package for '/content/wine-quality-csv/pipeline_module/wine_quality_transformer.py' (including modules: ['wine_quality_transformer', 'wine_quality_trainer']).
INFO:absl:User module package has hash fingerprint version c1150eedbb85fec0209086ea8cfe31f83b43deb0437299552bc480ac495bccf8.
INFO:absl:Executing: ['/usr/bin/python3', '/tmp/tmpen4bgv_z/_tfx_generated_setup.py', 'bdist_wheel', '--bdist-dir', '/tmp/tmpdfe10qvf', '--dist-dir', '/tmp/tmp36ea0nwj']


running bdist_wheel
running build
running build_py
creating build
creating build/lib
copying wine_quality_transformer.py -> build/lib
copying wine_quality_trainer.py -> build/lib
installing to /tmp/tmpdfe10qvf
running install
running install_lib
copying build/lib/wine_quality_transformer.py -> /tmp/tmpdfe10qvf
copying build/lib/wine_quality_trainer.py -> /tmp/tmpdfe10qvf
running install_egg_info
running egg_info
creating tfx_user_code_Transform.egg-info
writing tfx_user_code_Transform.egg-info/PKG-INFO
writing dependency_links to tfx_user_code_Transform.egg-info/dependency_links.txt
writing top-level names to tfx_user_code_Transform.egg-info/top_level.txt
writing manifest file 'tfx_user_code_Transform.egg-info/SOURCES.txt'
reading manifest file 'tfx_user_code_Transform.egg-info/SOURCES.txt'
writing manifest file 'tfx_user_code_Transform.egg-info/SOURCES.txt'
Copying tfx_user_code_Transform.egg-info to /tmp/tmpdfe10qvf/tfx_user_code_Transform-0.0+c1150eedbb85fec0209086ea8cfe31f83b43deb0

INFO:absl:Successfully built user code wheel distribution at 'gs://devfest_mlops_2022/pipeline_root/_wheels/tfx_user_code_Transform-0.0+c1150eedbb85fec0209086ea8cfe31f83b43deb0437299552bc480ac495bccf8-py3-none-any.whl'; target user module is 'wine_quality_transformer'.
INFO:absl:Full user module path is 'wine_quality_transformer@gs://devfest_mlops_2022/pipeline_root/_wheels/tfx_user_code_Transform-0.0+c1150eedbb85fec0209086ea8cfe31f83b43deb0437299552bc480ac495bccf8-py3-none-any.whl'
INFO:absl:Generating ephemeral wheel package for '/content/wine-quality-csv/pipeline_module/wine_quality_trainer.py' (including modules: ['wine_quality_transformer', 'wine_quality_trainer']).
INFO:absl:User module package has hash fingerprint version c1150eedbb85fec0209086ea8cfe31f83b43deb0437299552bc480ac495bccf8.
INFO:absl:Executing: ['/usr/bin/python3', '/tmp/tmpx55xflfu/_tfx_generated_setup.py', 'bdist_wheel', '--bdist-dir', '/tmp/tmpfkwxf5kb', '--dist-dir', '/tmp/tmplvjbtxa9']


running bdist_wheel
running build
running build_py
creating build
creating build/lib
copying wine_quality_transformer.py -> build/lib
copying wine_quality_trainer.py -> build/lib
installing to /tmp/tmpfkwxf5kb
running install
running install_lib
copying build/lib/wine_quality_transformer.py -> /tmp/tmpfkwxf5kb
copying build/lib/wine_quality_trainer.py -> /tmp/tmpfkwxf5kb
running install_egg_info
running egg_info
creating tfx_user_code_Trainer.egg-info
writing tfx_user_code_Trainer.egg-info/PKG-INFO
writing dependency_links to tfx_user_code_Trainer.egg-info/dependency_links.txt
writing top-level names to tfx_user_code_Trainer.egg-info/top_level.txt
writing manifest file 'tfx_user_code_Trainer.egg-info/SOURCES.txt'
reading manifest file 'tfx_user_code_Trainer.egg-info/SOURCES.txt'
writing manifest file 'tfx_user_code_Trainer.egg-info/SOURCES.txt'
Copying tfx_user_code_Trainer.egg-info to /tmp/tmpfkwxf5kb/tfx_user_code_Trainer-0.0+c1150eedbb85fec0209086ea8cfe31f83b43deb0437299552bc480ac49

INFO:absl:Successfully built user code wheel distribution at 'gs://devfest_mlops_2022/pipeline_root/_wheels/tfx_user_code_Trainer-0.0+c1150eedbb85fec0209086ea8cfe31f83b43deb0437299552bc480ac495bccf8-py3-none-any.whl'; target user module is 'wine_quality_trainer'.
INFO:absl:Full user module path is 'wine_quality_trainer@gs://devfest_mlops_2022/pipeline_root/_wheels/tfx_user_code_Trainer-0.0+c1150eedbb85fec0209086ea8cfe31f83b43deb0437299552bc480ac495bccf8-py3-none-any.whl'


In [57]:
from google.cloud import aiplatform
from google.cloud.aiplatform import pipeline_jobs
import logging
logging.getLogger().setLevel(logging.INFO)

aiplatform.init(project=GOOGLE_CLOUD_PROJECT, location=GOOGLE_CLOUD_REGION)

job = pipeline_jobs.PipelineJob(template_path=PIPELINE_DEFINITION_FILE,
                                display_name=PIPELINE_NAME)
job.submit()

Creating PipelineJob


INFO:google.cloud.aiplatform.pipeline_jobs:Creating PipelineJob


InvalidArgument: 400 You do not have permission to act as service_account: 881499317005-compute@developer.gserviceaccount.com. (or it may not exist).

In [15]:
def _float_feature(value):
  """Returns a float_list from a float / double."""
  return tf.train.Feature(float_list=tf.train.FloatList(value=[value]))

def serialize_example(alcohol, chlorides, citric_acid, density, fixed_acidity, free_sulfur_dioxide,
                      ph, residual_sugar, sulphates, total_sulfur_dioxide, volatile_acidity):
  """
  Creates a tf.train.Example message ready to be written to a file.
  """
  # Create a dictionary mapping the feature name to the tf.train.Example-compatible
  # data type.
  feature = {
      'alcohol': _float_feature(alcohol),
      'chlorides': _float_feature(chlorides),
      'citric_acid': _float_feature(citric_acid),
      'density': _float_feature(density),
      'fixed_acidity': _float_feature(fixed_acidity),
      'free_sulfur_dioxide': _float_feature(free_sulfur_dioxide),
      'ph': _float_feature(ph),
      'residual_sugar': _float_feature(residual_sugar),
      'sulphates': _float_feature(sulphates),
      'total_sulfur_dioxide': _float_feature(total_sulfur_dioxide),
      'volatile_acidity': _float_feature(volatile_acidity),
  }

  # Create a Features message using tf.train.Example.

  example_proto = tf.train.Example(features=tf.train.Features(feature=feature))
  return example_proto.SerializeToString()

In [41]:
!ls ../secrets

sfeir-data-394b11f86ed4.json


In [48]:
!cat ../secrets/sfeir-data-394b11f86ed4.json

{
  "type": "service_account",
  "project_id": "sfeir-data",
  "private_key_id": "394b11f86ed402dd6f3216df56a38805086f118c",
  "private_key": "-----BEGIN PRIVATE KEY-----\nMIIEvQIBADANBgkqhkiG9w0BAQEFAASCBKcwggSjAgEAAoIBAQCf4L6pnPqw+wlO\nPeq/55DDP4lTv6TDTu/7p31WcARO3X3/W+MEg5SntcH1AWKHxhnza+rhWJzNUOrb\nY/7r0AXeojsEUQ+/+LmTvk6KxBrQgolufobajCP8ryJ3p7zvVO6mFeRWf0uBnepa\nImVRSbKUdCdKlELOYjRVS1IkDjc4WC1Jve/zpAQIkKmyBXOVAEe8Sl7w2DafPK1Z\nJuv4g2Jx5PNZfE/O+sjV8x3X7c9b037Z9YEuoAhau9X0TJ/bgCRsjViiv7PIl6a/\n6MXoAWBICz7I36jwj0SO9+Ps8sdO3oOJByj33WEB4dORSW7qPFU6aAWXXOkXzNGw\nKoHFwgjfAgMBAAECggEAOJlWxjMVqMvmWnWfAnsXC5gVpLlmueHbIYsE2zHixIyz\nC1DbSIDQgqLc3EC2QfzGuR9OUqKNOD5aNnrrB05olY13OSr9WWfTQtOPeJ6+I2zi\n/GP8l6gfIVha6mNOhqfZqKB9aoW8FFH0Yh5lIebmOpn4QvXDxM6NWljE5pDFycMW\nU0E9p97BJ+cCBA/Y10JIQoQSKx1sNizB6uVmyaucSojmp0rtOGOMIfS1Box+hxA6\nQW9e9gPZGK2Yj5g0Mh+INlH3lN+rwQ5H94YviVac0J0qOWQcfzPhehPQjgh+ptWC\nlaRSFHHO/VH4JnEMmcLUewQAxyIsxm/ru1xrHYOQEQKBgQDPkH5beTW7e/Jqk7bU\nHXkc0kwGJNCFVQ3KJuLRz6SeCzc6aIZQ

In [30]:
from google.cloud import aiplatform
# The AI Platform services require regional API endpoints.
client_options = {
    'api_endpoint': GOOGLE_CLOUD_REGION + '-aiplatform.googleapis.com'
    }

# Initialize client that will be used to create and send requests.
client = aiplatform.gapic.PredictionServiceClient(client_options=client_options)

ENDPOINT_ID='7290923176433811456'

endpoint = client.endpoint_path(
        project=GOOGLE_CLOUD_PROJECT, location=GOOGLE_CLOUD_REGION, endpoint=ENDPOINT_ID
    )

In [62]:
import base64
instances = [{
      "examples":{'b64': 
      base64.b64encode(serialize_example(13.8, 0.036, 0.0, 0.98981, 4.7, 
                                         23.0, 3.53, 3.4, 0.92, 134.0, 0.785)).decode()}
      }]
response = client.predict(endpoint=endpoint, instances=instances)

In [63]:
response

predictions {
  list_value {
    values {
      number_value: -0.130293861
    }
  }
}
deployed_model_id: "41464782506688512"
model: "projects/881499317005/locations/europe-west1/models/1821934747689943040"
model_display_name: "v1663106292"
model_version_id: "1"