In [2]:
import logging
import argparse
import apache_beam as beam
import apache_beam.transforms.window as window
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.options.pipeline_options import StandardOptions
from apache_beam.options.pipeline_options import SetupOptions
from apache_beam.io import ReadFromText
from apache_beam.io import WriteToText

import google.auth

print('complete')

complete


Prerequisites prior to running demo:
1) Billing-enabled project
2) API's enabled and necessary IAM role(s) for necessary services (e.g. Dataflow, BigQuery)
3) GCS bucket for temporary and schema files
4) BigQuery dataset (tables can / will be created if they don't already exist)
5) Pub/Sub topic for streaming data

# Dataflow Batch - GCS Text Files to BQ

In [3]:
%%time
# function(s)

def parse_json(line):
    import json
    record = json.loads(line)
    
    return record

def run_batch(argv=None):
    """Build and run the pipeline."""
    from datetime import datetime
    # datetime object containing current date and time
    now = datetime.now()
    # dd_mm_YY_H_M_S_ssssss - in this format so it can be used to create a unique BQ table
    dt_string = now.strftime("%m_%d_%Y_%H_%M_%S_%f")
    dt_string

    SCHEMA = 'eventId:STRING,deviceId:STRING,eventTime:DATETIME,city:STRING,temp:FLOAT,flowrate:FLOAT'
    
    parser = argparse.ArgumentParser()
    parser.add_argument('--runner', required=False, default='DataflowRunner', help='Dataflow Runner - DataflowRunner or DirectRunner (local)')
    parser.add_argument('--job_name', required=False, default='mgwaterdemobatch', help='Dataflow Job Name')
    parser.add_argument('--project_id', required=False, default='mg-ce-demos', help='GCP Project ID')
    parser.add_argument('--region', required=False, default='us-central1', help='GCP region for execution')
    parser.add_argument('--dataset_name', required=False, default='smart_water_demo', help='Output BigQuery Dataset') 
    parser.add_argument('--table_name', required=False, default='smart_water_demo_data_batch_'+str(dt_string), help='Output BigQuery Table')
    parser.add_argument('--input_data', required=False, default='gs://mg-ce-demos-bucket/water_data_stream_demo/output/*.json', help='input data (for batch only')

    
    known_args, pipeline_args = parser.parse_known_args(argv)
    
    pipeline_args.extend([
          '--runner=' + str(known_args.runner) # Change this to DataflowRunner to run your pipeline on the Google Cloud Dataflow Service.
          ,'--project=' + str(known_args.project_id) # Your project ID is required in order to run your pipeline on the Google Cloud Dataflow Service.
          ,'--staging_location=gs://mg-ce-demos-bucket/water_data_stream_demo/temp' # Your Google Cloud Storage path is required for staging local files.
          ,'--temp_location=gs://mg-ce-demos-bucket/water_data_stream_demo/tmp' # Your Google Cloud Storage path is required for temporary files.
          ,'--job_name=' + str(known_args.job_name) # Set project unique job name
          ,'--region=' + str(known_args.region) # Set region if using DataflowRunner
      ])
    
    pipeline_options = PipelineOptions(pipeline_args)
    pipeline_options.view_as(SetupOptions).save_main_session = True
    #pipeline_options.view_as(StandardOptions).streaming = True  # set to True if stream (remove if batch)
    
    with beam.Pipeline(options=pipeline_options) as p:
        
        # Read the JSON files in GCS into a PCollection.
        events = ( p | beam.io.ReadFromText(known_args.input_data) )  #change to read files from GCS

        # Tranform events
        transformed = (events | beam.Map(parse_json))

        # Persist to BigQuery
        transformed | 'Write' >> beam.io.WriteToBigQuery(
                            table=known_args.table_name,
                            dataset=known_args.dataset_name,
                            project=known_args.project_id,
                            schema=SCHEMA,
                            create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED,
                            write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND
                            )

CPU times: user 5 µs, sys: 0 ns, total: 5 µs
Wall time: 7.87 µs


In [4]:
%%time
#Execute
if __name__ == '__main__':
    logging.getLogger().setLevel(logging.INFO)
    run_batch()

INFO:apache_beam.internal.gcp.auth:Setting socket default timeout to 60 seconds.
INFO:apache_beam.internal.gcp.auth:socket default timeout is 60.0 seconds.
INFO:apache_beam.io.gcp.gcsio:Starting the size estimation of the input
INFO:oauth2client.transport:Attempting refresh to obtain initial access_token
INFO:oauth2client.client:Refreshing access_token
INFO:apache_beam.io.gcp.gcsio:Finished listing 28 files in 0.28203511238098145 seconds.


  is_streaming_pipeline = p.options.view_as(StandardOptions).streaming
  temp_location = p.options.view_as(GoogleCloudOptions).temp_location
