Skip to content

Commit

Permalink
Merge pull request #910 from untergeek/feature/836
Browse files Browse the repository at this point in the history
Add wait_for_it utility to satisfy #836
  • Loading branch information
untergeek committed Mar 27, 2017
2 parents b428fd4 + 1fe5db3 commit 1b147a8
Show file tree
Hide file tree
Showing 21 changed files with 727 additions and 92 deletions.
3 changes: 2 additions & 1 deletion curator/_version.py
Original file line number Diff line number Diff line change
@@ -1 +1,2 @@
__version__ = '5.0.0.a1'
__version__ = '5.0.0.a2'

118 changes: 81 additions & 37 deletions curator/actions.py
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,7 @@ def do_action(self):

class Allocation(object):
def __init__(self, ilo, key=None, value=None, allocation_type='require',
wait_for_completion=False, timeout=30,
wait_for_completion=False, wait_interval=3, max_wait=-1,
):
"""
:arg ilo: A :class:`curator.indexlist.IndexList` object
Expand All @@ -166,7 +166,9 @@ def __init__(self, ilo, key=None, value=None, allocation_type='require',
:arg wait_for_completion: Wait (or not) for the operation
to complete before returning. (default: `False`)
:type wait_for_completion: bool
:arg timeout: Number of seconds to `wait_for_completion`
:arg wait_interval: How long in seconds to wait between checks for
completion.
:arg max_wait: Maximum number of seconds to `wait_for_completion`
.. note::
See:
Expand Down Expand Up @@ -195,10 +197,13 @@ def __init__(self, ilo, key=None, value=None, allocation_type='require',
#: Instance variable.
#: Internal reference to `wait_for_completion`
self.wfc = wait_for_completion
#: Instance variable
#: How many seconds to wait between checks for completion.
self.wait_interval = wait_interval
#: Instance variable.
#: How long in seconds to `wait_for_completion` before returning with an
#: exception
self.timeout = '{0}s'.format(timeout)
#: exception. A value of -1 means wait forever.
self.max_wait = max_wait

def do_dry_run(self):
"""
Expand Down Expand Up @@ -230,17 +235,10 @@ def do_action(self):
'Waiting for shards to complete relocation for indices:'
' {0}'.format(to_csv(l))
)
version = get_version(self.client)
if version >= (5,1,0):
self.client.cluster.health(index=to_csv(l),
level='indices', wait_for_no_relocating_shards=True,
timeout=self.timeout,
)
else:
self.client.cluster.health(index=to_csv(l),
level='indices', wait_for_relocating_shards=0,
timeout=self.timeout,
)
wait_for_it(
self.client, 'allocation',
wait_interval=self.wait_interval, max_wait=self.max_wait
)
except Exception as e:
report_failure(e)

