# Dataflow

## Setup IAM and networking for Dataflow jobs

### Create a Cloud Storage bucket

In [None]:
gcloud auth list
gcloud config list project

PROJECT=`gcloud config list --format 'value(core.project)'`
USER_EMAIL=`gcloud config list account --format 'value(core.account)'`
REGION=us-central1
gsutil mb -p $PROJECT -b on gs://$PROJECT

### Create a virtual environment

In [None]:
## Create and activate virtual environment
sudo apt-get install -y python3-venv
python3 -m venv df-env
source df-env/bin/activate

python3 -m pip install -q --upgrade pip setuptools wheel
python3 -m pip install apache-beam[gcp]

# Dataflow API is enabled.
gcloud services enable dataflow.googleapis.com

### Launch a Dataflow job

In [None]:
gcloud projects get-iam-policy $PROJECT \
--format='table(bindings.role)' \
--flatten='bindings[].members' \
--filter='bindings.members:$USER_EMAIL'

gcloud projects add-iam-policy-binding $PROJECT \
--member=user:$USER_EMAIL \
--role=roles/dataflow.admin

python3 -m apache_beam.examples.wordcount \
--input=gs://dataflow-samples/shakespeare/kinglear.txt \
--output=gs://$PROJECT/results/outputs \
--runner=DataflowRunner \
--project=$PROJECT \
--temp_location=gs://$PROJECT/tmp/ \
--region=$REGION

### Launch in private IPs

In [None]:
gcloud projects add-iam-policy-binding $PROJECT \
--member=user:$USER_EMAIL \
--role=roles/compute.networkAdmin


gcloud compute networks subnets update default \
--region=$REGION \
--enable-private-ip-google-access


python3 -m apache_beam.examples.wordcount \
--input=gs://dataflow-samples/shakespeare/kinglear.txt \
--output=gs://$PROJECT/results/outputs \
--runner=DataflowRunner \
--project=$PROJECT \
--temp_location=gs://$PROJECT/tmp/ \
--region=$REGION \
--no_use_public_ips \
--network default

## Extract-Transform-Load

### Pipeline

In [None]:
import argparse
import time
import logging
import json
import apache_beam as beam
from apache_beam.options.pipeline_options import GoogleCloudOptions
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.options.pipeline_options import StandardOptions
from apache_beam.runners import DataflowRunner, DirectRunner

# ### main

def run():
    # Command line arguments
    parser = argparse.ArgumentParser(description='Load from Json into BigQuery')
    parser.add_argument('--project',required=True, help='Specify Google Cloud project')
    parser.add_argument('--region', required=True, help='Specify Google Cloud region')
    parser.add_argument('--stagingLocation', required=True, help='Specify Cloud Storage bucket for staging')
    parser.add_argument('--tempLocation', required=True, help='Specify Cloud Storage bucket for temp')
    parser.add_argument('--runner', required=True, help='Specify Apache Beam Runner')

    opts = parser.parse_args()

    # Setting up the Beam pipeline options
    options = PipelineOptions()
    options.view_as(GoogleCloudOptions).project = opts.project
    options.view_as(GoogleCloudOptions).region = opts.region
    options.view_as(GoogleCloudOptions).staging_location = opts.stagingLocation
    options.view_as(GoogleCloudOptions).temp_location = opts.tempLocation
    options.view_as(GoogleCloudOptions).job_name = '{0}{1}'.format('my-pipeline-',time.time_ns())
    options.view_as(StandardOptions).runner = opts.runner

    # Static input and output
    input = 'gs://{0}/events.json'.format(opts.project)
    output = '{0}:logs.logs'.format(opts.project)

    # Table schema for BigQuery
    table_schema = {
        "fields": [
            {
                "name": "ip",
                "type": "STRING"
            },
            {
                "name": "user_id",
                "type": "STRING"
            },
            {
                "name": "lat",
                "type": "FLOAT"
            },
            {
                "name": "lng",
                "type": "FLOAT"
            },
            {
                "name": "timestamp",
                "type": "STRING"
            },
            {
                "name": "http_request",
                "type": "STRING"
            },
            {
                "name": "http_response",
                "type": "INTEGER"
            },
            {
                "name": "num_bytes",
                "type": "INTEGER"
            },
            {
                "name": "user_agent",
                "type": "STRING"
            }
        ]
    }

    # Create the pipeline
    p = beam.Pipeline(options=options)

    '''
    Steps:
    1) Read something
    2) Transform something
    3) Write something
    '''

    (p
        | 'ReadFromGCS' >> beam.io.ReadFromText(input)
        | 'ParseJson' >> beam.Map(lambda line: json.loads(line))
        | 'WriteToBQ' >> beam.io.WriteToBigQuery(
            output,
            schema=table_schema,
            create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED,
            write_disposition=beam.io.BigQueryDisposition.WRITE_TRUNCATE
            )
    )

    logging.getLogger().setLevel(logging.INFO)
    logging.info("Building pipeline ...")

    p.run()

if __name__ == '__main__':
    run()

In [None]:
# Set up environment variables
export PROJECT_ID=$(gcloud config get-value project)
# Run the pipeline
python3 my_pipeline.py \
--project=${PROJECT_ID} \
--region=us-central1 \
--stagingLocation=gs://$PROJECT_ID/staging/ \
--tempLocation=gs://$PROJECT_ID/temp/ \
--runner=DataflowRunner

