Ici se trouve le code correspondant à la partie streaming du schéma d'architecture du cas d'usage d'implémentation de d'analyse data de cryptomonnaie. 

Il s'agit de récupérer des données publiées en temps réel dans un topic pub/sub grâce à une subscription préalablement créée.

Chaque message reçu par la subscription sera alors préparé par une pipeline de données (le même traitement que dans le cadre des données récupérées en batch car provenant de la même source de données). 

De la même manière que pour le batch, nous lancerons et exécuterons d'abord la pipeline de données en local sur la VM (grâce à interactiverunner()) puis exécuterons la pipeline sur le cloud grâce à dataflowrunner()).  

Le dataflow exporte toutes les données traitées dans une table de BigQuery. 
Ces données sont prêtes à être visualisé depuis un outil de visualisation de données au choix (DataStudio, Grafana, etc).

In [1]:
import logging
import json
import time
import traceback

import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.options import pipeline_options
from apache_beam.io.gcp.pubsub import ReadFromPubSub
from apache_beam.io.gcp.bigquery import BigQueryDisposition, WriteToBigQuery
from apache_beam.io import WriteToText

from apache_beam.runners.interactive.interactive_runner import InteractiveRunner
import apache_beam.runners.interactive.interactive_beam as ib
from apache_beam.runners import DataflowRunner

import google.auth

#from utils.utils import publish_to_topic
from IPython.core.display import display, HTML

import datetime

In [2]:
# The project will be used for creating a subscription to the Pub/Sub topic and for the Dataflow pipeline
project = google.auth.default()[1]
project

'verdant-cargo-321713'

In [3]:
options = pipeline_options.PipelineOptions(
    streaming=True,
    project=project
)

Si le batch a déjà été exécuté, pas besoin d'exécuter la ligne suivante car la dataset aura déjà été créée 

In [None]:
#!bq mk --location US --dataset temp_crypto_batch

2. Interactive Runner 

Lançons et exécutons la pipeline de données localement depuis la VM

In [4]:
logging.getLogger().setLevel(logging.WARNING)

In [5]:
#Temps durant lequel la souscription sera active localement et alimentera le dataflow
ib.options.recording_duration = '10m'

In [6]:
import pandas as pd

class KeepDoFn(beam.DoFn):
    def process(self, element):
        return [{
            'timestamp': element['block_timestamp_truncated'],
            'request': str(element['oracle_request_id']),
            'symbol' : element['symbol'],
            'rate' : element['rate'],  
            'date_str' : pd.to_datetime(element['block_timestamp'], format='%Y-%m-%dT%H:%M:%S.%fZ').strftime("%Y%m%d%H%M%S")
    }]

Commande suivante à n'exécuter que lors de la première insertion en streaming.

In [7]:
from google.cloud import bigquery

# Construct a BigQuery client object.
client = bigquery.Client()

# TODO(developer): Set table_id to the ID of the table to create.
table_id = "{}.temp_crypto_batch.streaming_timestamp".format(project)

schema = [
    bigquery.SchemaField("timestamp", "TIMESTAMP"),
    bigquery.SchemaField("request", "STRING"),
    bigquery.SchemaField("symbol", "STRING"),
    bigquery.SchemaField("rate", "FLOAT"),
    bigquery.SchemaField("date_str", "STRING"),
]

table = bigquery.Table(table_id, schema=schema)
table = client.create_table(table)  # Make an API request.
print(
    "Created table {}.{}.{}".format(table.project, table.dataset_id, table.table_id)
)

Created table verdant-cargo-321713.temp_crypto_batch.streaming_timestamp


In [8]:
bucket = "gs://{project}_bigquery_to_pubsub_temp".format(project=project.replace("-", "_"))
bucket

'gs://verdant_cargo_321713_bigquery_to_pubsub_temp'

