Skip to content
This repository has been archived by the owner on May 12, 2021. It is now read-only.

Commit

Permalink
AURORA-1923 Aurora client should not automatically retry non-idempote…
Browse files Browse the repository at this point in the history
…nt operations

Aurora client has a built in mechanism to automatically retry thrift API operations if the connection with scheduler times out, experiences transport exception, or encounters a transient exception on the scheduler side.

Retrying thrift calls due to scheduler connection timeout and transient exceptions (see AURORA-187) is safe. However, as Aurora has no concept of idempotency, its client can retry non-idempotent operations upon encountering transport exceptions which can lead to nondeterministic situations.

For example, if client requests go through a proxy to reach scheduler, client might consider a non-idempotent request failed and automatically retry it while the original request has been received and processed by the scheduler.

This patch changes Aurora client invocation semantics from "at least once" to "at most once" for non-idempotent operations.

Reviewed at https://reviews.apache.org/r/58850/
  • Loading branch information
nurolahzade authored and Mehrdad Nurolahzade committed May 2, 2017
1 parent 6a896df commit f1e2537
Show file tree
Hide file tree
Showing 9 changed files with 185 additions and 33 deletions.
39 changes: 26 additions & 13 deletions src/main/python/apache/aurora/client/api/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,8 @@ def deschedule_cron(self, jobkey):
return self._scheduler_proxy.descheduleCronJob(jobkey.to_thrift())

def populate_job_config(self, config):
return self._scheduler_proxy.populateJobConfig(config.job())
# read-only calls are retriable.
return self._scheduler_proxy.populateJobConfig(config.job(), retry=True)

def start_cronjob(self, job_key):
self._assert_valid_job_key(job_key)
Expand All @@ -98,7 +99,8 @@ def start_cronjob(self, job_key):

def get_jobs(self, role):
log.info("Retrieving jobs for role %s" % role)
return self._scheduler_proxy.getJobs(role)
# read-only calls are retriable.
return self._scheduler_proxy.getJobs(role, retry=True)

def add_instances(self, job_key, instance_id, count):
key = InstanceKey(jobKey=job_key.to_thrift(), instanceId=instance_id)
Expand All @@ -113,7 +115,6 @@ def kill_job(self, job_key, instances=None, message=None):
if instances is not None:
log.info("Instances to be killed: %s" % instances)
instances = frozenset([int(s) for s in instances])

return self._scheduler_proxy.killTasks(job_key.to_thrift(), instances, message)

def check_status(self, job_key):
Expand All @@ -130,14 +131,16 @@ def build_query(cls, role, job, env=None, instances=None, statuses=LIVE_STATES):

def query(self, query):
try:
return self._scheduler_proxy.getTasksStatus(query)
# read-only calls are retriable.
return self._scheduler_proxy.getTasksStatus(query, retry=True)
except SchedulerProxy.ThriftInternalError as e:
raise self.ThriftInternalError(e.args[0])

def query_no_configs(self, query):
"""Returns all matching tasks without TaskConfig.executorConfig set."""
try:
return self._scheduler_proxy.getTasksWithoutConfigs(query)
# read-only calls are retriable.
return self._scheduler_proxy.getTasksWithoutConfigs(query, retry=True)
except SchedulerProxy.ThriftInternalError as e:
raise self.ThriftInternalError(e.args[0])

Expand Down Expand Up @@ -167,7 +170,9 @@ def start_job_update(self, config, message, instances=None, metadata=None):
"""
request = self._job_update_request(config, instances, metadata)
log.info("Starting update for: %s" % config.name())
return self._scheduler_proxy.startJobUpdate(request, message)
# retring starting a job update is safe, client and scheduler reconcile state if the
# job update is in progress (AURORA-1711).
return self._scheduler_proxy.startJobUpdate(request, message, retry=True)

def pause_job_update(self, update_key, message):
"""Requests Scheduler to pause active job update.
Expand Down Expand Up @@ -225,7 +230,8 @@ def get_job_update_diff(self, config, instances=None):
"""
request = self._job_update_request(config, instances)
log.debug("Requesting job update diff details for: %s" % config.name())
return self._scheduler_proxy.getJobUpdateDiff(request)
# read-only calls are retriable.
return self._scheduler_proxy.getJobUpdateDiff(request, retry=True)

def query_job_updates(
self,
Expand All @@ -246,13 +252,15 @@ def query_job_updates(
Returns response object with all matching job update summaries.
"""
# TODO(wfarner): Consider accepting JobUpdateQuery in this function instead of kwargs.
# read-only calls are retriable.
return self._scheduler_proxy.getJobUpdateSummaries(
JobUpdateQuery(
role=role,
jobKey=job_key.to_thrift() if job_key else None,
user=user,
updateStatuses=update_statuses,
key=update_key))
key=update_key),
retry=True)

