# Module 3: Implementing streaming pipelines
1. Dataflow Streaming
2. Challenges in stream processing
3. Build a stream processing pipeline for live traffic data
4. Handle late data; watermarks, triggers, accumulation
5. Lab: Stream data processing pipeline for live traffic data

## 1. Dataflow Streaming
![](img/3-01.png)
![](img/3-02.png)
![](img/3-03.png)
![](img/3-04.png)
![](img/3-05.png)
![](img/3-06.png)
![](img/3-07.png)
![](img/3-08.png)
![](img/3-09.png)
![](img/3-10.png)

## 2. Challenges in stream processing
![](img/3-11.png)
![](img/3-12.png)
![](img/3-13.png)
![](img/3-14.png)
![](img/3-15.png)
![](img/3-16.png)
![](img/3-17.png)
![](img/3-18.png)

## 3. Build a stream processing pipeline for live traffic data
![](img/3-19.png)
![](img/3-20.png)
![](img/3-21.png)
![](img/3-22.png)
![](img/3-23.png)
![](img/3-24.png)
![](img/3-25.png)

## 4. Handle late data; watermarks, triggers, accumulation
![](img/3-26.png)
![](img/3-27.png)
![](img/3-28.png)
![](img/3-29.png)
![](img/3-30.png)
![](img/3-31.png)
![](img/3-32.png)
![](img/3-33.png)
![](img/3-34.png)
![](img/3-35.png)
![](img/3-36.png)
![](img/3-37.png)
![](img/3-38.png)
![](img/3-39.png)
![](img/3-40.png)

## 5. Lab 2: Stream data processing pipeline for live traffic data
![](img/3-41.png)

### Overview
At the time of this writing, streaming pipelines are not available in the DataFlow Python SDK. So the streaming labs are written in Java.

- Launch Dataflow and run a Dataflow job
- Understand how data elements flow through the transformations of a Dataflow pipeline
- Connect Dataflow to Pub/Sub and BigQuery
- Observe and understand how Dataflow autoscaling adjusts compute resources to process input data optimally
- Learn where to find logging information created by Dataflow
- Explore metrics and create alerts and dashboards with Stackdriver Monitoring
### Lab 2: Streaming Data Pipelines
In this lab you will use Dataflow to collect traffic events from simulated traffic sensor data made available through Google Cloud PubSub, process them into an actionable average, and store the raw data in BigQuery for later analysis. You will learn how to start a Dataflow pipeline, monitor it, and, lastly, optimize it.
- Task 1: Preparation
- Task 2: Create a BigQuery Dataset and Cloud Storage Bucket
- Task 3: Simulate traffic sensor data into Pub/Sub
- Task 4: Launch Dataflow Pipeline
- Task 5: Explore the pipeline
- Task 6: Determine throughput rates
- Task 7: Review BigQuery output
- Task 8: Observe and understand autoscaling
- Task 9: Refresh the sensor data simulation script
- Task 10: Stackdriver integration
- Task 11: Explore metrics
- Task 12: Create alerts
- Task 13: Set up dashboards
- Task 14: Launch another streaming pipeline

### Task 1: Preparation
You will be running a sensor simulator from the training VM. In Lab 1 you manually setup the Pub/Sub components. In this lab several of those process are automated.

__Open the SSH terminal and connect to the training VM__

1. In the Console, on the Navigation menu () click Compute Engine > VM instances.
2. Locate the line with the instance called training_vm.
3. On the far right, under 'connect', Click on SSH to open a terminal window.
4. In this lab you will enter CLI commands on the training_vm.<br>
__Verify initialization is complete__
5. The training_vm is installing software in the background. Verify that setup is complete by checking that the following directory exists. If it does not exist, wait a few minutes and try again.<br>
`ls /training`<br>
Wait until setup is complete before proceeding. You can verify the installation of maven with mvn -version and the JDK with java -version.<br>
__Copy files__
6. A repository has been downloaded to the VM. Copy the repository to your home directory.<br>
`cp -r /training/training-data-analyst/ .`<br>
__Set environment variables__
7. On the training_vm SSH terminal enter the following:<br>
`source /training/project_env.sh`<br>
This script sets the **DEVSHELL_PROJECT_ID** and **BUCKET** environment variables