In [None]:
# Build a full JSON object
bq show --schema --format=prettyjson logs.logs | sed '1s/^/{"BigQuery Schema":/' | sed '$s/$/}/' > schema.json
cat schema.json
export PROJECT_ID=$(gcloud config get-value project)
gsutil cp schema.json gs://${PROJECT_ID}/

## Sources and Sinks

### Text IO & File IO

In [None]:
# Text IO reading
pcoll = (pipeline
    | 'Create' >> Create([file_name])
    | 'ReadAll' >> ReadAllFromText()
)

pcoll = pipeline | 'Read' >> ReadFromText(file_name)

# File IO reading with filenames
with beam.Pipeline() as p:
    readable_files = (p
        | fileio.MatchFiles('hdfs://path/to/*.txt') # Match file patter
        | fileio.ReadMatches()
        | beam.Reshuffle()
    )
    file_and_contents = (readable_files
        | beam.Map(lambda x: (x.metadata.path, x.read_utf8())) # Access file metadata
    )
    
# File IO processing files as they arrive
with beam.Pipeline() as p:
    readable_files = (p
        | beam.io.ReadFromPubSub(...) # Parse PubSub message and yield filename
    )
    files_and_contents = (readable_files
        | ReadAllFromText() # Used parsed filename to read 
    )
    
# Text IO writing
transformed_data | "write" >> WriteToText(know_args.output, coder=JsonCoder())

# Text IO writing with dynamic destinations
pcoll | beam.io.fileio.WriteToFiles(
    path='/path',
    destination=lambda record: 
        'avro' if record['type']=='A' else 'csv', # Dynamic destination
    sink=lambda dest: 
        AvroSink() if dest=='avro' else CsvSink(), # Write dynamic sink
        file_naming=beam.io.fileio.destination_prefix_naming()
)

### BigQuery IO with BigQuery Storage API

In [None]:
# BigQuery IO reading with query
pcoll = (p
    | 'QueryTableStdSQL' >> beam.io.ReadFromBigQuery(
        query='SELECT max_temperature '\
            'FROM `project.dataset.table`',
        use_standard_sql=True
    ) # Map results
    | beam.Map(lambda elem: elem['max_temperature']) # Source using query
)

# BigQuery IO writing with dynamic destinations
def table_fn(element, fictional_characters):
    if element in fictional_characters:
        return 'dataset.fictional_quotes'
    else:
        return 'dataset.real_quotes'
    
quotes | "WriteWithDynamicDestination" >> beam.io.WriteToBigQuery(
    table_fn,
    schema=table_schema, # Schema destination
    table_side_inputs=(fictional_characters_view)
)

## PubSub IO 

In [None]:
# PubSub IO reading
class GroupWindowsIntoBatches(beam.PTransform):
    return (pcoll
        | beam.WindowInto(window.FixedWindows(self.window_size))
    )

pipeline 
    | "Read PubSub Message" >> beam.io.ReadFromPubSub(topic=input_topic)
    | "Window into" >> GroupWindowIntoBatches(window_size)

## Kafka IO

In [None]:
# Kafka IO reading
pipeline
    | ReadFromKafka(
        consumer_config={'bootstrap.servers': bootstrap_servers},
        topic=[topic]
    )

## Avro IO

In [None]:
# Avro IO reading multiple files
with beam.Pipeline() as p:
    records = p | "Read" >> beam.io.ReadFromAvro('/avrofiles*')

## Splittable DoFn

In [None]:
# Splittable DoFn custome source
class FileToWordsRestrictionProvider(beam.io.RestrictionProvider):
    def initial_restriction(self, file_name): # Initial restriction
        return OffsetRange(0, os.stat(file_name).st_size)
    
    # Tracking subset of restriction completed
    def create_tracker(self, restriction):
        return beam.io.restriction_trackers.OffsetRestrictionTracker()
    
class FileToWordsFn(beam.DoFn):
    def process(self, ...=FileToWordsRestrictionProvider()):

## Beam Notebooks

Include the `interactive_runner` and `interactive_beam` modules in notebook.

In [None]:
import apache_beam as beam
from apache_beam.runners.interactive.interactive_runner import InteractiveRunner
import apache_beam.runners.interactive.interactive_beam as ib

In [None]:
# Set the recording duration to 10 min
ib.options.recording_duration = '10m'

# Set the recording size limit to 1 GB
ib.options.recording_size_limit = 1e9

In [None]:
words = p | "read" >> beam.io.ReadFromPubSub(topic=topic)

windowed_words = (words | "window" >> beam.WindowInto(beam.window.FixedWindows(10)))

windowed_words_counts = (windowed_words | "count" >> beam.combiners.Count.PerElement())

In [None]:
# Materializes the resulting PCollection in a table
ib.show(windowed_word_counts, include_window_info=True)

# Load the output in a Pandas DataFrame
ib.collect(windowed_word_counts, include_window_info=True)

# Visualize the data in the Notebook
ib.show(windowed_word_counts, include_window_info=True, visualize_data=True)

In [None]:
# Import the production Dataflow runner
from apache_beam.runners import DataflowRunner

# Set up Apache Beam pipeline options
options = pipeline_options.PipelineOptions()

# Run the pipeline
runner = DataflowRunner()
runner.run_pipeline(p, options=options)

In [None]:
storeSales = p | beam.io.ReadFromText("purchases-store")
               | beam.Map(lambda s: ...)

onlineSales = p | beam.io.ReadFromText("purchase-online")
                | beam.Map(lambda s: ...)
    
