Skip to content

Commit

Permalink
Merge pull request #903 from untergeek/feature/898
Browse files Browse the repository at this point in the history
Add Rollover action
  • Loading branch information
untergeek committed Mar 10, 2017
2 parents 44e55af + e50e414 commit 1de247f
Show file tree
Hide file tree
Showing 14 changed files with 377 additions and 9 deletions.
2 changes: 1 addition & 1 deletion .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ python:
env:
- ES_VERSION=5.0.2
- ES_VERSION=5.1.2
- ES_VERSION=5.2.1
- ES_VERSION=5.2.2

os: linux

Expand Down
90 changes: 90 additions & 0 deletions curator/actions.py
Original file line number Diff line number Diff line change
Expand Up @@ -694,6 +694,96 @@ def do_action(self):
except Exception as e:
report_failure(e)

class Rollover(object):
def __init__(
self, client, name, conditions, extra_settings=None,
wait_for_active_shards=1
):
"""
:arg client: An :class:`elasticsearch.Elasticsearch` client object
:arg name: The name of the single-index-mapped alias to test for
rollover conditions.
:arg conditions: A dictionary of conditions to test
:arg extra_settings: Must be either `None`, or a dictionary of settings
to apply to the new index on rollover. This is used in place of
`settings` in the Rollover API, mostly because it's already existent
in other places here in Curator
:arg wait_for_active_shards: The number of shards expected to be active
before returning.
"""
verify_client_object(client)
self.loggit = logging.getLogger('curator.actions.rollover')
if not isinstance(conditions, dict):
raise ConfigurationError('"conditions" must be a dictionary')
else:
self.loggit.debug('"conditions" is {0}'.format(conditions))
if not isinstance(extra_settings, dict) and extra_settings is not None:
raise ConfigurationError(
'"extra_settings" must be a dictionary or None')
#: Instance variable.
#: The Elasticsearch Client object
self.client = client
#: Instance variable.
#: Internal reference to `conditions`
self.conditions = conditions
#: Instance variable.
#: Internal reference to `extra_settings`
self.settings = extra_settings
#: Instance variable.
#: Internal reference to `wait_for_active_shards`
self.wait_for_active_shards = wait_for_active_shards

# Verify that `conditions` and `settings` are good?
# Verify that `name` is an alias, and is only mapped to one index.
if rollable_alias(client, name):
self.name = name
else:
raise ValueError(
'Unable to perform index rollover with alias '
'"{0}". See previous logs for more details.'.format(name)
)

def body(self):
"""
Create a body from conditions and settings
"""
retval = {}
retval['conditions'] = self.conditions
if self.settings:
retval['settings'] = self.settings
return retval

def doit(self, dry_run=False):
"""
This exists solely to prevent having to have duplicate code in both
`do_dry_run` and `do_action`
"""
return self.client.indices.rollover(
alias=self.name,
body=self.body(),
dry_run=dry_run,
wait_for_active_shards=self.wait_for_active_shards,
)

def do_dry_run(self):
"""
Log what the output would be, but take no action.
"""
logger.info('DRY-RUN MODE. No changes will be made.')
result = self.doit(dry_run=True)
logger.info('DRY-RUN: rollover: {0} result: '
'{1}'.format(self.name, result))

def do_action(self):
"""
Rollover the index referenced by alias `name`
"""
self.loggit.info('Performing index rollover')
try:
self.doit()
except Exception as e:
report_failure(e)

