#### Script: Dataflow Basics

Description: Notebook where we will see the functioning of each transformation discussed during the theory.

EDEM. Master Data Analytics<br>
Professor: Javi Briones

### Setup

In [1]:
# GCP Auth

# Local
!gcloud auth application-default login

# Google Colab
# from google.colab import auth
# auth.authenticate_user()

Your browser has been opened to visit:

    https://accounts.google.com/o/oauth2/auth?response_type=code&client_id=764086051850-6qr4p6gpi6hn506pt8ejuq83di341hur.apps.googleusercontent.com&redirect_uri=http%3A%2F%2Flocalhost%3A8085%2F&scope=openid+https%3A%2F%2Fwww.googleapis.com%2Fauth%2Fuserinfo.email+https%3A%2F%2Fwww.googleapis.com%2Fauth%2Fcloud-platform+https%3A%2F%2Fwww.googleapis.com%2Fauth%2Fsqlservice.login&state=mCpasZV8J1H7tDJdENWLHkpkk5woyt&access_type=offline&code_challenge=h8hBD1wJiU7ga0yeP7hsVHYrufhAiPuYYcZeRObTEP8&code_challenge_method=S256


Credentials saved to file: [C:\Users\DELL\AppData\Roaming\gcloud\application_default_credentials.json]

These credentials will be used by any library that requests Application Default Credentials (ADC).

Quota project "dataflow-1-411618" was added to ADC which can be used by Google client libraries for billing and quota. Note that some services may still bill the project owning the resource.


In [2]:
# Install requirements
#!pip3 install "apache_beam[interactive]"

In [3]:
# Import Python Libraries
import logging
import apache_beam as beam
from apache_beam.runners.interactive.interactive_runner import InteractiveRunner
import apache_beam.runners.interactive.interactive_beam as ib

### Beam Basics

<img src="../00_DocAux/.images/Beam_Pipeline.png" width="1000"/>

##### 01 Understanding basic concepts: PCollection, PTransform & Pipeline Object

In [4]:
with beam.Pipeline(InteractiveRunner()) as p:

    (p   
        | "Read Text from a File" >> beam.io.ReadFromText('../00_DocAux/input_text.txt')
        | "FlatMap" >> beam.FlatMap(lambda z: z.split())
        | "map" >> beam.Map(lambda x: (x,1))
        | "resultado" >> beam.CombinePerKey(sum)
        | "Show content" >> beam.Map(print))

('En', 1)
('un', 2)
('lugar', 1)
('de', 12)
('la', 1)
('Mancha,', 1)
('cuyo', 1)
('nombre', 1)
('no', 2)
('quiero', 1)
('acordarme,', 1)
('ha', 1)
('mucho', 1)
('tiempo', 1)
('que', 2)
('vivía', 1)
('hidalgo', 1)
('los', 5)
('lanza', 1)
('en', 1)
('astillero,', 1)
('adarga', 1)
('antigua,', 1)
('rocín', 1)
('flaco', 1)
('y', 2)
('galgo', 1)
('corredor.', 1)
('Una', 1)
('olla', 1)
('algo', 1)
('más', 3)
('vaca', 1)
('carnero,', 1)
('salpicón', 1)
('las', 3)
('noches,', 1)
('duelos', 1)
('quebrantos', 1)
('sábados,', 1)
('lantejas', 1)
('viernes,', 1)
('algún', 1)
('palomino', 1)
('añadidura', 1)
('domingos,', 1)
('consumían', 1)
('tres', 1)
('partes', 1)
('su', 2)
('hacienda.', 1)
('El', 1)
('resto', 1)
('della', 1)
('concluían', 1)
('sayo', 1)
('velarte,', 1)
('calzas', 1)
('velludo', 1)
('para', 1)
('fiestas', 1)
('con', 2)
('sus', 1)
('pantuflos', 1)
('lo', 2)
('mismo,', 1)
('días', 1)
('entre', 1)
('semana', 1)
('se', 1)
('honraba', 1)
('vellorí', 1)
('fino.', 1)


##### 02 Understanding Core Transformations: DoFn & Map

In [5]:
# Map
def edem_map(element, num):
    return element * num

# DoFn
class edemDoFn(beam.DoFn):

    def __init__(self, num):
        self.num_ = num

    def process(self, element):
        yield element * self.num_

# Pipeline
with beam.Pipeline(InteractiveRunner()) as p:
  data = (
      p 
        | "Create a PCollection" >> beam.Create([1,2,3,4,5])
        | "Map" >> beam.Map(edem_map, num=2)
        | "DoFn" >> beam.ParDo(edemDoFn(4))
        | "Print" >> beam.Map(print)
  )

