Skip to content

Commit

Permalink
Merge pull request #1034 from untergeek/feature/926
Browse files Browse the repository at this point in the history
Add the shrink action
  • Loading branch information
untergeek committed Aug 23, 2017
2 parents afd8525 + 02bcf70 commit cf85134
Show file tree
Hide file tree
Showing 12 changed files with 970 additions and 20 deletions.
348 changes: 348 additions & 0 deletions curator/actions.py

Large diffs are not rendered by default.

1 change: 1 addition & 0 deletions curator/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
'restore' : Restore,
'rollover' : Rollover,
'snapshot' : Snapshot,
'shrink' : Shrink,
}

def process_action(client, config, **kwargs):
Expand Down
43 changes: 39 additions & 4 deletions curator/defaults/option_defaults.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,9 @@ def delay():
)
}

def delete_after():
return { Optional('delete_after', default=True): Any(bool, All(Any(str, unicode), Boolean())) }

def delete_aliases():
return { Optional('delete_aliases', default=False): Any(bool, All(Any(str, unicode), Boolean())) }

Expand Down Expand Up @@ -70,7 +73,7 @@ def max_wait(action):
value = -1
# if action in ['allocation', 'cluster_routing', 'replicas']:
# value = -1
# elif action in ['restore', 'snapshot', 'reindex']:
# elif action in ['restore', 'snapshot', 'reindex', 'shrink']:
# value = -1
return { Optional('max_wait', default=value): Any(-1, Coerce(int), None) }

Expand All @@ -93,9 +96,32 @@ def name(action):
def new_index():
return { Optional('new_index', default=None): Any(str, unicode) }

def node_filters():
return {
Optional('node_filters', default={}): {
Optional('permit_masters', default=False): Any(bool, All(Any(str, unicode), Boolean())),
Optional('exclude_nodes', default=[]): Any(list, None)
}
}

def number_of_replicas():
return { Optional('number_of_replicas', default=1): All(Coerce(int), Range(min=0, max=10)) }

def number_of_shards():
return { Optional('number_of_shards', default=1): All(Coerce(int), Range(min=1, max=99)) }

def partial():
return { Optional('partial', default=False): Any(bool, All(Any(str, unicode), Boolean())) }

def post_allocation():
return {
Optional('post_allocation', default={}): {
Required('allocation_type', default='require'): All(Any(str, unicode), Any('require', 'include', 'exclude')),
Required('key'): Any(str, unicode),
Required('value', default=None): Any(str, unicode, None)
}
}

def preserve_existing():
return { Optional('preserve_existing', default=False): Any(bool, All(Any(str, unicode), Boolean())) }

Expand Down Expand Up @@ -213,6 +239,15 @@ def cluster_routing_value():
)
}

def shrink_node():
return { Required('shrink_node'): Any(str, unicode) }

def shrink_prefix():
return { Optional('shrink_prefix', default=''): Any(str, unicode, None) }

def shrink_suffix():
return { Optional('shrink_suffix', default=''): Any(str, unicode, None) }

def skip_repo_fs_check():
return { Optional('skip_repo_fs_check', default=False): Any(bool, All(Any(str, unicode), Boolean())) }

Expand Down Expand Up @@ -243,11 +278,11 @@ def value():

def wait_for_active_shards(action):
value = 0
if action == 'reindex':
if action in ['reindex', 'shrink']:
value = 1
return {
Optional('wait_for_active_shards', default=value): Any(
Coerce(int), None)
Coerce(int), 'all', None)
}

def wait_for_completion(action):
Expand All @@ -262,7 +297,7 @@ def wait_interval(action):
maxval = 30
# if action in ['allocation', 'cluster_routing', 'replicas']:
value = 3
if action in ['restore', 'snapshot', 'reindex']:
if action in ['restore', 'snapshot', 'reindex', 'shrink']:
value = 9
return { Optional('wait_interval', default=value): Any(All(
Coerce(int), Range(min=minval, max=maxval)), None) }
Expand Down
1 change: 1 addition & 0 deletions curator/defaults/settings.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ def index_actions():
'reindex',
'replicas',
'rollover',
'shrink',
'snapshot',
]

