#### Script: Dataflow Basics

Description: Notebook where we will see the functioning of each transformation discussed during the theory.

EDEM. Master Data Analytics<br>
Professor: Javi Briones

In [1]:
# Import Python Libraries
import logging
import apache_beam as beam
from apache_beam.runners.interactive.interactive_runner import InteractiveRunner
import apache_beam.runners.interactive.interactive_beam as ib

## Apache Beam Basics

<img src="../00_DocAux/.images/beam_pipeline.png" width="1000"/>

##### Apache Beam is a unified programming model for parallel data processing, providing a set of transformations that enable efficient manipulation and processing of data.

#### Pipeline, PCollection & PTransform

- **PCollection**: In Apache Beam, *PCollection* represents an immutable collection of data flowing through a processing pipeline.

- **Pipeline**: A *Pipeline* in Apache Beam defines a data processing flow, specifying the sequence of transformations to be applied to the PCollections.

- **PTransform**: *PTransform* is an abstraction in Apache Beam that encapsulates a data transformation operation. It defines how an input PCollection is transformed into an output PCollection within the pipeline.

In [5]:
with beam.Pipeline(InteractiveRunner()) as p:

    (p   
        | "Read Text from a File" >> beam.io.ReadFromText('../00_DocAux/input_text.txt')
        | "FlatMap" >> beam.FlatMap(lambda z: z.split())
        | "Map" >> beam.Map(lambda x: (x,1))
        | "Combine" >> beam.CombinePerKey(sum)
        | "Print" >> beam.Map(print))

('En', 1)
('un', 2)
('lugar', 1)
('de', 12)
('la', 1)
('Mancha,', 1)
('cuyo', 1)
('nombre', 1)
('no', 2)
('quiero', 1)
('acordarme,', 1)
('ha', 1)
('mucho', 1)
('tiempo', 1)
('que', 2)
('vivía', 1)
('hidalgo', 1)
('los', 5)
('lanza', 1)
('en', 1)
('astillero,', 1)
('adarga', 1)
('antigua,', 1)
('rocín', 1)
('flaco', 1)
('y', 2)
('galgo', 1)
('corredor.', 1)
('Una', 1)
('olla', 1)
('algo', 1)
('más', 3)
('vaca', 1)
('carnero,', 1)
('salpicón', 1)
('las', 3)
('noches,', 1)
('duelos', 1)
('quebrantos', 1)
('sábados,', 1)
('lantejas', 1)
('viernes,', 1)
('algún', 1)
('palomino', 1)
('añadidura', 1)
('domingos,', 1)
('consumían', 1)
('tres', 1)
('partes', 1)
('su', 2)
('hacienda.', 1)
('El', 1)
('resto', 1)
('della', 1)
('concluían', 1)
('sayo', 1)
('velarte,', 1)
('calzas', 1)
('velludo', 1)
('para', 1)
('fiestas', 1)
('con', 2)
('sus', 1)
('pantuflos', 1)
('lo', 2)
('mismo,', 1)
('días', 1)
('entre', 1)
('semana', 1)
('se', 1)
('honraba', 1)
('vellorí', 1)
('fino.', 1)


#### ParDo vs Map

ParDo: Applies a function to each element in a data bundle, allowing for more complex and flexible operations than the Map transformation.

DoFn: Defines a function that can be used in ParDo transformations to perform more advanced and customized operations on the elements of a dataset.

In [6]:
# Map
def edem_map(element, num):
    return element * num

# DoFn
class edemDoFn(beam.DoFn):

    def __init__(self, num):
        self.num_ = num

    def process(self, element):
        yield element * self.num_

# Pipeline
with beam.Pipeline(InteractiveRunner()) as p:
  data = (
      p 
        | "Create a PCollection" >> beam.Create([1,2,3,4,5])
        | "Map" >> beam.Map(edem_map, num=2)
        | "DoFn" >> beam.ParDo(edemDoFn(4))
        | "Print" >> beam.Map(print)
  )