topSales = (storeSales, onlineSales)
                | beam.Flatten()
                | beam.Combiners.Count.perKey()
                | beam.Combiners.Top.of(10, key=lambda x: x[1])
            
topSales        | beam.io.WriteToBigQuery(topSales)

## Troubleshooting & Debug

### Adding exception handlers

In [None]:
class FilterMessagesFn(beam.DoFn):
    BAD_MESSAGE_TAG = 'bad_message'
    GOOD_MESSAGE_TAG = 'good_message'

    def process(self, element, window=beam.DoFn.WindowParam):
        try:
            data = element.decode()
            # tag the element accordingly
            if 'bad' in data:
                yield pvalue.TaggedOutput(self.BAD_MESSAGE_TAG, element)
            else:
                yield pvalue.TaggedOutput(self.GOOD_MESSAGE_TAG, element)
                
        # handle any exceptions in the processing
        except Exception as exp:
            logging.getLogger.warning(exp)
            yield pvalue.TaggedOutput(self.BAD_MESSAGE_TAG, element)

In [None]:
# my_pipeline.py
import argparse
import logging
import argparse, logging, os
import apache_beam as beam
from apache_beam.io import WriteToText
from apache_beam.options.pipeline_options import PipelineOptions

class ReadGBK(beam.DoFn):
    def process(self, e):
        k, elems = e
        for v in elems:
            logging.info(f"the element is {v}")
            yield v
            
def run(argv=None):
    parser = argparse.ArgumentParser()
    parser.add_argument(
        '--output', dest='output', help='Output file to write results to.')
    known_args, pipeline_args = parser.parse_known_args(argv)
    read_query = """(
                 SELECT
                   version,
                   block_hash,
                   block_number
                 FROM
                   `bigquery-public-data.crypto_bitcoin.transactions`
                 WHERE
                   version = 1
                 LIMIT
                   1000000 )
               UNION ALL (
                 SELECT
                   version,
                   block_hash,
                   block_number
                 FROM
                   `bigquery-public-data.crypto_bitcoin.transactions`
                 WHERE
                   version = 2
                 LIMIT
                   1000 ) ;"""
    
    p = beam.Pipeline(options=PipelineOptions(pipeline_args))
    (p
    | 'Read from BigQuery' >> beam.io.ReadFromBigQuery(
        query=read_query, use_standard_sql=True)
    | "Add Hotkey" >> beam.Map(lambda elem: (elem["version"], elem))
    | "Groupby" >> beam.GroupByKey()
    | 'Print' >>  beam.ParDo(ReadGBK())
    | 'Sink' >>  WriteToText(known_args.output))
    result = p.run()

if __name__ == '__main__':
    logger = logging.getLogger().setLevel(logging.INFO)
    run()

In [None]:
# Create a storage bucket
export PROJECT_ID=$(gcloud config get-value project)
gsutil mb -l US gs://$PROJECT_ID

# Attempt to launch the pipeline
# Launch the pipeline
python3 my_pipeline.py \
  --project=${PROJECT_ID} \
  --region=us-central1 \
  --output=gs://$PROJECT_ID/results/prefix \
  --tempLocation=gs://$PROJECT_ID/temp/ \
  --max_num_workers=5 \
  --runner=DataflowRunner

## Performance

### Graph Optimization

In [None]:
# Reshuffle after ParDo
_ = pcoll | beam.Reshuffle()

# Side input
_ = pcoll | beam.FlatMap(cross_join, rights=beam.pvalue.AsIter(side_input))

### Disaster Recovery

In [None]:
# Make a snapshot of a subscription
gcloud pubsub snapshots create my-snapshot \
--subscription=my-sub

# Stop and drain Dataflow job
gcloud dataflow jobs drain [job-id]

# Seek subscription to the snapshot
gcloud pubsub subscriptions seek my-sub --snapshot=my-snapshot

# Resubmit the pipeline
gcloud dataflow jobs run my-job-name \
--gcs_location=my_gcs_bucket

## CI/CD Testing

Introduce frameworks and features available to streamline CI/CD workflow for Dataflow pipelines.

### Unit Testing

Performing unit tests for DoFns and PTransforms.
`TestPipeline` is a special class included in the Beam SDK specifically for testing transforms and pipeline logic. Use the `assert_that` method to check that the output PCollection matches the expected output, and `equal_to` to verify that the output PCollection has the correct elements.

In [None]:
# Python PAssert
from apache_beam.testing.util import assert_that
from apache_beam.testing.util import equal_to

# Python TestPipeline
with TestPipeline as p:
    INPUTS = [fake_input_1, fake_input_2]
    test_output = p 
        | beam.Create(INPUTS) # Transforms to be tested
    # Check whether a PCollection contains some elements in any order.
        | assert_that(test_output, equal_to(EXPECTED_OUTPUTS))

### In-flight Actions

Update

In [None]:
python -m apache_beam.examples.wordcount \
--project $PROJECT \
--staging_location=gs://$BUCKET/tmp/ \
--input=gs://dataflow-samples/shakespeare/kinglear.txt \
--output=gs://$BUCKET/results/outputs \
--runner=DataflowRunner \
--update \
--job_name=[prior job name] \
--transform_name_mapping=='{"oldTransform1":"newTransform1", ...}' \
--region=$REGION

### Grant the `dataflow.worker` role to the Compute Engine default service account

