From db7e5c65523b8f3a44449de4750eec1f0090d647 Mon Sep 17 00:00:00 2001 From: Valentyn Tymofieiev Date: Wed, 3 Apr 2019 14:30:10 -0700 Subject: [PATCH] Remove restrictions for Python 3.7 in Dataflow runner. --- .../runners/dataflow/internal/apiclient.py | 15 +---- .../dataflow/internal/apiclient_test.py | 58 ++++++++++--------- 2 files changed, 35 insertions(+), 38 deletions(-) diff --git a/sdks/python/apache_beam/runners/dataflow/internal/apiclient.py b/sdks/python/apache_beam/runners/dataflow/internal/apiclient.py index b03a8bb97db8..e0a1a5689f1e 100644 --- a/sdks/python/apache_beam/runners/dataflow/internal/apiclient.py +++ b/sdks/python/apache_beam/runners/dataflow/internal/apiclient.py @@ -31,7 +31,6 @@ import sys import tempfile import time -import warnings from datetime import datetime from builtins import object @@ -954,15 +953,7 @@ def get_response_encoding(): def _verify_interpreter_version_is_supported(pipeline_options): - if sys.version_info[0] == 2: - return - - if sys.version_info[0:2] in [(3, 5), (3, 6)]: - if sys.version_info[0:3] < (3, 5, 3): - warnings.warn( - 'You are using an early release for Python 3.5. It is recommended ' - 'to use Python 3.5.3 or higher with Dataflow ' - 'runner.') + if sys.version_info[0:2] in [(2, 7), (3, 5), (3, 6), (3, 7)]: return debug_options = pipeline_options.view_as(DebugOptions) @@ -972,8 +963,8 @@ def _verify_interpreter_version_is_supported(pipeline_options): raise Exception( 'Dataflow runner currently supports Python versions ' - '2.7, 3.5 and 3.6. To ignore this requirement and start a job using a ' - 'different version of Python 3 interpreter, pass ' + '2.7, 3.5, 3.6, and 3.7. To ignore this requirement and start a job ' + 'using a different version of Python 3 interpreter, pass ' '--experiment ignore_py3_minor_version pipeline option.') diff --git a/sdks/python/apache_beam/runners/dataflow/internal/apiclient_test.py b/sdks/python/apache_beam/runners/dataflow/internal/apiclient_test.py index 9e110356b7df..77eba7c52355 100644 --- a/sdks/python/apache_beam/runners/dataflow/internal/apiclient_test.py +++ b/sdks/python/apache_beam/runners/dataflow/internal/apiclient_test.py @@ -286,26 +286,23 @@ def test_pinned_worker_harness_image_tag_used_in_dev_sdk(self): pipeline_options, '2.0.0', #any environment version FAKE_PIPELINE_URL) - if sys.version_info[0:2] == (3, 5): + if sys.version_info[0] == 2: self.assertEqual( env.proto.workerPools[0].workerHarnessContainerImage, (names.DATAFLOW_CONTAINER_IMAGE_REPOSITORY + - '/python3-fnapi:' + names.BEAM_FNAPI_CONTAINER_VERSION)) - elif sys.version_info[0:2] == (3, 6): - self.assertEqual( - env.proto.workerPools[0].workerHarnessContainerImage, - (names.DATAFLOW_CONTAINER_IMAGE_REPOSITORY + - '/python36-fnapi:' + names.BEAM_FNAPI_CONTAINER_VERSION)) - elif sys.version_info[0:2] == (3, 7): + '/python-fnapi:' + names.BEAM_FNAPI_CONTAINER_VERSION)) + elif sys.version_info[0:2] == (3, 5): self.assertEqual( env.proto.workerPools[0].workerHarnessContainerImage, (names.DATAFLOW_CONTAINER_IMAGE_REPOSITORY + - '/python37-fnapi:' + names.BEAM_FNAPI_CONTAINER_VERSION)) + '/python3-fnapi:' + names.BEAM_FNAPI_CONTAINER_VERSION)) else: self.assertEqual( env.proto.workerPools[0].workerHarnessContainerImage, (names.DATAFLOW_CONTAINER_IMAGE_REPOSITORY + - '/python-fnapi:' + names.BEAM_FNAPI_CONTAINER_VERSION)) + '/python%d%d-fnapi:%s' % (sys.version_info[0], + sys.version_info[1], + names.BEAM_FNAPI_CONTAINER_VERSION))) # batch, legacy pipeline. pipeline_options = PipelineOptions( @@ -314,26 +311,23 @@ def test_pinned_worker_harness_image_tag_used_in_dev_sdk(self): pipeline_options, '2.0.0', #any environment version FAKE_PIPELINE_URL) - if sys.version_info[0:2] == (3, 5): - self.assertEqual( - env.proto.workerPools[0].workerHarnessContainerImage, - (names.DATAFLOW_CONTAINER_IMAGE_REPOSITORY + - '/python3:' + names.BEAM_CONTAINER_VERSION)) - elif sys.version_info[0:2] == (3, 6): + if sys.version_info[0] == 2: self.assertEqual( env.proto.workerPools[0].workerHarnessContainerImage, (names.DATAFLOW_CONTAINER_IMAGE_REPOSITORY + - '/python36:' + names.BEAM_CONTAINER_VERSION)) - elif sys.version_info[0:2] == (3, 7): + '/python:' + names.BEAM_CONTAINER_VERSION)) + elif sys.version_info[0:2] == (3, 5): self.assertEqual( env.proto.workerPools[0].workerHarnessContainerImage, (names.DATAFLOW_CONTAINER_IMAGE_REPOSITORY + - '/python37:' + names.BEAM_CONTAINER_VERSION)) + '/python3:' + names.BEAM_CONTAINER_VERSION)) else: self.assertEqual( env.proto.workerPools[0].workerHarnessContainerImage, (names.DATAFLOW_CONTAINER_IMAGE_REPOSITORY + - '/python:' + names.BEAM_CONTAINER_VERSION)) + '/python%d%d:%s' % (sys.version_info[0], + sys.version_info[1], + names.BEAM_CONTAINER_VERSION))) @mock.patch('apache_beam.runners.dataflow.internal.apiclient.' 'beam_version.__version__', '2.2.0') @@ -345,7 +339,12 @@ def test_worker_harness_image_tag_matches_released_sdk_version(self): pipeline_options, '2.0.0', #any environment version FAKE_PIPELINE_URL) - if sys.version_info[0] == 3: + if sys.version_info[0] == 2: + self.assertEqual( + env.proto.workerPools[0].workerHarnessContainerImage, + (names.DATAFLOW_CONTAINER_IMAGE_REPOSITORY + + '/python-fnapi:2.2.0')) + elif sys.version_info[0:2] == (3, 5): self.assertEqual( env.proto.workerPools[0].workerHarnessContainerImage, (names.DATAFLOW_CONTAINER_IMAGE_REPOSITORY + @@ -354,7 +353,8 @@ def test_worker_harness_image_tag_matches_released_sdk_version(self): self.assertEqual( env.proto.workerPools[0].workerHarnessContainerImage, (names.DATAFLOW_CONTAINER_IMAGE_REPOSITORY + - '/python-fnapi:2.2.0')) + '/python%d%d-fnapi:2.2.0' % (sys.version_info[0], + sys.version_info[1]))) # batch, legacy pipeline. pipeline_options = PipelineOptions( @@ -363,7 +363,12 @@ def test_worker_harness_image_tag_matches_released_sdk_version(self): pipeline_options, '2.0.0', #any environment version FAKE_PIPELINE_URL) - if sys.version_info[0] == 3: + if sys.version_info[0] == 2: + self.assertEqual( + env.proto.workerPools[0].workerHarnessContainerImage, + (names.DATAFLOW_CONTAINER_IMAGE_REPOSITORY + + '/python:2.2.0')) + elif sys.version_info[0:2] == (3, 5): self.assertEqual( env.proto.workerPools[0].workerHarnessContainerImage, (names.DATAFLOW_CONTAINER_IMAGE_REPOSITORY + @@ -372,7 +377,8 @@ def test_worker_harness_image_tag_matches_released_sdk_version(self): self.assertEqual( env.proto.workerPools[0].workerHarnessContainerImage, (names.DATAFLOW_CONTAINER_IMAGE_REPOSITORY + - '/python:2.2.0')) + '/python%d%d:2.2.0' % (sys.version_info[0], + sys.version_info[1]))) def test_worker_harness_override_takes_precedence_over_sdk_defaults(self): # streaming, fnapi pipeline. @@ -456,7 +462,7 @@ def test_experiment_use_multiple_sdk_containers(self): @mock.patch( 'apache_beam.runners.dataflow.internal.apiclient.sys.version_info', - (2, 333)) + (3, 5)) def test_get_python_sdk_name(self): pipeline_options = PipelineOptions( ['--project', 'test_project', '--job_name', 'test_job_name', @@ -465,7 +471,7 @@ def test_get_python_sdk_name(self): '--experiments', 'use_multiple_sdk_containers']) environment = apiclient.Environment( [], pipeline_options, 1, FAKE_PIPELINE_URL) - self.assertEqual('Apache Beam Python 2.333 SDK', + self.assertEqual('Apache Beam Python 3.5 SDK', environment._get_python_sdk_name()) @mock.patch(