8
16
24
32
40


##### DoFn Life Cycle

The life cycle of a **DoFn** in Apache Beam refers to the phases that an instance of the DoFn class goes through from its initialization to its completion within the context of a *ParDo* transformation. Below, I describe the main stages of the DoFn life cycle:

1. **Initialization: Setup**:

   - Goal: This phase occurs once for each instance of DoFn before the transformation is executed.
   - Process: Initialization tasks, such as configuring resources and preparing data that will be used during execution, are performed here.

2. **Processing Elements (Process)**:

   - Goal: This phase is executed for each input element in out data bundle.
   - Process: The core processing logic is implemented in the *process(self,element)* method of the DoFn class. This method is called for each element and defines how each input is processed.

3. **Start bundle or finish buncle**:

   - Goal: This phase occurs once before/after all elements in a bundle (a portion of data processed in parallel) are being or have been processed.
   - Process: Clean-up tasks and resource finalization used during bundle processing are carried out here.

4. **Teardown**: 

   - Goal: This phase occurs once after all elements in the dataset have been processed.
   - Process: Final cleaning and resource release tasks are executed, ensuring that all closing operations are completed successfully.

The life cycle of a **DoFn** provides structured control over the execution of processing logic in *ParDo* transformations. **Each instance of DoFn is created, initialized, processes logic for each element, and is closed in a controlled manner**. This approach ensures proper resource management and facilitates the implementation of custom and clean data processing operations.

In [38]:
from datetime import datetime
class DoFnLifeCycle(beam.DoFn):

  def teardown(self):
    print("worker finished at: %s" % self.now())

  def now(self):
    self._now = datetime.now()
    return self._now

  def __init__(self):
    print("Constructor started at: %s" % self.now())

  def setup(self):
    print("worker started at: %s" % self.now())

  def start_bundle(self):
    print("bundle started at: %s" % self.now())

  def process(self, element):
    words = element.split()
    for word in words:
      print("Processing element: %s" % word.upper())
      yield word.upper()

  def finish_bundle(self):
    print("bundle finished at: %s" % self.now())

  

with beam.Pipeline(InteractiveRunner()) as p:
  input_data = (
      p 
        | "Reading the input file" >> beam.io.ReadFromText('../00_DocAux/input_text.txt')
        | "DoFn Life Cycle" >> beam.ParDo(DoFnLifeCycle())
  )

Constructor started at: 2024-01-30 16:40:53.148358


INFO:apache_beam.runners.worker.statecache:Creating state cache with size 104857600
INFO:apache_beam.runners.portability.fn_api_runner.worker_handlers:Created Worker handler <apache_beam.runners.portability.fn_api_runner.worker_handlers.EmbeddedWorkerHandler object at 0x0000021162BB7D30> for environment ref_Environment_default_environment_1 (beam:env:embedded_python:v1, b'')
INFO:apache_beam.io.filebasedsink:Starting finalize_write threads with num_shards: 1 (skipped: 0), batches: 1, num_threads: 1
INFO:apache_beam.io.filebasedsink:Renamed 1 shards in 0.00 seconds.


worker started at: 2024-01-30 16:40:53.800535
bundle started at: 2024-01-30 16:40:53.803185
Processing element: EN
Processing element: UN
Processing element: LUGAR
Processing element: DE
Processing element: LA
Processing element: MANCHA,
Processing element: DE
Processing element: CUYO
Processing element: NOMBRE
Processing element: NO
Processing element: QUIERO
Processing element: ACORDARME,
Processing element: NO
Processing element: HA
Processing element: MUCHO
Processing element: TIEMPO
Processing element: QUE
Processing element: VIVÍA
Processing element: UN
Processing element: HIDALGO
Processing element: DE
Processing element: LOS
Processing element: DE
Processing element: LANZA
Processing element: EN
Processing element: ASTILLERO,
Processing element: ADARGA
Processing element: ANTIGUA,
Processing element: ROCÍN
Processing element: FLACO
Processing element: Y
Processing element: GALGO
Processing element: CORREDOR.
Processing element: UNA
Processing element: OLLA
Processing element: D