In [None]:
PROJECT_ID=$(gcloud config get-value project)
export PROJECT_NUMBER=$(gcloud projects list --filter="$PROJECT_ID" 
    --format="value(PROJECT_NUMBER)")
export serviceAccount=""$PROJECT_NUMBER"-compute@developer.gserviceaccount.com"
gcloud projects add-iam-policy-binding $PROJECT_ID \
--member="serviceAccount:${serviceAccount}" \
--role="roles/dataflow.worker"

export PROJECT_ID=$(gcloud config get-value project)
gsutil mb -l US gs://$PROJECT_ID
gsutil cp testing.out gs://$PROJECT_ID/8a_Batch_Testing_Pipeline/

### Performing unit tests for DoFns and PTransforms for a batch pipeline

- Create a `TestPipeline`.
- Create some test input data and use the `Create` transform to create a `PCollection` of your input data.
- Apply your transform to the input `PCollection` and save the resulting `PCollection`.
- Use the `assert_that` method from the `testing.util` module and its other methods to verify that the output `PCollection` contains the elements that you expect.

In [None]:
# weather_statistics_pipeline.py
import json
import typing
import logging
import apache_beam as beam

class WeatherRecord(typing.NamedTuple):
    loc_id: str
    lat: float
    lng: float
    date: str
    low_temp: float
    high_temp: float
    precip: float

beam.coders.registry.register_coder(WeatherRecord, beam.coders.RowCoder)

class ConvertCsvToWeatherRecord(beam.DoFn):

    def process(self, line):
        fields = 'loc_id,lat,lng,date,low_temp,high_temp,precip'.split(',')
        values = line.split(',')
        row = dict(zip(fields,values))
        for num_field in ('lat', 'lng', 'low_temp', 'high_temp', 'precip'):
            row[num_field] = float(row[num_field])
        yield WeatherRecord(**row)

class ConvertTempUnits(beam.DoFn):

    def process(self, row):
        row_dict = row._asdict()
        for field in ('low_temp', 'high_temp'):
            row_dict[field] = row_dict[field] * 1.8 + 32.0
        yield WeatherRecord(**row_dict)

class ConvertToJson(beam.DoFn):

    def process(self, row):
        line = json.dumps(row._asdict())
        yield line

class ComputeStatistics(beam.PTransform):

    def expand(self, pcoll):
    
        results = (
            pcoll | 'ComputeStatistics' >> beam.GroupBy('loc_id')
                                        .aggregate_field('low_temp', min, 'record_low')
                                        .aggregate_field('high_temp', max, 'record_high')
                                        .aggregate_field('precip', sum, 'total_precip')
                | 'ToJson' >> beam.ParDo(ConvertToJson())
        )
        
        return results

class WeatherStats(beam.PTransform):

    def expand(self, pcoll):

        results = (
            pcoll | "ParseCSV" >> beam.ParDo(ConvertCsvToWeatherRecord())
                  | "ConvertToF" >> beam.ParDo(ConvertTempUnits())
                  | "ComputeStats" >> ComputeStatistics()
        )

        return results

def run():

    p = beam.Pipeline()

    (p | 'ReadCSV' >> beam.io.ReadFromText('./weather_data.csv')
       | 'ComputeStatistics' >> WeatherStats()
       | 'WriteJson' >> beam.io.WriteToText('./weather_stats', '.json')
    )

    logging.getLogger().setLevel(logging.INFO)
    logging.info("Building pipeline ...")

    p.run()

if __name__ == '__main__':
    run()

In [None]:
# weather_statistics_pipeline_test.py
import logging
import json
import unittest
import sys

from weather_statistics_pipeline import *
from apache_beam.testing.test_pipeline import TestPipeline
from apache_beam.testing.util import BeamAssertException
from apache_beam.testing.util import assert_that, equal_to

def main(out = sys.stderr, verbosity = 2):
    loader = unittest.TestLoader()
  
    suite = loader.loadTestsFromModule(sys.modules[__name__])
    unittest.TextTestRunner(out, verbosity = verbosity).run(suite)


class ConvertToWeatherRecordTest(unittest.TestCase):

    def test_convert_to_csv(self):

        with TestPipeline() as p:

            LINES = ['x,0.0,0.0,2/2/2021,1.0,2.0,0.1']
            EXPECTED_OUTPUT = [WeatherRecord('x', 0.0, 0.0, '2/2/2021', 1.0, 2.0, 0.1)]

            input_lines = p | beam.Create(LINES)

            output = input_lines | beam.ParDo(ConvertCsvToWeatherRecord())

            assert_that(output, equal_to(EXPECTED_OUTPUT))

class ConvertTempUnitsTest(unittest.TestCase):

    def test_convert_temp_units(self):

        with TestPipeline() as p:

            RECORDS = [WeatherRecord('x', 0.0, 0.0, '2/2/2021', 1.0, 2.0, 0.1),
                       WeatherRecord('y', 0.0, 0.0, '2/2/2021', -3.0, -1.0, 0.3)]

            EXPECTED_RECORDS = [WeatherRecord('x', 0.0, 0.0, '2/2/2021', 33.8, 35.6, 0.1),
                               WeatherRecord('y', 0.0, 0.0, '2/2/2021', 26.6, 30.2, 0.3)]

            input_records = p | beam.Create(RECORDS)

            output = input_records | beam.ParDo(ConvertTempUnits())
            
            assert_that(output, equal_to(EXPECTED_RECORDS))

