# Data Journey Day 1: Streaming Data Transformation Pipeline

<table align="left">

  <td>
    <a href="https://github.com/AmritRaj23/data-journey/blob/main/day-1/ETL%20(Extract%20Transform%20Load)/Dataflow/DataJourneyStreamingTransformation.ipynb">
      <img src="https://cloud.google.com/ml-engine/images/github-logo-32px.png" alt="GitHub logo">
      View on GitHub
    </a>
  </td>
  <td>
    <a href="https://console.cloud.google.com/vertex-ai/workbench/deploy-notebook?download_url=https://github.com/AmritRaj23/data-journey/blob/main/day-1/ETL%20(Extract%20Transform%20Load)/Dataflow/DataJourneyStreamingTransformation.ipynb">
      <img src="https://lh3.googleusercontent.com/UiNooY4LUgW_oTvpsNhPpQzsstV5W8F7rYgxgGBD85cWJoLmrOzhVs_ksK_vgx40SHs7jCqkTkCk=e14-rj-sc0xffffff-h130-w32" alt="Vertex AI logo">
      Open in Vertex AI Workbench
     </a>
  </td>
</table>
</table>
<br/><br/><br/>

This Notebook demonstrates the utilization of the Apache Beam Framework to develop streaming data ingestion, transformation and sink.

In [1]:
import logging
import json
import time
import traceback

import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.options import pipeline_options
from apache_beam.io.gcp.pubsub import ReadFromPubSub
from apache_beam.io.gcp.bigquery import BigQueryDisposition, WriteToBigQuery
from apache_beam.io import WriteToText

from apache_beam.runners.interactive.interactive_runner import InteractiveRunner
import apache_beam.runners.interactive.interactive_beam as ib
from apache_beam.runners import DataflowRunner

import google.auth

## Recap: Apache Beam Programming Model & Syntax

The Apache Beam data processing framework supports various "runners".

The "Interactive Runner" comes handy for demonstration of the basic Apache Beam concepts.

Let's demonstrate a data ingestion from Pub/Sub, apply some simple transformations using Apache Beam.
The Interactive Runner allows us to peak at the result.

![image.png](attachment:4b77c33d-5f91-43fc-94e4-a74acd1f51d6.png)

On the highest level, Apache Beam abstracts data processing into Transforms and Pcollections.

Important Python Syntax here is:<br>
`Resulting PCollection` = (`Initial PCollection` | `Transformation Name` >> `Transformation User Code`)<br>
<br>
`|` Pipe operator corresponds to .apply().<br>
`>>` Separates an optional transformation name string from the actual transformation.

## Implementing an E-Commerce Analytics Pipeline

![image.png](attachment:4a809a67-5fbd-430d-8bfe-3aaa5837d782.png)

The json messages we are sending to the endpoint may look the following:


### Viewed item:

```
{
  "event_datetime":"2020-11-16 22:59:59",
  "event": "view_item",
  "user_id": "UID00003",
  "client_id": "CID00003",
  "page":"/product-67890",
  "page_previous": "/category-tshirts",
  "ecommerce": {
    "items": [{
      "item_name": "Donut Friday Scented T-Shirt",
      "item_id": "67890",
      "price": 33.75,
      "item_brand": "Google",
      "item_category": "Apparel",
      "item_category_2": "Mens",
      "item_category_3": "Shirts",
      "item_category_4": "Tshirts",
      "item_variant": "Black",
      "item_list_name": "Search Results",
      "item_list_id": "SR123",
      "index": 1,
      "quantity": 1
    }]
  }
```

### Item added to cart:
```
{
  "event_datetime":"2020-11-16 20:59:59",
  "event": "add_to_cart",
  "user_id": "UID00003",
  "client_id": "CID00003",
  "page":"/product-67890",
  "page_previous": "/category-tshirts",
  "ecommerce": {
    "items": [{
      "item_name": "Donut Friday Scented T-Shirt",
      "item_id": "67890",
      "price": 33.75,
      "item_brand": "Google",
      "item_category": "Apparel",
      "item_category_2": "Mens",
      "item_category_3": "Shirts",
      "item_category_4": "Tshirts",
      "item_variant": "Black",
      "item_list_name": "Search Results",
      "item_list_id": "SR123",
      "index": 1,
      "quantity": 2
    }]
  }
```

In [2]:
# Setting some basic pipeline options.
project = google.auth.default()[1]