def get_job_update_details(self, key):
"""Gets JobUpdateDetails for the specified job update ID.
Expand All @@ -267,7 +275,8 @@ def get_job_update_details(self, key):
% (key, JobUpdateKey.__name__, key.__class__.__name__))

query = JobUpdateQuery(key=key)
return self._scheduler_proxy.getJobUpdateDetails(key, query)
# read-only calls are retriable.
return self._scheduler_proxy.getJobUpdateDetails(key, query, retry=True)

def restart(self, job_key, instances, restart_settings):
"""Perform a rolling restart of the job.
Expand All @@ -291,15 +300,17 @@ def drain_hosts(self, hosts):

def maintenance_status(self, hosts):
log.info("Maintenance status for: %s" % hosts.hostNames)
return self._scheduler_proxy.maintenanceStatus(hosts)
# read-only calls are retriable.
return self._scheduler_proxy.maintenanceStatus(hosts, retry=True)

def end_maintenance(self, hosts):
log.info("Ending maintenance for: %s" % hosts.hostNames)
return self._scheduler_proxy.endMaintenance(hosts)

def get_quota(self, role):
log.info("Getting quota for: %s" % role)
return self._scheduler_proxy.getQuota(role)
# read-only calls are retriable.
return self._scheduler_proxy.getQuota(role, retry=True)

