Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 3 additions & 12 deletions sdks/python/apache_beam/runners/dataflow/internal/apiclient.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@
import sys
import tempfile
import time
import warnings
from datetime import datetime

from builtins import object
Expand Down Expand Up @@ -954,15 +953,7 @@ def get_response_encoding():


def _verify_interpreter_version_is_supported(pipeline_options):
if sys.version_info[0] == 2:
return

if sys.version_info[0:2] in [(3, 5), (3, 6)]:
if sys.version_info[0:3] < (3, 5, 3):
warnings.warn(
'You are using an early release for Python 3.5. It is recommended '
'to use Python 3.5.3 or higher with Dataflow '
'runner.')
if sys.version_info[0:2] in [(2, 7), (3, 5), (3, 6), (3, 7)]:
return

debug_options = pipeline_options.view_as(DebugOptions)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we still need ignore_py3_minor_version experiment?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It may be helpful to force the SDK to start a Dataflow job on, say, python 3.8 when it becomes available. I don't have a strong opinion whether to keep it or not.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's keep it for now, in case you want to get this PR into the release branch.

Python 3.8, probably we will need to add explicit support by changing this list anyway.

Expand All @@ -972,8 +963,8 @@ def _verify_interpreter_version_is_supported(pipeline_options):

raise Exception(
'Dataflow runner currently supports Python versions '
'2.7, 3.5 and 3.6. To ignore this requirement and start a job using a '
'different version of Python 3 interpreter, pass '
'2.7, 3.5, 3.6, and 3.7. To ignore this requirement and start a job '
'using a different version of Python 3 interpreter, pass '
'--experiment ignore_py3_minor_version pipeline option.')


Expand Down
58 changes: 32 additions & 26 deletions sdks/python/apache_beam/runners/dataflow/internal/apiclient_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -286,26 +286,23 @@ def test_pinned_worker_harness_image_tag_used_in_dev_sdk(self):
pipeline_options,
'2.0.0', #any environment version
FAKE_PIPELINE_URL)
if sys.version_info[0:2] == (3, 5):
if sys.version_info[0] == 2:
self.assertEqual(
env.proto.workerPools[0].workerHarnessContainerImage,
(names.DATAFLOW_CONTAINER_IMAGE_REPOSITORY +
'/python3-fnapi:' + names.BEAM_FNAPI_CONTAINER_VERSION))
elif sys.version_info[0:2] == (3, 6):
self.assertEqual(
env.proto.workerPools[0].workerHarnessContainerImage,
(names.DATAFLOW_CONTAINER_IMAGE_REPOSITORY +
'/python36-fnapi:' + names.BEAM_FNAPI_CONTAINER_VERSION))
elif sys.version_info[0:2] == (3, 7):
'/python-fnapi:' + names.BEAM_FNAPI_CONTAINER_VERSION))
elif sys.version_info[0:2] == (3, 5):
self.assertEqual(
env.proto.workerPools[0].workerHarnessContainerImage,
(names.DATAFLOW_CONTAINER_IMAGE_REPOSITORY +
'/python37-fnapi:' + names.BEAM_FNAPI_CONTAINER_VERSION))
'/python3-fnapi:' + names.BEAM_FNAPI_CONTAINER_VERSION))
else:
self.assertEqual(
env.proto.workerPools[0].workerHarnessContainerImage,
(names.DATAFLOW_CONTAINER_IMAGE_REPOSITORY +
'/python-fnapi:' + names.BEAM_FNAPI_CONTAINER_VERSION))
'/python%d%d-fnapi:%s' % (sys.version_info[0],
sys.version_info[1],
names.BEAM_FNAPI_CONTAINER_VERSION)))