#### Transformations

- GroupByKey

It Groups the elements of a data bundle according to a common key, generating a set where the keys are unique, and the values are lists of elements associated with each key.

In [None]:
with beam.Pipeline(InteractiveRunner()) as p:

    data = (p | "PCollection" >> beam.Create([('Spain', 'Valencia'), ('Spain','Barcelona'), ('France', 'Paris')]))

    (data 
        | "Combined" >> beam.GroupByKey()
        | "Print" >> beam.Map(print))

- CoGroupByKey

It merges two PCollections by key, generating pairs of keys and lists of associated elements from both data bundles. Used to perform operations involving data from two different sources.

In [None]:
with beam.Pipeline(InteractiveRunner()) as p:

    p1 = p | "PCollection 01" >> beam.Create([('Spain', 'Valencia'), ('Spain','Barcelona'), ('France', 'Paris')])
    p2 = p | "PCollection 02" >> beam.Create([('Spain', 'Madrid'), ('Spain','Alicante'), ('France', 'Lyon')])

    data = ((p1,p2) | beam.CoGroupByKey())

    data | "Print" >> beam.Map(print)

- Flatten

It combines multiple PCollections into a single collection, flattening the nested structure.

In [None]:
with beam.Pipeline(InteractiveRunner()) as p:

    p1 = p | "PCollection 01" >> beam.Create(['New York', 'Los Angeles', 'Miami', 'Chicago'])
    p2 = p | "Pcollection 02" >> beam.Create(['Madrid', 'Barcelona', 'Valencia', 'Malaga'])
    p3 = p | "Pcollection 03" >> beam.Create(['London','Manchester', 'Liverpool'])

    merged = ((p1,p2,p3)| beam.Flatten())

    merged | beam.Map(print)

- Partition

it splits a PCollection into different partitions based on certain criteria, enabling parallel and distributed processing.

In [None]:
countries = ['Spain', 'USA', 'Switzerland']

def partition_fn(country,num_countries):
    return countries.index(country['country'])

with beam.Pipeline(InteractiveRunner()) as p:

        p1,p2,p3 = (
                p 
                | "PCollection" >> beam.Create([
                        {'country': 'Spain', 'city': 'Valencia'},
                        {'country': 'Spain', 'city': 'Barcelona'},
                        {'country': 'USA', 'city': 'New York'},
                        {'country': 'Switzerland', 'city': 'Zurich'},
                        {'country': 'Switzerland', 'city': 'Geneva'}  
                ])
                | "partition" >> beam.Partition(partition_fn, len(countries))
        )

        p3 | "PCollection for Switzerland" >> beam.Map(print)
        

- Combine

It combines values associated with the same key using a specific combining function, useful for performing key-based aggregate operations.

In [None]:
with beam.Pipeline(InteractiveRunner()) as p:

    data = (p | "PCollection" >> beam.Create([('User1', 1), ('User2', 5), ('User1', 7)]))

    (data 
        | "Combined" >> beam.CombinePerKey(sum)
        | "Print" >> beam.Map(print))

#### Streaming

##### gcloud commnads:

- PubSub Topics

```
gcloud pubsub topics create <TOPIC_NAME>
```

- PubSub Subscriptions

```
gcloud pubsub subscriptions create <SUBSCRIPTION_NAME> --topic=<TOPIC_NAME>
```

- Google Cloud Storage Bucket  

```
gcloud storage mb gs://<BUCKET_NAME> --location=<REGION_ID>
```


In [33]:
# Variables
project_id = "active-road-412714"
subscription_name = "entorno1-sub"
bq_dataset = "entorno1"
bq_table = "entorno1"
bucket_name = "entorno1dpf"