class DeleteSnapshots(object):
def __init__(self, slo, retry_interval=120, retry_count=3):
"""
Expand Down
3 changes: 2 additions & 1 deletion curator/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
'open' : Open,
'replicas' : Replicas,
'restore' : Restore,
'rollover' : Rollover,
'snapshot' : Snapshot,
}

Expand Down Expand Up @@ -78,7 +79,7 @@ def process_action(client, config, **kwargs):
removes.iterate_filters(config['remove'])
action_obj.remove(
removes, warn_if_no_indices= opts['warn_if_no_indices'])
elif action in [ 'cluster_routing', 'create_index' ]:
elif action in [ 'cluster_routing', 'create_index', 'rollover']:
action_obj = action_class(client, **mykwargs)
elif action == 'delete_snapshots' or action == 'restore':
logger.debug('Running "{0}"'.format(action))
Expand Down
1 change: 1 addition & 0 deletions curator/defaults/settings.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ def index_actions():
'forcemerge',
'open',
'replicas',
'rollover',
'snapshot',
]

Expand Down
40 changes: 39 additions & 1 deletion curator/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,44 @@ def test_client_options(config):
if 'client_key' in config and config['client_key']:
read_file(config['client_key'])

def rollable_alias(client, alias):
"""
Ensure that `alias` is an alias, and points to an index that can use the
_rollover API.
:arg client: An :class:`elasticsearch.Elasticsearch` client object
:arg alias: An Elasticsearch alias
"""
try:
response = client.indices.get_alias(name=alias)
except elasticsearch.NotFoundError as e:
logger.error('alias "{0}" not found.'.format(alias))
return False
# Response should be like:
# {'there_should_be_only_one': {u'aliases': {'value of "alias" here': {}}}}
# Where 'there_should_be_only_one' is a single index name that ends in a
# number, and 'value of "alias" here' reflects the value of the passed
# parameter.
if len(response) > 1:
logger.error(
'"alias" must only reference one index: {0}'.format(response))
# elif len(response) < 1:
# logger.error(
# '"alias" must reference at least one index: {0}'.format(response))
else:
index = list(response.keys())[0]
rollable = False
# In order for `rollable` to be True, the last 2 digits of the index
# must be digits, or a hyphen followed by a digit.
# NOTE: This is not a guarantee that the rest of the index name is
# necessarily correctly formatted.
if index[-2:][1].isdigit():
if index[-2:][0].isdigit():
rollable = True
elif index[-2:][0] == '-':
rollable = True
return rollable

def verify_client_object(test):
"""
Test if `test` is a proper :class:`elasticsearch.Elasticsearch` client
Expand Down Expand Up @@ -1141,7 +1179,7 @@ def validate_actions(data):
)
# Add/Remove here
clean_config[action_id].update(add_remove)
elif current_action in [ 'cluster_routing', 'create_index' ]:
elif current_action in [ 'cluster_routing', 'create_index', 'rollover' ]:
# neither cluster_routing nor create_index should have filters
pass
else: # Filters key only appears in non-alias actions
Expand Down
6 changes: 3 additions & 3 deletions curator/validators/actions.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,9 +38,9 @@ def structure(data, location):
retval.update(
{ Optional('options', default=settings.default_options()): dict } )
action = data['action']
if action in [ 'cluster_routing', 'create_index' ]:
# Neither the cluster_routing nor create_index actions should have a
# 'filters' block
if action in [ 'cluster_routing', 'create_index', 'rollover']:
# The cluster_routing, create_index, and rollover actions should not
# have a 'filters' block
pass
elif action == 'alias':
# The alias action should not have a filters block, but should have
Expand Down
21 changes: 20 additions & 1 deletion curator/validators/options.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,14 @@ def allocation_type():
return { Optional('allocation_type', default='require'): All(
Any(str, unicode), Any('require', 'include', 'exclude')) }

def conditions():
return {
Optional('conditions'): {
Optional('max_age'): Any(str, unicode),
Optional('max_docs'): Coerce(int)
}
}

def continue_if_exception():
return { Optional('continue_if_exception', default=False): Boolean() }

Expand Down Expand Up @@ -53,7 +61,7 @@ def max_num_segments():
}

