<a href="https://colab.research.google.com/github/BasuSam/gcp-dataflow/blob/main/GCP_Dataflow_Beam.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
!pip install apache-beam[gcp]
import apache_beam as beam

Enable App Engine, Cloud Scheduler, Cloud Build APIs.

In [None]:
#upload the Service Account Key
from google.colab import files
uploaded=files.upload()
!pwd
!ls

Saving sandpit-samrat-basu-179x-660313590649.json to sandpit-samrat-basu-179x-660313590649.json
/content
sample_data  sandpit-samrat-basu-179x-660313590649.json


In [None]:
# Set Service Account key in the environment variable
import os
os.environ["GOOGLE_APPLICATION_CREDENTIALS"] = '/content/sandpit-samrat-basu-179x-660313590649.json'

In [None]:
# Steps to enable access to GCLOUD & GSUTIL
from google.colab import auth
auth.authenticate_user()
!curl https://sdk.cloud.google.com | bash
!gcloud init

In [None]:
#Create GCS bucket
!export BUCKET="179x-dataflow"
!gsutil mb -c nearline -l europe-west2 gs://$BUCKET

In [None]:
#Create a Pub/Sub topic and a subscription to that topic. This is a streaming source of data for the sample.
# For simplicity use the same topic name as the subscription name.
!export TOPIC="messages"
!export SUBSCRIPTION="$TOPIC"

!gcloud pubsub topics create $TOPIC
!gcloud pubsub subscriptions create --topic $TOPIC $SUBSCRIPTION

In [None]:
#Create a Cloud Scheduler job to publish "positive" and "negative" ratings every 1 and 2 minutes. 
#This publishes messages to the Pub/Sub source topic.

# Create a publisher for "positive ratings" that publishes 1 message per minute
# If an App Engine app does not exist for the project, this step will create one (To check why App is required).
!gcloud scheduler jobs create pubsub positive-ratings-publisher \
  --schedule="*/1 * * * *" \
  --topic="$TOPIC" \
  --message-body='{"url": "https://www.amazon.com/", "review": "positive"}'

# Start the job.
!gcloud scheduler jobs run positive-ratings-publisher

# Create and run another similar publisher for "negative ratings" that
# publishes 1 message every 2 minutes.
!gcloud scheduler jobs create pubsub negative-ratings-publisher \
  --schedule="*/2 * * * *" \
  --topic="$TOPIC" \
  --message-body='{"url": "https://www.amazon.com/", "review": "negative"}'

!gcloud scheduler jobs run negative-ratings-publisher

In [None]:
#Create Dataset in BigQuery

!export PROJECT="$(gcloud config get-value project)"
!export DATASET="beam_samples"
!export TABLE="streaming_beam"

!bq mk --location "europe-west2" --dataset "$PROJECT:$DATASET"

In [None]:
#Clone the python-docs-samples repository and navigate to the code sample.
!git clone https://github.com/GoogleCloudPlatform/python-docs-samples.git
!cd python-docs-samples/dataflow/flex-templates/streaming_beam

Now to build an Apache Beam streaming pipeline that reads JSON encoded messages from Pub/Sub, transforms the message data, and writes the results to a BigQuery table.

Important artifacts downloaded from GIT:

Dockerfile

streaming_beam.py

metadata.json

In [None]:
#Building a container image
#Build the Docker image for the Apache Beam pipeline. We are using Cloud Build so we don't need a local installation of Docker.

# (Optional) Enable to use Kaniko cache by default.
!gcloud config set builds/use_kaniko True

In [None]:
#CloudBuild builds a Docker image using a Dockerfile and saves it into Container Registry; where the image is accessible to other Google Cloud products.
!export TEMPLATE_IMAGE="eu.gcr.io/$PROJECT/samples/dataflow/streaming-beam:latest"

# Build the image into Container Registry, this is roughly equivalent to:
#   gcloud auth configure-docker
#   docker image build -t $TEMPLATE_IMAGE .
#   docker push $TEMPLATE_IMAGE
!gcloud builds submit --tag "$TEMPLATE_IMAGE" .

Creating a Flex Template

To run a template, we need to create a template spec file containing all the necessary information to run the job, such as the SDK information and metadata.

The metadata.json file contains additional information for the template such as the "name", "description" and input "parameters" field.

The template file must be created in a Cloud Storage location, and is used to run a new Dataflow job.



In [None]:
!export TEMPLATE_PATH="gs://$BUCKET/samples/dataflow/templates/streaming-beam.json"

# Build the Flex Template using the base docker image that GCP provided.
!gcloud dataflow flex-template build $TEMPLATE_PATH \
  --image "$TEMPLATE_IMAGE" \
  --sdk-language "PYTHON" \
  --metadata-file "metadata.json"


Running a Dataflow Flex Template pipeline

Now run the Apache Beam pipeline in Dataflow by referring to the template file and passing the template parameters required by the pipeline

In [None]:
!export REGION="europe-west2"

# Run the Flex Template.
!gcloud dataflow flex-template run "streaming-beam-`date +%Y%m%d-%H%M%S`" \
    --template-file-gcs-location "$TEMPLATE_PATH" \
    --parameters input_subscription="projects/$PROJECT/subscriptions/$SUBSCRIPTION" \
    --parameters output_table="$PROJECT:$DATASET.$TABLE" \
    --region "$REGION"


In [None]:
#Check the results in BigQuery by running the following query:

bq query --use_legacy_sql=false 'SELECT * FROM `'"$PROJECT.$DATASET.$TABLE"'`'

While this pipeline is running, we can see new rows appended into the BigQuery table every minute.

We can manually publish more messages from the Cloud Scheduler page to see how that affects the page review scores.

We can also publish messages directly to a topic through the Pub/Sub topics page by selecting the topic you want to publish to, and then clicking the "Publish message" button at the top. This way we can test the pipeline with different URLs, just make sure you pass valid JSON data since this sample does not do any error handling for code simplicity.

Try sending the following message and check back the BigQuery table about a minute later.

{"url": "https://cloud.google.com/bigquery/", "review": "positive"}

In [None]:
#Cleaning up

#Clean up the Flex template resources
#Stop the Dataflow pipeline.
!gcloud dataflow jobs list \
    --filter 'NAME:streaming-beam AND STATE=Running' \
    --format 'value(JOB_ID)' \
    --region "$REGION" \
  | xargs gcloud dataflow jobs cancel --region "$REGION"

#Delete the template spec file from Cloud Storage.
!gsutil rm $TEMPLATE_PATH

#Delete the Flex Template container image from Container Registry.
!gcloud container images delete $TEMPLATE_IMAGE --force-delete-tags


#Clean up Google Cloud project resources
#Delete the Cloud Scheduler jobs.
!gcloud scheduler jobs delete negative-ratings-publisher
!gcloud scheduler jobs delete positive-ratings-publisher

#Delete the Pub/Sub subscription and topic.
!gcloud pubsub subscriptions delete $SUBSCRIPTION
!gcloud pubsub topics delete $TOPIC

#Delete the BigQuery items.
!bq rm -f -t $PROJECT:$DATASET.$TABLE
!bq rm -r -f -d $PROJECT:$DATASET

#Delete GCS Bucket
!gsutil rm -r gs://$BUCKET

Limitations

Google-provided base image must be used to package the containers using Docker.

We cannot update streaming jobs using Flex Template.

We cannot use FlexRS for Flex Template jobs.