Expand Down Expand Up @@ -305,7 +303,7 @@ def do_action(self):
class ClusterRouting(object):
def __init__(
self, client, routing_type=None, setting=None, value=None,
wait_for_completion=False, timeout=30,
wait_for_completion=False, wait_interval=9, max_wait=-1
):
"""
For now, the cluster routing settings are hardcoded to be ``transient``
Expand All @@ -323,7 +321,9 @@ def __init__(
:arg wait_for_completion: Wait (or not) for the operation
to complete before returning. (default: `False`)
:type wait_for_completion: bool
:arg timeout: Number of seconds to `wait_for_completion`
:arg wait_interval: How long in seconds to wait between checks for
completion.
:arg max_wait: Maximum number of seconds to `wait_for_completion`
"""
verify_client_object(client)
#: Instance variable.
Expand All @@ -333,10 +333,13 @@ def __init__(
#: Instance variable.
#: Internal reference to `wait_for_completion`
self.wfc = wait_for_completion
#: Instance variable
#: How many seconds to wait between checks for completion.
self.wait_interval = wait_interval
#: Instance variable.
#: How long in seconds to `wait_for_completion` before returning with an
#: exception
self.timeout = '{0}s'.format(timeout)
#: exception. A value of -1 means wait forever.
self.max_wait = max_wait

if setting != 'enable':
raise ValueError(
Expand Down Expand Up @@ -382,9 +385,9 @@ def do_action(self):
logger.debug(
'Waiting for shards to complete routing and/or rebalancing'
)
self.client.cluster.health(
level='indices', wait_for_relocating_shards=0,
timeout=self.timeout,
wait_for_it(
self.client, 'cluster_routing',
wait_interval=self.wait_interval, max_wait=self.max_wait
)
except Exception as e:
report_failure(e)
Expand Down Expand Up @@ -624,13 +627,17 @@ def do_action(self):
report_failure(e)

class Replicas(object):
def __init__(self, ilo, count=None, wait_for_completion=False, timeout=30):
def __init__(self, ilo, count=None, wait_for_completion=False,
wait_interval=9, max_wait=-1):
"""
:arg ilo: A :class:`curator.indexlist.IndexList` object
:arg count: The count of replicas per shard
:arg wait_for_completion: Wait (or not) for the operation
to complete before returning. (default: `False`)
:type wait_for_completion: bool
:arg wait_interval: How long in seconds to wait between checks for
completion.
:arg max_wait: Maximum number of seconds to `wait_for_completion`
"""
verify_index_list(ilo)
# It's okay for count to be zero
Expand All @@ -650,10 +657,13 @@ def __init__(self, ilo, count=None, wait_for_completion=False, timeout=30):
#: Instance variable.
#: Internal reference to `wait_for_completion`
self.wfc = wait_for_completion
#: Instance variable
#: How many seconds to wait between checks for completion.
self.wait_interval = wait_interval
#: Instance variable.
#: How long in seconds to `wait_for_completion` before returning with an
#: exception
self.timeout = '{0}s'.format(timeout)
#: exception. A value of -1 means wait forever.
self.max_wait = max_wait
self.loggit = logging.getLogger('curator.actions.replicas')

def do_dry_run(self):
Expand Down Expand Up @@ -686,9 +696,9 @@ def do_action(self):
'Waiting for shards to complete replication for '
'indices: {0}'.format(to_csv(l))
)
self.client.cluster.health(
index=to_csv(l), wait_for_status='green',
timeout=self.timeout,
wait_for_it(
self.client, 'replicas',
wait_interval=self.wait_interval, max_wait=self.max_wait
)
except Exception as e:
report_failure(e)
Expand Down Expand Up @@ -848,15 +858,18 @@ def do_action(self):
class Snapshot(object):
def __init__(self, ilo, repository=None, name=None,
ignore_unavailable=False, include_global_state=True,
partial=False, wait_for_completion=True,
skip_repo_fs_check=False):
partial=False, wait_for_completion=True, wait_interval=9,
max_wait=-1, skip_repo_fs_check=False):
"""
:arg ilo: A :class:`curator.indexlist.IndexList` object
:arg repository: The Elasticsearch snapshot repository to use
:arg name: What to name the snapshot.
:arg wait_for_completion: Wait (or not) for the operation
to complete before returning. (default: `True`)
:type wait_for_completion: bool
:arg wait_interval: How long in seconds to wait between checks for
completion.
:arg max_wait: Maximum number of seconds to `wait_for_completion`
:arg ignore_unavailable: Ignore unavailable shards/indices.
(default: `False`)
:type ignore_unavailable: bool
Expand Down Expand Up @@ -898,6 +911,13 @@ def __init__(self, ilo, repository=None, name=None,
#: Instance variable.
#: Internally accessible copy of `wait_for_completion`
self.wait_for_completion = wait_for_completion
#: Instance variable
#: How many seconds to wait between checks for completion.
self.wait_interval = wait_interval
#: Instance variable.
#: How long in seconds to `wait_for_completion` before returning with an
#: exception. A value of -1 means wait forever.
self.max_wait = max_wait
#: Instance variable.
#: Internally accessible copy of `skip_repo_fs_check`
self.skip_repo_fs_check = skip_repo_fs_check
Expand Down Expand Up @@ -966,15 +986,22 @@ def do_action(self):
self.loggit.info('Creating snapshot "{0}" from indices: '
'{1}'.format(self.name, self.index_list.indices)
)
# Always set wait_for_completion to False. Let 'wait_for_it' do its
# thing if wait_for_completion is set to True. Report the task_id
# either way.
self.client.snapshot.create(
repository=self.repository, snapshot=self.name, body=self.body,
wait_for_completion=self.wait_for_completion
wait_for_completion=False
)
if self.wait_for_completion:
self.report_state()
wait_for_it(
self.client, 'snapshot', snapshot=self.name,
repository=self.repository,
wait_interval=self.wait_interval, max_wait=self.max_wait
)
else:
self.loggit.warn(
'"wait_for_completion" set to {0}. '
'"wait_for_completion" set to {0}.'
'Remember to check for successful completion '
'manually.'.format(self.wait_for_completion)
)
Expand All @@ -985,8 +1012,8 @@ class Restore(object):
def __init__(self, slo, name=None, indices=None, include_aliases=False,
ignore_unavailable=False, include_global_state=True,
partial=False, rename_pattern=None, rename_replacement=None,
extra_settings={}, wait_for_completion=True,
skip_repo_fs_check=False):
extra_settings={}, wait_for_completion=True, wait_interval=9,
max_wait=-1, skip_repo_fs_check=False):
"""
:arg slo: A :class:`curator.snapshotlist.SnapshotList` object
:arg name: Name of the snapshot to restore. If no name is provided, it
Expand Down Expand Up @@ -1020,7 +1047,11 @@ def __init__(self, slo, name=None, indices=None, include_aliases=False,
:type extra_settings: dict, representing the settings.
:arg wait_for_completion: Wait (or not) for the operation
to complete before returning. (default: `True`)
:arg wait_interval: How long in seconds to wait between checks for
completion.
:arg max_wait: Maximum number of seconds to `wait_for_completion`
:type wait_for_completion: bool
:arg skip_repo_fs_check: Do not validate write access to repository on
all cluster nodes before proceeding. (default: `False`). Useful for
shared filesystems where intermittent timeouts can affect
Expand Down Expand Up @@ -1060,6 +1091,13 @@ def __init__(self, slo, name=None, indices=None, include_aliases=False,
else:
self.indices = slo.snapshot_info[self.name]['indices']
self.wfc = wait_for_completion
#: Instance variable
#: How many seconds to wait between checks for completion.
self.wait_interval = wait_interval
#: Instance variable.
#: How long in seconds to `wait_for_completion` before returning with an
#: exception. A value of -1 means wait forever.
self.max_wait = max_wait
#: Instance variable version of ``rename_pattern``
self.rename_pattern = rename_pattern if rename_replacement is not None \
else ''
Expand Down Expand Up @@ -1185,12 +1223,18 @@ def do_action(self):
self.loggit.info('Restoring indices "{0}" from snapshot: '
'{1}'.format(self.indices, self.name)
)
# Always set wait_for_completion to False. Let 'wait_for_it' do its
# thing if wait_for_completion is set to True. Report the task_id
# either way.
self.client.snapshot.restore(
repository=self.repository, snapshot=self.name, body=self.body,
wait_for_completion=self.wfc
wait_for_completion=False
)
if self.wfc:
self.report_state()
wait_for_it(
self.client, 'restore', index_list=self.expected_output,
wait_interval=self.wait_interval, max_wait=self.max_wait
)
else:
self.loggit.warn(
'"wait_for_completion" set to {0}. '
Expand Down
8 changes: 2 additions & 6 deletions curator/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,11 +53,7 @@ def process_action(client, config, **kwargs):
if action == 'delete_indices':
mykwargs['master_timeout'] = (
kwargs['master_timeout'] if 'master_timeout' in kwargs else 30)
if action == 'allocation' or action == 'replicas':
# Setting the operation timeout to the client timeout
mykwargs['timeout'] = (
kwargs['timeout'] if 'timeout' in kwargs else 30)


### Update the defaults with whatever came with opts, minus any Nones
mykwargs.update(prune_nones(opts))
logger.debug('Action kwargs: {0}'.format(mykwargs))
Expand Down Expand Up @@ -162,7 +158,7 @@ def cli(config, dry_run, action_file):
kwargs['master_timeout'] = (
client_args['timeout'] if client_args['timeout'] <= 300 else 300)
kwargs['dry_run'] = dry_run
kwargs['timeout'] = client_args['timeout']
# kwargs['timeout'] = client_args['timeout']

# Create a client object for each action...
client = get_client(**client_args)
Expand Down
2 changes: 1 addition & 1 deletion curator/singletons.py
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,7 @@ def config_override(ctx, config_dict):
'--key', type=str, required=True, help='Node identification tag'
)
@click.option(
'--value', type=str, required=True, help='Value associated with --key'
'--value', type=str, default=None, help='Value associated with --key'
)
@click.option(
'--allocation_type', type=str,
Expand Down

0 comments on commit 1b147a8

Please sign in to comment.