Skip to content

Commit

Permalink
Merge pull request #6930: [BEAM-5462] get rid of <pipeline>.options d…
Browse files Browse the repository at this point in the history
…eprecation warnings in tests
  • Loading branch information
chamikaramj committed Dec 4, 2018
2 parents a3a8a32 + 74ed7ac commit 95d0ac5
Show file tree
Hide file tree
Showing 10 changed files with 144 additions and 124 deletions.
106 changes: 63 additions & 43 deletions sdks/python/apache_beam/io/gcp/pubsub_test.py
Expand Up @@ -35,6 +35,7 @@
from apache_beam.io.gcp.pubsub import WriteToPubSub
from apache_beam.io.gcp.pubsub import _PubSubSink
from apache_beam.io.gcp.pubsub import _PubSubSource
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.options.pipeline_options import StandardOptions
from apache_beam.runners.direct import transform_evaluator
from apache_beam.runners.direct.direct_runner import _DirectReadFromPubSub
Expand Down Expand Up @@ -109,8 +110,9 @@ def test_repr(self):
class TestReadFromPubSubOverride(unittest.TestCase):

def test_expand_with_topic(self):
p = TestPipeline()
p.options.view_as(StandardOptions).streaming = True
options = PipelineOptions([])
options.view_as(StandardOptions).streaming = True
p = TestPipeline(options=options)
pcoll = (p
| ReadFromPubSub('projects/fakeprj/topics/a_topic',
None, 'a_label', with_attributes=False,
Expand All @@ -119,7 +121,7 @@ def test_expand_with_topic(self):
self.assertEqual(bytes, pcoll.element_type)

# Apply the necessary PTransformOverrides.
overrides = _get_transform_overrides(p.options)
overrides = _get_transform_overrides(options)
p.replace_all(overrides)

# Note that the direct output of ReadFromPubSub will be replaced
Expand All @@ -132,8 +134,9 @@ def test_expand_with_topic(self):
self.assertEqual('a_label', source.id_label)

def test_expand_with_subscription(self):
p = TestPipeline()
p.options.view_as(StandardOptions).streaming = True
options = PipelineOptions([])
options.view_as(StandardOptions).streaming = True
p = TestPipeline(options=options)
pcoll = (p
| ReadFromPubSub(
None, 'projects/fakeprj/subscriptions/a_subscription',
Expand All @@ -142,7 +145,7 @@ def test_expand_with_subscription(self):
self.assertEqual(bytes, pcoll.element_type)

# Apply the necessary PTransformOverrides.
overrides = _get_transform_overrides(p.options)
overrides = _get_transform_overrides(options)
p.replace_all(overrides)

# Note that the direct output of ReadFromPubSub will be replaced
Expand All @@ -167,8 +170,9 @@ def test_expand_with_both_topic_and_subscription(self):
with_attributes=False, timestamp_attribute=None)

def test_expand_with_other_options(self):
p = TestPipeline()
p.options.view_as(StandardOptions).streaming = True
options = PipelineOptions([])
options.view_as(StandardOptions).streaming = True
p = TestPipeline(options=options)
pcoll = (p
| ReadFromPubSub('projects/fakeprj/topics/a_topic',
None, 'a_label', with_attributes=True,
Expand All @@ -177,7 +181,7 @@ def test_expand_with_other_options(self):
self.assertEqual(PubsubMessage, pcoll.element_type)

# Apply the necessary PTransformOverrides.
overrides = _get_transform_overrides(p.options)
overrides = _get_transform_overrides(options)
p.replace_all(overrides)

# Note that the direct output of ReadFromPubSub will be replaced
Expand All @@ -193,15 +197,16 @@ def test_expand_with_other_options(self):
@unittest.skipIf(pubsub is None, 'GCP dependencies are not installed')
class TestWriteStringsToPubSubOverride(unittest.TestCase):
def test_expand_deprecated(self):
p = TestPipeline()
p.options.view_as(StandardOptions).streaming = True
options = PipelineOptions([])
options.view_as(StandardOptions).streaming = True
p = TestPipeline(options=options)
pcoll = (p
| ReadFromPubSub('projects/fakeprj/topics/baz')
| WriteStringsToPubSub('projects/fakeprj/topics/a_topic')
| beam.Map(lambda x: x))

# Apply the necessary PTransformOverrides.
overrides = _get_transform_overrides(p.options)
overrides = _get_transform_overrides(options)
p.replace_all(overrides)

# Note that the direct output of ReadFromPubSub will be replaced
Expand All @@ -212,16 +217,17 @@ def test_expand_deprecated(self):
self.assertEqual('a_topic', write_transform.dofn.short_topic_name)

def test_expand(self):
p = TestPipeline()
p.options.view_as(StandardOptions).streaming = True
options = PipelineOptions([])
options.view_as(StandardOptions).streaming = True
p = TestPipeline(options=options)
pcoll = (p
| ReadFromPubSub('projects/fakeprj/topics/baz')
| WriteToPubSub('projects/fakeprj/topics/a_topic',
with_attributes=True)
| beam.Map(lambda x: x))

# Apply the necessary PTransformOverrides.
overrides = _get_transform_overrides(p.options)
overrides = _get_transform_overrides(options)
p.replace_all(overrides)

# Note that the direct output of ReadFromPubSub will be replaced
Expand Down Expand Up @@ -342,8 +348,9 @@ def test_read_messages_success(self, mock_pubsub):
[window.GlobalWindow()])]
mock_pubsub.return_value.pull.return_value = pull_response

p = TestPipeline()
p.options.view_as(StandardOptions).streaming = True
options = PipelineOptions([])
options.view_as(StandardOptions).streaming = True
p = TestPipeline(options=options)
pcoll = (p
| ReadFromPubSub('projects/fakeprj/topics/a_topic',
None, None, with_attributes=True))
Expand All @@ -362,8 +369,9 @@ def test_read_strings_success(self, mock_pubsub):
expected_elements = [data]
mock_pubsub.return_value.pull.return_value = pull_response

p = TestPipeline()
p.options.view_as(StandardOptions).streaming = True
options = PipelineOptions([])
options.view_as(StandardOptions).streaming = True
p = TestPipeline(options=options)
pcoll = (p
| ReadStringsFromPubSub('projects/fakeprj/topics/a_topic',
None, None))
Expand All @@ -380,8 +388,9 @@ def test_read_data_success(self, mock_pubsub):
expected_elements = [data_encoded]
mock_pubsub.return_value.pull.return_value = pull_response

p = TestPipeline()
p.options.view_as(StandardOptions).streaming = True
options = PipelineOptions([])
options.view_as(StandardOptions).streaming = True
p = TestPipeline(options=options)
pcoll = (p
| ReadFromPubSub('projects/fakeprj/topics/a_topic', None, None))
assert_that(pcoll, equal_to(expected_elements))
Expand All @@ -407,8 +416,9 @@ def test_read_messages_timestamp_attribute_milli_success(self, mock_pubsub):
]
mock_pubsub.return_value.pull.return_value = pull_response

p = TestPipeline()
p.options.view_as(StandardOptions).streaming = True
options = PipelineOptions([])
options.view_as(StandardOptions).streaming = True
p = TestPipeline(options=options)
pcoll = (p
| ReadFromPubSub(
'projects/fakeprj/topics/a_topic', None, None,
Expand Down Expand Up @@ -436,8 +446,9 @@ def test_read_messages_timestamp_attribute_rfc3339_success(self, mock_pubsub):
]
mock_pubsub.return_value.pull.return_value = pull_response

p = TestPipeline()
p.options.view_as(StandardOptions).streaming = True
options = PipelineOptions([])
options.view_as(StandardOptions).streaming = True
p = TestPipeline(options=options)
pcoll = (p
| ReadFromPubSub(
'projects/fakeprj/topics/a_topic', None, None,
Expand Down Expand Up @@ -466,8 +477,9 @@ def test_read_messages_timestamp_attribute_missing(self, mock_pubsub):
]
mock_pubsub.return_value.pull.return_value = pull_response

p = TestPipeline()
p.options.view_as(StandardOptions).streaming = True
options = PipelineOptions([])
options.view_as(StandardOptions).streaming = True
p = TestPipeline(options=options)
pcoll = (p
| ReadFromPubSub(
'projects/fakeprj/topics/a_topic', None, None,
Expand All @@ -489,8 +501,9 @@ def test_read_messages_timestamp_attribute_fail_parse(self, mock_pubsub):
])
mock_pubsub.return_value.pull.return_value = pull_response

p = TestPipeline()
p.options.view_as(StandardOptions).streaming = True
options = PipelineOptions([])
options.view_as(StandardOptions).streaming = True
p = TestPipeline(options=options)
_ = (p
| ReadFromPubSub(
'projects/fakeprj/topics/a_topic', None, None,
Expand All @@ -501,8 +514,9 @@ def test_read_messages_timestamp_attribute_fail_parse(self, mock_pubsub):

def test_read_message_id_label_unsupported(self, unused_mock_pubsub):
# id_label is unsupported in DirectRunner.
p = TestPipeline()
p.options.view_as(StandardOptions).streaming = True
options = PipelineOptions([])
options.view_as(StandardOptions).streaming = True
p = TestPipeline(options=options)
_ = (p | ReadFromPubSub('projects/fakeprj/topics/a_topic', None, 'a_label'))
with self.assertRaisesRegexp(NotImplementedError,
r'id_label is not supported'):
Expand All @@ -517,8 +531,9 @@ def test_write_messages_success(self, mock_pubsub):
data = 'data'
payloads = [data]

p = TestPipeline()
p.options.view_as(StandardOptions).streaming = True
options = PipelineOptions([])
options.view_as(StandardOptions).streaming = True
p = TestPipeline(options=options)
_ = (p
| Create(payloads)
| WriteToPubSub('projects/fakeprj/topics/a_topic',
Expand All @@ -531,8 +546,9 @@ def test_write_messages_deprecated(self, mock_pubsub):
data = 'data'
payloads = [data]

p = TestPipeline()
p.options.view_as(StandardOptions).streaming = True
options = PipelineOptions([])
options.view_as(StandardOptions).streaming = True
p = TestPipeline(options=options)
_ = (p
| Create(payloads)
| WriteStringsToPubSub('projects/fakeprj/topics/a_topic'))
Expand All @@ -545,8 +561,9 @@ def test_write_messages_with_attributes_success(self, mock_pubsub):
attributes = {'key': 'value'}
payloads = [PubsubMessage(data, attributes)]

p = TestPipeline()
p.options.view_as(StandardOptions).streaming = True
options = PipelineOptions([])
options.view_as(StandardOptions).streaming = True
p = TestPipeline(options=options)
_ = (p
| Create(payloads)
| WriteToPubSub('projects/fakeprj/topics/a_topic',
Expand All @@ -560,8 +577,9 @@ def test_write_messages_with_attributes_error(self, mock_pubsub):
# Sending raw data when WriteToPubSub expects a PubsubMessage object.
payloads = [data]

p = TestPipeline()
p.options.view_as(StandardOptions).streaming = True
options = PipelineOptions([])
options.view_as(StandardOptions).streaming = True
p = TestPipeline(options=options)
_ = (p
| Create(payloads)
| WriteToPubSub('projects/fakeprj/topics/a_topic',
Expand All @@ -575,17 +593,19 @@ def test_write_messages_unsupported_features(self, mock_pubsub):
attributes = {'key': 'value'}
payloads = [PubsubMessage(data, attributes)]

p = TestPipeline()
p.options.view_as(StandardOptions).streaming = True
options = PipelineOptions([])
options.view_as(StandardOptions).streaming = True
p = TestPipeline(options=options)
_ = (p
| Create(payloads)
| WriteToPubSub('projects/fakeprj/topics/a_topic',
id_label='a_label'))
with self.assertRaisesRegexp(NotImplementedError,
r'id_label is not supported'):
p.run()
p = TestPipeline()
p.options.view_as(StandardOptions).streaming = True
options = PipelineOptions([])
options.view_as(StandardOptions).streaming = True
p = TestPipeline(options=options)
_ = (p
| Create(payloads)
| WriteToPubSub('projects/fakeprj/topics/a_topic',
Expand Down
4 changes: 2 additions & 2 deletions sdks/python/apache_beam/pipeline.py
Expand Up @@ -415,7 +415,7 @@ def run(self, test_runner_api=True):
pickler.dump_session(os.path.join(tmpdir, 'main_session.pickle'))
finally:
shutil.rmtree(tmpdir)
return self.runner.run_pipeline(self)
return self.runner.run_pipeline(self, self._options)

def __enter__(self):
return self
Expand Down Expand Up @@ -512,7 +512,7 @@ def apply(self, transform, pvalueish=None, label=None):
if type_options.pipeline_type_check:
transform.type_check_inputs(pvalueish)

pvalueish_result = self.runner.apply(transform, pvalueish)
pvalueish_result = self.runner.apply(transform, pvalueish, self._options)

if type_options is not None and type_options.pipeline_type_check:
transform.type_check_outputs(pvalueish_result)
Expand Down

0 comments on commit 95d0ac5

Please sign in to comment.