def set_quota(self, role, cpu, ram, disk):
log.info("Setting quota for user:%s cpu:%f ram:%d disk: %d"
Expand All @@ -313,7 +324,8 @@ def set_quota(self, role, cpu, ram, disk):

def get_tier_configs(self):
log.debug("Getting tier configurations")
return self._scheduler_proxy.getTierConfigs()
# read-only calls are retriable.
return self._scheduler_proxy.getTierConfigs(retry=True)

def force_task_state(self, task_id, status):
log.info("Requesting that task %s transition to state %s" % (task_id, status))
Expand All @@ -329,7 +341,8 @@ def stage_recovery(self, backup_id):
return self._scheduler_proxy.stageRecovery(backup_id)

def query_recovery(self, query):
return self._scheduler_proxy.queryRecovery(query)
# read-only calls are retriable.
return self._scheduler_proxy.queryRecovery(query, retry=True)

def delete_recovery_tasks(self, query):
return self._scheduler_proxy.deleteRecoveryTasks(query)
Expand Down
23 changes: 20 additions & 3 deletions src/main/python/apache/aurora/client/api/scheduler_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -235,6 +235,7 @@ class TransientError(Error): pass
class AuthError(Error): pass
class APIVersionError(Error): pass
class ThriftInternalError(Error): pass
class NotRetriableError(Error): pass

def __init__(self, cluster, verbose=False, **kwargs):
self.cluster = cluster
Expand Down Expand Up @@ -302,12 +303,12 @@ def __getattr__(self, method_name):
return method

@functools.wraps(method)
def method_wrapper(*args):
def method_wrapper(*args, **kwargs):
retry = kwargs.get('retry', False)
with self._lock:
start = time.time()
while not self._terminating.is_set() and (
time.time() - start) < self.RPC_MAXIMUM_WAIT.as_(Time.SECONDS):

try:
method = getattr(self.client(), method_name)
if not callable(method):
Expand All @@ -321,7 +322,23 @@ def method_wrapper(*args):
except TRequestsTransport.AuthError as e:
log.error(self.scheduler_client().get_failed_auth_message())
raise self.AuthError(e)
except (TTransport.TTransportException, self.TimeoutError, self.TransientError) as e:
except TTransport.TTransportException as e:
# Client does not know if the request has been received and processed by
# the scheduler, therefore the call is retried if it is idempotent.
if not self._terminating.is_set():
if retry:
log.warning('Transport error communicating with scheduler: %s, retrying...' % e)
self.invalidate()
self._terminating.wait(self.RPC_RETRY_INTERVAL.as_(Time.SECONDS))
else:
raise self.NotRetriableError('Transport error communicating with scheduler during '
'non-idempotent operation: %s, not retrying' % e)
except (self.TimeoutError, self.TransientError) as e:
# If it is TimeoutError then the connection with scheduler could not
# be established, therefore the call did not go through.
# If it is TransientError then the scheduler could not process the call
# because its storage is not in READY state.
# In both cases, the call can be safely retried.
if not self._terminating.is_set():
log.warning('Connection error with scheduler: %s, reconnecting...' % e)
self.invalidate()
Expand Down
7 changes: 7 additions & 0 deletions src/main/python/apache/aurora/client/cli/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
from twitter.common.lang import AbstractClass, Compatibility

from apache.aurora.client.api import AuroraClientAPI
from apache.aurora.client.api.scheduler_client import SchedulerProxy

from .command_hooks import GlobalCommandHookRegistry
from .options import CommandOption
Expand Down Expand Up @@ -319,6 +320,12 @@ def _execute(self, args):
except Context.CommandError as c:
context.print_err(c.msg)
return c.code
except SchedulerProxy.NotRetriableError as e:
context.print_err(e.message)
return EXIT_NETWORK_ERROR
except SchedulerProxy.TimeoutError as e:
context.print_err(e.message)
return EXIT_TIMEOUT
except AuroraClientAPI.Error as e:
# TODO(wfarner): Generalize this error type in the contract of noun and verb implementations.
context.print_err("Fatal error running command: %s" % e.message)
Expand Down
14 changes: 9 additions & 5 deletions src/test/python/apache/aurora/admin/test_maintenance.py
Original file line number Diff line number Diff line change
Expand Up @@ -107,14 +107,16 @@ def test_end_maintenance_hosts(self):
host_activate([self.TEST_CLUSTER])

mock_scheduler_proxy.endMaintenance.assert_called_with(Hosts(set(self.HOSTNAMES)))
mock_scheduler_proxy.maintenanceStatus.assert_called_with(Hosts(set(self.HOSTNAMES)))
mock_scheduler_proxy.maintenanceStatus.assert_called_with(
Hosts(set(self.HOSTNAMES)),
retry=True)

def test_perform_maintenance_hosts(self):
mock_options = self.make_mock_options()
mock_options.post_drain_script = 'callback'
mock_options.grouping = 'by_host'

def host_status_results(hostnames):
def host_status_results(hostnames, retry=True):
if isinstance(hostnames, Hosts):
return self.create_drained_status_result(hostnames)
return self.create_maintenance_status_result()
Expand Down Expand Up @@ -150,7 +152,7 @@ def test_perform_maintenance_hosts_unknown_hosts_skipped(self):
mock_options.post_drain_script = None
mock_options.grouping = 'by_host'

def host_status_results(hostnames):
def host_status_results(hostnames, retry=True):
if isinstance(hostnames, Hosts):
return self.create_drained_status_result(hostnames)
return self.create_maintenance_status_result()
Expand Down Expand Up @@ -247,7 +249,7 @@ def test_perform_maintenance_hosts_no_prod_tasks(self):
mock_options.post_drain_script = None
mock_options.grouping = 'by_host'

def host_status_results(hostnames):
def host_status_results(hostnames, retry=True):
if isinstance(hostnames, Hosts):
return self.create_drained_status_result(hostnames)
return self.create_maintenance_status_result()
Expand Down Expand Up @@ -321,4 +323,6 @@ def test_host_maintenance_status(self):
patch('twitter.common.app.get_options', return_value=mock_options)):
host_status([self.TEST_CLUSTER])

