##### Copyright 2020 Google Inc.

Licensed under the Apache License, Version 2.0 (the "License").
<!--
    Licensed to the Apache Software Foundation (ASF) under one
    or more contributor license agreements.  See the NOTICE file
    distributed with this work for additional information
    regarding copyright ownership.  The ASF licenses this file
    to you under the Apache License, Version 2.0 (the
    "License"); you may not use this file except in compliance
    with the License.  You may obtain a copy of the License at

      http://www.apache.org/licenses/LICENSE-2.0

    Unless required by applicable law or agreed to in writing,
    software distributed under the License is distributed on an
    "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    KIND, either express or implied.  See the License for the
    specific language governing permissions and limitations
    under the License.
-->


# Using Apache Beam on Google Cloud Dataflow to process streaming data

This example demonstrates how to set up a streaming pipeline that processes a stream that contains NYC taxi ride data.

It's a modified version of an example from Google's example repository, and it reads the data from a public Google Cloud Pub/Sub topic that Google created for people to be able to test streaming data processing use-cases.

Each element in the stream contains the location of the taxi, the timestamp, the meter reading, the meter increment, the passenger count, and ride status in JSON format.

We will use Apache Beam to define the pipeline, run it on Google Cloud Dataflow, and store the results in Google Cloud BigQuery.
To get a better understanding of the Apache Beam constructs we use in this example, see [Apache Beam Basics](https://beam.apache.org/documentation/basics/), and the [Apache Beam Programming Guide](https://beam.apache.org/documentation/programming-guide/).


Start with the necessary imports:

In [1]:
import apache_beam as beam
from apache_beam.options.pipeline_options import GoogleCloudOptions
import google.auth
import json

The following is only needed in order to avoid pip version warnings later in the notebook:

In [2]:
!/jupyter/.kernels/apache-beam-2.46.0/bin/python -m pip install --upgrade pip

/bin/bash: /opt/conda/lib/libtinfo.so.6: no version information available (required by /bin/bash)


Specify the Google Cloud Pub/Sub topic that we will read from, and the Google Cloud Storage bucket that Dataflow will use to store the data in transit. 

**IMPORTANT:
Replace GCS-BUCKET-NAME with your own GCS bucket name (you created a GCS bucket earlier in this book).**

In [3]:
# Pub/Sub source topic
topic = "projects/pubsub-public-data/topics/taxirides-realtime"

# Google Cloud Storage location.
dataflow_gcs_location = 'gs://GCS-BUCKET-NAME/dataflow'

Specify the options to create the streaming pipeline.
For more information, see [Apache Beam Pipeline Options](https://beam.apache.org/releases/pydoc/2.4.0/apache_beam.options.pipeline_options.html)

In [4]:
# Setting up the Beam pipeline options.
options = beam.options.pipeline_options.PipelineOptions(flags={})

# Sets the pipeline mode to streaming, so we can stream the data from PubSub.
options.view_as(beam.options.pipeline_options.StandardOptions).streaming = True

# Sets the project to the default project in your current Google Cloud environment.
# The project will be used for creating a subscription to the PubSub topic.
_, options.view_as(GoogleCloudOptions).project = google.auth.default()

Create a pipeline with the options we just created:

In [5]:
p = beam.Pipeline(options=options)

The following creates a `PTransform` that will create a subscription to the given Pub/Sub topic and reads from the subscription. 
The data is in JSON format, so we add another `Map` `PTransform` to parse the data as JSON.

In [6]:
data = p | "read" >> beam.io.ReadFromPubSub(topic=topic) | beam.Map(json.loads)

Because we are reading from an unbounded source, we need to create a windowing scheme.
We will use sliding windows with a 10-second duration for each window, and with one second for each slide.
For more information about windowing in Apache Beam, see [Windowing Basics](https://beam.apache.org/documentation/programming-guide/#windowing-basics).


In [7]:
windowed_data = (data | "window" >> beam.WindowInto(beam.window.SlidingWindows(10, 1)))

Note that there will be some duplicate data for each element because with sliding windows each element has to appear in multiple
windows.

Now let's calculate the 10-second dollar run rate for each second, by summing the `meter_increment` JSON field for each window.

First, extract the `meter_increment` field from the JSON object.

In [8]:
meter_increments = windowed_data | beam.Map(lambda e: e.get('meter_increment'))

Now sum all elements by window:

In [9]:
run_rates = meter_increments | beam.CombineGlobally(sum).without_defaults()

Now we have our pipeline defined. Next, we will specify some additional options in order to run this pipeline on Google Cloud Dataflow.

In [10]:
# Set the Google Cloud region to run Dataflow.
options.view_as(GoogleCloudOptions).region = 'us-central1'

# Set the staging location. This location is used to stage the
# Dataflow pipeline and SDK binary.
options.view_as(GoogleCloudOptions).staging_location = '%s/staging' % dataflow_gcs_location

# Set the temporary location. This location is used to store temporary files
# or intermediate results before outputting to the sink.
options.view_as(GoogleCloudOptions).temp_location = '%s/temp' % dataflow_gcs_location


Next, we specify the schema for our BigQuery Table. Dataflow will create the table in BigQuery, with this schema. The outputs are just the aggregated run_rates, so our schema consists of just one field.

In [11]:
table_schema = {
    'fields': [{
        'name': 'run_rates', 'type': 'NUMERIC', 'mode': 'NULLABLE'
    }]
}

The final step in the process is to write the results (i.e., the contents of the run_rates variable we created above) to BigQuery. However, the run_rates variable currently just contains numeric data, whereas the WriteToBigQuery transform expects a dictionary with a key-value pair that matches the schema. We also want to ensure that we don't exceed the size of BigQuery's NUMERIC datatype, so we round the rate value to 5 decimal places just to be safe. The following code will perform the necessary conversion:

In [12]:
# Convert run_rates to a dictionary
def to_dict(rate):
    return {'run_rates': round(rate, 5)}

run_rates_dict = run_rates | 'ConvertToDict' >> beam.Map(to_dict)

Now the data is ready to be written to BigQuery, and the following code will create the BigQuery sink for that purpose.
**IMPORTANT:
Replace GOOGLE-CLOUD-PROJECT with the name of your Google Cloud project**

In [13]:
# Write results to BigQuery
run_rates_dict | 'WriteToBigQuery' >> beam.io.WriteToBigQuery(
    table='GOOGLE-CLOUD-PROJECT.taxirides.run_rates_table',
    schema=table_schema,
    create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED,
    write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND)

<apache_beam.io.gcp.bigquery.WriteResult at 0x7f55fb1a0790>

Okay, now we have defined all of the steps in our data processing pipeline, and now we will run the pipeline on Google Cloud Dataflow. After running this cell, it will display details regarding the pipeline result, and you can go to the [Dataflow Jobs Console](https://console.cloud.google.com/dataflow/jobs/) to view the additional job details and execution status.

In [14]:
runner = beam.runners.DataflowRunner()
runner.run_pipeline(p, options=options)

<DataflowPipelineResult <Job
 clientRequestId: '20230507205544023415-5911'
 createTime: '2023-05-07T20:55:46.191755Z'
 currentStateTime: '1970-01-01T00:00:00Z'
 id: '2023-05-07_13_55_45-17532039298532056386'
 location: 'us-central1'
 name: 'beamapp-root-0507205544-022088-qxs2zhy3'
 projectId: 'still-sight-352221'
 stageStates: []
 startTime: '2023-05-07T20:55:46.191755Z'
 steps: []
 tempFiles: []
 type: TypeValueValuesEnum(JOB_TYPE_STREAMING, 2)> at 0x7f55fae7bdc0>