In [9]:
def streaming_pipeline(project, region="us-central1"):
    
    table = "{}:temp_crypto_batch.streaming_timestamp".format(project)
    schema = "timestamp:TIMESTAMP, request:STRING, symbol:STRING, rate:FLOAT, date_str:STRING"
    
    topic = "projects/{}/topics/bigquery-to-pubsub-test0".format(project)
    bucket = "gs://{project}_bigquery_to_pubsub_temp".format(project=project.replace("-", "_"))
    
    options = PipelineOptions(
        streaming=True,
        project=project,
        region=region,
        staging_location="%s/staging" % bucket,
        temp_location="%s/temp" % bucket
    )

    p = beam.Pipeline(InteractiveRunner(), options=options)

    pubsub = (p | "Read Topic" >> ReadFromPubSub(topic=topic)
                | "To Dict" >> beam.Map(json.loads)
                | "Format" >> beam.ParDo(KeepDoFn())
                | "Write To BigQuery" >> WriteToBigQuery(table=table, schema=schema,
                                  create_disposition=BigQueryDisposition.CREATE_NEVER,
                                  write_disposition=BigQueryDisposition.WRITE_APPEND))
    
    return ib.show(pubsub)

In [11]:
streaming_pipeline(project)

ERROR:apache_beam.runners.direct.executor:Exception at bundle <apache_beam.runners.direct.bundle_factory._Bundle object at 0x7fc4654838c0>, due to an exception.
 Traceback (most recent call last):
  File "/root/apache-beam-2.25.0/packages/beam/sdks/python/apache_beam/runners/direct/executor.py", line 382, in call
    finish_state)
  File "/root/apache-beam-2.25.0/packages/beam/sdks/python/apache_beam/runners/direct/executor.py", line 420, in attempt_call
    evaluator.process_element(value)
  File "/root/apache-beam-2.25.0/packages/beam/sdks/python/apache_beam/runners/direct/transform_evaluator.py", line 545, in process_element
    events.append(next(self.event_stream))
  File "/root/apache-beam-2.25.0/packages/beam/sdks/python/apache_beam/runners/direct/test_stream_impl.py", line 339, in events_from_rpc
    raise e
  File "/root/apache-beam-2.25.0/packages/beam/sdks/python/apache_beam/runners/direct/test_stream_impl.py", line 330, in events_from_rpc
    event = channel.get(timeout=30)

IMPORTANT : Arrêtez-vous ici et reprenez la suite des instructions du Mardown README.md <br>
La suite sera faite après.

3. DataflowRunner

Cette fois-ci, testons le dataflow directement sur le cloud avec dataflowrunner(). Le code va donc se lancer depuis la VM et s'exécuter sur Dataflow.

In [17]:
logging.getLogger().setLevel(logging.INFO)

In [18]:
from setuptools import setup, find_packages

In [19]:
%%writefile setup.py

from setuptools import setup, find_packages

setup(
    name="dataflow_pipeline_dependencies",
    version="0.1",
    install_requires=[
   'pandas==0.25.2',
    ],
    packages = find_packages()
)

Overwriting setup.py


In [20]:
bucket = "gs://{project}_bigquery_to_pubsub_temp".format(project=project.replace("-", "_"))
pipeline_parameters = [
    '--streaming', True,
    '--project', project,
    '--staging_location', "%s/staging" % bucket,
    '--temp_location', "%s/temp" % bucket,
    "--setup_file", './setup.py', 
    "--region", "us-central1",
]

In [21]:
class KeepDoFn(beam.DoFn):
    def process(self, element):
        import pandas as pd

        return [{
            'timestamp': element['block_timestamp_truncated'],
            'request': str(element['oracle_request_id']),
            'symbol' : element['symbol'],
            'rate' : element['rate'],  
            'date_str' : pd.to_datetime(element['block_timestamp'], format='%Y-%m-%dT%H:%M:%S.%fZ').strftime("%Y%m%d%H%M%S")
    }]

project = google.auth.default()[1]
table = "{}:temp_crypto_batch.streaming_timestamp".format(project)
schema = "timestamp:TIMESTAMP, request:STRING, symbol:STRING, rate:FLOAT, date_str:STRING"
topic = "projects/{}/topics/bigquery-to-pubsub-test0".format(project)
bucket = "gs://{project}_bigquery_to_pubsub_temp".format(project=project.replace("-", "_"))
streaming = True