#### PubSub - Dataflow - BigQuery

- Local

In [28]:
import apache_beam as beam
import json
import logging
from apache_beam.options.pipeline_options import PipelineOptions
import sys

sys.argv = ['--flexrs_goal', 'SPEED_OPTIMIZED']

def decode_message(msg):

    output = msg.decode('utf-8')

    logging.info("New PubSub Message: %s", output)

    return json.loads(output)

def run():
    with beam.Pipeline(options=PipelineOptions(streaming=True, save_main_session=True)) as p:
        (
            p 
            | "ReadFromPubSub" >> beam.io.ReadFromPubSub(subscription=f'projects/{project_id}/subscriptions/{subscription_name}')
            | "decode msg" >> beam.Map(decode_message)
            | "Write to BigQuery" >> beam.io.WriteToBigQuery(
                table = f"{project_id}:{bq_dataset}.{bq_table}", # Required Format: PROJECT_ID:DATASET.TABLE
                schema='nombre:STRING', # Required Format: field:TYPE
                create_disposition=beam.io.BigQueryDisposition.CREATE_NEVER,
                write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND
            )
        )

if __name__ == '__main__':

    # Set Logs
    logging.getLogger().setLevel(logging.INFO)

    logging.info("The process started")
    
    # Run Process
    run()

 

INFO:root:The process started
INFO:root:Missing pipeline option (runner). Executing pipeline using the default runner: DirectRunner.
INFO:apache_beam.runners.direct.direct_runner:Running pipeline with DirectRunner.
INFO:root:New PubSub Message: {"nombre" : "diego"}
INFO:apache_beam.internal.metrics.metric:[Locally aggregated metrics since 2024-01-30 15:20:19.004000]
MetricName(namespace=apache_beam.io.gcp.bigquery_tools.BigQueryWrapper, name=latency_histogram_ms): HistogramData(Total count: 1, P99: 840, P90: 838, P50: 830)
INFO:root:New PubSub Message: {"nombre" : "balma"}


KeyboardInterrupt: 

- Dataflow (Google Cloud)

In [37]:
import apache_beam as beam
import json
import logging
from apache_beam.options.pipeline_options import PipelineOptions

#bucket_name = "entorno1dpf"

def decode_message(msg):

    output = msg.decode('utf-8')

    logging.info("New PubSub Message: %s", output)

    return json.loads(output)

def run():
    with beam.Pipeline(options=PipelineOptions(
        streaming=True,
        # save_main_session=True
        project=project_id,
        runner="DataflowRunner",
        temp_location=f"gs://{bucket_name}/tmp",
        staging_location=f"gs://{bucket_name}/staging",
        region="europe-west4"
    )) as p:
        (
            p 
            | "ReadFromPubSub" >> beam.io.ReadFromPubSub(subscription=f'projects/{project_id}/subscriptions/{subscription_name}')
            | "decode msg" >> beam.Map(decode_message)
            | "Write to BigQuery" >> beam.io.WriteToBigQuery(
                table = f"{project_id}:{bq_dataset}.{bq_table}",
                schema='nombre:STRING',
                create_disposition=beam.io.BigQueryDisposition.CREATE_NEVER,
                write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND
            )
        )

if __name__ == '__main__':

    # Set Logs
    logging.getLogger().setLevel(logging.INFO)

    logging.info("The process started")
    
    # Run Process
    run()

