Skip to content

Commit

Permalink
Merge pull request #5071 from charlesccychen/wordstream-snippet
Browse files Browse the repository at this point in the history
[BEAM-4037] Add streaming wordcount snippets and test
  • Loading branch information
aaltay committed Apr 10, 2018
2 parents 20a07d2 + 399ef70 commit 1422ff7
Show file tree
Hide file tree
Showing 2 changed files with 126 additions and 0 deletions.
60 changes: 60 additions & 0 deletions sdks/python/apache_beam/examples/snippets/snippets.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@
string. The tags can contain only letters, digits and _.
"""

import argparse

import six

import apache_beam as beam
Expand Down Expand Up @@ -628,6 +630,64 @@ def format_result(word_count):
p.visit(SnippetUtils.RenameFiles(renames))


def examples_wordcount_streaming(argv):
import apache_beam as beam
from apache_beam import window
from apache_beam.io import ReadFromPubSub
from apache_beam.io import WriteStringsToPubSub
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.options.pipeline_options import SetupOptions
from apache_beam.options.pipeline_options import StandardOptions

# Parse out arguments.
parser = argparse.ArgumentParser()
parser.add_argument(
'--output_topic', required=True,
help=('Output PubSub topic of the form '
'"projects/<PROJECT>/topic/<TOPIC>".'))
group = parser.add_mutually_exclusive_group(required=True)
group.add_argument(
'--input_topic',
help=('Input PubSub topic of the form '
'"projects/<PROJECT>/topics/<TOPIC>".'))
group.add_argument(
'--input_subscription',
help=('Input PubSub subscription of the form '
'"projects/<PROJECT>/subscriptions/<SUBSCRIPTION>."'))
known_args, pipeline_args = parser.parse_known_args(argv)

pipeline_options = PipelineOptions(pipeline_args)
pipeline_options.view_as(StandardOptions).streaming = True

with TestPipeline(options=pipeline_options) as p:
# [START example_wordcount_streaming_read]
# Read from Pub/Sub into a PCollection.
if known_args.input_subscription:
lines = p | beam.io.ReadFromPubSub(
subscription=known_args.input_subscription)
else:
lines = p | beam.io.ReadFromPubSub(topic=known_args.input_topic)
# [END example_wordcount_streaming_read]

output = (
lines
| 'DecodeUnicode' >> beam.FlatMap(
lambda encoded: encoded.decode('utf-8'))
| 'ExtractWords' >> beam.FlatMap(
lambda x: __import__('re').findall(r'[A-Za-z\']+', x))
| 'PairWithOnes' >> beam.Map(lambda x: (x, 1))
| beam.WindowInto(window.FixedWindows(15, 0))
| 'Group' >> beam.GroupByKey()
| 'Sum' >> beam.Map(lambda word_ones: (word_ones[0], sum(word_ones[1])))
| 'Format' >> beam.Map(
lambda word_and_count: '%s: %d' % word_and_count))

# [START example_wordcount_streaming_write]
# Write to Pub/Sub
output | beam.io.WriteStringsToPubSub(known_args.output_topic)
# [END example_wordcount_streaming_write]


def examples_ptransforms_templated(renames):
# [START examples_ptransforms_templated]
import apache_beam as beam
Expand Down
66 changes: 66 additions & 0 deletions sdks/python/apache_beam/examples/snippets/snippets_test.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
# coding=utf-8
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
Expand Down Expand Up @@ -25,6 +26,8 @@
import unittest
import uuid

import mock

import apache_beam as beam
from apache_beam import coders
from apache_beam import pvalue
Expand All @@ -36,8 +39,10 @@
from apache_beam.options.pipeline_options import GoogleCloudOptions
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.testing.test_pipeline import TestPipeline
from apache_beam.testing.test_stream import TestStream
from apache_beam.testing.util import assert_that
from apache_beam.testing.util import equal_to
from apache_beam.transforms.window import TimestampedValue
from apache_beam.utils.windowed_value import WindowedValue

# Protect against environments where apitools library is not available.
Expand All @@ -56,6 +61,14 @@
datastore_pb2 = None
# pylint: enable=wrong-import-order, wrong-import-position

# Protect against environments where the PubSub library is not available.
# pylint: disable=wrong-import-order, wrong-import-position
try:
from google.cloud import pubsub
except ImportError:
pubsub = None
# pylint: enable=wrong-import-order, wrong-import-position


class ParDoTest(unittest.TestCase):
"""Tests for model/par-do."""
Expand Down Expand Up @@ -691,6 +704,59 @@ def test_examples_wordcount_debugging(self):
self.get_output(result_path),
['Flourish: 3', 'stomach: 1'])

@unittest.skipIf(pubsub is None, 'GCP dependencies are not installed')
@mock.patch('apache_beam.io.ReadFromPubSub')
@mock.patch('apache_beam.io.WriteStringsToPubSub')
def test_examples_wordcount_streaming(self, *unused_mocks):
def FakeReadFromPubSub(topic=None, subscription=None, values=None):
expected_topic = topic
expected_subscription = subscription

def _inner(topic=None, subscription=None):
assert topic == expected_topic
assert subscription == expected_subscription
return TestStream().add_elements(values)
return _inner

class AssertTransform(beam.PTransform):
def __init__(self, matcher):
self.matcher = matcher

def expand(self, pcoll):
assert_that(pcoll, self.matcher)

def FakeWriteStringsToPubSub(topic=None, values=None):
expected_topic = topic

def _inner(topic=None, subscription=None):
assert topic == expected_topic
return AssertTransform(equal_to(values))
return _inner

# Test basic execution.
input_topic = 'projects/fake-beam-test-project/topic/intopic'
input_values = [TimestampedValue('a a b', 1),
TimestampedValue(u'🤷 ¯\\_(ツ)_/¯ b b '.encode('utf-8'), 12),
TimestampedValue('a b c c c', 20)]
output_topic = 'projects/fake-beam-test-project/topic/outtopic'
output_values = ['a: 1', 'a: 2', 'b: 1', 'b: 3', 'c: 3']
beam.io.ReadFromPubSub = (
FakeReadFromPubSub(topic=input_topic, values=input_values))
beam.io.WriteStringsToPubSub = (
FakeWriteStringsToPubSub(topic=output_topic, values=output_values))
snippets.examples_wordcount_streaming([
'--input_topic', 'projects/fake-beam-test-project/topic/intopic',
'--output_topic', 'projects/fake-beam-test-project/topic/outtopic'])

# Test with custom subscription.
input_sub = 'projects/fake-beam-test-project/subscriptions/insub'
beam.io.ReadFromPubSub = FakeReadFromPubSub(subscription=input_sub,
values=input_values)
snippets.examples_wordcount_streaming([
'--input_subscription',
'projects/fake-beam-test-project/subscriptions/insub',
'--output_topic', 'projects/fake-beam-test-project/topic/outtopic'])

def test_model_composite_transform_example(self):
contents = ['aa bb cc', 'bb cc', 'cc']
result_path = self.create_temp_file()
Expand Down

0 comments on commit 1422ff7

Please sign in to comment.