Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions sdks/python/apache_beam/examples/snippets/snippets.py
Original file line number Diff line number Diff line change
Expand Up @@ -238,7 +238,7 @@ def _add_argparse_args(cls, parser):
google_cloud_options.project = 'my-project-id'
google_cloud_options.job_name = 'myjob'
google_cloud_options.staging_location = 'gs://my-bucket/binaries'
google_cloud_options.temp_location = 'gs://my-bucket/temp'
google_cloud_options.gcp_temp_location = 'gs://my-bucket/temp'
options.view_as(StandardOptions).runner = 'DataflowRunner'

# Create the Pipeline with the specified options.
Expand Down Expand Up @@ -423,7 +423,7 @@ def examples_wordcount_minimal(renames):
google_cloud_options.project = 'my-project-id'
google_cloud_options.job_name = 'myjob'
google_cloud_options.staging_location = 'gs://your-bucket-name-here/staging'
google_cloud_options.temp_location = 'gs://your-bucket-name-here/temp'
google_cloud_options.gcp_temp_location = 'gs://your-bucket-name-here/temp'
options.view_as(StandardOptions).runner = 'DataflowRunner'
# [END examples_wordcount_minimal_options]

Expand Down
4 changes: 2 additions & 2 deletions sdks/python/apache_beam/io/fileio.py
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@
from apache_beam.io import filesystem
from apache_beam.io import filesystems
from apache_beam.io.filesystem import BeamIOError
from apache_beam.options.pipeline_options import GoogleCloudOptions
from apache_beam.options.pipeline_options import StandardOptions
from apache_beam.options.value_provider import StaticValueProvider
from apache_beam.options.value_provider import ValueProvider
from apache_beam.transforms.window import GlobalWindow
Expand Down Expand Up @@ -490,7 +490,7 @@ def expand(self, pcoll):

if not self._temp_directory:
temp_location = (
p.options.view_as(GoogleCloudOptions).temp_location
p.options.view_as(StandardOptions).temp_location
or self.path.get())
dir_uid = str(uuid.uuid4())
self._temp_directory = StaticValueProvider(
Expand Down
2 changes: 1 addition & 1 deletion sdks/python/apache_beam/io/gcp/bigquery_file_loads.py
Original file line number Diff line number Diff line change
Expand Up @@ -768,7 +768,7 @@ def _load_data(self, partitions_using_temp_tables,
def expand(self, pcoll):
p = pcoll.pipeline

temp_location = p.options.view_as(GoogleCloudOptions).temp_location
temp_location = p.options.view_as(GoogleCloudOptions).gcp_temp_location

empty_pc = p | "ImpulseEmptyPC" >> beam.Create([])
singleton_pc = p | "ImpulseSingleElementPC" >> beam.Create([None])
Expand Down
61 changes: 54 additions & 7 deletions sdks/python/apache_beam/options/pipeline_options.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
from apache_beam.options.value_provider import ValueProvider
from apache_beam.transforms.display import HasDisplayData
from apache_beam.utils import processes
from apache_beam.utils.annotations import deprecated

__all__ = [
'PipelineOptions',
Expand Down Expand Up @@ -387,11 +388,16 @@ def _add_argparse_args(cls, parser):
'--runner',
help=('Pipeline runner used to execute the workflow. Valid values are '
'DirectRunner, DataflowRunner.'))
# Whether to enable streaming mode.
parser.add_argument('--streaming',
default=False,
action='store_true',
help='Whether to enable streaming mode.')
parser.add_argument('--temp_location',
default=None,
help='Location where to store temporary files. Can be '
'a local folder or the URL of an object store bucket. '
'This location must be accessable by all worker '
'processes.')


class TypeOptions(PipelineOptions):
Expand Down Expand Up @@ -474,13 +480,13 @@ def _add_argparse_args(cls, parser):
default=None,
help='Name of the Cloud Dataflow job.')
# Remote execution must check that this option is not None.
# If staging_location is not set, it defaults to gcp_temp_location.
parser.add_argument('--staging_location',
default=None,
help='GCS path for staging code packages needed by '
'workers.')
# Remote execution must check that this option is not None.
# If staging_location is not set, it defaults to temp_location.
parser.add_argument('--temp_location',
# If gcp_temp_location is not set, it defaults to temp_location.
parser.add_argument('--gcp_temp_location',
default=None,
help='GCS path for saving temporary workflow jobs.')
# The Google Compute Engine region for creating Dataflow jobs. See
Expand Down Expand Up @@ -571,13 +577,54 @@ def _get_default_gcp_region(self):
'https://cloud.google.com/compute/docs/regions-zones')
return 'us-central1'

def __getattr__(self, name):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggestion: consider using @property here.

Copy link
Contributor Author

@ostrokach ostrokach Nov 12, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That was my first thought as well, but I couldn't figure out how to get it to work.

The problem is that attribute access of the GoogleCloudOptions class ends up being performed using PipelineOptions.__getattr__ and PipelineOptions.__setattr__, and those only check if the argument is set in the (child) class's _add_argparse_args(cls, parser) method, not if the (child) class actually has that attribute.

Since we can't have both StandardOptions._add_argparse_args and GoogleCloudOptions._add_argparse_args define the temp_location argument, I ended up patching the GoogleCloudOptions.__getattr__ and GoogleCloudOptions.__setattr__ methods to add an additional check for the temp_location attribute (and, if not, default to the parent's __getattr__ / __setattr__).