class ComputeStatsTest(unittest.TestCase):
    
    def test_compute_statistics(self):

        with TestPipeline() as p:

            INPUT_RECORDS = [WeatherRecord('x', 0.0, 0.0, '2/2/2021', 33.8, 35.6, 0.1),
                             WeatherRecord('x', 0.0, 0.0, '2/3/2021', 41.6, 65.3, 0.2),
                             WeatherRecord('x', 0.0, 0.0, '2/4/2021', 45.3, 52.6, 0.2),
                             WeatherRecord('y', 0.0, 0.0, '2/2/2021', 12.8, 23.6, 0.1),
                             WeatherRecord('y', 0.0, 0.0, '2/3/2021', 26.6, 30.2, 0.3)]

            EXPECTED_STATS = [json.dumps({'loc_id': 'x', 'record_low': 33.8, 'record_high': 65.3, 'total_precip': 0.5 }),
                              json.dumps({'loc_id': 'y', 'record_low': 12.8, 'record_high': 30.2, 'total_precip': 0.4 })]

            inputs = p | beam.Create(INPUT_RECORDS)

            output = inputs | ComputeStatistics()

            assert_that(output, equal_to(EXPECTED_STATS))

class WeatherStatsTransformTest(unittest.TestCase):

    def test_weather_stats_transform(self):

        with TestPipeline() as p:

            INPUT_STRINGS = ["x,31.4,-39.2,2/2/21,4.0,7.5,0.1",
                             "x,31.4,-39.2,2/2/21,3.5,6.0,0.3",
                             "y,33.4,-49.2,2/2/21,12.5,17.5,0.5"]

            EXPECTED_STATS = [json.dumps({'loc_id': 'x', 'record_low': 38.3, 'record_high': 45.5, 'total_precip': 0.4 }),
                              json.dumps({'loc_id': 'y', 'record_low': 54.5, 'record_high': 63.5, 'total_precip': 0.5 })]

            inputs = p | beam.Create(INPUT_STRINGS)

            output = inputs | WeatherStats()

            assert_that(output, equal_to(EXPECTED_STATS))
            
if __name__ == '__main__':
    with open('testing.out', 'w') as f:
        main(f)

### Perform unit testing for a streaming pipeline

- Create a `TestPipeline`.
- Use the `TestStream` class to generate streaming data. This includes generating a series of events, advancing the watermark, and advancing the processing time.
- Use the `assert_that` method from the `testing.util` module and its other methods to verify that the output `PCollection` contains the elements that you expect.

In [None]:
# taxi_streaming_pipeline.py
import json
import typing
import logging
import apache_beam as beam
from apache_beam.transforms.trigger import AccumulationMode, AfterCount, AfterWatermark
from apache_beam.transforms.combiners import CountCombineFn
import argparse

class TaxiRide(typing.NamedTuple):
    ride_id: str
    point_idx: int
    latitude: float
    longitude: float
    timestamp: str
    meter_reading: float
    meter_increment: float
    ride_status: str
    passenger_count: int

beam.coders.registry.register_coder(TaxiRide, beam.coders.RowCoder)

class JsonToTaxiRide(beam.DoFn):

    def process(self, line):
        row = json.loads(line)
        yield TaxiRide(**row)

class ConvertCountToDict(beam.DoFn):

    def process(self, element, window=beam.DoFn.WindowParam):
        window_start = window.start.to_utc_datetime().strftime("%Y-%m-%dT%H:%M:%S")
        output = {"taxi_rides" : element, "timestamp": window_start}
        yield output


class TaxiCountTransform(beam.PTransform):

    def expand(self, pcoll):
        
        output = (pcoll
                    | "ParseJson" >> beam.ParDo(JsonToTaxiRide())
                    | "FilterForPickups" >> beam.Filter(lambda x : x.ride_status == 'pickup')
                    | "WindowByMinute" >> beam.WindowInto(beam.window.FixedWindows(60),
                                              trigger=AfterWatermark(late=AfterCount(1)),
                                              allowed_lateness=60,
                                              accumulation_mode=AccumulationMode.ACCUMULATING)
                    | "CountPerMinute" >> beam.CombineGlobally(CountCombineFn()).without_defaults()
                 )

        return output

def run():

    parser = argparse.ArgumentParser(description='Load from Json from Pub/Sub into BigQuery')

    parser.add_argument('--table_name', required=True, help='Output BQ table')

    opts = parser.parse_args()

    table_name = opts['table_name']

    table_schema = {
        "fields": [
            {
                "name": "taxi_rides",
                "type": "INTEGER"
            },
            {
                "name": "timestamp",
                "type": "STRING"
            },

        ]
    }

    p = beam.Pipeline()

    (p | "ReadFromPubSub" >> beam.io.ReadFromPubSub(topic="projects/pubsub-public-data/topics/taxirides-realtime") 
       | "TaxiPickupCount" >> TaxiCountTransform()
       | "ConvertToDict" >> beam.ParDo(ConvertCountToDict())
       | 'WriteAggToBQ' >> beam.io.WriteToBigQuery(
                table_name,
                schema=table_schema,
                create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED,
                write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND
                )
    )

if __name__ == '__main__':

    run()

In [None]:
# taxi_streaming_pipeline_test.py
import logging
import json
import unittest
import sys

import apache_beam as beam