options = pipeline_options.PipelineOptions(
    streaming=True,
    project=project
)

In [3]:
# Telling the interactive runner to listen for events for one minute.
ib.options.recording_duration = '1m'

The pipeline listens to the given Pub/Sub topic, parses the received message for it's json content and applies an arbitrary transformation on every record. Once we send messages to the Pub/Sub topic, we can see the resulting PCollections printed out.

In [None]:
# Defining a pipeline object.
p = beam.Pipeline(InteractiveRunner(), options=options)

# Defining the Pub/Sub Subscription to read from.
subscription = "projects/jp-sandbox-359611/subscriptions/ecommerce_interactive_beam"

# Defining the pipeline.
pubsub = (p | "Read Topic" >> ReadFromPubSub(subscription=subscription) # Listining to Pub/Sub.
            | "Parse json" >> beam.Map(json.loads) # Parsing json from message string.
            | "Drop Col" >> beam.Map(lambda input: {'event_datetime': input['event_datetime'], # Dropping and renaming columns.
                                                    'event': input['event'],
                                                    'user_id':  input['user_id'],
                                                    'client_id': input['client_id'],
                                                    'page': input['page'],
                                                    'page_previous': input['page_previous']})
         )

ib.show(pubsub) # Display resulting PCollection in Notebook.

In [12]:
# Defining a pipeline object.
p = beam.Pipeline(InteractiveRunner(), options=options)

# Defining the Pub/Sub Subscription to read from.
subscription = "projects/jp-sandbox-359611/subscriptions/ecommerce_interactive_beam"

# Receiving message from Pub/Sub & parsing json from string.
events = (p
                | "Read Topic" >> ReadFromPubSub(subscription=subscription) # Listining to Pub/Sub.
                | "Parse json" >> beam.Map(json.loads) # Parsing json from message string.
                | "item view row" >> beam.Map(lambda input: {'event_datetime': input['event_datetime'], # Dropping and renaming columns.
                                                  'event': input['event'],
                                                  'user_id':  input['user_id'],
                                                  'client_id': input['client_id'],
                                                  'page': input['page'],
                                                  'page_previous': input['page_previous'],
                                                  "item_name": input['ecommerce']['items'][0]["item_name"],
                                                  "item_id": input['ecommerce']['items'][0]["item_id"],
                                                  "price": input['ecommerce']['items'][0]["price"],
                                                  "item_brand": input['ecommerce']['items'][0]["item_brand"],
                                                  "item_category": input['ecommerce']['items'][0]["item_category"],
                                                  "item_category_2": input['ecommerce']['items'][0]["item_category_2"],
                                                  "item_category_3": input['ecommerce']['items'][0]["item_category_3"],
                                                  "item_category_4": input['ecommerce']['items'][0]["item_category_4"],
                                                  "item_variant": input['ecommerce']['items'][0]["item_variant"],
                                                  "item_list_name": input['ecommerce']['items'][0]["item_list_name"],
                                                  "item_list_id": input['ecommerce']['items'][0]["item_list_id"],
                                                  "quantity": input['ecommerce']['items'][0]["quantity"]
                                                 })
         )

# Display resulting PCollection in Notebook.
ib.show(events) 



<IPython.core.display.Javascript object>

## Serverless Beam Data Processing using the Dataflow Runner

The Dataflow Runner allows running any Apache Beam pipeline completely serverless.

In [10]:
# Defining event filter functions.
def is_item_view(event):
    return event['event'] == 'view_item'

def is_add_to_cart(event):
    return event['event'] == 'add_to_cart'


