Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
35 changes: 20 additions & 15 deletions sdks/python/apache_beam/runners/dataflow/internal/apiclient.py
Original file line number Diff line number Diff line change
Expand Up @@ -451,11 +451,12 @@ def create_job_description(self, job):

@retry.with_exponential_backoff(num_retries=3, initial_delay_secs=3)
def get_job_metrics(self, job_id):
request = dataflow.DataflowProjectsJobsGetMetricsRequest()
request = dataflow.DataflowProjectsLocationsJobsGetMetricsRequest()
request.jobId = job_id
request.location = self.google_cloud_options.region
request.projectId = self.google_cloud_options.project
try:
response = self._client.projects_jobs.GetMetrics(request)
response = self._client.projects_locations_jobs.GetMetrics(request)
except exceptions.BadStatusCodeError as e:
logging.error('HTTP status %d. Unable to query metrics',
e.response.status)
Expand All @@ -464,12 +465,13 @@ def get_job_metrics(self, job_id):

def submit_job_description(self, job):
"""Creates and excutes a job request."""
request = dataflow.DataflowProjectsJobsCreateRequest()
request = dataflow.DataflowProjectsLocationsJobsCreateRequest()
request.projectId = self.google_cloud_options.project
request.location = self.google_cloud_options.region
request.job = job.proto

try:
response = self._client.projects_jobs.Create(request)
response = self._client.projects_locations_jobs.Create(request)
except exceptions.BadStatusCodeError as e:
logging.error('HTTP status %d trying to create job'
' at dataflow service endpoint %s',
Expand Down Expand Up @@ -509,9 +511,10 @@ def modify_job_state(self, job_id, new_state):
# Other states could only be set by the service.
return False

request = dataflow.DataflowProjectsJobsUpdateRequest()
request = dataflow.DataflowProjectsLocationsJobsUpdateRequest()
request.jobId = job_id
request.projectId = self.google_cloud_options.project
request.location = self.google_cloud_options.region
request.job = dataflow.Job(requestedState=new_state)

self._client.projects_jobs.Update(request)
Expand Down Expand Up @@ -539,10 +542,11 @@ def get_job(self, job_id):
(e.g. '2015-03-10T00:01:53.074Z')
currentStateTime: UTC time for the current state of the job.
"""
request = dataflow.DataflowProjectsJobsGetRequest()
request = dataflow.DataflowProjectsLocationsJobsGetRequest()
request.jobId = job_id
request.projectId = self.google_cloud_options.project
response = self._client.projects_jobs.Get(request)
request.location = self.google_cloud_options.region
response = self._client.projects_locations_jobs.Get(request)
return response

@retry.with_exponential_backoff() # Using retry defaults from utils/retry.py
Expand Down Expand Up @@ -588,8 +592,9 @@ def list_messages(
JOB_MESSAGE_WARNING, JOB_MESSAGE_ERROR.
messageText: A message string.
"""
request = dataflow.DataflowProjectsJobsMessagesListRequest(
jobId=job_id, projectId=self.google_cloud_options.project)
request = dataflow.DataflowProjectsLocationsJobsMessagesListRequest(
jobId=job_id, location=self.google_cloud_options.region,
projectId=self.google_cloud_options.project)
if page_token is not None:
request.pageToken = page_token
if start_time is not None:
Expand All @@ -599,34 +604,34 @@ def list_messages(
if minimum_importance is not None:
if minimum_importance == 'JOB_MESSAGE_DEBUG':
request.minimumImportance = (
dataflow.DataflowProjectsJobsMessagesListRequest
dataflow.DataflowProjectsLocationsJobsMessagesListRequest
.MinimumImportanceValueValuesEnum
.JOB_MESSAGE_DEBUG)
elif minimum_importance == 'JOB_MESSAGE_DETAILED':
request.minimumImportance = (
dataflow.DataflowProjectsJobsMessagesListRequest
dataflow.DataflowProjectsLocationsJobsMessagesListRequest
.MinimumImportanceValueValuesEnum
.JOB_MESSAGE_DETAILED)
elif minimum_importance == 'JOB_MESSAGE_BASIC':
request.minimumImportance = (
dataflow.DataflowProjectsJobsMessagesListRequest
dataflow.DataflowProjectsLocationsJobsMessagesListRequest
.MinimumImportanceValueValuesEnum
.JOB_MESSAGE_BASIC)
elif minimum_importance == 'JOB_MESSAGE_WARNING':
request.minimumImportance = (
dataflow.DataflowProjectsJobsMessagesListRequest
dataflow.DataflowProjectsLocationsJobsMessagesListRequest
.MinimumImportanceValueValuesEnum
.JOB_MESSAGE_WARNING)
elif minimum_importance == 'JOB_MESSAGE_ERROR':
request.minimumImportance = (
dataflow.DataflowProjectsJobsMessagesListRequest
dataflow.DataflowProjectsLocationsJobsMessagesListRequest
.MinimumImportanceValueValuesEnum
.JOB_MESSAGE_ERROR)
else:
raise RuntimeError(
'Unexpected value for minimum_importance argument: %r',
minimum_importance)
response = self._client.projects_jobs_messages.List(request)
response = self._client.projects_locations_jobs_messages.List(request)
return response.jobMessages, response.nextPageToken


Expand Down
11 changes: 10 additions & 1 deletion sdks/python/apache_beam/utils/pipeline_options.py
Original file line number Diff line number Diff line change
Expand Up @@ -277,6 +277,15 @@ def _add_argparse_args(cls, parser):
parser.add_argument('--temp_location',
default=None,
help='GCS path for saving temporary workflow jobs.')
# The Cloud Dataflow service does not yet honor this setting. However, once
# service support is added then users of this SDK will be able to control
# the region. Default is up to the Dataflow service. See
# https://cloud.google.com/compute/docs/regions-zones/regions-zones for a
# list of valid options/
parser.add_argument('--region',
default='us-central1',
help='The Google Compute Engine region for creating '
'Dataflow job.')
parser.add_argument('--service_account_name',
default=None,
help='Name of the service account for Google APIs.')
Expand Down Expand Up @@ -336,7 +345,7 @@ def _add_argparse_args(cls, parser):
choices=['NONE', 'THROUGHPUT_BASED'],
default=None, # Meaning unset, distinct from 'NONE' meaning don't scale
help=
('If and how to auotscale the workerpool.'))
('If and how to autoscale the workerpool.'))
parser.add_argument(
'--worker_machine_type',
dest='machine_type',
Expand Down