### Task 2: Create a BigQuery Dataset and Cloud Storage Bucket
The Dataflow pipeline will be created later and will write into a table in this dataset.

__Create a BigQuery Dataset__

1. Open the BigQuery web UI. On the Navigation menu () click BigQuery.
2. In the left column, beneath the text box, find your project name. To the right of the project name, click the blue arrow. Choose Create new dataset.


3. In the ‘Create Dataset' dialog, for Dataset ID, type demos and click OK.
__Verify the Cloud Storage Bucket__
A bucket should already exist that has the same name as the Project ID.

4. In the Console, on the Navigation menu () click Storage > Browser.

Observe the following values:

| Property | Value |
| -------------------------- | ------------------ |
| Name | Same as the Project ID |
| Default storage class | [x] Regional |
| Multi-Regional location | Your location |


### Task 3: Simulate traffic sensor data into 
1. In the training_vm SSH terminal, start the sensor simulator. The script reads sample data from a csv file and publishes it to Pub/Sub.
`/training/sensor_magic.sh`
This command will send 1 hour of data in 1 minute. Let the script continue to run in the current terminal.
__Open a second SSH terminal and connect to the training VM__
2. In the upper right corner of the training_vm SSH terminal, click on the gear-shaped button () and select New Connection to training-vm from the drop-down menu. A new terminal window will open.
3. The new terminal session will not have the required environment variables. Run the following command to set them.
4. In the new training_vm SSH terminal enter the following:
5. In the Console, on the Navigation menu () click Pub/Sub>Topics
6. Examine the line for Topic name for the topic sandiego. Notice that Subscriptions are currently at 0.
`source /training/project_env.sh`<br>
__View subscriptions__
7. In the Console, on the Navigation menu () click Pub/Sub>Topics
8. Examine the line for Topic name for the topic sandiego. Notice that Subscriptions are currently at 0.

### Task 4: Launch Dataflow Pipeline
__Verify that Google Cloud Dataflow API is enabled for this project__
1. Return to the browser tab for Console. In the top search bar, enter Dataflow API. This will take you to the page, Navigation > APIs & Services > Dashboard > Google Dataflow API. It will either show a status information or it will give you the option to Enable the API.
2. If necessary, Enable the API.
3. Return to the second training_vm SSH terminal. Change into the directory for this lab.
`cd ~/training-data-analyst/courses/streaming/process/sandiego`
4. Identify the script that creates and runs the Dataflow pipeline.
`cat run_oncloud.sh`
5. Copy-and-paste the following URL into a new browser tab to view the source code on Github.
`https://github.com/GoogleCloudPlatform/training-data-analyst/blob/master/courses/streaming/process/sandiego/run_oncloud.sh`
6. The script requires three arguments: project id, bucket name, classname<br>
**A 4th optional argument is options. The options argument in discussed later in this lab.**<br>
| project id | your Project ID |
| --------------------- | --------------------------------------- |
| bucket name | your Bucket Name |
| classname | java file that runs aggregations |
| options | options |
There are 4 java files that you can choose from for classname. Each reads the traffic data from Pub/Sub and runs different aggregations/computations.
7. Go into the java directory. Identify the source file AverageSpeeds.java.
`cd ~/training-data-analyst/courses/streaming/process/sandiego/src/main/java/com/google/cloud/training/dataanalyst/sandiego` 
`cat AverageSpeeds.java`
What does the script do?<br>
Close the file to continue. You will want to refer to this source code while running the application. So for easy access you will open a new browser tab and view the file AverageSpeeds.java on Github.
8. Copy-and-paste the following URL into a browser tab to view the source code on Github.
`https://github.com/GoogleCloudPlatform/training-data-analyst/blob/master/courses/streaming/process/sandiego/src/main/java/com/google/cloud/training/dataanalyst/sandiego/AverageSpeeds.java`
Leave this browser tab open. You will be referring back to the source code in a later step in this lab.
9. Return to the training_vm SSH terminal. Run the Dataflow pipeline to read from PubSub and write into BigQuery.
`cd ~/training-data-analyst/courses/streaming/process/sandiego`
`./run_oncloud.sh $DEVSHELL_PROJECT_ID $BUCKET AverageSpeeds`
This script uses maven to build a Dataflow streaming pipeline in Java.
Example successful completion:
`[INFO] ------------------------------------------------------------------------
[INFO] BUILD SUCCESS
[INFO] ------------------------------------------------------------------------
[INFO] Total time: 45.542 s
[INFO] Finished at: 2018-06-08T16:51:30+00:00
[INFO] Final Memory: 56M/216M
[INFO] ------------------------------------------------------------------------`