Expand Down
61 changes: 60 additions & 1 deletion curator/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -1565,6 +1565,10 @@ def wait_for_it(
'function':task_check,
'args':{'task_id':task_id},
},
'shrink':{
'function': health_check,
'args': {'status':'green'},
},
}
wait_actions = list(action_map.keys())

Expand Down Expand Up @@ -1631,4 +1635,59 @@ def wait_for_it(
raise ActionTimeout(
'Action "{0}" failed to complete in the max_wait period of '
'{1} seconds'.format(action, max_wait)
)
)

def node_roles(client, node_id):
"""
Return the list of roles assigned to the node identified by ``node_id``
:arg client: An :class:`elasticsearch.Elasticsearch` client object
:rtype: list
"""
return client.nodes.info()['nodes'][node_id]['roles']

def index_size(client, idx):
return client.indices.stats(index=idx)['indices'][idx]['total']['store']['size_in_bytes']

def single_data_path(client, node_id):
"""
In order for a shrink to work, it should be on a single filesystem, as
shards cannot span filesystems. Return `True` if the node has a single
filesystem, and `False` otherwise.
:arg client: An :class:`elasticsearch.Elasticsearch` client object
:rtype: bool
"""
return len(client.nodes.stats()['nodes'][node_id]['fs']['data']) == 1


def name_to_node_id(client, name):
"""
Return the node_id of the node identified by ``name``
:arg client: An :class:`elasticsearch.Elasticsearch` client object
:rtype: str
"""
stats = client.nodes.stats()
for node in stats['nodes']:
if stats['nodes'][node]['name'] == name:
logger.debug('Found node_id "{0}" for name "{1}".'.format(node, name))
return node
logger.error('No node_id found matching name: "{0}"'.format(name))
return None

def node_id_to_name(client, node_id):
"""
Return the name of the node identified by ``node_id``
:arg client: An :class:`elasticsearch.Elasticsearch` client object
:rtype: str
"""
stats = client.nodes.stats()
name = None
if node_id in stats['nodes']:
name = stats['nodes'][node_id]['name']
else:
logger.error('No node_id found matching: "{0}"'.format(node_id))
logger.debug('Name associated with node_id "{0}": {1}'.format(node_id, name))
return None
15 changes: 15 additions & 0 deletions curator/validators/options.py
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,21 @@ def action_specific(action):
option_defaults.max_wait(action),
option_defaults.skip_repo_fs_check(),
],
'shrink' : [
option_defaults.shrink_node(),
option_defaults.node_filters(),
option_defaults.number_of_shards(),
option_defaults.number_of_replicas(),
option_defaults.shrink_prefix(),
option_defaults.shrink_suffix(),
option_defaults.delete_after(),
option_defaults.post_allocation(),
option_defaults.wait_for_active_shards(action),
option_defaults.extra_settings(),
option_defaults.wait_for_completion(action),
option_defaults.wait_interval(action),
option_defaults.max_wait(action),
],
}
return options[action]

Expand Down
10 changes: 10 additions & 0 deletions docs/Changelog.rst
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,16 @@ Changelog
5.2.0 (?? ??? 2017)
-------------------

**New Features**

* Shrink action! Apologies to all who have patiently waited for this
feature. It's been a long time coming, but it is hopefully worth the
wait. There are a lot of checks and tests associated with this action,
as there are many conditions that have to be met in order for a shrink
to take place. Curator will try its best to ensure that all of these
conditions are met so you can comfortably rest assured that shrink will
work properly unattended. See the documentation for more information.

5.1.2 (08 August 2017)
----------------------

Expand Down
112 changes: 112 additions & 0 deletions docs/asciidoc/actions.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ once created, can only be deleted.
* <<replicas,Replicas>>
* <<restore,Restore>>
* <<rollover,Rollover>>
* <<shrink,Shrink>>
* <<snapshot,Snapshot>>
--

Expand Down Expand Up @@ -988,6 +989,117 @@ TIP: See an example of this action in an <<actionfile,actionfile>>



[[shrink]]
== Shrink