from taxi_streaming_pipeline import *
from apache_beam.testing.test_pipeline import TestPipeline
from apache_beam.testing.util import BeamAssertException
from apache_beam.testing.util import assert_that, equal_to_per_window
from apache_beam.testing.test_stream import TestStream
from apache_beam.transforms.window import TimestampedValue, IntervalWindow
from apache_beam.options.pipeline_options import PipelineOptions, StandardOptions

def main(out = sys.stderr, verbosity = 2):
    loader = unittest.TestLoader()
  
    suite = loader.loadTestsFromModule(sys.modules[__name__])
    unittest.TextTestRunner(out, verbosity = verbosity).run(suite)


class TaxiWindowingTest(unittest.TestCase):

    def test_windowing_behavior(self):

        options = PipelineOptions()
        options.view_as(StandardOptions).streaming = True

        with TestPipeline(options=options) as p:

            base_json_pickup = "{\"ride_id\":\"x\",\"point_idx\":1,\"latitude\":0.0,\"longitude\":0.0," \
                         "\"timestamp\":\"00:00:00\",\"meter_reading\":1.0,\"meter_increment\":0.1," \
                         "\"ride_status\":\"pickup\",\"passenger_count\":1}" 

            base_json_enroute = "{\"ride_id\":\"x\",\"point_idx\":1,\"latitude\":0.0,\"longitude\":0.0," \
                         "\"timestamp\":\"00:00:00\",\"meter_reading\":1.0,\"meter_increment\":0.1," \
                         "\"ride_status\":\"pickup\",\"passenger_count\":1}" 
            

            test_stream = TestStream().advance_watermark_to(0).add_elements([
                TimestampedValue(base_json_pickup, 0),
                TimestampedValue(base_json_pickup, 0),
                TimestampedValue(base_json_enroute, 0),
                TimestampedValue(base_json_pickup, 60)
            ]).advance_watermark_to(60).advance_processing_time(60).add_elements([
                TimestampedValue(base_json_pickup, 120)
            ]).advance_watermark_to_infinity()

            taxi_counts = (p | test_stream
                             | TaxiCountTransform()
                          )

            EXPECTED_WINDOW_COUNTS = {IntervalWindow(0,60): [3],
                                      IntervalWindow(60,120): [1],
                                      IntervalWindow(120,180): [1]}

            assert_that(taxi_counts, equal_to_per_window(EXPECTED_WINDOW_COUNTS),
                        reify_windows=True)

class TaxiLateDataTest(unittest.TestCase):

        def test_late_data_behavior(self):

            options = PipelineOptions()
            options.view_as(StandardOptions).streaming = True

            with TestPipeline(options=options) as p:

                base_json_pickup = "{\"ride_id\":\"x\",\"point_idx\":1,\"latitude\":0.0,\"longitude\":0.0," \
                            "\"timestamp\":\"00:00:00\",\"meter_reading\":1.0,\"meter_increment\":0.1," \
                            "\"ride_status\":\"pickup\",\"passenger_count\":1}" 

                test_stream = TestStream().advance_watermark_to(0).add_elements([
                    TimestampedValue(base_json_pickup, 0),
                    TimestampedValue(base_json_pickup, 0),
                ]).advance_watermark_to(60).advance_processing_time(60).add_elements([
                    TimestampedValue(base_json_pickup, 0)
                ]).advance_watermark_to(300).advance_processing_time(240).add_elements([
                    TimestampedValue(base_json_pickup, 0)
                ])

                EXPECTED_RESULTS = {IntervalWindow(0,60): [2,3]}  #On Time and Late Result

                taxi_counts_late = (p | test_stream
                                      | TaxiCountTransform()
                                   )

                assert_that(taxi_counts_late, equal_to_per_window(EXPECTED_RESULTS),
                            reify_windows=True)

if __name__ == '__main__':
    with open('testing.out', 'w') as f:
        main(f)

### The CI/CD pipeline

![CI CD pipeline](./img/CI_CD_pipeline.png)

In [None]:
# Initializing Cloud Composer environment
gcloud composer environments create $COMPOSER_ENV_NAME \
--location $COMPOSER_REGION \
--zone $COMPOSER_ZONE_ID \
--machine-type n1-standard-1 \
--node-count 3 \
--disk-size 20 \
--python-version 3

# Cloud Composer environment variable
export COMPOSER_DAG_BUCKET=$(gcloud composer environments \
    describe $COMPOSER_ENV_NAME \
    --location $COMPOSER_REGION \
    --format="get(config.dagGcsPrefix)")

# Service account
export COMPOSER_SERVICE_ACCOUNT=$(gcloud composer environments \
    describe $COMPOSER_ENV_NAME \
    --location $COMPOSER_REGION \
    --format="get(config.nodeConfig.serviceAccount)")

In [None]:
# Cloud Source Repositories
gcloud source repos create $SOURCE_CODE_REPO
cp -r ~/ci-cd-for-data-processing-workflow/source-code ~/$SOURCE_CODE_REPO
cd ~/$SOURCE_CODE_REPO
git config --global credential.'https://source.developers.google.com'.helper gcloud.sh
git config --global user.email $(gcloud config list --format 'value(core.account)')
git config --global user.name $(gcloud config list --format 'value(core.account)')
git init
git remote add google \
    https://source.developers.google.com/p/$GCP_PROJECT_ID/r/$SOURCE_CODE_REPO
git add .
git commit -m 'initial commit'
git push google master