### Task 5: Explore the pipeline
This Dataflow pipeline reads messages from a Pub/Sub topic, parses the JSON of the input message, produces one main output and writes to BigQuery.

1. Return to the browser tab for Console. On the Navigation menu () click Dataflow and click on your job to monitor progress.<br>
__Example:__
![](img/lab2-01.png)
2. After the pipeline is running, click on the Navigation menu () click Pub/Sub>Topics.
3. Examine the line for Topic name for the topic sandiego. Notice that Subscriptions field is now at 1.<br>
__Example:__
![](img/lab2-02.png)
4. Return to the Navigation menu () click Dataflow and click on your job.
5. Compare the code in the Github browser tab, AverageSpeeds.java and the pipeline graph in the page for your Dataflow job.
6. Find the "GetMessages" pipeline step in the graph, and then find the corresponding code in the AverageSpeeds.java file. This is the pipeline step that reads from the Pub/Sub topic. It creates a collection of Strings - which corresponds to Pub/Sub messages that have been read.<br>
**- Do you see a subscription created?**<br>
**- How does the code pull messages from Pub/Sub?**<br>
7. Find the "Time Window" pipeline step in the graph and in code. In this pipeline step we create a window of a duration specified in the pipeline parameters (sliding window in this case). This window will accumulate the traffic data from the previous step until end of window, and pass it to the next steps for further transforms.<br>
**- What is the window interval?**<br>
**- How often is a new window created?**<br>
8. Find the "BySensor" and "AvgBySensor" pipeline steps in the graph, and then find the corresponding code snippet in the AverageSpeeds.java file. This "BySensor" does a grouping of all events in the window by sensor id, while "AvgBySensor" will then compute the mean speed for each grouping.
9. Find the "ToBQRow" pipeline step in the graph and in code. This step simply creates a "row" with the average computed from previous step together with the lane information.<br>
`In practice, other actions could be taken in the ToBQRow step. For example, it could compare the calculated mean against a predefined threshold and log the results of the comparison in Stackdriver Logging.`
10. Find the "BigQueryIO.Write" in both the pipeline graph and in the source code. This step writes the row out of the pipeline into a BigQuery table. Because we chose the WriteDisposition.WRITE_APPEND write disposition, new records will be appended to the table.
11. Return to the BigQuery web UI tab Or open it from the Navigation menu () click BigQuery. Refresh your browser.
12. In the left column, beneath the text box, find your project name and the demos dataset you created. The small blue arrow to the left should now be active and clicking on it will reveal the average_speeds table.
Example:
![](img/lab2-02.png)

### Task 6: Determine throughput rates
One common activity when monitoring and improving Dataflow pipelines is figuring out how many elements the pipeline processes per second, what the system lag is, and how many data elements have been processed so far. In this activity you will learn where in the Cloud Console one can find information about processed elements and time.