def streaming_pipeline(project, region="us-central1"):
    topic = "projects/{}/topics/ecommerce_interactive_beam".format(project)
    subscription = "projects/jp-sandbox-359611/subscriptions/ecommerce_interactive_beam"
    item_views_table = "{}:retail_dataset.item_views_sink".format(project)
    add_to_carts_table = "{}:retail_dataset.add_to_carts_sink".format(project) 
    schema = "event_datetime:DATETIME, event:STRING, user_id:STRING, client_id:STRING, page:STRING, page_previous:STRING, " \
    "item_name:STRING, item_id:STRING, price:STRING, item_brand:STRING, item_category:STRING, item_category_2:STRING, item_category_3:STRING, " \
    "item_category_4:STRING, item_variant:STRING, item_list_name:STRING, item_list_id:STRING, quantity:STRING"
    bucket = "gs://interactive-df"
    
    # Defining pipeline options.
    options = PipelineOptions(
        streaming=True,
        project=project,
        region=region,
        staging_location="%s/staging" % bucket,
        temp_location="%s/temp" % bucket,
        subnetwork='regions/us-central1/subnetworks/terraform-network'
    )
        
    # Defining pipeline.
    p = beam.Pipeline(DataflowRunner(), options=options)
    
    # Receiving message from Pub/Sub & parsing json from string.
    json_message = (p
                    | "Read Topic" >> ReadFromPubSub(subscription=subscription) # Listining to Pub/Sub.
                    | "Parse json" >> beam.Map(json.loads) # Parsing json from message string.
             )

    # Extracting Item Views.
    item_views = (json_message 
                | 'Filter for item views' >> beam.Filter(is_item_view)
                | "item view row" >> beam.Map(lambda input: {'event_datetime': input['event_datetime'], # Dropping and renaming columns.
                                                      'event': input['event'],
                                                      'user_id':  input['user_id'],
                                                      'client_id': input['client_id'],
                                                      'page': input['page'],
                                                      'page_previous': input['page_previous'],
                                                      "item_name": input['ecommerce']['items'][0]["item_name"],
                                                      "item_id": input['ecommerce']['items'][0]["item_id"],
                                                      "price": input['ecommerce']['items'][0]["price"],
                                                      "item_brand": input['ecommerce']['items'][0]["item_brand"],
                                                      "item_category": input['ecommerce']['items'][0]["item_category"],
                                                      "item_category_2": input['ecommerce']['items'][0]["item_category_2"],
                                                      "item_category_3": input['ecommerce']['items'][0]["item_category_3"],
                                                      "item_category_4": input['ecommerce']['items'][0]["item_category_4"],
                                                      "item_variant": input['ecommerce']['items'][0]["item_variant"],
                                                      "item_list_name": input['ecommerce']['items'][0]["item_list_name"],
                                                      "item_list_id": input['ecommerce']['items'][0]["item_list_id"],
                                                      "quantity": input['ecommerce']['items'][0]["quantity"]
                                                     })
             )

    # Extracting Add To Carts.
    add_to_carts = (json_message 
                | 'Filter for add to cart' >> beam.Filter(is_add_to_cart)
                | "add to cart row" >> beam.Map(lambda input: {'event_datetime': input['event_datetime'], # Dropping and renaming columns.
                                                      'event': input['event'],
                                                      'user_id':  input['user_id'],
                                                      'client_id': input['client_id'],
                                                      'page': input['page'],
                                                      'page_previous': input['page_previous'],
                                                      "item_name": input['ecommerce']['items'][0]["item_name"],
                                                      "item_id": input['ecommerce']['items'][0]["item_id"],
                                                      "price": input['ecommerce']['items'][0]["price"],
                                                      "item_brand": input['ecommerce']['items'][0]["item_brand"],
                                                      "item_category": input['ecommerce']['items'][0]["item_category"],
                                                      "item_category_2": input['ecommerce']['items'][0]["item_category_2"],
                                                      "item_category_3": input['ecommerce']['items'][0]["item_category_3"],
                                                      "item_category_4": input['ecommerce']['items'][0]["item_category_4"],
                                                      "item_variant": input['ecommerce']['items'][0]["item_variant"],
                                                      "item_list_name": input['ecommerce']['items'][0]["item_list_name"],
                                                      "item_list_id": input['ecommerce']['items'][0]["item_list_id"],
                                                      "quantity": input['ecommerce']['items'][0]["quantity"]
                                                     })
             )
    
    # Writing the PCollections to two differnt BigQuery tables.
    item_views | "Write Items Views To BigQuery" >> WriteToBigQuery(table=item_views_table, schema=schema,
                                  create_disposition=BigQueryDisposition.CREATE_IF_NEEDED,
                                  write_disposition=BigQueryDisposition.WRITE_APPEND)
    
    add_to_carts | "Write Add To Carts To BigQuery" >> WriteToBigQuery(table=add_to_carts_table, schema=schema,
                                  create_disposition=BigQueryDisposition.CREATE_IF_NEEDED,
                                  write_disposition=BigQueryDisposition.WRITE_APPEND)
        
    return p.run()

In [11]:
pipeline = streaming_pipeline(project)
print("\n PIPELINE RUNNING \n")




 PIPELINE RUNNING 

