In [None]:
import logging
import argparse
import apache_beam as beam
import apache_beam.transforms.window as window
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.options.pipeline_options import StandardOptions
from apache_beam.options.pipeline_options import SetupOptions
from apache_beam.io import ReadFromText
from apache_beam.io import WriteToText

import google.auth

# Dataflow Batch - GCS Text Files to BQ

In [None]:
%%time
# function(s)

def parse_json(line):
    import json
    record = json.loads(line)
    
    return record

def run_batch(argv=None):
    """Build and run the pipeline."""
    from datetime import datetime
    # datetime object containing current date and time
    now = datetime.now()
    # dd_mm_YY_H_M_S_ssssss
    dt_string = now.strftime("%m_%d_%Y_%H_%M_%S_%f")
    dt_string

    SCHEMA = 'eventId:STRING,deviceId:STRING,eventTime:DATETIME,city:STRING,temp:FLOAT,flowrate:FLOAT'
    
    parser = argparse.ArgumentParser()
    parser.add_argument('--runner',           required=False, default='DataflowRunner',            help='Dataflow Runner - DataflowRunner or DirectRunner (local)')
    parser.add_argument('--job_name',         required=False, default='mgwaterdemobatch_'+str(dt_string),             help='Dataflow Job Name')
    parser.add_argument('--project_id',       required=False, default='mg-ce-demos',             help='GCP Project ID')
    parser.add_argument('--region',           required=False, default='us-central1',             help='GCP region for execution')
    parser.add_argument('--dataset_name',     required=False, default='smart_water_demo',        help='Output BigQuery Dataset') 
    parser.add_argument('--table_name',       required=False, default='smart_water_demo_data_'+str(dt_string),   help='Output BigQuery Table')
    parser.add_argument('--input_data',       required=False, default='gs://mg-ce-demos-bucket/water_data_stream_demo/output/*.json', help='GCS Bucket and file suffix for input data')

    
    known_args, pipeline_args = parser.parse_known_args(argv)
    
    pipeline_args.extend([
          # CHANGE 2/7: (OPTIONAL) Change this to DataflowRunner to
          # run your pipeline on the Google Cloud Dataflow Service. DirectRunner (local)
          #'--runner=DirectRunner',
          '--runner=' + str(known_args.runner)
          # CHANGE 3/7: Your project ID is required in order to run your pipeline on
          # the Google Cloud Dataflow Service.
          #'--project=mg-ce-demos',
          ,'--project=' + str(known_args.project_id)
          # CHANGE 4/7: Your Google Cloud Storage path is required for staging local
          # files.
          ,'--staging_location=gs://mg-ce-demos-bucket/water_data_stream_demo/temp'
          # CHANGE 5/7: Your Google Cloud Storage path is required for temporary
          # files.
          ,'--temp_location=gs://mg-ce-demos-bucket/water_data_stream_demo/tmp'
          # CHANGE 6/7: '--job_name=mgwaterdemo1'
          ,'--job_name=' + str(known_args.job_name) + str(dt_string)
          # CHANGE 7/7: (OPTIONAL) Set region if using DataflowRunner
          ,'--region=' + str(known_args.region)
      ])
    
    pipeline_options = PipelineOptions(pipeline_args)
    pipeline_options.view_as(SetupOptions).save_main_session = True
    #pipeline_options.view_as(StandardOptions).streaming = True  #remove if batch
    
    with beam.Pipeline(options=pipeline_options) as p:
        
        # Read the JSON files in GCS into a PCollection.
        events = ( p | beam.io.ReadFromText(known_args.input_data) )  #change to read files from GCS

        # Tranform events
        transformed = (events | beam.Map(parse_json))

        # Persist to BigQuery
        transformed | 'Write' >> beam.io.WriteToBigQuery(
                            table=known_args.table_name,
                            dataset=known_args.dataset_name,
                            project=known_args.project_id,
                            schema=SCHEMA,
                            create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED,
                            write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND
                            )

In [None]:
%%time
#Execute
if __name__ == '__main__':
    logging.getLogger().setLevel(logging.INFO)
    run_batch()

# Dataflow Stream - Pub/Sub Topic to BQ

In [None]:
%%time
# function(s)

def parse_json(line):
    import json
    record = json.loads(line)
    
    return record