1. Return to the browser tab for Console. On the Navigation menu () click Dataflow and click on your job to monitor progress (it will have your username in the pipeline name).
2. Select the "GetMessages" pipeline node in the graph and look at the step metrics on the right.<br>
**- System Lag** is an important metric for streaming pipelines. It represents the amount of time data elements are waiting to be processed since they "arrived" in the input of the transformation step.<br>
**- Elements Added** metric under output collections tells you how many data elements exited this step (for the **"Read PubSub Msg"** step of the pipeline it also represents the number of Pub/Sub messages read from the topic by the Pub/Sub IO connector).<br>
3. Select the "Time Window" node in the graph. Observe how the Elements Added metric under the Input Collections of the "Time Window" step matches the Elements Added metric under the Output Collections of the previous step "GetMessages".

### Task 7: Review BigQuery output
1. Return to the BigQuery web UI or on the Navigation menu () click BigQuery.
`Streaming data and tables may not show up immediately, and the Preview feature may not be available for data that is still in the streaming buffer. If you click on Preview you will see the message "This table has records in the streaming buffer that may not be visible in the preview." You can still run queries to view the data.`
2. Use the following query to observe the output from the Dataflow job. Replace PROJECTID with your Project ID. It is listed under connection details in Qwiklabs.
`SELECT * 
FROM [<PROJECTID>:demos.average_speeds] 
ORDER BY timestamp DESC
LIMIT 100`
3. Find the last update to the table by running the following SQL.
`SELECT
  MAX(timestamp)
FROM
  [<PROJECTID>:demos.average_speeds]`
4. Use the BigQuery Table Decorator to look at results in the last 10 minutes.
`SELECT
  *
FROM
  [<PROJECTID>:demos.average_speeds@-600000]
ORDER BY
  timestamp DESC`
  
Use the BigQuery Invalid Snapshot Time, try reducing the 600000 to 100000.

### Task 8: Observe and understand autoscaling
Observe how Dataflow scales the number of workers to process the backlog of incoming Pub/Sub messages.

1. Return to the browser tab for Console. On the Navigation menu () click Dataflow and click on your pipeline job.
2. Examine the **Job summary** panel on the right, and review the **Autoscaling section**. How many workers are currently being used to process messages in the Pub/Sub topic?
3. Click on **"See more history"** and review how many workers were used at different points in time during pipeline execution.
4. The data from a traffic sensor simulator started at the beginning of the lab creates hundreds of messages per second in the Pub/Sub topic. This will cause Dataflow to increase the number of workers to keep the system lag of the pipeline at optimal levels.
5. Click on **See more history**. In the Worker History pop-up, you can see how Dataflow changed the number of workers. Notice the **Rationale** column that explains the reason for the change.

### Task 9: Refresh the sensor data simulation script
`The training lab environment has quota limits. If the sensor data simulation script runs too long it will pass a quota limit, causing the session credentials to be suspended.`

1. Return to the training_vm SSH terminal where the sensor data is script is running.
2. If you see messages that say "INFO: Publishing" then the script is still running. Press CRTL-C to stop it. Then issue the command to start the script again.
`cd ~/training-data-analyst/courses/streaming/publish
./send_sensor_data.py --speedFactor=60 --project DEVSHELL_PROJECT_ID`<br>
3. If the script has passed the quota limit, you will see repeating error messages that "credentials could not be refreshed" and you may not be able to use CTRL-C to stop the script. Simply close the SSH terminal. Open a new SSH terminal. The new session will have a fresh quota.
4. In the Console, on the Navigation menu () click Compute Engine > VM instances.
5. Locate the line with the instance called training_vm.
6. On the far right, under 'connect', Click on SSH to open a second terminal window.
7. In the training_vm SSH terminal, enter the following to create environment variables.<br>
`source /training/project_env.sh`
8. Use the following commands to start a new sensor simulator.<br>
`cd ~/training-data-analyst/courses/streaming/publish
./send_sensor_data.py --speedFactor=60 --project DEVSHELL_PROJECT_ID`