# batch, legacy pipeline.
pipeline_options = PipelineOptions(
Expand All @@ -314,26 +311,23 @@ def test_pinned_worker_harness_image_tag_used_in_dev_sdk(self):
pipeline_options,
'2.0.0', #any environment version
FAKE_PIPELINE_URL)
if sys.version_info[0:2] == (3, 5):
self.assertEqual(
env.proto.workerPools[0].workerHarnessContainerImage,
(names.DATAFLOW_CONTAINER_IMAGE_REPOSITORY +
'/python3:' + names.BEAM_CONTAINER_VERSION))
elif sys.version_info[0:2] == (3, 6):
if sys.version_info[0] == 2:
self.assertEqual(
env.proto.workerPools[0].workerHarnessContainerImage,
(names.DATAFLOW_CONTAINER_IMAGE_REPOSITORY +
'/python36:' + names.BEAM_CONTAINER_VERSION))
elif sys.version_info[0:2] == (3, 7):
'/python:' + names.BEAM_CONTAINER_VERSION))
elif sys.version_info[0:2] == (3, 5):
self.assertEqual(
env.proto.workerPools[0].workerHarnessContainerImage,
(names.DATAFLOW_CONTAINER_IMAGE_REPOSITORY +
'/python37:' + names.BEAM_CONTAINER_VERSION))
'/python3:' + names.BEAM_CONTAINER_VERSION))
else:
self.assertEqual(
env.proto.workerPools[0].workerHarnessContainerImage,
(names.DATAFLOW_CONTAINER_IMAGE_REPOSITORY +
'/python:' + names.BEAM_CONTAINER_VERSION))
'/python%d%d:%s' % (sys.version_info[0],
sys.version_info[1],
names.BEAM_CONTAINER_VERSION)))

@mock.patch('apache_beam.runners.dataflow.internal.apiclient.'
'beam_version.__version__', '2.2.0')
Expand All @@ -345,7 +339,12 @@ def test_worker_harness_image_tag_matches_released_sdk_version(self):
pipeline_options,
'2.0.0', #any environment version
FAKE_PIPELINE_URL)
if sys.version_info[0] == 3:
if sys.version_info[0] == 2:
self.assertEqual(
env.proto.workerPools[0].workerHarnessContainerImage,
(names.DATAFLOW_CONTAINER_IMAGE_REPOSITORY +
'/python-fnapi:2.2.0'))
elif sys.version_info[0:2] == (3, 5):
self.assertEqual(
env.proto.workerPools[0].workerHarnessContainerImage,
(names.DATAFLOW_CONTAINER_IMAGE_REPOSITORY +
Expand All @@ -354,7 +353,8 @@ def test_worker_harness_image_tag_matches_released_sdk_version(self):
self.assertEqual(
env.proto.workerPools[0].workerHarnessContainerImage,
(names.DATAFLOW_CONTAINER_IMAGE_REPOSITORY +
'/python-fnapi:2.2.0'))
'/python%d%d-fnapi:2.2.0' % (sys.version_info[0],
sys.version_info[1])))

# batch, legacy pipeline.
pipeline_options = PipelineOptions(
Expand All @@ -363,7 +363,12 @@ def test_worker_harness_image_tag_matches_released_sdk_version(self):
pipeline_options,
'2.0.0', #any environment version
FAKE_PIPELINE_URL)
if sys.version_info[0] == 3:
if sys.version_info[0] == 2:
self.assertEqual(
env.proto.workerPools[0].workerHarnessContainerImage,
(names.DATAFLOW_CONTAINER_IMAGE_REPOSITORY +
'/python:2.2.0'))
elif sys.version_info[0:2] == (3, 5):
self.assertEqual(
env.proto.workerPools[0].workerHarnessContainerImage,
(names.DATAFLOW_CONTAINER_IMAGE_REPOSITORY +
Expand All @@ -372,7 +377,8 @@ def test_worker_harness_image_tag_matches_released_sdk_version(self):
self.assertEqual(
env.proto.workerPools[0].workerHarnessContainerImage,
(names.DATAFLOW_CONTAINER_IMAGE_REPOSITORY +
'/python:2.2.0'))
'/python%d%d:2.2.0' % (sys.version_info[0],
sys.version_info[1])))

def test_worker_harness_override_takes_precedence_over_sdk_defaults(self):
# streaming, fnapi pipeline.
Expand Down Expand Up @@ -456,7 +462,7 @@ def test_experiment_use_multiple_sdk_containers(self):

@mock.patch(
'apache_beam.runners.dataflow.internal.apiclient.sys.version_info',
(2, 333))
(3, 5))
def test_get_python_sdk_name(self):
pipeline_options = PipelineOptions(
['--project', 'test_project', '--job_name', 'test_job_name',
Expand All @@ -465,7 +471,7 @@ def test_get_python_sdk_name(self):
'--experiments', 'use_multiple_sdk_containers'])
environment = apiclient.Environment(
[], pipeline_options, 1, FAKE_PIPELINE_URL)
self.assertEqual('Apache Beam Python 2.333 SDK',
self.assertEqual('Apache Beam Python 3.5 SDK',
environment._get_python_sdk_name())

@mock.patch(
Expand Down