8
16
24
32
40


In [9]:
# PTransform
class edem_PTransform(beam.PTransform):

    # Map
    def edem_map(element):
        return element * 2

    # DoFn
    class edemDoFn(beam.DoFn):

        def process(self, element, num):
            yield element * num
    
    def expand(self,PColl):
        
        PColl_ = (PColl 
            | "Map" >> beam.Map(lambda x: x * 2)
            | "ParDo" >> beam.ParDo(edemDoFn(), num=4)
            | "Print" >> beam.Map(print))
        
        yield PColl_

# Pipeline
with beam.Pipeline(InteractiveRunner()) as p:
    data = (
        p 
            | "Create a PCollection" >> beam.Create([1,2,3,4,5])
    )
    
    data | edem_PTransform() | "Print" >> beam.Map(print)

usage: ipykernel_launcher.py [-h] [--dataflow_endpoint DATAFLOW_ENDPOINT]
                             [--project PROJECT] [--job_name JOB_NAME]
                             [--staging_location STAGING_LOCATION]
                             [--temp_location TEMP_LOCATION] [--region REGION]
                             [--service_account_email SERVICE_ACCOUNT_EMAIL]
                             [--no_auth]
                             [--template_location TEMPLATE_LOCATION]
                             [--label LABELS] [--update]
                             [--transform_name_mapping TRANSFORM_NAME_MAPPING]
                             [--enable_streaming_engine]
                             [--dataflow_kms_key DATAFLOW_KMS_KEY]
                             [--create_from_snapshot CREATE_FROM_SNAPSHOT]
                             [--flexrs_goal {COST_OPTIMIZED,SPEED_OPTIMIZED}]
                             [--dataflow_service_option DATAFLOW_SERVICE_OPTIONS]
                           

AttributeError: 'tuple' object has no attribute 'tb_frame'

##### 03 DoFn Lifecycle

In [6]:
from datetime import datetime
class DoFnLifeCycle(beam.DoFn):

  def now(self):
    self._now = datetime.now()
    return self._now

  def __init__(self):
    print("Constructor started at: %s" % self.now())

  def setup(self):
    print("worker started at: %s" % self.now())

  def start_bundle(self):
    print("bundle started at: %s" % self.now())

  def process(self, element):
    words = element.split()
    for word in words:
      print("Processing element: %s" % word)
      yield word.upper()

  def finish_bundle(self):
    print("bundle finished at: %s" % self.now())

  def teardown(self):
    print("worker finished at: %s" % self.now())

with beam.Pipeline(InteractiveRunner()) as p:
  input_data = (
      p 
        | "Reading the input file" >> beam.io.ReadFromText('../00_DocAux/input_text.txt')
        | "DoFn Life Cycle" >> beam.ParDo(DoFnLifeCycle())
  )

Constructor started at: 2024-01-20 11:20:02.363277
worker started at: 2024-01-20 11:20:03.772674
bundle started at: 2024-01-20 11:20:03.777725
Processing element: En
Processing element: un
Processing element: lugar
Processing element: de
Processing element: la
Processing element: Mancha,
Processing element: de
Processing element: cuyo
Processing element: nombre
Processing element: no
Processing element: quiero
Processing element: acordarme,
Processing element: no
Processing element: ha
Processing element: mucho
Processing element: tiempo
Processing element: que
Processing element: vivía
Processing element: un
Processing element: hidalgo
Processing element: de
Processing element: los
Processing element: de
Processing element: lanza
Processing element: en
Processing element: astillero,
Processing element: adarga
Processing element: antigua,
Processing element: rocín
Processing element: flaco
Processing element: y
Processing element: galgo
Processing element: corredor.
Processing element:

##### 04 Transformations

In [7]:
# GroupByKey
with beam.Pipeline(InteractiveRunner()) as p:

    data = (p | "PCollection" >> beam.Create([('Spain', 'Valencia'), ('Spain','Barcelona'), ('France', 'Paris')]))

    (data 
        | "Combined" >> beam.GroupByKey()
        | "Print" >> beam.Map(print))

('Spain', ['Valencia', 'Barcelona'])
('France', ['Paris'])


In [8]:
# CoGroupByKey
with beam.Pipeline(InteractiveRunner()) as p:

    p1 = p | "PCollection 01" >> beam.Create([('Spain', 'Valencia'), ('Spain','Barcelona'), ('France', 'Paris')])
    p2 = p | "PCollection 02" >> beam.Create([('Spain', 'Madrid'), ('Spain','Alicante'), ('France', 'Lyon')])

    data = ((p1,p2) | beam.CoGroupByKey())

    data | "Print" >> beam.Map(print)

