Skip to content

Commit

Permalink
Merge pull request #793 from untergeek/feature/446
Browse files Browse the repository at this point in the history
Add cluster routing functionality
  • Loading branch information
untergeek committed Oct 25, 2016
2 parents 971e3a0 + e8e6647 commit 8fd6863
Show file tree
Hide file tree
Showing 14 changed files with 426 additions and 12 deletions.
87 changes: 87 additions & 0 deletions curator/actions.py
Original file line number Diff line number Diff line change
Expand Up @@ -259,6 +259,93 @@ def do_action(self):
except Exception as e:
report_failure(e)

class ClusterRouting(object):
def __init__(
self, client, routing_type=None, setting=None, value=None,
wait_for_completion=False, timeout=30,
):
"""
For now, the cluster routing settings are hardcoded to be ``transient``
:arg client: An :class:`elasticsearch.Elasticsearch` client object
:arg routing_type: Type of routing to apply. Either `allocation` or
`rebalance`
:arg setting: Currently, the only acceptable value for `setting` is
``enable``. This is here in case that changes.
:arg value: Used only if `setting` is `enable`. Semi-dependent on
`routing_type`. Acceptable values for `allocation` and `rebalance`
are ``all``, ``primaries``, and ``none`` (string, not `NoneType`).
If `routing_type` is `allocation`, this can also be
``new_primaries``, and if `rebalance`, it can be ``replicas``.
:arg wait_for_completion: Wait (or not) for the operation
to complete before returning. (default: `False`)
:type wait_for_completion: bool
:arg timeout: Number of seconds to `wait_for_completion`
"""
verify_client_object(client)
#: Instance variable.
#: An :class:`elasticsearch.Elasticsearch` client object
self.client = client
self.loggit = logging.getLogger('curator.actions.cluster_routing')
#: Instance variable.
#: Internal reference to `wait_for_completion`
self.wfc = wait_for_completion
#: Instance variable.
#: How long in seconds to `wait_for_completion` before returning with an
#: exception
self.timeout = '{0}s'.format(timeout)

if setting != 'enable':
raise ValueError(
'Invalid value for "setting": {0}.'.format(setting)
)
if routing_type == 'allocation':
if value not in ['all', 'primaries', 'new_primaries', 'none']:
raise ValueError(
'Invalid "value": {0} with "routing_type":'
'{1}.'.format(value, routing_type)
)
elif routing_type == 'rebalance':
if value not in ['all', 'primaries', 'replicas', 'none']:
raise ValueError(
'Invalid "value": {0} with "routing_type":'
'{1}.'.format(value, routing_type)
)
else:
raise ValueError(
'Invalid value for "routing_type": {0}.'.format(routing_type)
)
bkey = 'cluster.routing.{0}.{1}'.format(routing_type,setting)
self.body = { 'transient' : { bkey : value } }

def do_dry_run(self):
"""
Log what the output would be, but take no action.
"""
logger.info('DRY-RUN MODE. No changes will be made.')
self.loggit.info(
'DRY-RUN: Update cluster routing settings with arguments: '
'{0}'.format(self.body)
)

def do_action(self):
"""
Change cluster routing settings with the settings in `body`.
"""
self.loggit.info('Updating cluster settings: {0}'.format(self.body))
try:
self.client.cluster.put_settings(body=self.body)
if self.wfc:
logger.debug(
'Waiting for shards to complete routing and/or rebalancing'
)
self.client.cluster.health(
level='indices', wait_for_relocating_shards=0,
timeout=self.timeout,
)
except Exception as e:
report_failure(e)