### Task 10: Stackdriver integration
Stackdriver Monitoring integration with Dataflow allows users to access Dataflow job metrics such as System Lag (for streaming jobs), Job Status (Failed, Successful), Element Counts, and User Counters from within Stackdriver.

#### Monitoring Integration features of Stackdriver
- **Explore Dataflow Metrics**: Browse through available Dataflow pipeline metrics and visualize them in charts.
Some common Dataflow metrics.

|Job status|Job status (Failed, Successful), reported as an enum every 30 secs and on update.|
| ------------------ | ------------------------------------------------------------------------------- |
| Elapsed time | Job elapsed time (measured in seconds), reported every 30 secs. |
| System lag | Max lag across the entire pipeline, reported in seconds. |
| Current vCPU count | Current # of virtual CPUs used by job and updated on value change. |
| Estimated byte count | Number of bytes processed per PCollection. |

- **Chart Dataflow metrics in Stackdriver Dashboards:** Create Dashboards and chart time series of Dataflow metrics.
- **Configure Alerts:** Define thresholds on job or resource group-level metrics and alert when these metrics reach specified values. Stackdriver alerting can notify on a variety of conditions such as long streaming system lag or failed jobs.
- **Monitor User-Defined Metrics:** In addition to Dataflow metrics, Dataflow exposes user-defined metrics (SDK Aggregators) as Stackdriver custom counters in the Monitoring UI, available for charting and alerting. Any Aggregator defined in a Dataflow pipeline will be reported to Stackdriver as a custom metric. Dataflow will define a new custom metric on behalf of the user and report incremental updates to Stackdriver approximately every 30 seconds.

### Task 11: Explore metrics
Stackdriver monitoring is a separate service in Google Cloud Platform. So you will need to go through some setup steps to initialize the service for your lab account.

#### Setup Stackdriver account
1. Return to the browser tab for Console. On the Navigation menu () click Stackdriver > Monitoring.
2. Click Log in with Google.
3. Click Create Account.
4. Click Continue.
5. Click Skip AWS Setup.
6. Click Continue.
7. Select No Reports and click Continue.
8. It may take a few minutes for Stackdriver to import project information about your lab account and the resources already being used. Once the Launch monitoring button becomes active, click Launch monitoring.
9. The trial version of Stackdriver provides the Premium Tier of service. So upgrading simply sets up billing so the account will not revert to Basic Tier at the end of 30 days.
10. Click on Continue with the trial. (You can also click on 'Dismiss' on the message bar at the top asking if you want to upgrade).
**Explore Stackdriver Metrics**
11. In the panel to the left click on Resources > Metrics Explorer
12. In the Metrics Explorer, find and select the Dataflow_job resource type. You should see a list of available Dataflow-related metrics.
![](img/lab2-04.png)
13. Select the resource Dataflow Job and the metric Data watermark lag.
14. Stackdriver will draw a graph on the right side of the page.
15. Under Find resource type and metric, click on the (x) to remove the Data watermark lag metric. Select a new metric, System Lag.<br>
`The metrics that Dataflow provides to Stackdriver are listed here:
https://cloud.google.com/monitoring/api/metrics_gcp
(Search on the page for Dataflow).
The metrics you have viewed are useful indicators of pipeline performance.
Data watermark age: The age (time since event timestamp) of the most recent item of data that has been fully processed by the pipeline.
System lag: The current maximum duration that an item of data has been awaiting processing, in seconds.`

### Task 12: Create alerts
If you want to be notified when a certain metric crosses a specified threshold (for example, when System Lag of our lab streaming pipeline increases above a predefined value), you could use the Alerting mechanisms of Stackdriver to accomplish that.