In [None]:
# Cloud Build pipeline
cd ~/ci-cd-for-data-processing-workflow/source-code/build-pipeline
gcloud builds submit --config=build_deploy_test.yaml --substitutions=\
REPO_NAME=$SOURCE_CODE_REPO,\
_DATAFLOW_JAR_BUCKET=$DATAFLOW_JAR_BUCKET_TEST,\
_COMPOSER_INPUT_BUCKET=$INPUT_BUCKET_TEST,\
_COMPOSER_REF_BUCKET=$REF_BUCKET_TEST,\
_COMPOSER_DAG_BUCKET=$COMPOSER_DAG_BUCKET,\
_COMPOSER_ENV_NAME=$COMPOSER_ENV_NAME,\
_COMPOSER_REGION=$COMPOSER_REGION,\
_COMPOSER_DAG_NAME_TEST=$COMPOSER_DAG_NAME_TEST

# Get the URL to Cloud Composer web interface
gcloud composer environments describe $COMPOSER_ENV_NAME \
--location $COMPOSER_REGION \
--format="get(config.airflowUri)"

In [None]:
# Cloud Composer variable for the JAR filename
export DATAFLOW_JAR_FILE_LATEST=$(gcloud composer environments run $COMPOSER_ENV_NAME \
--location $COMPOSER_REGION variables -- \
--get dataflow_jar_file_test 2>&1 | grep -i '.jar')

# Build pipeline configuration file
cd ~/ci-cd-for-data-processing-workflow/source-code/build-pipeline
gcloud builds submit --config=deploy_prod.yaml --substitutions=\
REPO_NAME=$SOURCE_CODE_REPO,\
_DATAFLOW_JAR_BUCKET_TEST=$DATAFLOW_JAR_BUCKET_TEST,\
_DATAFLOW_JAR_FILE_LATEST=$DATAFLOW_JAR_FILE_LATEST,\
_DATAFLOW_JAR_BUCKET_PROD=$DATAFLOW_JAR_BUCKET_PROD,\
_COMPOSER_INPUT_BUCKET=$INPUT_BUCKET_PROD,\
_COMPOSER_ENV_NAME=$COMPOSER_ENV_NAME,\
_COMPOSER_REGION=$COMPOSER_REGION,\
_COMPOSER_DAG_BUCKET=$COMPOSER_DAG_BUCKET,\
_COMPOSER_DAG_NAME_PROD=$COMPOSER_DAG_NAME_PROD

# Get the URL for Cloud Composer UI
gcloud composer environments describe $COMPOSER_ENV_NAME \
--location $COMPOSER_REGION \
--format="get(config.airflowUri)"

## Flex Templates

### Create a custom Dataflow Flex Template container image

In [None]:
# my_pipeline.py
import argparse
import time
import logging
import json
import apache_beam as beam
from apache_beam.options.pipeline_options import GoogleCloudOptions
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.options.pipeline_options import StandardOptions
from apache_beam.runners import DataflowRunner, DirectRunner

# ### functions and classes

def parse_json(element):
    return json.loads(element)

def drop_fields(element):
    element.pop('user_agent')
    return element

# ### main

def run():
    # Command line arguments
    parser = argparse.ArgumentParser(description='Load from Json into BigQuery')
    parser.add_argument('--project',required=True, help='Specify Google Cloud project')
    parser.add_argument('--region', required=True, help='Specify Google Cloud region')
    parser.add_argument('--runner', required=True, help='Specify Apache Beam Runner')
    parser.add_argument('--inputPath', required=True, help='Path to events.json')
    parser.add_argument('--outputPath', required=True, help='Path to coldline storage bucket')
    parser.add_argument('--tableName', required=True, help='BigQuery table name')

    opts, pipeline_opts = parser.parse_known_args()

    # Setting up the Beam pipeline options
    options = PipelineOptions(pipeline_opts)
    options.view_as(GoogleCloudOptions).project = opts.project
    options.view_as(GoogleCloudOptions).region = opts.region
    options.view_as(GoogleCloudOptions).job_name = '{0}{1}'.format('my-pipeline-',time.time_ns())
    options.view_as(StandardOptions).runner = opts.runner

    input_path = opts.inputPath
    output_path = opts.outputPath
    table_name = opts.tableName

    # Table schema for BigQuery
    table_schema = {
        "fields": [
            {
                "name": "ip",
                "type": "STRING"
            },
            {
                "name": "user_id",
                "type": "STRING"
            },
            {
                "name": "lat",
                "type": "FLOAT",
                "mode": "NULLABLE"
            },
            {
                "name": "lng",
                "type": "FLOAT",
                "mode": "NULLABLE"
            },
            {
                "name": "timestamp",
                "type": "STRING"
            },
            {
                "name": "http_request",
                "type": "STRING"
            },
            {
                "name": "http_response",
                "type": "INTEGER"
            },
            {
                "name": "num_bytes",
                "type": "INTEGER"
            }
        ]
    }

    # Create the pipeline
    p = beam.Pipeline(options=options)

    '''
    Steps:
    1) Read something
    2) Transform something
    3) Write something
    '''

    # Read in lines to an initial PCollection that can then be branched off of
    lines = p | 'ReadFromGCS' >> beam.io.ReadFromText(input_path)

    # Write to Google Cloud Storage
    lines | 'WriteRawToGCS' >> beam.io.WriteToText(output_path)

    # Read elements from Json, filter out individual elements, and write to BigQuery
    (lines
        | 'ParseJson' >> beam.Map(parse_json)
        | 'DropFields' >> beam.Map(drop_fields)
        | 'FilterFn' >> beam.Filter(lambda row: row['num_bytes'] < 120)
        | 'WriteToBQ' >> beam.io.WriteToBigQuery(
            table_name,
            schema=table_schema,
            create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED,
            write_disposition=beam.io.BigQueryDisposition.WRITE_TRUNCATE
            )
    )

    logging.getLogger().setLevel(logging.INFO)
    logging.info("Building pipeline ...")

    p.run()