options = PipelineOptions(
        streaming=True,
        project=project,
        region="us-central1",
        staging_location="%s/staging" % bucket,
        temp_location="%s/temp" % bucket, 
        setup_file="./setup.py" 
)

#p = beam.Pipeline("DataFlowRunner", argv=pipeline_parameters)
p = beam.Pipeline(DataflowRunner(), options=options)

pubsub = (p | "Read Topic" >> ReadFromPubSub(topic=topic)
                | "To Dict" >> beam.Map(json.loads)
                | "Format" >> beam.ParDo(KeepDoFn())
                | "Write To BigQuery" >> WriteToBigQuery(table=table, schema=schema,
                                  create_disposition=BigQueryDisposition.CREATE_NEVER,
                                  write_disposition=BigQueryDisposition.WRITE_APPEND))
    
p.run()

INFO:apache_beam.runners.portability.stager:Executing command: ['/root/apache-beam-2.25.0/bin/python', 'setup.py', 'sdist', '--dist-dir', '/tmp/tmp5y89uotj']
INFO:apache_beam.runners.portability.stager:Downloading source distribution of the SDK from PyPi
INFO:apache_beam.runners.portability.stager:Executing command: ['/root/apache-beam-2.25.0/bin/python', '-m', 'pip', 'download', '--dest', '/tmp/tmp5y89uotj', 'apache-beam==2.25.0', '--no-deps', '--no-binary', ':all:']
INFO:apache_beam.runners.portability.stager:Staging SDK sources from PyPI: dataflow_python_sdk.tar
INFO:apache_beam.runners.portability.stager:Downloading binary distribution of the SDK from PyPi
INFO:apache_beam.runners.portability.stager:Executing command: ['/root/apache-beam-2.25.0/bin/python', '-m', 'pip', 'download', '--dest', '/tmp/tmp5y89uotj', 'apache-beam==2.25.0', '--no-deps', '--only-binary', ':all:', '--python-version', '37', '--implementation', 'cp', '--abi', 'cp37m', '--platform', 'manylinux1_x86_64']
INFO:a

<DataflowPipelineResult <Job
 createTime: '2021-08-03T09:36:43.995136Z'
 currentStateTime: '1970-01-01T00:00:00Z'
 id: '2021-08-03_02_36_42-9512594698141357029'
 location: 'us-central1'
 name: 'beamapp-root-0803093639-841908'
 projectId: 'verdant-cargo-321713'
 stageStates: []
 startTime: '2021-08-03T09:36:43.995136Z'
 steps: []
 tempFiles: []
 type: TypeValueValuesEnum(JOB_TYPE_STREAMING, 2)> at 0x7fc464925690>

Penser à stopper la pipeline pour réduire les coûts lorsque le streaming se termine (depuis la console, sur le job, choisir STOP puis DRAIN)

Revenir au markdown pour la suite des commandes

4. Automatisation avec Cloud Functions : Création du template

In [22]:
import logging
import json
import time
import traceback

import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.options import pipeline_options
from apache_beam.io.gcp.pubsub import ReadFromPubSub
from apache_beam.io.gcp.bigquery import BigQueryDisposition, WriteToBigQuery
from apache_beam.io import WriteToText

from apache_beam.runners.interactive.interactive_runner import InteractiveRunner
import apache_beam.runners.interactive.interactive_beam as ib
from apache_beam.runners import DataflowRunner

import google.auth

#from utils.utils import publish_to_topic
from IPython.core.display import display, HTML

import datetime

In [23]:
# The project will be used for creating a subscription to the Pub/Sub topic and for the Dataflow pipeline
project = google.auth.default()[1]
project

'verdant-cargo-321713'

In [24]:
options = pipeline_options.PipelineOptions(
    streaming=True,
    project=project
)

In [25]:
logging.getLogger().setLevel(logging.INFO)

In [26]:
from setuptools import setup, find_packages

In [27]:
%%writefile setup.py

from setuptools import setup, find_packages

