Authenticate with Google Cloud CLI


In [None]:
!gcloud init

Create credentials file

In [None]:
!gcloud auth application-default login

Create resources on Google Cloud

In [None]:
# Google Cloud Storage Temporary Files
!gsutil mb gs://dataflow_temp_storage_<name>

In [None]:
# Destination Dataset (Data Warehouse)
!bq --location=US mk --dataset "<project_id>:nyctaxi"

In [None]:
# Create pubsub subscription to pull messages from streaming source
!gcloud pubsub subscriptions create "taxirides" --topic="projects/pubsub-public-data/topics/taxirides-realtime"

Install Python packages

In [None]:
!pip install pandas
!pip install fsspec
!pip install gcsfs
!pip install 'apache-beam[gcp]'

Import libraries

In [None]:
import json
import apache_beam as beam
from datetime import date
from apache_beam.io.gcp.pubsub import ReadFromPubSub
from apache_beam.io.gcp.bigquery import WriteToBigQuery
from apache_beam.options.pipeline_options import PipelineOptions

Initialising variables

In [None]:
custom_gcs_temp_location = "gs://dataflow_temp_storage_<name>"
subscription_name = "projects/<project_id>/subscriptions/taxirides"
output_table = "<project_id>:nyctaxi.taxirides-realtime"
schema = """ride_id:STRING,
point_idx:INTEGER,
latitude:FLOAT64,
longitude:FLOAT64,
timestamp:TIMESTAMP,
meter_reading:FLOAT64,
meter_increment:FLOAT64,
ride_status:STRING,
passenger_count:INTEGER"""

Data processing function

In [None]:
def process_columns(element):
    data = element.decode("utf-8")
    data = json.loads(data)
    return data

Running apache beam pipeline

In [None]:
beam_options = PipelineOptions(
    streaming=True,
    runner='DataflowRunner',
    project='<project_id>',
    job_name='<job_name>',
    staging_location="gs://dataflow_temp_storage_<name>/staging",
    temp_location='gs://dataflow_temp_storage_<name>/temp',
    template_location="gs://dataflow_temp_storage_<name>/templates/streamingpipeline",
    region='us-central1')

with beam.Pipeline(options=beam_options) as pipeline:

    input_data = pipeline | "Read from Pub/Sub" >> ReadFromPubSub(subscription=subscription_name)

    processed_data = input_data | 'ProcessColumns' >> beam.Map(process_columns)

    processed_data  | 'WriteToBigQuery' >> WriteToBigQuery(
         table=output_table,
         schema=schema,
         custom_gcs_temp_location=custom_gcs_temp_location,
         create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED,
         write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND)

Add all the code in a python file and create Dataflow template

In [None]:
!python -m streamingpipeline \
  --runner DataflowRunner \
  --project <project_id> \
  --staging_location gs://dataflow_temp_storage_<name>/staging \
  --temp_location gs://dataflow_temp_storage_<name>/temp \
  --template_location gs://dataflow_temp_storage_<name>/templates/dataproc-job \
  --region us-central1 \

Go to console and enable Dataflow, Data Pipeline, Cloud Scheduler API
Run Dataflow job.

In [None]:
!gcloud dataflow jobs run tripdata --gcs-location gs://dataflow_temp_storage_909090/templates/streamingpipeline --region us-central1