<a href="https://colab.research.google.com/github/AnandInguva/beam/blob/notebook/beam/examples/notebooks/beam-ml/side_Input_model_updates.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
# @title ###### Licensed to the Apache Software Foundation (ASF), Version 2.0 (the "License")

# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
#   http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License

# Update ML models in running pipelines

The pipeline in this notebook uses a RunInference `PTransform` to run inference on images using TensorFlow models. To update the model, it uses a side input `PCollection` that emits `ModelMetadata`.

You can use side inputs to update your model in real-time, even while the Apache Beam pipeline is running. The side input is passed in a `ModelHandler` configuration object. You can update the model either by leveraging one of Apache Beam's provided patterns, such as the `WatchFilePattern`, or by configuring a custom side input `PCollection` that defines the logic for the model update.

For more information about side inputs, see the [Side inputs](https://beam.apache.org/documentation/programming-guide/#side-inputs) section in the Apache Beam Programming Guide.

This example uses `WatchFilePattern` as a side input. `WatchFilePattern` is used to watch for the file updates matching the `file_pattern` based on timestamps. It emits the latest `ModelMetadata`, which is used in the RunInference `PTransform` to automatically update the ML model without stopping the Apache Beam pipeline.


## Before you begin
Install the dependencies required to run this notebook.

To use RunInference with side inputs for automatic model updates, install `Apache Beam` version `2.46.0` or later.

In [None]:
!pip install apache_beam[gcp]>=2.46.0 --quiet
!pip install tensorflow
!pip install tensorflow_hub

In [None]:
# Imports required for the notebook.
import logging
import time
from typing import Iterable
from typing import Tuple

import apache_beam as beam
from apache_beam.examples.inference.tensorflow_imagenet_segmentation import PostProcessor
from apache_beam.examples.inference.tensorflow_imagenet_segmentation import read_image
from apache_beam.ml.inference.base import PredictionResult
from apache_beam.ml.inference.base import RunInference
from apache_beam.ml.inference.tensorflow_inference import TFModelHandlerTensor
from apache_beam.ml.inference.utils import WatchFilePattern
from apache_beam.options.pipeline_options import GoogleCloudOptions
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.options.pipeline_options import SetupOptions
from apache_beam.options.pipeline_options import StandardOptions
from apache_beam.transforms.periodicsequence import PeriodicImpulse

In [None]:
# Authenticate to your Google Cloud account.
from google.colab import auth
auth.authenticate_user()

## Runner

This pipeline runs on the Dataflow Runner. Ensure that you have all the required permissions to run the pipeline on Dataflow.

Configure the pipeline options for the pipeline to run on Dataflow. Make sure the pipeline is using streaming mode.

In [None]:
options = PipelineOptions()
options.view_as(StandardOptions).streaming = True

# Provide required pipeline options for the Dataflow Runner.
options.view_as(StandardOptions).runner = "DataflowRunner"

# Set the project to the default project in your current Google Cloud environment.
options.view_as(GoogleCloudOptions).project = 'your-project'

# Set the Google Cloud region that you want to run Dataflow in.
options.view_as(GoogleCloudOptions).region = 'us-central1'

# IMPORTANT: Update the following line to choose a Cloud Storage location.
dataflow_gcs_location = "gs://your-bucket/tmp/"

# The Dataflow staging location. This location is used to stage the Dataflow pipeline and the SDK binary.
options.view_as(GoogleCloudOptions).staging_location = '%s/staging' % dataflow_gcs_location

# The Dataflow temp location. This location is used to store temporary files or intermediate results before outputting to the sink.
options.view_as(GoogleCloudOptions).temp_location = '%s/temp' % dataflow_gcs_location



Install the `tensorflow` and `tensorflow_hub` dependencies on Dataflow. Use the `requirements_file` pipeline option to pass these dependencies.

In [None]:
# In a requirements file, define the dependencies required for the pipeline.
deps_required_for_pipeline = ['tensorflow>=2.12.0', 'tensorflow-hub>=0.10.0', 'Pillow>=9.0.0']
requirements_file_path = './requirements.txt'
# Write the depencies to the requirements file.
with open(requirements_file_path, 'w') as f:
  for dep in deps_required_for_pipeline:
    f.write(dep + '\n')

# Install the pipeline dependencies on Dataflow.
options.view_as(SetupOptions).requirements_file = requirements_file_path

## TensorFlow ModelHandler
 This example uses `TFModelHandlerTensor` as the model handler and the `resnet_101` model trained on imagenet.

 Download the model from [Google Cloud Storage](https://storage.googleapis.com/tensorflow/keras-applications/resnet/resnet101_weights_tf_dim_ordering_tf_kernels.h5) (link downloads the model), and place it in the directory that you want to use to update your model.

In [None]:
model_handler = TFModelHandlerTensor(
    model_uri="gs://your-bucket/resnet101_weights_tf_dim_ordering_tf_kernels.h5")

## Pre-process images

Use `preprocess_image` to run the inference, read the image, and convert the image to a TensorFlow tensor.

In [None]:
def preprocess_image(image_name, image_dir):
  img = tf.keras.utils.get_file(image_name, image_dir + image_name)
  img = Image.open(img).resize((224, 224))
  img = numpy.array(img) / 255.0
  img_tensor = tf.cast(tf.convert_to_tensor(img[...]), dtype=tf.float32)
  return img_tensor

In [None]:
class PostProcessor(beam.DoFn):
  """Process the PredictionResult to get the predicted label.
  Returns predicted label.
  """
  def process(self, element: PredictionResult) -> Iterable[Tuple[str, str]]:
    predicted_class = numpy.argmax(element.inference, axis=-1)
    labels_path = tf.keras.utils.get_file(
        'ImageNetLabels.txt',
        'https://storage.googleapis.com/download.tensorflow.org/data/ImageNetLabels.txt'  # pylint: disable=line-too-long
    )
    imagenet_labels = numpy.array(open(labels_path).read().splitlines())
    predicted_class_name = imagenet_labels[predicted_class]
    yield predicted_class_name.title(), element.model_id

In [None]:
# Define the pipeline object.
pipeline = beam.Pipeline(options=options)

Next, review the pipeline steps and examine the code.

### Pipeline steps


1. Create a `PeriodicImpulse`, which emits output every `n` seconds. The `PeriodicImpulse` transform generates an infinite sequence of elements with a given runtime interval.

  In this example, `PeriodicImpulse` mimics the Pub/Sub source. Because the inputs in a streaming pipeline arrives in intervals, use `PeriodicImpulse` to output elements at `m` intervals.

To learn more about `PeriodicImpulse`, see the [`PeriodicImpulse` code](https://github.com/apache/beam/blob/9c52e0594d6f0e59cd17ee005acfb41da508e0d5/sdks/python/apache_beam/transforms/periodicsequence.py#L150).

In [None]:
start_timestamp = time.time() # start timestamp of the periodic impulse
end_timestamp = start_timestamp + 60 * 20 # end timestamp of the periodic impulse.
main_input_fire_interval = 60 # interval at which the main input PCollection is emitted.
side_input_fire_interval = 60 # interval at which the side input PCollection is emitted.

periodic_impulse = (
      pipeline
      | "MainInputPcoll" >> PeriodicImpulse(
          start_timestamp=start_timestamp,
          stop_timestamp=end_timestamp,
          fire_interval=main_input_fire_interval)

2. To read and pre-process the images, use the `read_image` function. This example uses `Cat-with-beanie.jpg` for all inferences.

In [None]:
image_data = (periodic_impulse | beam.Map(lambda x: "Cat-with-beanie.jpg")
      | "ReadImage" >> beam.Map(lambda image_name: read_image(
          image_name=image_name, image_dir='https://storage.googleapis.com/apache-beam-samples/image_captioning/')))

3. Pass the images to the RunInference `PTransform`. RunInference takes `model_handler` and `model_metadata_pcoll` as input parameters.
  * `model_metadata_pcoll` is a [side input](https://beam.apache.org/documentation/programming-guide/#side-inputs) `PCollection` to the RunInference `PTransform`. This side input is used to update the `model_uri` in the `model_handler` without needing to stop the Apache Beam pipeline. Use `WatchFilePattern` as side input to watch a `file_pattern` matching `.h5` files. In this case, the `file_pattern` is `'gs://your-bucket/*.h5'`.

  **How to watch for the automatic model update**

  After the pipeline starts processing data and when you see output emitted from the RunInference `PTransform`, upload a `.h5` `TensorFlow` model (for example, [resnet_152](https://storage.googleapis.com/tensorflow/keras-applications/resnet/resnet152_weights_tf_dim_ordering_tf_kernels.h5)) that matches the `file_pattern` to the Google Cloud Storage bucket. RunInference uses `WatchFilePattern` as a side input to update the `model_uri` of `TFModelHandlerTensor`.


In [None]:
 # The side input used to watch for the .h5 file and update the model_uri of the TFModelHandlerTensor.
 file_pattern = 'gs://your-bucket/*.h5'
  side_input_pcoll = (
      pipeline
      | "WatchFilePattern" >> WatchFilePattern(file_pattern=file_pattern,
                                                interval=side_input_fire_interval,
                                                stop_timestamp=end_timestamp))
 inferences = (
     image_data
     | "ApplyWindowing" >> beam.WindowInto(beam.window.FixedWindows(10))
     | "RunInference" >> RunInference(model_handler=model_handler,
                                      model_metadata_pcoll=side_input_pcoll))

4. Post-process the `PredictionResult` object.

  When the inference is complete, RunInference outputs a `PredictionResult` object that contains the fields `example`, `inference`, and `model_id`. The `model_id` field is used to identify which model is used for running the inference. The `PostProcessor` returns the predicted label and the model ID used to run the inference on the predicted label.

In [None]:
post_processor = (
    inferences
    | "PostProcessResults" >> beam.ParDo(PostProcessor())
    | "LogResults" >> beam.Map(logging.info))

## Run the pipeline

In [None]:
# Run the pipeline.
result = pipeline.run().wait_until_finish()