# How to run the examples on Dataflow

This notebook illustrates a pipeline to stream the raw data from pub/sub to bigquery using dataflow runner and interactive runner .

This pipeline processes the raw data from pub/sub and loads into Bigquery and in parallel it also windows the raw data (using fixed windowing) for every 3 seconds and calculates the mean of sensor values on the windowed data


Note that running this example incurs a small [charge](https://cloud.google.com/dataflow/pricing) from Dataflow.

Let's make sure the dependencies are installed. This allows to load the bq query results to a dataframe to plot the anomalies.



In [None]:
pip install db-dtypes

After you do `pip install db-dtypes` restart the kernel by clicking on the reload icon up top near the navigation menu. Once restarted, proceed with the rest of the steps below.

 Lets make sure the Dataflow API is enabled. This [allows](https://cloud.google.com/apis/docs/getting-started#enabling_apis) your project to access the Dataflow service:

In [None]:
!gcloud services enable dataflow.googleapis.com
!gcloud services enable dataflow

### 1. Start with necessary imports


In [None]:
import re
import json
from datetime import datetime
import apache_beam as beam
import random
import time
from google.cloud import pubsub_v1,bigquery
from apache_beam.options import pipeline_options
from apache_beam.options.pipeline_options import GoogleCloudOptions
from apache_beam.runners import DataflowRunner
from apache_beam.runners.interactive import interactive_runner
from apache_beam import DoFn, GroupByKey, io, ParDo, Pipeline, PTransform, WindowInto, WithKeys,Create,Map , CombineGlobally ,dataframe
import apache_beam.runners.interactive.interactive_beam as ib
import google.auth
import matplotlib.pyplot as plt

publisher  = pubsub_v1.PublisherClient() #Pubsub publisher client
subscriber = pubsub_v1.SubscriberClient() #Pubsub subscriber client
client     = bigquery.Client() #bigquery client

### 2. Set the variables . These variables will be referenced in later sections


In [None]:
dest_project=!gcloud config get-value project
project_id=dest_project[1]
print(project_id)

pubsub_topic                       = project_id + "-" + "topic" 
pubsub_subscription                = pubsub_topic + "-" + "sub"
pubsub_topic_path                  = publisher.topic_path(project_id, pubsub_topic)
pubsub_subscription_path           = subscriber.subscription_path(project_id, pubsub_subscription)

bq_dataset                         = "anomaly_detection_demo"
bigquery_agg_schema                = "sensorID:STRING,sensorValue:FLOAT,windowStart:DATETIME,windowEnd:DATETIME"
bigquery_raw_schema                = "sensorID:STRING,timeStamp:DATETIME,sensorValue:FLOAT"
bigquery_raw_table                 = bq_dataset + ".anomaly_raw_table" 
bigquery_agg_table                 = bq_dataset + ".anomaly_windowed_table" 
region                             = "us-central1"
bucket_name                        = project_id 

### 3: Create Pub/sub topic

In [None]:
!gcloud pubsub topics create {pubsub_topic}

If you get an error that says `Run client channel backup poller: UNKNOWN:pollset_` don't be alarmed it won't effect the job. It is just a formatting issue.

### 4: Create Pub/sub subscription

In [None]:
!gcloud pubsub subscriptions create {pubsub_subscription} --topic={pubsub_topic}

### 5. Create BigQuery Dataset


In [None]:
!bq --location={region} mk --dataset {project_id}:{bq_dataset}

### 6. Create BigQuery Tables

raw big query schema

![raw-schema](Images/raw-schema.png)

aggregated big query schema
![agg-schema](Images/agg-schema.png)

In [None]:
!bq mk --schema {bigquery_raw_schema} -t {bigquery_raw_table}
!bq mk --schema {bigquery_agg_schema} -t {bigquery_agg_table}

If you get an error that says `Run client channel backup poller: UNKNOWN:pollset_` don't be alarmed it won't effect the job. It is just a formatting issue.

### 7. Create GCS Bucket  

In [None]:
!gsutil mb -c standard -l {region} gs://{bucket_name}

### 8. IMPORTANT! open GCS bucket from console and create a folder called dataflow.
path should be  gs://project_id/dataflow


### 9. set the pipeline options

In [None]:
# Setting up the Apache Beam pipeline options.
options = pipeline_options.PipelineOptions(flags={})

# Sets the pipeline mode to streaming, so we can stream the data from PubSub.
options.view_as(pipeline_options.StandardOptions).streaming = True

# Sets the project to the default project in your current Google Cloud environment.
options.view_as(GoogleCloudOptions).project = project_id

# Sets the Google Cloud Region in which Cloud Dataflow runs.
options.view_as(GoogleCloudOptions).region = region

### 10. create the function to format the raw data and processed data

In [None]:
# to add window begin datetime and endtime to the aggregated PCollections.
class FormatDoFn(beam.DoFn):
    def process(self, element, window=beam.DoFn.WindowParam):
        from datetime import datetime
        window_start = datetime.fromtimestamp(window.start)
        window_end = datetime.fromtimestamp(window.end)
        return [{
        'sensorID': element[0],
        'sensorValue': element[1],
        'windowStart': window_start,
        'windowEnd': window_end
        }]                     

In [None]:
# to get the raw PCollections
class ProcessDoFn(beam.DoFn):
    def process(self, element):
        yield element 

### 11. Construct the pipeline 
This step will take the pipeline from pub/sub topic and do some processing. It will process the raw data into raw PCollections and process the aggregated windowed data into aggregated pcollections.

With the aggregated window, the pipeline will read the data from the pub/topic and group the data into 5 sec intervals. Lastly it will calculate the mean of sensor value for each window.

![fixed-window](Images/fixed-window.png)

In [None]:
# Set pipeline options 
p = beam.Pipeline(interactive_runner.InteractiveRunner(), options=options)

# pub/sub => mapped(pcollections)
mapped   = (p  | "ReadFromPubSub" >> beam.io.gcp.pubsub.ReadFromPubSub(subscription=pubsub_subscription_path)
               | "Json Loads" >> Map(json.loads))

# mapped(input pcollections => raw_data(output pcollections)
raw_data = (mapped 
               | 'Format' >> beam.ParDo(ProcessDoFn()))

# mapped(input pcollections) => agg_date(output pcollections)            
agg_data = (mapped 
               | "Map Keys" >> Map(lambda x: (x["SensorID"],x["SensorValue"]))
               | "ApplyFixedWindow" >> beam.WindowInto(beam.window.FixedWindows(5))
               | "Total Per Key" >> beam.combiners.Mean.PerKey()
               | 'Final Format' >> beam.ParDo(FormatDoFn()))                        

Note that the `Pipeline` is constructed by an `InteractiveRunner`, so you can use operations such as `ib.collect` or `ib.show`.
### Important 
Run steps 1-4 in simulator script(PythonSimulator.ipynb) in a separate tab -- (this is to simulate the data and writes to pub/sub topic to test interactiverunner(1 message per millisecond until it reaches 100 messages)

Remember to **only** run steps 1-4 for now. We will come back to this script to run step 5 later.
 

In [None]:
ib.show(agg_data)

### 12.Dataflow Additions

Now, for something a bit different. Because Dataflow executes in the cloud, you need to output to a cloud sink. In this case, you are loading the transformed data into Cloud Storage.

First, set up the `PipelineOptions` to specify to the Dataflow service the Google Cloud project, the region to run the Dataflow Job, and the SDK location.

In [None]:
# IMPORTANT! Adjust the following to choose a Cloud Storage location.
dataflow_gcs_location = "gs://<add your project id>/dataflow"

# Dataflow Staging Location. This location is used to stage the Dataflow Pipeline and SDK binary.
# options.view_as(GoogleCloudOptions).staging_location = dataflow_gcs_location

# Sets the project to the default project in your current Google Cloud environment.
_, options.view_as(GoogleCloudOptions).project = google.auth.default()

# Dataflow Temp Location. This location is used to store temporary files or intermediate results before finally outputting to the sink.
options.view_as(GoogleCloudOptions).temp_location = '%s/temp' % dataflow_gcs_location

# Dataflow job name. when pipeline runs as dataflowrunner.
options.view_as(GoogleCloudOptions).job_name = project_id


In [None]:
# Specifying the bigquery table to write `add_data` to,
# based on the `bigquery_raw_table` variable set earlier.
(raw_data | 'Write raw data to Bigquery' 
 >> beam.io.WriteToBigQuery(
                bigquery_raw_table,
                write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND))
# Specifying the bigquery table to write `add_data` to,
# based on the `bigquery_agg_table` variable set earlier.
(agg_data | 'Write windowed aggregated data to Bigquery' 
 >> beam.io.WriteToBigQuery(
                bigquery_agg_table,
                write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND))

In [None]:
# IMPORTANT! Ensure that the graph is correct before sending it out to Dataflow.
# Because this is a notebook environment, unintended additions to the graph may have occurred when rerunning cells. 
ib.show_graph(p)

### 13.Running the pipeline

Now you are ready to run the pipeline on Dataflow. `run_pipeline()` runs the pipeline and return a pipeline result object.

In [None]:
pipeline_result = DataflowRunner().run_pipeline(p, options=options)

### Important 
![dataflowStatus](Images/dataflowFailed.png)

Before moving forward, check the dataflow job to see if it's running (Hamburger menu->Dataflow->Jobs). If the status shows as `failed`, **rerun** the above cell `pipeline_result = DataflowRunner().run_pipeline(p, options=options)` one more time. This happens when the Dataflow API is not fully enabled. It takes a minute or so for the API to permeate fully.




Using the `pipeline_result` handle, the following code builds a link to the Google Cloud Console web page that shows you details of the Dataflow job you just started:

In [None]:
from IPython.core.display import display, HTML
url = ('https://console.cloud.google.com/dataflow/jobs/%s/%s?project=%s' % 
      (pipeline_result._job.location, pipeline_result._job.id, pipeline_result._job.projectId))
display(HTML('Click <a href="%s" target="_new">here</a> for the details of your Dataflow job!' % url))


dtaflow job
![dataflow-job](Images/DataflowJob.png)

### Important 
Run step5 in simulator script(PythonSimulator.ipynb) that is in a separate tab -- (this is to simulate the data and writes to pub/sub topic to test dataflow runner(1 message per millisecond until it reaches 5000 messages). 

### 14.Checking the raw table results (note: it will take ~90sec to appear the initial data in table due to dataflow warmup time)
raw table results
![raw-table-results](Images/raw-data-results.png)

In [None]:
#check the raw data in BQ raw Table
sql = 'SELECT * FROM `{}` '.format(bigquery_raw_table)
query_job = client.query(sql)  # API request
raw_df = query_job.to_dataframe()
raw_df

### 15.Checking the agg table results
agg table results
![agg-table-results](Images/agg-data-results.png)

In [None]:
#check the agg data in BQ raw Table
sql = 'SELECT   sensorID , case when sensorValue >= 200 then "Anomaly" else "Normal" end as type, sensorValue,row_number() over (order by windowStart) as cycle FROM `{}` '.format(bigquery_agg_table)
query_job = client.query(sql)  # API request
agg_df = query_job.to_dataframe()
agg_df

### 16.Plot the results in a simple scatterplot chart 

Chart will display Anomalies in red color and Normal in Green color
![plot](Images/plot.png)

In [None]:
c=['green' if g=='Normal' else 'red' for g in agg_df['type']]
agg_df.plot(
    kind="scatter",
    x="cycle",
    y="sensorValue"  , c = c, s = 150,
    figsize=(20, 10)    
    )
plt.axhline(y=200, color='black', linestyle='-',linewidth=3)


# Congratulations!!!
End of lab