INFO:apache_beam.runners.portability.stager:Downloading source distribution of the SDK from PyPi
INFO:apache_beam.runners.portability.stager:Executing command: ['/Users/mikegoodman/Documents/developer/venv/bin/python', '-m', 'pip', 'download', '--dest', '/var/folders/q8/1gf829m141157nbz7kc4_19h00tb0q/T/tmpht3fz2_u', 'apache-beam==2.35.0', '--no-deps', '--no-binary', ':all:']
You should consider upgrading via the '/Users/mikegoodman/Documents/developer/venv/bin/python -m pip install --upgrade pip' command.
INFO:apache_beam.runners.portability.stager:Staging SDK sources from PyPI: dataflow_python_sdk.tar
INFO:apache_beam.runners.portability.stager:Downloading binary distribution of the SDK from PyPi
INFO:apache_beam.runners.portability.stager:Executing command: ['/Users/mikegoodman/Documents/developer/venv/bin/python', '-m', 'pip', 'download', '--de

Exception: Dataflow runner currently supports Python versions ['3.6', '3.7', '3.8'], got 3.9.6 (v3.9.6:db3ff76da1, Jun 28 2021, 11:49:53) 
[Clang 6.0 (clang-600.0.57)].
To ignore this requirement and start a job using an unsupported version of Python interpreter, pass --experiment use_unsupported_python_version pipeline option.

# Dataflow Stream - Pub/Sub Topic to BQ

In [None]:
%%time
# function(s)

def parse_json(line):
    import json
    record = json.loads(line)
    
    return record

def run_stream(argv=None):
    """Build and run the pipeline."""
    from datetime import datetime
    # datetime object containing current date and time
    now = datetime.now()
    # dd_mm_YY_H_M_S_ssssss - in this format so it can be used to create a unique BQ table
    dt_string = now.strftime("%m_%d_%Y_%H_%M_%S_%f")
    dt_string
    
    SCHEMA = 'eventId:STRING,deviceId:STRING,eventTime:DATETIME,city:STRING,temp:FLOAT,flowrate:FLOAT'  # Simple BQ Schema
    
    parser = argparse.ArgumentParser()
    parser.add_argument('--runner', required=False, default='DataflowRunner', help='Dataflow Runner - DataflowRunner or DirectRunner (local)')
    parser.add_argument('--job_name', required=False, default='mgwaterdemostream', help='Dataflow Job Name')
    parser.add_argument('--batch_size', required=False, default='100', help='Dataflow Batch Size')
    parser.add_argument('--input_topic', required=False, default='projects/mg-ce-demos/topics/smart-water', help='projects/<project_id>/topics/<topic_name>')
    parser.add_argument('--project_id', required=False, default='mg-ce-demos', help='GCP Project ID')
    parser.add_argument('--region', required=False, default='us-central1', help='GCP region for execution')
    parser.add_argument('--dataset_name', required=False, default='smart_water_demo', help='Output BigQuery Dataset') 
    parser.add_argument('--table_name', required=False, default='smart_water_demo_data_stream_'+str(dt_string), help='Output BigQuery Table')
    parser.add_argument('--input_data', required=False, default='gs://mg-ce-demos-bucket/water_data_stream_demo/output/*.json', help='input data (for batch only')

    
    known_args, pipeline_args = parser.parse_known_args(argv)
    
    pipeline_args.extend([
          '--runner=' + str(known_args.runner) # Change this to DataflowRunner to run your pipeline on the Google Cloud Dataflow Service.
          ,'--project=' + str(known_args.project_id) # Your project ID is required in order to run your pipeline on the Google Cloud Dataflow Service.
          ,'--staging_location=gs://mg-ce-demos-bucket/water_data_stream_demo/temp' # Your Google Cloud Storage path is required for staging local files.
          ,'--temp_location=gs://mg-ce-demos-bucket/water_data_stream_demo/tmp' # Your Google Cloud Storage path is required for temporary files.
          ,'--job_name=' + str(known_args.job_name) # Set project unique job name
          ,'--region=' + str(known_args.region) # Set region if using DataflowRunner
      ])
    
    pipeline_options = PipelineOptions(pipeline_args)
    pipeline_options.view_as(SetupOptions).save_main_session = True
    pipeline_options.view_as(StandardOptions).streaming = True  # set to True if stream (remove if batch)
    
    with beam.Pipeline(options=pipeline_options) as p:
        
        # Read the pubsub topic into a PCollection.
        events = ( p | beam.io.ReadStringsFromPubSub(known_args.input_topic) )

        # Tranform events
        transformed = (events | beam.Map(parse_json))

        # Persist to BigQuery
        transformed | 'Write' >> beam.io.WriteToBigQuery(
                            table=known_args.table_name,
                            dataset=known_args.dataset_name,
                            project=known_args.project_id,
                            schema=SCHEMA,
                            create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED,
                            write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND,
                            batch_size=int(known_args.batch_size)
                            )

In [None]:
%%time
#Execute
if __name__ == '__main__':
    logging.getLogger().setLevel(logging.INFO)
    run_stream()

# Extras

In [None]:
# another way to do BQ schema - more details
'''
    table_schema = bigquery.TableSchema()
    
    # Fields that use standard types.
    eventId_schema = bigquery.TableFieldSchema()
    eventId_schema.name = 'eventId'
    eventId_schema.type = 'string'
    eventId_schema.mode = 'nullable'
    table_schema.fields.append(eventId_schema)
    
    deviceId_schema = bigquery.TableFieldSchema()
    deviceId_schema.name = 'deviceId'
    deviceId_schema.type = 'string'
    deviceId_schema.mode = 'required'
    table_schema.fields.append(deviceId_schema)
    
    eventTime = bigquery.TableFieldSchema()
    eventTime.name = 'eventTime'
    eventTime.type = 'datetime'
    eventTime.mode = 'nullable'
    table_schema.fields.append(eventTime)
    
    city_schema = bigquery.TableFieldSchema()
    city_schema.name = 'city'
    city_schema.type = 'string'
    city_schema.mode = 'nullable'
    table_schema.fields.append(city_schema)
    
    temp_schema = bigquery.TableFieldSchema()
    temp_schema.name = 'temp'
    temp_schema.type = 'float'
    temp_schema.mode = 'nullable'
    table_schema.fields.append(temp_schema)
    
    flowrate_schema = bigquery.TableFieldSchema()
    flowrate_schema.name = 'flowrate'
    flowrate_schema.type = 'float'
    flowrate_schema.mode = 'nullable'
    table_schema.fields.append(flowrate_schema)
    '''