if name in ["temp_location"]:
return self._get_temp_location()
else:
return super(GoogleCloudOptions, self).__getattr__(name)

def __setattr__(self, name, value):
if name in ["temp_location"]:
self._set_temp_location(value)
else:
super(GoogleCloudOptions, self).__setattr__(name, value)

@deprecated(
since='2.17.0',
custom_message=(
'GoogleCloudOptions.temp_location is deprecated since %since%. '
'Please use StandardOptions.temp_location or '
'GoogleCloudOptions.gcp_temp_location, as appropriate.'))
def _get_temp_location(self):
if self.gcp_temp_location is not None:
return self.gcp_temp_location
else:
return self.view_as(StandardOptions).temp_location

@deprecated(
since='2.17.0',
custom_message=(
'GoogleCloudOptions.temp_location is deprecated since %since%. '
'Please use StandardOptions.temp_location or '
'GoogleCloudOptions.gcp_temp_location, as appropriate.'))
def _set_temp_location(self, temp_location):
self.gcp_temp_location = temp_location

def validate(self, validator):
errors = []
if validator.is_service_runner():
errors.extend(validator.validate_cloud_options(self))
errors.extend(validator.validate_gcs_path(self, 'temp_location'))
if getattr(self, 'staging_location',
None) or getattr(self, 'temp_location', None) is None:
if getattr(self.view_as(GoogleCloudOptions), "gcp_temp_location", None):
errors.extend(
validator.validate_gcs_path(
self.view_as(GoogleCloudOptions), "gcp_temp_location"))
else:
errors.extend(
validator.validate_gcs_path(
self.view_as(StandardOptions), "temp_location"))
if (getattr(self, 'staging_location', None) or
(not getattr(self, 'gcp_temp_location', None) and
not getattr(self.view_as(StandardOptions), 'temp_location', None))):
errors.extend(validator.validate_gcs_path(self, 'staging_location'))

if self.view_as(DebugOptions).dataflow_job_file:
Expand Down
48 changes: 48 additions & 0 deletions sdks/python/apache_beam/options/pipeline_options_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,20 +22,24 @@
from __future__ import absolute_import

import logging
import sys
import unittest
import warnings

import hamcrest as hc

from apache_beam.options.pipeline_options import DebugOptions
from apache_beam.options.pipeline_options import GoogleCloudOptions
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.options.pipeline_options import ProfilingOptions
from apache_beam.options.pipeline_options import StandardOptions
from apache_beam.options.pipeline_options import TypeOptions
from apache_beam.options.pipeline_options import WorkerOptions
from apache_beam.options.value_provider import RuntimeValueProvider
from apache_beam.options.value_provider import StaticValueProvider
from apache_beam.transforms.display import DisplayData
from apache_beam.transforms.display_test import DisplayDataItemMatcher
from apache_beam.utils.annotations import BeamDeprecationWarning


class PipelineOptionsTest(unittest.TestCase):
Expand Down Expand Up @@ -500,6 +504,50 @@ def test_transform_name_mapping(self):
self.assertEqual(mapping['from'], 'to')


class GoogleCloudOptionsTest(unittest.TestCase):

def setUp(self):
# The __warningregistry__'s need to be in a pristine state for tests
# to work properly.
for v in sys.modules.values():
if getattr(v, '__warningregistry__', None):
v.__warningregistry__ = {}

def test_temp_location(self):
options = PipelineOptions(temp_location="abc")

with warnings.catch_warnings():
warnings.simplefilter("ignore", BeamDeprecationWarning)

self.assertEqual(
options.view_as(StandardOptions).temp_location, "abc")
self.assertEqual(
options.view_as(GoogleCloudOptions).temp_location, "abc")
self.assertEqual(
options.view_as(GoogleCloudOptions).gcp_temp_location, None)

options.view_as(GoogleCloudOptions).temp_location = "gs://asdf"
self.assertEqual(
options.view_as(StandardOptions).temp_location, "abc")
self.assertEqual(
options.view_as(GoogleCloudOptions).temp_location, "gs://asdf")
self.assertEqual(
options.view_as(GoogleCloudOptions).gcp_temp_location, "gs://asdf")