def run_stream(argv=None):
    """Build and run the pipeline."""
    from datetime import datetime
    # datetime object containing current date and time
    now = datetime.now()
    # dd_mm_YY_H_M_S_ssssss
    dt_string = now.strftime("%m_%d_%Y_%H_%M_%S_%f")
    dt_string
    
    SCHEMA = 'eventId:STRING,deviceId:STRING,eventTime:DATETIME,city:STRING,temp:FLOAT,flowrate:FLOAT'  # Simple BQ Schema
    
    parser = argparse.ArgumentParser()
    parser.add_argument('--runner',           required=False, default='DataflowRunner',          help='Dataflow Runner - DataflowRunner or DirectRunner (local)')
    parser.add_argument('--job_name',         required=False, default='mgwaterdemostream_'+str(dt_string),            help='Dataflow Job Name')
    parser.add_argument('--batch_size',       required=False, default='100',                     help='Dataflow Batch Size')
    parser.add_argument('--input_topic',      required=False, default='projects/mg-ce-demos/topics/smart-water',             help='Input PubSub Topic: projects/<project_id>/topics/<topic_name>')
    parser.add_argument('--project_id',       required=False, default='mg-ce-demos',             help='GCP Project ID')
    parser.add_argument('--region',           required=False, default='us-central1',             help='GCP region for execution')
    parser.add_argument('--dataset_name',     required=False, default='smart_water_demo',        help='Output BigQuery Dataset') 
    parser.add_argument('--table_name',       required=False, default='smart_water_demo_data_stream_'+str(dt_string),   help='Output BigQuery Table')
    parser.add_argument('--input_data',       required=False, default='gs://mg-ce-demos-bucket/water_data_stream_demo/output/*.json', help='GCS Bucket and file suffix for input data')

    
    known_args, pipeline_args = parser.parse_known_args(argv)
    
    pipeline_args.extend([
          # CHANGE 2/7: (OPTIONAL) Change this to DataflowRunner to
          # run your pipeline on the Google Cloud Dataflow Service. DirectRunner (local)
          #'--runner=DirectRunner',
          '--runner=' + str(known_args.runner)
          # CHANGE 3/7: Your project ID is required in order to run your pipeline on
          # the Google Cloud Dataflow Service.
          #'--project=mg-ce-demos',
          ,'--project=' + str(known_args.project_id)
          # CHANGE 4/7: Your Google Cloud Storage path is required for staging local
          # files.
          ,'--staging_location=gs://mg-ce-demos-bucket/water_data_stream_demo/temp'
          # CHANGE 5/7: Your Google Cloud Storage path is required for temporary
          # files.
          ,'--temp_location=gs://mg-ce-demos-bucket/water_data_stream_demo/tmp'
          # CHANGE 6/7: '--job_name=mgwaterdemo1'
          ,'--job_name=' + str(known_args.job_name)
          # CHANGE 7/7: (OPTIONAL) Set region if using DataflowRunner
          ,'--region=' + str(known_args.region)
      ])
    
    pipeline_options = PipelineOptions(pipeline_args)
    pipeline_options.view_as(SetupOptions).save_main_session = True
    pipeline_options.view_as(StandardOptions).streaming = True  #remove if batch
    
    with beam.Pipeline(options=pipeline_options) as p:
        
        # Read the pubsub topic into a PCollection.
        events = ( p | beam.io.ReadStringsFromPubSub(known_args.input_topic) )

        # Tranform events
        transformed = (events | beam.Map(parse_json))

        # Persist to BigQuery
        transformed | 'Write' >> beam.io.WriteToBigQuery(
                            table=known_args.table_name,
                            dataset=known_args.dataset_name,
                            project=known_args.project_id,
                            schema=SCHEMA,
                            create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED,
                            write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND,
                            batch_size=int(known_args.batch_size)
                            )

In [None]:
%%time
#Execute
if __name__ == '__main__':
    logging.getLogger().setLevel(logging.INFO)
    run_stream()

# Extras

In [None]:
# another way to do BQ schema - more details
'''
    table_schema = bigquery.TableSchema()
    
    # Fields that use standard types.
    eventId_schema = bigquery.TableFieldSchema()
    eventId_schema.name = 'eventId'
    eventId_schema.type = 'string'
    eventId_schema.mode = 'nullable'
    table_schema.fields.append(eventId_schema)
    
    deviceId_schema = bigquery.TableFieldSchema()
    deviceId_schema.name = 'deviceId'
    deviceId_schema.type = 'string'
    deviceId_schema.mode = 'required'
    table_schema.fields.append(deviceId_schema)
    
    eventTime = bigquery.TableFieldSchema()
    eventTime.name = 'eventTime'
    eventTime.type = 'datetime'
    eventTime.mode = 'nullable'
    table_schema.fields.append(eventTime)
    
    city_schema = bigquery.TableFieldSchema()
    city_schema.name = 'city'
    city_schema.type = 'string'
    city_schema.mode = 'nullable'
    table_schema.fields.append(city_schema)
    
    temp_schema = bigquery.TableFieldSchema()
    temp_schema.name = 'temp'
    temp_schema.type = 'float'
    temp_schema.mode = 'nullable'
    table_schema.fields.append(temp_schema)
    
    flowrate_schema = bigquery.TableFieldSchema()
    flowrate_schema.name = 'flowrate'
    flowrate_schema.type = 'float'
    flowrate_schema.mode = 'nullable'
    table_schema.fields.append(flowrate_schema)
    '''

In [None]:
from datetime import datetime
# datetime object containing current date and time
now = datetime.now()
# dd_mm_YY_H_M_S_ssssss
dt_string = now.strftime("%m_%d_%Y_%H_%M_%S_%f")
dt_string