('Spain', (['Valencia', 'Barcelona'], ['Madrid', 'Alicante']))
('France', (['Paris'], ['Lyon']))


In [9]:
# Combine
with beam.Pipeline(InteractiveRunner()) as p:

    data = (p | "PCollection" >> beam.Create([('User1', 1), ('User2', 5), ('User1', 7)]))

    (data 
        | "Combined" >> beam.CombinePerKey(sum)
        | "Print" >> beam.Map(print))

('User1', 8)
('User2', 5)


In [10]:
# Flatten
with beam.Pipeline(InteractiveRunner()) as p:

    p1 = p | "PCollection 01" >> beam.Create(['New York', 'Los Angeles', 'Miami', 'Chicago'])
    p2 = p | "Pcollection 02" >> beam.Create(['Madrid', 'Barcelona', 'Valencia', 'Malaga'])
    p3 = p | "Pcollection 03" >> beam.Create(['London','Manchester', 'Liverpool'])

    merged = ((p1,p2,p3)| beam.Flatten())

    merged | beam.Map(print)

New York
Los Angeles
Miami
Chicago
Madrid
Barcelona
Valencia
Malaga
London
Manchester
Liverpool


In [11]:
# Partition
countries = ['Spain', 'USA', 'Switzerland']

def partition_fn(country,num_countries):
    return countries.index(country['country'])

with beam.Pipeline(InteractiveRunner()) as p:

        p1,p2,p3 = (
                p 
                | "PCollection" >> beam.Create([
                        {'country': 'Spain', 'city': 'Valencia'},
                        {'country': 'Spain', 'city': 'Barcelona'},
                        {'country': 'USA', 'city': 'New York'},
                        {'country': 'Switzerland', 'city': 'Zurich'},
                        {'country': 'Switzerland', 'city': 'Geneva'}  
                ])
                | "partition" >> beam.Partition(partition_fn, len(countries))
        )

        p3 | "PCollection for Spain" >> beam.Map(print)
        

{'country': 'Switzerland', 'city': 'Zurich'}
{'country': 'Switzerland', 'city': 'Geneva'}


##### 05 Streaming

In [1]:
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.io.gcp.bigquery import WriteToBigQuery
from apache_beam.io.gcp.bigquery import Schema, Field
import json

# Definir la función decode_message
def decode_message(msg):
    # Lógica para decodificar el mensaje y cargarlo como JSON
    output = msg.decode('utf-8')
    return json.loads(output)

# Definir el esquema de BigQuery
table_schema = Schema()
field_schema = Field('nombre', 'STRING', mode='NULLABLE')
table_schema.fields.append(field_schema)

with beam.Pipeline(options=PipelineOptions(streaming=True)) as p:
    data = (
        p
        | "LeerDesdePubSub" >> beam.io.ReadFromPubSub(subscription='projects/dataflow-1-411618/subscriptions/new_topic-sub')
        | "decodificar_msg" >> beam.Map(decode_message)
        | "escribir" >> WriteToBigQuery(
              table="dataflow-1-411618:data_test.tabla",
              schema=table_schema,
              create_disposition=beam.io.BigQueryDisposition.CREATE_NEVER,
              write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND
        )
    )


usage: ipykernel_launcher.py [-h] [--dataflow_endpoint DATAFLOW_ENDPOINT]
                             [--project PROJECT] [--job_name JOB_NAME]
                             [--staging_location STAGING_LOCATION]
                             [--temp_location TEMP_LOCATION] [--region REGION]
                             [--service_account_email SERVICE_ACCOUNT_EMAIL]
                             [--no_auth]
                             [--template_location TEMPLATE_LOCATION]
                             [--label LABELS] [--update]
                             [--transform_name_mapping TRANSFORM_NAME_MAPPING]
                             [--enable_streaming_engine]
                             [--dataflow_kms_key DATAFLOW_KMS_KEY]
                             [--create_from_snapshot CREATE_FROM_SNAPSHOT]
                             [--flexrs_goal {COST_OPTIMIZED,SPEED_OPTIMIZED}]
                             [--dataflow_service_option DATAFLOW_SERVICE_OPTIONS]
                           

AttributeError: 'tuple' object has no attribute 'tb_frame'

In [12]:
#!pip install google-cloud-PubSub


In [2]:
!pip install requitements.txt

ERROR: Could not find a version that satisfies the requirement requitements.txt (from versions: none)
ERROR: No matching distribution found for requitements.txt