INFO:root:The process started
INFO:root:Using provided Python SDK container image: gcr.io/cloud-dataflow/v1beta3/beam_python3.10_sdk:2.51.0
INFO:root:Python SDK container image set to "gcr.io/cloud-dataflow/v1beta3/beam_python3.10_sdk:2.51.0" for Docker environment
INFO:apache_beam.runners.dataflow.internal.apiclient:Starting GCS upload to gs://entorno1dpf/staging/beamapp-diego-0130151406-463804-3z95y93g.1706627646.463804/pipeline.pb...
INFO:oauth2client.transport:Attempting refresh to obtain initial access_token
INFO:oauth2client.client:Refreshing access_token
INFO:oauth2client.transport:Attempting refresh to obtain initial access_token
INFO:oauth2client.client:Refreshing access_token
INFO:apache_beam.runners.dataflow.internal.apiclient:Completed GCS upload to gs://entorno1dpf/staging/beamapp-diego-0130151406-463804-3z95y93g.1706627646.463804/pipeline.pb in 0 seconds.
INFO:apache_beam.runners.dataflow.internal.apiclient:Create job: <Job
 clientRequestId: '20240130151406463804-3594'
 c

AssertionError: Job did not reach to a terminal state after waiting indefinitely. Console URL: https://console.cloud.google.com/dataflow/jobs/<RegionId>/2024-01-30_07_14_07-6903491149117288438?project=<ProjectId>

**Window in Apache Beam:**
A window in Apache Beam defines a time frame for **organizing and grouping data elements** during processing, enabling time-based specific operations.

**Types of Windows:**

- **Fixed Window:** Groups elements into fixed time intervals, dividing the PCollection into time-based windows.

- **Sliding Window:** Allows overlapping windows, specified by a size and a stride, making it easy to analyze data in continuous intervals over time.

- **Session Window:** Groups data elements that share a contiguous temporal relationship, where continuity is defined by the **inactivity gap between elements**. This dynamic window is formed based on the inactivity time between events, allowing the capture of logical sessions in data streams.

- Fixed Window

<img src="../00_DocAux/.images/fixed_window.png" width="500"/>

In [None]:
import apache_beam as beam
import json
import logging
from apache_beam.options.pipeline_options import PipelineOptions

def decode_message(msg):

    output = msg.decode('utf-8')

    logging.info("New PubSub Message: %s", output)

    return json.loads(output)['temp']

class OutputDoFn(beam.DoFn):

    def process(self, element):
        yield element

def run():
    with beam.Pipeline(options=PipelineOptions(streaming=True, save_main_session=True)) as p:
        (
            p 
            | "ReadFromPubSub" >> beam.io.ReadFromPubSub(subscription=f'projects/{project_id}/subscriptions/{subscription_name}')
            | "Decode msg" >> beam.Map(decode_message)
            | "Fixed Window" >> beam.WindowInto(beam.window.FixedWindows(10))
            | "Combine" >> beam.CombineGlobally(sum).without_defaults()
            | "Print" >> beam.Map(print)
        )

if __name__ == '__main__':

    # Set Logs
    logging.getLogger().setLevel(logging.INFO)

    logging.info("The process started")
    
    # Run Process
    run()

- Sliding windows 

<img src="../00_DocAux/.images/sliding_window.png" width="500"/>

In [None]:
import apache_beam as beam
import json
import logging
from apache_beam.options.pipeline_options import PipelineOptions

def decode_message(msg):

    output = msg.decode('utf-8')

    logging.info("New PubSub Message: %s", output)

    return output['temp']

class OutputDoFn(beam.DoFn):

    def process(self, element):
        yield element

def run():
    with beam.Pipeline(options=PipelineOptions(streaming=True, save_main_session=True)) as p:
        (
            p 
            | "ReadFromPubSub" >> beam.io.ReadFromPubSub(subscription=f'projects/{project_id}/subscriptions/{subscription_name}')
            | "Decode msg" >> beam.Map(decode_message)
            | "Sliding Window" >> beam.WindowInto(beam.window.SlidingWindows(size=60, period=20))
            | "Combine" >> beam.CombinePerKey(sum)
            | "Print" >> beam.Map(print)
        )

if __name__ == '__main__':

    # Set Logs
    logging.getLogger().setLevel(logging.INFO)

    logging.info("The process started")
    
    # Run Process
    run()