def name(action):
if action in ['alias', 'create_index']:
if action in ['alias', 'create_index', 'rollover']:
return { Required('name'): Any(str, unicode) }
elif action == 'snapshot':
return {
Expand Down Expand Up @@ -122,6 +130,11 @@ def timeout_override(action):
def value():
return { Required('value'): Any(str, unicode) }

def wait_for_active_shards():
return {
Optional('wait_for_active_shards', default=0): Any(Coerce(int), None)
}

def wait_for_completion(action):
if action in ['allocation', 'cluster_routing', 'replicas']:
return { Optional('wait_for_completion', default=False): Boolean() }
Expand Down Expand Up @@ -171,6 +184,12 @@ def action_specific(action):
count(),
wait_for_completion(action),
],
'rollover' : [
name(action),
conditions(),
extra_settings(),
wait_for_active_shards(),
],
'restore' : [
repository(),
name(action),
Expand Down
2 changes: 2 additions & 0 deletions docs/Changelog.rst
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@ Changelog

**New Features**

* Added support for the Rollover API. Requested in #898, and by countless
others.
* Added ``warn_if_no_indices`` option for ``alias`` action in response to
#883. Using this option will permit the ``alias`` add or remove to continue
with a logged warning, even if the filters result in a NoIndices condition.
Expand Down
79 changes: 79 additions & 0 deletions docs/asciidoc/actions.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ once created, can only be deleted.
* <<forcemerge,forceMerge>>
* <<replicas,Replicas>>
* <<restore,Restore>>
* <<rollover,Rollover>>
* <<snapshot,Snapshot>>
--

Expand Down Expand Up @@ -638,6 +639,84 @@ Optional settings
TIP: See an example of this action in an <<actionfile,actionfile>>
<<ex_restore,here>>.

[[rollover]]
== Rollover

[source,yaml]
-------------
action: rollover
description: >-
Rollover the index associated with index 'name', which should be in the
form of prefix-000001 (or similar), or prefix-YYYY.MM.DD-1.
options:
name: aliasname
conditions:
max_age: 1d
max_docs: 1000000
extra_settings:
index.number_of_shards: 3
index.number_of_replicas: 1
timeout_override:
continue_if_exception: False
disable_action: False
-------------

This action uses the
{ref}/indices-rollover-index.html[Elasticsearch Rollover API] to create a new
index with the optional <<option_extra_settings,extra_settings>>, if the
described conditions are met.

IMPORTANT: When choosing `conditions`, **either** one of
<<option_max_age,max_age>> or <<option_max_docs,max_docs>>, **or both** may be
used. If both are used, then both conditions must be matched for the rollover to
occur.

WARNING: If either or both of the <<option_max_age,max_age>> or
<<option_max_docs,max_docs>> options are present, they must each have a value.
Because there is no default value, neither of these options can be left empty,
or Curator will generate an error.

The <<option_extra_settings,extra_settings>> option allows the addition of extra
index settings (but not mappings). An example of how these settings can be used
might be:

[source,yaml]
-------------
extra_settings:
indices.number_of_shards: 1
indices.number_of_replicas: 0
-------------

[float]
Required settings
~~~~~~~~~~~~~~~~~
* <<option_name,name>> The alias name
* <<option_max_age,max_age>> The maximum age that is allowed before triggering
a rollover. _Must be nested under `conditions:`_ There is no default value. If
this condition is specified, it must have a value, or Curator will generate an
error.
* <<option_max_docs,max_docs>> The maximum number of documents allowed in an
index before triggering a rollover. _Must be nested under `conditions:`_
There is no default value. If this condition is specified, it must have a
value, or Curator will generate an error.

[float]
Optional settings
~~~~~~~~~~~~~~~~~
* <<option_extra_settings,extra_settings>> No default value. You can add any
acceptable index settings (not mappings) as nested YAML. See the
{ref}/indices-create-index.html[Elasticsearch Create Index API documentation]
for more information.
* <<option_timeout_override,timeout_override>> (can override the default
<<timeout,timeout>>)
* <<option_continue,continue_if_exception>> (has a default value which can
optionally be changed)
* <<option_disable,disable_action>> (has a default value which can optionally
be changed)

TIP: See an example of this action in an <<actionfile,actionfile>>
<<ex_rollover,here>>.

[[snapshot]]
== Snapshot

Expand Down
30 changes: 30 additions & 0 deletions docs/asciidoc/examples.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ configuration files.
* <<ex_open,open>>
* <<ex_replicas,replicas>>
* <<ex_restore,restore>>
* <<ex_rollover,rollover>>
* <<ex_snapshot,snapshot>>

--
Expand Down Expand Up @@ -478,6 +479,35 @@ actions:
exclude:
-------------

[[ex_rollover]]
== rollover

[source,yaml]
-------------
---
# Remember, leave a key empty if there is no value. None will be a string,
# not a Python "NoneType". However, max_age and max_docs must have a value, if
# they are used as conditions.
#
# Also remember that all examples have 'disable_action' set to True. If you
# want to use this action as a template, be sure to set this to False after
# copying it.
actions:
1:
action: rollover
description: >-
Rollover the index associated with index 'name', which should be in the
form of prefix-000001 (or similar), or prefix-YYYY.MM.DD-1.
options:
name: aliasname
conditions:
max_age: 1d
max_docs: 1000000
timeout_override:
continue_if_exception: False
disable_action: True
-------------

[[ex_snapshot]]
== snapshot

Expand Down

0 comments on commit 1de247f

Please sign in to comment.