[source,yaml]
-------------
action: shrink
description: >-
Shrink selected indices on the node with the most available space.
Delete source index after successful shrink, then reroute the shrunk
index with the provided parameters.
options:
ignore_empty_list: True
shrink_node: DETERMINISTIC
node_filters:
permit_masters: False
exclude_nodes: ['not_this_node']
number_of_shards: 1
number_of_replicas: 1
shrink_prefix:
shrink_suffix: '-shrink'
delete_after: True
post_allocation:
allocation_type: include
key: node_tag
value: cold
wait_for_active_shards: 1
extra_settings:
settings:
index.codec: best_compression
wait_for_completion: True
wait_interval: 9
max_wait: -1
filters:
- filtertype: ...
-------------

NOTE: Empty values and commented lines will result in the default value, if any,
being selected. If a setting is set, but not used by a given action, it
will be ignored.

Shrinking an index is a good way to reduce the total shard count in your cluster.
https://www.elastic.co/guide/en/elasticsearch/reference/current/indices-shrink-index.html#_shrinking_an_index[Several conditions need to be met]
in order for index shrinking to take place:

* The index must be marked as read-only
* A (primary or replica) copy of every shard in the index must be relocated to the same node
* The cluster must have health `green`
* The target index must not exist
* The number of primary shards in the target index must be a factor of the number of primary shards in the source index.
* The source index must have more primary shards than the target index.
* The index must not contain more than 2,147,483,519 documents in total across all shards that will be shrunk into a single shard on the target index as this is the maximum number of docs that can fit into a single shard.
* The node handling the shrink process must have sufficient free disk space to accommodate a second copy of the existing index.

Curator will try to meet these conditions. If it is unable to meet them all, it will not perform a shrink operation.

This action will shrink indices to the target index, the name of which is the value of
<<option_shrink_prefix,shrink_prefix>> + the source index name + <<option_shrink_suffix,shrink_suffix>>.
The resulting index will have <<option_number_of_shards,number_of_shards>> primary shards, and
<<option_number_of_replicas,number_of_replicas>> replica shards.

The shrinking will take place on the node identified by <<option_shrink_node,shrink_node>>,
unless `DETERMINISTIC` is specified, in which case Curator will evaluate all of
the nodes to determine which one has the most free space. If multiple indices
are identified for shrinking by the filter block, and `DETERMINISTIC` is specified,
the node selection process will be repeated for each successive index, preventing
all of the space being consumed on a single node.

By default, Curator will delete the source index after a successful shrink. This
can be disabled by setting <<option_delete_after,delete_after>> to `False`. If the source index,
is not deleted after a successful shrink, Curator will remove the read-only setting and the
shard allocation routing applied to the source index to put it on the shrink node. Curator will
wait for the shards to stop rerouting before continuing.

The <<option_post_allocation,post_allocation>> option applies to the target index after
the shrink is complete. If set, this shard allocation routing will be applied (after a
successful shrink) and Curator will wait for all shards to stop rerouting before continuing.

The only <<option_extra_settings,extra_settings>> which are acceptable are `settings` and `aliases`.
Please note that in the example above, while `best_compression` is being applied to the new index,
it will not take effect until new writes are made to the index, such as when
<<forcemerge,force-merging>> the shard to a single segment.

The other options are usually okay to leave at the defaults, but feel free to change them
as needed.

=== Required settings

* <<option_shrink_node,shrink_node>>

=== Optional settings

* <<option_continue,continue_if_exception>>
* <<option_ignore_empty_list,ignore_empty_list>>
* <<option_delete_after,delete_after>>
* <<option_disable,disable_action>>
* <<option_extra_settings,extra_settings>>
* <<option_node_filters,node_filters>>
* <<option_number_of_shards,number_of_shards>>
* <<option_number_of_replicas,number_of_replicas>>
* <<option_post_allocation,post_allocation>>
* <<option_shrink_prefix,shrink_prefix>>
* <<option_shrink_suffix,shrink_suffix>>
* <<option_timeout_override,timeout_override>>
* <<option_wait_for_active_shards,wait_for_active_shards>>
* <<option_wfc,wait_for_completion>>
* <<option_max_wait,max_wait>>
* <<option_wait_interval,wait_interval>>

TIP: See an example of this action in an <<actionfile,actionfile>>
<<ex_shrink,here>>.

[[snapshot]]
== Snapshot

Expand Down

0 comments on commit cf85134

Please sign in to comment.