class CreateIndex(object):
def __init__(self, client, name, extra_settings={}):
"""
Expand Down
5 changes: 4 additions & 1 deletion curator/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
'alias' : Alias,
'allocation' : Allocation,
'close' : Close,
'cluster_routing' : ClusterRouting,
'create_index' : CreateIndex,
'delete_indices' : DeleteIndices,
'delete_snapshots' : DeleteSnapshots,
Expand Down Expand Up @@ -76,7 +77,7 @@ def process_action(client, config, **kwargs):
removes = IndexList(client)
removes.iterate_filters(config['remove'])
action_obj.remove(removes)
elif action == 'create_index':
elif action in [ 'cluster_routing', 'create_index' ]:
action_obj = action_class(client, **mykwargs)
elif action == 'delete_snapshots' or action == 'restore':
logger.debug('Running "{0}"'.format(action))
Expand Down Expand Up @@ -122,7 +123,9 @@ def cli(config, dry_run, action_file):
### Start working on the actions here ###
#########################################
action_config = get_yaml(action_file)
logger.debug('PRE-YAY')
action_dict = validate_actions(action_config)
logger.debug('YAY')
actions = action_dict['actions']
logger.debug('Full list of actions: {0}'.format(actions))
action_keys = sorted(list(actions.keys()))
Expand Down
6 changes: 5 additions & 1 deletion curator/defaults/settings.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,10 @@ def date_regex():
}

# Actions

def cluster_actions():
return [ 'cluster_routing' ]

def index_actions():
return [
'alias',
Expand All @@ -52,7 +56,7 @@ def snapshot_actions():
return [ 'delete_snapshots', 'restore' ]

def all_actions():
return sorted(index_actions() + snapshot_actions())
return sorted(cluster_actions() + index_actions() + snapshot_actions())

def index_filtertypes():
return [
Expand Down
6 changes: 3 additions & 3 deletions curator/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -456,7 +456,7 @@ def get_client(**kwargs):
``/_cluster/state/metadata`` endpoint. So long as this endpoint does not
function in AWS ES, the client will not be able to use
:class:`curator.indexlist.IndexList`, which is the backbone of Curator 4
Return an :class:`elasticsearch.Elasticsearch` client object using the
provided parameters. Any of the keyword arguments the
:class:`elasticsearch.Elasticsearch` client object can receive are valid,
Expand Down Expand Up @@ -1124,8 +1124,8 @@ def validate_actions(data):
)
# Add/Remove here
clean_config[action_id].update(add_remove)
elif current_action == 'create_index':
# create_index should not have a filters
elif current_action in [ 'cluster_routing', 'create_index' ]:
# neither cluster_routing nor create_index should have filters
pass
else: # Filters key only appears in non-alias actions
valid_filters = SchemaCheck(
Expand Down
5 changes: 3 additions & 2 deletions curator/validators/actions.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,9 @@ def structure(data, location):
retval.update(
{ Optional('options', default=settings.default_options()): dict } )
action = data['action']
if action == 'create_index':
# The create_index action should not have a 'filters' block
if action in [ 'cluster_routing', 'create_index' ]:
# Neither the cluster_routing nor create_index actions should have a
# 'filters' block
pass
elif action == 'alias':
# The alias action should not have a filters block, but should have
Expand Down
23 changes: 22 additions & 1 deletion curator/validators/options.py
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,19 @@ def retry_interval():
)
}

def routing_type():
return { Required('routing_type'): Any('allocation', 'rebalance') }

def cluster_routing_setting():
return { Required('setting'): Any('enable') }

def cluster_routing_value():
return {
Required('value'): Any(
'all', 'primaries', 'none', 'new_primaries', 'replicas'
)
}

def skip_repo_fs_check():
return { Optional('skip_repo_fs_check', default=False): Boolean() }

Expand All @@ -107,8 +120,10 @@ def timeout_override(action):
def value():
return { Required('value'): str }



def wait_for_completion(action):
if action in ['allocation', 'replicas']:
if action in ['allocation', 'cluster_routing', 'replicas']:
return { Optional('wait_for_completion', default=False): Boolean() }
elif action in ['restore', 'snapshot']:
return { Optional('wait_for_completion', default=True): Boolean() }
Expand All @@ -127,6 +142,12 @@ def action_specific(action):
wait_for_completion(action),
],
'close' : [ delete_aliases() ],
'cluster_routing' : [
routing_type(),
cluster_routing_setting(),
cluster_routing_value(),
wait_for_completion(action),
],
'create_index' : [
name(action),
extra_settings(),
Expand Down
8 changes: 7 additions & 1 deletion docs/Changelog.rst
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,18 @@ Changelog
4.2.0 (? ? ?)
-------------

**New Features**

* Shard routing allocation enable/disable. This will allow you to disable
shard allocation routing before performing one or more actions, and then
re-enable after it is complete. Requested in #446 (untergeek)

**General**

* Update testing to the most recent versions.
* Lock elasticsearch-py module version at >= 2.4.0 and <= 3.0.0. There are
API changes in the 5.0 release that cause tests to fail.

**Bug Fixes**

* Guarantee that binary packages are built from the latest Python + libraries.
Expand Down
6 changes: 6 additions & 0 deletions docs/actionclasses.rst
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ Action Classes
* `Alias`_
* `Allocation`_
* `Close`_
* `ClusterRouting`_
* `DeleteIndices`_
* `DeleteSnapshots`_
* `ForceMerge`_
Expand All @@ -33,6 +34,11 @@ Close
.. autoclass:: curator.actions.Close
:members:

ClusterRouting
--------------
.. autoclass:: curator.actions.ClusterRouting
:members:

DeleteIndices
-------------
.. autoclass:: curator.actions.DeleteIndices
Expand Down
57 changes: 57 additions & 0 deletions docs/asciidoc/actions.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ once created, can only be deleted.
* <<alias,Alias>>
* <<allocation,Allocation>>
* <<close,Close>>
* <<cluster_routing,Cluster Routing>>
* <<create_index,Create Index>>
* <<delete_indices,Delete Indices>>
* <<delete_snapshots,Delete Snapshots>>
Expand Down Expand Up @@ -189,6 +190,62 @@ Optional settings
TIP: See an example of this action in an <<actionfile,actionfile>>
<<ex_close,here>>.

[[cluster_routing]]
== Cluster Routing

[source,text]
-------------
action: cluster_routing
description: "Apply routing rules to the entire cluster"
options:
routing_type:
value:
setting: enabled
wait_for_completion: False
timeout_override:
continue_if_exception: False
disable_action: False
-------------

NOTE: Empty values and commented lines will result in the default value, if any,
being selected. If a setting is set, but not used by a given action, it
will be ignored.

This action changes the shard routing allocation for the selected indices.

See https://www.elastic.co/guide/en/elasticsearch/reference/current/shards-allocation.html
for more information.

You can optionally set `wait_for_completion` to `True`
to have Curator wait for the shard routing to complete before continuing. If
this option is chosen, it is advisable to use the
<<option_timeout_override,timeout_override>> option in order to avoid client
timeouts.

[float]
Required settings
~~~~~~~~~~~~~~~~~

* <<option_routing_type,routing_type>> (required)
* <<option_value,value>> (required)
* <<option_setting,setting>> Currently must be set to `enabled`. This setting
is a placeholder for potential future expansion.

[float]
Optional settings
~~~~~~~~~~~~~~~~~

* <<option_wfc,wait_for_completion>> (has a default value which can optionally
be changed)
* <<option_timeout_override,timeout_override>> (can override the default
<<timeout,timeout>>)
* <<option_continue,continue_if_exception>> (has a default value which can
optionally be changed)
* <<option_disable,disable_action>> (has a default value which can optionally
be changed)

TIP: See an example of this action in an <<actionfile,actionfile>>
<<ex_cluster_routing,here>>.

[[create_index]]
== Create Index
Expand Down
49 changes: 49 additions & 0 deletions docs/asciidoc/examples.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ configuration files.
* <<ex_alias,alias>>
* <<ex_allocation,allocation>>
* <<ex_close,close>>
* <<ex_cluster_routing,cluster_routing>>
* <<ex_create_index,create_index>>
* <<ex_delete_indices,delete_indices>>
* <<ex_delete_snapshots,delete_snapshots>>
Expand Down Expand Up @@ -157,6 +158,54 @@ actions:
exclude:
-------------


[[ex_cluster_routing]]
== cluster_routing

[source,text]
-------------
---
# Remember, leave a key empty if there is no value. None will be a string,
# not a Python "NoneType"
#
# Also remember that all examples have 'disable_action' set to True. If you
# want to use this action as a template, be sure to set this to False after
# copying it.
#
# This action example has a blank spot at action ID 2. This is to show that
# Curator can disable allocation before one or more actions, and then re-enable
# it afterward.
actions:
1:
action: cluster_routing
description: >-
Disable shard routing for the entire cluster.
options:
routing_type: allocation
value: none
setting: enable
wait_for_completion: False
timeout_override:
continue_if_exception: False
disable_action: True
2:
action: (any other action details go here)
...
3:
action: cluster_routing
description: >-
Re-enable shard routing for the entire cluster.
options:
routing_type: allocation
value: all
setting: enable
wait_for_completion: False
timeout_override:
continue_if_exception: False
disable_action: True
-------------


[[ex_create_index]]
== create_index

Expand Down

0 comments on commit 8fd6863

Please sign in to comment.