def test_temp_location_deprecation_warning(self):
options = PipelineOptions()
with warnings.catch_warnings(record=True) as w:
warnings.simplefilter("always", BeamDeprecationWarning)
_ = options.view_as(GoogleCloudOptions).temp_location
self.assertEqual(len(w), 1)
self.assertIsInstance(w[0].message, BeamDeprecationWarning)
with warnings.catch_warnings(record=True) as w:
warnings.simplefilter("always", BeamDeprecationWarning)
options.view_as(GoogleCloudOptions).temp_location = "gs://asdf"
self.assertEqual(len(w), 1)
self.assertIsInstance(w[0].message, BeamDeprecationWarning)


if __name__ == '__main__':
logging.getLogger().setLevel(logging.INFO)
unittest.main()
39 changes: 24 additions & 15 deletions sdks/python/apache_beam/runners/dataflow/internal/apiclient.py
Original file line number Diff line number Diff line change
Expand Up @@ -145,10 +145,11 @@ def __init__(self, packages, options, environment_version, pipeline_url):
self.proto.clusterManagerApiService = GoogleCloudOptions.COMPUTE_API_SERVICE
self.proto.dataset = '{}/cloud_dataflow'.format(
GoogleCloudOptions.BIGQUERY_API_SERVICE)
temp_location = (
self.google_cloud_options.gcp_temp_location or
self.standard_options.temp_location)
self.proto.tempStoragePrefix = (
self.google_cloud_options.temp_location.replace(
'gs:/',
GoogleCloudOptions.STORAGE_API_SERVICE))
temp_location.replace('gs:/', GoogleCloudOptions.STORAGE_API_SERVICE))
if self.worker_options.worker_region:
self.proto.workerRegion = self.worker_options.worker_region
if self.worker_options.worker_zone:
Expand Down Expand Up @@ -377,20 +378,30 @@ def __init__(self, options, proto_pipeline):
self.google_cloud_options.job_name = self.default_job_name(
self.google_cloud_options.job_name)

required_google_cloud_options = ['project', 'job_name', 'temp_location']
if not self.google_cloud_options.gcp_temp_location:
_LOGGER.info('Defaulting to temp_location as gcp_temp_location: %s',
self.options.view_as(StandardOptions).temp_location)
(self.google_cloud_options.gcp_temp_location
) = self.options.view_as(StandardOptions).temp_location

if not self.google_cloud_options.staging_location:
_LOGGER.info(
'Defaulting to the gcp_temp_location as staging_location: %s',
self.google_cloud_options.gcp_temp_location)
(self.google_cloud_options.staging_location
) = self.google_cloud_options.gcp_temp_location

required_google_cloud_options = [
'project', 'job_name', 'gcp_temp_location', 'staging_location'
]
missing = [
option for option in required_google_cloud_options
if not getattr(self.google_cloud_options, option)]
if not getattr(self.google_cloud_options, option, None)
]
if missing:
raise ValueError(
'Missing required configuration parameters: %s' % missing)

if not self.google_cloud_options.staging_location:
_LOGGER.info('Defaulting to the temp_location as staging_location: %s',
self.google_cloud_options.temp_location)
(self.google_cloud_options
.staging_location) = self.google_cloud_options.temp_location

# Make the staging and temp locations job name and time specific. This is
# needed to avoid clashes between job submissions using the same staging
# area or team members using same job names. This method is not entirely
Expand All @@ -402,8 +413,8 @@ def __init__(self, options, proto_pipeline):
path_suffix = '%s.%f' % (self.google_cloud_options.job_name, time.time())
self.google_cloud_options.staging_location = FileSystems.join(
self.google_cloud_options.staging_location, path_suffix)
self.google_cloud_options.temp_location = FileSystems.join(
self.google_cloud_options.temp_location, path_suffix)
self.google_cloud_options.gcp_temp_location = FileSystems.join(
self.google_cloud_options.gcp_temp_location, path_suffix)

self.proto = dataflow.Job(name=self.google_cloud_options.job_name)
if self.options.view_as(StandardOptions).streaming:
Expand Down Expand Up @@ -489,8 +500,6 @@ def _stage_resources(self, options):
google_cloud_options = options.view_as(GoogleCloudOptions)
if google_cloud_options.staging_location is None:
raise RuntimeError('The --staging_location option must be specified.')
if google_cloud_options.temp_location is None:
raise RuntimeError('The --temp_location option must be specified.')

resource_stager = _LegacyDataflowStager(self)
_, resources = resource_stager.stage_job_resources(
Expand Down