setup(
    name="dataflow_pipeline_dependencies",
    version="0.1",
    install_requires=[
   'pandas==0.25.2',
    ],
    packages = find_packages()
)

Overwriting setup.py


In [28]:
project

'verdant-cargo-321713'

In [29]:
bucket = "gs://{project}_bigquery_to_pubsub_temp".format(project=project.replace("-", "_"))
pipeline_parameters = [
    '--streaming', True,
    '--project', project,
    '--staging_location', "%s/staging" % bucket,
    '--temp_location', "%s/temp" % bucket,
    "--setup_file", './setup.py', 
    "--region", "us-central1",
    "--template_location", "gs://{project}_bigquery_to_pubsub_temp/templates/CUSTOM_TEMPLATE_DATAFLOW_STREAMING".format(project=project.replace("-", "_"))
]

In [30]:
class KeepDoFn(beam.DoFn):
    def process(self, element):
        import pandas as pd

        return [{
            'timestamp': element['block_timestamp_truncated'],
            'request': str(element['oracle_request_id']),
            'symbol' : element['symbol'],
            'rate' : element['rate'],  
            'date_str' : pd.to_datetime(element['block_timestamp'], format='%Y-%m-%dT%H:%M:%S.%fZ').strftime("%Y%m%d%H%M%S")
    }]

project = google.auth.default()[1]
table = "{}:temp_crypto_batch.streaming_timestamp".format(project)
schema = "timestamp:TIMESTAMP, request:STRING, symbol:STRING, rate:FLOAT, date_str:STRING"
topic = "projects/{}/topics/bigquery-to-pubsub-test0".format(project)
bucket = "gs://{project}_bigquery_to_pubsub_temp".format(project=project.replace("-", "_"))
streaming = True

options = PipelineOptions(
        streaming=True,
        project=project,
        region="us-central1",
        staging_location="%s/staging" % bucket,
        temp_location="%s/temp" % bucket, 
        setup_file="./setup.py", 
        template_location= "gs://{project}_bigquery_to_pubsub_temp/templates/CUSTOM_TEMPLATE_DATAFLOW_STREAMING".format(project=project.replace("-", "_"))
)

#p = beam.Pipeline("DataFlowRunner", argv=pipeline_parameters)
p = beam.Pipeline(DataflowRunner(), options=options)

pubsub = (p | "Read Topic" >> ReadFromPubSub(topic=topic)
                | "To Dict" >> beam.Map(json.loads)
                | "Format" >> beam.ParDo(KeepDoFn())
                | "Write To BigQuery" >> WriteToBigQuery(table=table, schema=schema,
                                  create_disposition=BigQueryDisposition.CREATE_NEVER,
                                  write_disposition=BigQueryDisposition.WRITE_APPEND))
    
p.run()

INFO:apache_beam.runners.portability.stager:Executing command: ['/root/apache-beam-2.25.0/bin/python', 'setup.py', 'sdist', '--dist-dir', '/tmp/tmpx3_9d6lb']
INFO:apache_beam.runners.portability.stager:Downloading source distribution of the SDK from PyPi
INFO:apache_beam.runners.portability.stager:Executing command: ['/root/apache-beam-2.25.0/bin/python', '-m', 'pip', 'download', '--dest', '/tmp/tmpx3_9d6lb', 'apache-beam==2.25.0', '--no-deps', '--no-binary', ':all:']
INFO:apache_beam.runners.portability.stager:Staging SDK sources from PyPI: dataflow_python_sdk.tar
INFO:apache_beam.runners.portability.stager:Downloading binary distribution of the SDK from PyPi
INFO:apache_beam.runners.portability.stager:Executing command: ['/root/apache-beam-2.25.0/bin/python', '-m', 'pip', 'download', '--dest', '/tmp/tmpx3_9d6lb', 'apache-beam==2.25.0', '--no-deps', '--only-binary', ':all:', '--python-version', '37', '--implementation', 'cp', '--abi', 'cp37m', '--platform', 'manylinux1_x86_64']
INFO:a

<DataflowPipelineResult None at 0x7fc464665c50>