Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 0 additions & 8 deletions sdks/python/apache_beam/options/pipeline_options.py
Original file line number Diff line number Diff line change
Expand Up @@ -278,14 +278,6 @@ def _add_argparse_args(cls, parser):
action='store_true',
help='Whether to enable streaming mode.')

# TODO(BEAM-1265): Remove this error, once at least one runner supports
# streaming pipelines.
def validate(self, validator):
errors = []
if self.view_as(StandardOptions).streaming:
errors.append('Streaming pipelines are not supported.')
return errors


class TypeOptions(PipelineOptions):

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -302,11 +302,11 @@ def test_validate_dataflow_job_file(self):

def test_streaming(self):
pipeline_options = PipelineOptions(['--streaming'])
runner = MockRunners.TestDataflowRunner()
runner = MockRunners.OtherRunner()
validator = PipelineOptionsValidator(pipeline_options, runner)
errors = validator.validate()

self.assertIn('Streaming pipelines are not supported.', errors)
self.assertEqual(len(errors), 0)

def test_test_matcher(self):
def get_validator(matcher):
Expand Down
38 changes: 38 additions & 0 deletions sdks/python/apache_beam/runners/dataflow/dataflow_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -532,6 +532,44 @@ def run_CombineValues(self, transform_node):
PropertyNames.OUTPUT_NAME: PropertyNames.OUT})
step.add_property(PropertyNames.OUTPUT_INFO, outputs)

def apply_Read(self, transform, pcoll):
"""Adds a necessary transform to convert native PubSubSource representations
to the expected type based upon the user specified coder within the
PubSubSource.
"""

# Import here to avoid adding the dependency for local running scenarios.
# pylint: disable=wrong-import-order, wrong-import-position
from apache_beam import Map
from apache_beam.io.gcp.pubsub import PubSubSource

# Only add the additional portion if this is a PubSubSource
if not isinstance(transform.source, PubSubSource):
return self.apply_PTransform(transform, pcoll)

class PubSubMessagePayloadTransformer(object):
"""Converts from a payload of bytes encoded in the nested context to
the supplied coder type.
"""
def __init__(self, value_coder):
self._input_coder = coders.BytesCoder()
self._value_coder = value_coder

def transform_value(self, encoded_value):
bytes = self._input_coder.decode(encoded_value)
return self._value_coder.decode(bytes)

pcoll.element_type = coders.WindowedValueCoder(
coders.BytesCoder(),
coders.coders.GlobalWindowCoder())

pcoll = self.apply_PTransform(transform, pcoll) | 'parse fn' >> Map(
PubSubMessagePayloadTransformer(transform.source.coder).transform_value)
pcoll.element_type = coders.WindowedValueCoder(
transform.source.coder,
coders.coders.GlobalWindowCoder())
return pcoll

def run_Read(self, transform_node):
transform = transform_node.transform
step = self._add_step(
Expand Down
11 changes: 11 additions & 0 deletions sdks/python/apache_beam/runners/dataflow/dataflow_runner_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import apache_beam.transforms as ptransform

from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.options.pipeline_options import StandardOptions
from apache_beam.pipeline import Pipeline, AppliedPTransform
from apache_beam.pvalue import PCollection
from apache_beam.runners import create_runner
Expand Down Expand Up @@ -240,6 +241,16 @@ def _test_flatten_input_visitor(self, input_type, output_type, num_inputs):
for _ in range(num_inputs):
self.assertEqual(inputs[0].element_type, output_type)

def test_pubsub_source_override(self):
remote_runner = DataflowRunner()
options = StandardOptions(self.default_properties + ["--streaming"])
p = Pipeline(remote_runner,
options=options)

(p | beam.io.Read(beam.io.PubSubSource("test_topic")))
remote_runner.job = apiclient.Job(p._options)
super(DataflowRunner, remote_runner).run(p)
print remote_runner.job

if __name__ == '__main__':
unittest.main()