if __name__ == '__main__':
  run()

In [None]:
# Dockerfile
FROM gcr.io/dataflow-templates-base/python3-template-launcher-base
ARG WORKDIR=/dataflow/template
RUN mkdir -p ${WORKDIR}
WORKDIR ${WORKDIR}
RUN apt-get update && apt-get install -y libffi-dev && rm -rf /var/lib/apt/lists/*
COPY my_pipeline.py .
ENV FLEX_TEMPLATE_PYTHON_PY_FILE="${WORKDIR}/my_pipeline.py"
RUN python3 -m pip install apache-beam[gcp]==2.25.0

First, enable Kaniko cache use by default. Kaniko caches container build artifacts, so using this option speeds up subsequent builds. We will also use `pip3 freeze` to record the packages and their versions being used in our environment.

Cloud Build to build the container image.

In [None]:
gcloud config set builds/use_kaniko True

export TEMPLATE_IMAGE="gcr.io/$PROJECT_ID/dataflow/my_pipeline:latest"
gcloud builds submit --tag $TEMPLATE_IMAGE .

### Create and stage the flex template

Create a template spec file in a Cloud Storage containing all of the necessary information to run the job, such as the SDK information and metadata.

In [None]:
# metadata.json
{
  "name": "My Branching Pipeline",
  "description": "A branching pipeline that writes raw to GCS Coldline, and filtered data to BQ",
  "parameters": [
    {
      "name": "inputPath",
      "label": "Input file path.",
      "helpText": "Path to events.json file.",
      "regexes": [
        ".*\\.json"
      ]
    },
    {
      "name": "outputPath",
      "label": "Output file location",
      "helpText": "GCS Coldline Bucket location for raw data",
      "regexes": [
        "gs:\\/\\/[a-zA-z0-9\\-\\_\\/]+"
      ]
    },
    {
      "name": "tableName",
      "label": "BigQuery output table",
      "helpText": "BigQuery table spec to write to, in the form 'project:dataset.table'.",
      "regexes": [
        "[^:]+:[^.]+[.].+"
      ]
    }
  ]
}

In [None]:
export TEMPLATE_PATH="gs://${PROJECT_ID}/templates/mytemplate.json"

gcloud dataflow flex-template build $TEMPLATE_PATH \
--image "$TEMPLATE_IMAGE" \
--sdk-language "PYTHON" \
--metadata-file "metadata.json"

### Execute the flex template

One of the benefits of using Dataflow templates is the ability to execute them from a wider variety of contexts, other than a development environment.

In [None]:
# gcloud
export PROJECT_ID=$(gcloud config get-value project)
export REGION='us-central1'
export JOB_NAME=mytemplate-$(date +%Y%m%d-%H%M$S)
export TEMPLATE_LOC=gs://${PROJECT_ID}/templates/mytemplate.json
export INPUT_PATH=gs://${PROJECT_ID}/events.json
export OUTPUT_PATH=gs://${PROJECT_ID}-coldline/template_output/
export BQ_TABLE=${PROJECT_ID}:logs.logs_filtered
gcloud dataflow flex-template run ${JOB_NAME} \
--region=$REGION \
--template-file-gcs-location ${TEMPLATE_LOC} \
--parameters "inputPath=${INPUT_PATH},outputPath=${OUTPUT_PATH},tableName=${BQ_TABLE}"

In [None]:
# gcloud
gcloud dataflow flex-template run "job-name-`date +%Y%m%d-%H%M%S`" \
--template-file-gcs-location "$TEMPLATE_PATH" \
--parameters inputSubscription="$SUBSCRIPTION" \
--parameters outputTable="$PROJECT:$DATASET.$TABLE" \
--region="$REGION"

# REST API
curl -X POST "https://dataflow.googleapis.com/v1b3/projects/$PROJECT/locations/${REGION}/flexTemplates:launch" \
-H "Content-Type: application/json" \
-H "Authorization: Bearer $(gcloud auth print-access-token)" \
-d '{
  "launch_parameter": {
    "jobName": "job-name-`date +%Y%m%d-%H%M%S`",
    "parameters": {
      "inputSubscription": "'$SUBSCRIPTION'",
      "outputTable": "'$PROJECT:$DATASET.$TABLE'"
    },
    "containerSpecGcsPath": "'$TEMPLATE_PATH'"
  }
}'

# Cloud Scheduler
gcloud scheduler jobs create http scheduler-job \
--schedule="*/30 * * * *" \
--uri="https://dataflow.googleapis.com/v1b3/projects/$PROJECT/locations/${REGION}/flexTemplates:launch" \
--http-method=POST \
--headers Content-Type=application/json \
--oauth-service-account-email=email@project.iam.gserviceaccount.com \
--message-body='{
    "launch_parameter": {
      "jobName":"job-name"
      "parameters": {
        "inputSubscription": "'$SUBSCRIPTION'",
        "outputTable": "'$PROJECT:$DATASET.$TABLE'"
      },
      "containerSpecGcsPath": "'$TEMPLATE_PATH'"
    }
}'