<a href="https://colab.research.google.com/github/deep-diver/Continuous-Adaptation-for-Machine-Learning-System-to-Data-Changes/blob/main/notebooks/04_Cloud_Scheduler_Trigger.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Outline

1. Create Pub/Sub Topic ([refer](https://github.com/sayakpaul/CI-CD-for-Model-Training/blob/main/cloud_function_trigger.ipynb))
2. Deploy Cloud Function ([refer](https://github.com/sayakpaul/CI-CD-for-Model-Training/blob/main/cloud_function_trigger.ipynb))
    - check if there are enough number of images in a specific GCS bucket
3. Publish Pub/Sub Topic to trigger batch prediction pipeline ([refer](https://github.com/sayakpaul/CI-CD-for-Model-Training/blob/main/cloud_scheduler_trigger.ipynb))
    - need pipeline JSON spec somewhere in GCS

# Setup

In [39]:
!pip install --upgrade -q google-cloud-scheduler

In [None]:
!gcloud init

In [1]:
from google.colab import auth
auth.authenticate_user()

In [2]:
GOOGLE_CLOUD_PROJECT = "gcp-ml-172005" #@param {type:"string"}
GOOGLE_CLOUD_REGION = "us-central1"

GCS_BUCKET_NAME = 'cifar10-experimental-csp2'    #@param {type:"string"}
PIPELINE_NAME = "continuous-adaptation-for-data-changes-batch" #@param {type:"string"}
PIPELINE_ROOT = 'gs://{}/pipeline_root/{}'.format(GCS_BUCKET_NAME, PIPELINE_NAME) 
PIPELINE_LOCATION = f"{PIPELINE_ROOT}/{PIPELINE_NAME}.json" 

PUBSUB_TOPIC = f"trigger-{PIPELINE_NAME}" 

SCHEDULER_JOB_NAME = f"scheduler-job-{PUBSUB_TOPIC}"

IMAGE_LOCATION = 'gs://batch-prediction-collection-3' #@param {type:"string"}

In [3]:
IMAGE_LOCATION

'gs://batch-prediction-collection-3'

# Create Pub/Sub Topic

In [8]:
!gcloud pubsub topics create {PUBSUB_TOPIC}

Created topic [projects/gcp-ml-172005/topics/trigger-continuous-adaptation-for-data-changes-batch].


# Deploy Cloud Function

### Create Cloud Function Directory

In [9]:
!mkdir -p cloud_function
!touch cloud_function/__init__.py

### Create Requirements.txt

In [10]:
_cloud_function_dep = 'cloud_function/requirements.txt'

In [11]:
%%writefile {_cloud_function_dep}

kfp==1.6.2
google-cloud-aiplatform
google-cloud-storage

Writing cloud_function/requirements.txt


### Create Cloud Function Module

In [4]:
_cloud_function_file = 'cloud_function/main.py'

In [5]:
%%writefile {_cloud_function_file}

import os
import json
import logging
import base64

from kfp.v2.google.client import AIPlatformClient
from google.cloud import storage

def get_number_of_images(storage_client, gcs_image_file_location):
    path_parts = gcs_image_file_location.replace("gs://", "").split("/")
    bucket_name = path_parts[0]

    blobs = storage_client.list_blobs(bucket_name)

    count = 0
    for blob in blobs:
      if blob.name.split('.')[-1] == "jpg":
        count = count+1

    return count

def is_there_enough_images(storage_client, gcs_image_file_location, threshold):
    number_of_images = get_number_of_images(storage_client, gcs_image_file_location)
    print(f'number of images = {number_of_images}')
    return number_of_images >= threshold

def trigger_pipeline(event, context):
    # Parse the environment variables.
    project = os.getenv("PROJECT")
    region = os.getenv("REGION")
    gcs_pipeline_file_location = os.getenv("GCS_PIPELINE_FILE_LOCATION")
    gcs_image_file_location = os.getenv("GCS_IMAGE_FILE_LOCATION")

    print(project)
    print(region)
    print(gcs_pipeline_file_location)
    print(gcs_image_file_location)
    
    threshold = 100

    # Check if the pipeline file exists in the provided GCS Bucket.
    storage_client = storage.Client()

    if is_there_enough_images(storage_client, gcs_image_file_location, threshold):
      path_parts = gcs_pipeline_file_location.replace("gs://", "").split("/")
      bucket_name = path_parts[0]
      blob_name = "/".join(path_parts[1:])

      bucket = storage_client.bucket(bucket_name)
      blob = storage.Blob(bucket=bucket, name=blob_name)

      if not blob.exists(storage_client):
          raise ValueError(f"{gcs_pipeline_file_location} does not exist.")
      
      # Parse the data from the Pub/Sub trigger message.
      # data = base64.b64decode(event["data"]).decode("utf-8")
      # logging.info(f"Event data: {data}")

      # parameter_values = json.loads(data)
      
      # Initialize Vertex AI API client and submit for pipeline execution.
      api_client = AIPlatformClient(project_id=project, region=region)

      response = api_client.create_run_from_job_spec(
          job_spec_path=gcs_pipeline_file_location,
          # parameter_values=parameter_values,
          enable_caching=True,
      )

      logging.info(response)

Overwriting cloud_function/main.py


### Deploy Cloud Function

In [6]:
!cd cloud_function

In [7]:
ENV_VARS=f"""\
PROJECT={GOOGLE_CLOUD_PROJECT},\
REGION={GOOGLE_CLOUD_REGION},\
GCS_PIPELINE_FILE_LOCATION={PIPELINE_LOCATION},\
GCS_IMAGE_FILE_LOCATION={IMAGE_LOCATION}
"""

!echo {ENV_VARS}

PROJECT=gcp-ml-172005,REGION=us-central1,GCS_PIPELINE_FILE_LOCATION=gs://cifar10-experimental-csp2/pipeline_root/continuous-adaptation-for-data-changes-batch/continuous-adaptation-for-data-changes-batch.json,GCS_IMAGE_FILE_LOCATION=gs://batch-prediction-collection-3


In [8]:
BUCKET = f'gs://{GCS_BUCKET_NAME}'
CLOUD_FUNCTION_NAME = f'trigger-{PIPELINE_NAME}-fn'

!gcloud functions deploy {CLOUD_FUNCTION_NAME} \
    --region={GOOGLE_CLOUD_REGION} \
    --trigger-topic={PUBSUB_TOPIC} \
    --runtime=python37 \
    --source=cloud_function\
    --entry-point=trigger_pipeline\
    --stage-bucket={BUCKET}\
    --update-env-vars={ENV_VARS}


For Cloud Build Logs, visit: https://console.cloud.google.com/cloud-build/builds;region=us-central1/dcc7b7aa-780f-423c-93a3-ff5abce7b5f0?project=874401645461
availableMemoryMb: 256
buildId: e7093ad3-2aa2-4d46-899b-844118c226ea
buildName: projects/874401645461/locations/us-central1/builds/e7093ad3-2aa2-4d46-899b-844118c226ea
entryPoint: trigger_pipeline
environmentVariables:
  GCS_IMAGE_FILE_LOCATION: gs://batch-prediction-collection-3
  GCS_PIPELINE_FILE_LOCATION: gs://cifar10-experimental-csp2/pipeline_root/continuous-adaptation-for-data-changes-batch/continuous-adaptation-for-data-changes-batch.json
  PROJECT: gcp-ml-172005
  REGION: us-central1
eventTrigger:
  eventType: google.pubsub.topic.publish
  failurePolicy: {}
  resource: projects/gcp-ml-172005/topics/trigger-continuous-adaptation-for-data-changes-batch
  service: pubsub.googleapis.com
ingressSettings: ALLOW_ALL
labels:
  deployment-tool: cli-gcloud
name: projects/gcp-ml-172005/locations/us-central1/functions/trigger-contin

### See the Progress

In [25]:
import IPython

cloud_fn_url = f"https://console.cloud.google.com/functions/details/{GOOGLE_CLOUD_REGION}/{CLOUD_FUNCTION_NAME}"
html = (
    f'See the Cloud Function details <a href="{cloud_fn_url}" target="_blank">here</a>.'
)
IPython.display.display(IPython.display.HTML(html))

# Create Cloud Scheduler's Job

In [49]:
!gcloud scheduler jobs create pubsub $SCHEDULER_JOB_NAME --schedule "*/3 * * * *" --topic $PUBSUB_TOPIC --attributes name=scheduler #every hour

name: projects/gcp-ml-172005/locations/us-central1/jobs/scheduler-job-trigger-continuous-adaptation-for-data-changes-batch
pubsubTarget:
  attributes:
    name: scheduler
  topicName: projects/gcp-ml-172005/topics/trigger-continuous-adaptation-for-data-changes-batch
retryConfig:
  maxBackoffDuration: 3600s
  maxDoublings: 16
  maxRetryDuration: 0s
  minBackoffDuration: 5s
schedule: '*/3 * * * *'
state: ENABLED
timeZone: Etc/UTC
userUpdateTime: '2021-10-18T01:10:04Z'