#### Create an alert
1. On the Stackdriver Monitoring click on Stackdriver > Alerting > Policies Overview.
2. Click on Add Policy.
3. On the Create new Alerting Policy page click on Add Condition.
4. On the Metric Threshold row, click Select.
5. In the Target section, set the RESOURCE TYPE to Dataflow Job.
6. Under APPLIES TO, select Single.
7. Select the resource you used in the previous task.
8. In the Configuration section, set IF METRIC to System Lag.
9. Set CONDITION to above.
10. Set THRESHOLD to 5
11. Set FOR to 1 minute.
12. Click on Save Condition to save the alert.<br>
**Add a notification**
13. Under Notification, click on the pulldown menu to view the options for notification channel. You can set up a notification policy if you would like, using your email address.
14. In the Name this policy section, give the policy a name such as MyAlertPolicy.
15. Click on Save Policy.<br>
**View events**
16. On the Stackdriver tab, click on Alerting > Events.
17. Every time an alert is triggered by a Metric Threshold condition, an Incident and a corresponding Event are created in Stackdriver. If you specified a notification mechanism in the alert (email, SMS, pager, etc), you will also receive a notification.

### Task 13: Set up dashboards
You can easily build dashboards with the most relevant Dataflow-related charts with Stackdriver Monitoring Dashboards.

1. On the Stackdriver tab, click on Dashboards > Create dashboard.
2. Click on Add Chart.
3. On the Add Chart page:
4. In the Find resource type and metric box, start typing Dataflow Job and then select it as the Resource Type.
5. After you select a Resource Type, the Metric field menu will appear. Select a metric to chart, such as System Lag.
6. in the Filter panel, select project, then the equals sign '=', then your Project ID.
7. click Save.
Example:
![](img/lab2-05.png)


You can add more charts to the dashboard, if you would like, for example, Pub/Sub publish rates on the topic, or subscription backlog (which is a signal to the Dataflow auto-scaler).

### Task 14: Launch another streaming pipeline
1. In the training_vm SSH terminal, examine the CurrentConditions.java application. Do not make any changes to the code.
`cd ~/training-data-analyst/courses/streaming/process/sandiego/src/main/java/com/google/cloud/training/dataanalyst/sandiego 

cat CurrentConditions.java`
2. Copy-and-paste the following URL into a browser tab to view the source code on Github.
`https://github.com/GoogleCloudPlatform/training-data-analyst/blob/master/courses/streaming/process/sandiego/src/main/java/com/google/cloud/training/dataanalyst/sandiego/CurrentConditions.java`<br>
What does the script do?
3. Run the CurrentConditions.java code in a new Dataflow pipeline; this script is simpler in the sense that it does not do many transforms like AverageSpeeds. The results will be used in the next lab to build dashboards and run some transforms (functions) while retrieving data from BigQuery.
4. In the training_vm SSH terminal, enter the following:<br>
`cd ~/training-data-analyst/courses/streaming/process/sandiego
./run_oncloud.sh $DEVSHELL_PROJECT_ID $BUCKET CurrentConditions`
5. Return to the browser tab for Console. On the Navigation menu () click Dataflow and click on the new pipeline job. Confirm that the pipeline job is listed and verify that it is running without errors.
6. It will take several minutes before the current_conditions table appears in BigQuery.

#### Completion
#### Cleanup
In the Cloud Platform Console, sign out of the Google account.

Close the browser tab.

End your lab

## Quiz
### Question 1
The Dataflow models provides constructs that map to the four questions that are relevant in any out-of-order data processing pipeline:

| Questions | Constructs|
| ------------------------------------------------ | ----------------------------------------------|
| __ 1. What results are calculated? | A. Answered via Event-time windowing |
| __ 2. Where in event time are results calculated?	| B. Answered via transformations |
| __ 3. When in processing time are results materialized? | C. Answered via Accumulation modes |
| __ 4. How do refinements of results relate? | D. Answered via Watermarks, triggers, and allowed lateness. |

1. 
1-A
2-D
3-C
4-B

2. ** 
1-B
2-A
3-D
4-C**

3. 
1-C
2-A
3-D
4-B

4. 
1-D
2-B
3-A
4-C