mock_scheduler_proxy.maintenanceStatus.assert_called_with(Hosts(set(self.HOSTNAMES)))
mock_scheduler_proxy.maintenanceStatus.assert_called_with(
Hosts(set(self.HOSTNAMES)),
retry=True)
36 changes: 36 additions & 0 deletions src/test/python/apache/aurora/api_util.py
Original file line number Diff line number Diff line change
Expand Up @@ -115,5 +115,41 @@ class SchedulerProxyApiSpec(SchedulerThriftApiSpec, SchedulerProxy):
A concrete definition of the API provided by SchedulerProxy.
"""

def getTasksStatus(self, query, retry=True):
pass

def getTasksWithoutConfigs(self, query, retry=True):
pass

def getJobs(self, ownerRole, retry=True):
pass

def getQuota(self, ownerRole, retry=True):
pass

def populateJobConfig(self, description, retry=True):
pass

def getJobUpdateSummaries(self, jobUpdateQuery, retry=True):
pass

def getJobUpdateDetails(self, key, query, retry=True):
pass

def getJobUpdateDiff(self, request, retry=True):
pass

def getTierConfigs(self, retry=True):
pass

def queryRecovery(self, query, retry=True):
pass

def maintenanceStatus(self, hosts, retry=True):
pass

def startJobUpdate(self, request, message, retry=True):
pass

def url(self):
pass
17 changes: 10 additions & 7 deletions src/test/python/apache/aurora/client/api/test_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
from apache.aurora.config import AuroraConfig
from apache.aurora.config.schema.base import UpdateConfig

from ...api_util import SchedulerThriftApiSpec
from ...api_util import SchedulerProxyApiSpec

from gen.apache.aurora.api.ttypes import (
InstanceKey,
Expand Down Expand Up @@ -75,7 +75,7 @@ def create_error_response(cls):
@classmethod
def mock_api(cls):
api = AuroraClientAPI(Cluster(name="foo"), 'test-client')
mock_proxy = create_autospec(spec=SchedulerThriftApiSpec, spec_set=True, instance=True)
mock_proxy = create_autospec(spec=SchedulerProxyApiSpec, spec_set=True, instance=True)
api._scheduler_proxy = mock_proxy
return api, mock_proxy

Expand Down Expand Up @@ -131,7 +131,8 @@ def test_start_job_update(self):
api.start_job_update(self.mock_job_config(), instances=None, message='hello')
mock_proxy.startJobUpdate.assert_called_once_with(
self.create_update_request(task_config),
'hello')
'hello',
retry=True)

def test_start_job_update_fails_parse_update_config(self):
"""Test start_job_update fails to parse invalid UpdateConfig."""
Expand All @@ -150,7 +151,9 @@ def test_get_job_update_diff(self):
mock_proxy.getJobUpdateDiff.return_value = self.create_simple_success_response()

api.get_job_update_diff(self.mock_job_config(), instances=None)
mock_proxy.getJobUpdateDiff.assert_called_once_with(self.create_update_request(task_config))
mock_proxy.getJobUpdateDiff.assert_called_once_with(
self.create_update_request(task_config),
retry=True)

def test_pause_job_update(self):
"""Test successful job update pause."""
Expand All @@ -176,22 +179,22 @@ def test_query_job_updates(self):
jobKey=job_key.to_thrift(),
updateStatuses={JobUpdateStatus.ROLLING_FORWARD})
api.query_job_updates(job_key=job_key, update_statuses=query.updateStatuses)
mock_proxy.getJobUpdateSummaries.assert_called_once_with(query)
mock_proxy.getJobUpdateSummaries.assert_called_once_with(query, retry=True)

def test_query_job_updates_no_filter(self):
"""Test querying job updates with no filter args."""
api, mock_proxy = self.mock_api()
query = JobUpdateQuery()
api.query_job_updates()
mock_proxy.getJobUpdateSummaries.assert_called_once_with(query)
mock_proxy.getJobUpdateSummaries.assert_called_once_with(query, retry=True)

def test_get_job_update_details(self):
"""Test getting job update details."""
api, mock_proxy = self.mock_api()
key = JobUpdateKey(job=JobKey(role="role", environment="env", name="name"), id="id")
api.get_job_update_details(key)
query = JobUpdateQuery(key=key)
mock_proxy.getJobUpdateDetails.assert_called_once_with(key, query)
mock_proxy.getJobUpdateDetails.assert_called_once_with(key, query, retry=True)

def test_set_quota(self):
"""Test setting quota."""
Expand Down
Loading

0 comments on commit f1e2537

Please sign in to comment.