Skip to content

Commit

Permalink
Merge pull request #1019 from untergeek/fix/999
Browse files Browse the repository at this point in the history
Add tasks API check for snapshot activity
  • Loading branch information
untergeek committed Aug 2, 2017
2 parents cdd84b8 + 0c52e41 commit 017c093
Show file tree
Hide file tree
Showing 6 changed files with 61 additions and 8 deletions.
33 changes: 26 additions & 7 deletions curator/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -897,6 +897,24 @@ def snapshot_in_progress(client, repository=None, snapshot=None):
'More than 1 snapshot in progress: {0}'.format(inprogress)
)

def find_snapshot_tasks(client):
"""
Check if there is snapshot activity in the Tasks API.
Return `True` if activity is found, or `False`
:arg client: An :class:`elasticsearch.Elasticsearch` client object
:rtype: bool
"""
retval = False
tasklist = client.tasks.get()
for node in tasklist['nodes']:
for task in tasklist['nodes'][node]['tasks']:
activity = tasklist['nodes'][node]['tasks'][task]['action']
if 'snapshot' in activity:
logger.debug('Snapshot activity detected: {0}'.format(activity))
retval = True
return retval

def safe_to_snap(client, repository=None, retry_interval=120, retry_count=3):
"""
Ensure there are no snapshots in progress. Pause and retry accordingly
Expand All @@ -914,20 +932,21 @@ def safe_to_snap(client, repository=None, retry_interval=120, retry_count=3):
in_progress = snapshot_in_progress(
client, repository=repository
)
if in_progress:
logger.info(
'Snapshot already in progress: {0}'.format(in_progress))
ongoing_task = find_snapshot_tasks(client)
if in_progress or ongoing_task:
if in_progress:
logger.info(
'Snapshot already in progress: {0}'.format(in_progress))
elif ongoing_task:
logger.info('Snapshot activity detected in Tasks API')
logger.info(
'Pausing {0} seconds before retrying...'.format(
retry_interval)
)
'Pausing {0} seconds before retrying...'.format(retry_interval))
time.sleep(retry_interval)
logger.info('Retry {0} of {1}'.format(count, retry_count))
else:
return True
return False


def create_snapshot_body(indices, ignore_unavailable=False,
include_global_state=True, partial=False):
"""
Expand Down
6 changes: 6 additions & 0 deletions docs/Changelog.rst
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,12 @@ Changelog

**Bug Fixes**

* Under rare circumstances, snapshot delete (or create) actions could fail,
even when there were no snapshots in state ``IN_PROGRESS``. This was
tracked down by JD557 as a collision with a previously deleted snapshot
that hadn't finished deleting. It could be seen in the tasks API. An
additional test for snapshot activity in the tasks API has been added to
cover this scenario. Reported in #999 (untergeek)
* The ``restore_check`` function did not work properly with wildcard index
patterns. This has been rectified, and an integration test added to
satisfy this. Reported in #989 (untergeek)
Expand Down
4 changes: 4 additions & 0 deletions test/unit/test_action_delete_snapshots.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ def test_do_dry_run(self):
client = Mock()
client.snapshot.get.return_value = testvars.snapshots
client.snapshot.get_repository.return_value = testvars.test_repo
client.tasks.get.return_value = testvars.no_snap_tasks
client.snapshot.delete.return_value = None
slo = curator.SnapshotList(client, repository=testvars.repo_name)
do = curator.DeleteSnapshots(slo)
Expand All @@ -28,6 +29,7 @@ def test_do_action(self):
client = Mock()
client.snapshot.get.return_value = testvars.snapshots
client.snapshot.get_repository.return_value = testvars.test_repo
client.tasks.get.return_value = testvars.no_snap_tasks
client.snapshot.delete.return_value = None
slo = curator.SnapshotList(client, repository=testvars.repo_name)
do = curator.DeleteSnapshots(slo)
Expand All @@ -37,6 +39,7 @@ def test_do_action_raises_exception(self):
client.snapshot.get.return_value = testvars.snapshots
client.snapshot.get_repository.return_value = testvars.test_repo
client.snapshot.delete.return_value = None
client.tasks.get.return_value = testvars.no_snap_tasks
client.snapshot.delete.side_effect = testvars.fake_fail
slo = curator.SnapshotList(client, repository=testvars.repo_name)
do = curator.DeleteSnapshots(slo)
Expand All @@ -45,6 +48,7 @@ def test_not_safe_to_snap_raises_exception(self):
client = Mock()
client.snapshot.get.return_value = testvars.inprogress
client.snapshot.get_repository.return_value = testvars.test_repo
client.tasks.get.return_value = testvars.no_snap_tasks
slo = curator.SnapshotList(client, repository=testvars.repo_name)
do = curator.DeleteSnapshots(slo, retry_interval=0, retry_count=1)
self.assertRaises(curator.FailedExecution, do.do_action)
9 changes: 9 additions & 0 deletions test/unit/test_action_snapshot.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ def test_get_state_success(self):
client.indices.stats.return_value = testvars.stats_one
client.snapshot.get_repository.return_value = testvars.test_repo
client.snapshot.get.return_value = testvars.snapshots
client.tasks.get.return_value = testvars.no_snap_tasks
ilo = curator.IndexList(client)
so = curator.Snapshot(ilo, repository=testvars.repo_name,
name=testvars.snap_name)
Expand All @@ -69,6 +70,7 @@ def test_get_state_fail(self):
client.indices.stats.return_value = testvars.stats_one
client.snapshot.get_repository.return_value = testvars.test_repo
client.snapshot.get.return_value = {'snapshots':[]}
client.tasks.get.return_value = testvars.no_snap_tasks
ilo = curator.IndexList(client)
so = curator.Snapshot(ilo, repository=testvars.repo_name,
name=testvars.snap_name)
Expand All @@ -81,6 +83,7 @@ def test_report_state_success(self):
client.indices.stats.return_value = testvars.stats_one
client.snapshot.get_repository.return_value = testvars.test_repo
client.snapshot.get.return_value = testvars.snapshots
client.tasks.get.return_value = testvars.no_snap_tasks
ilo = curator.IndexList(client)
so = curator.Snapshot(ilo, repository=testvars.repo_name,
name=testvars.snap_name)
Expand All @@ -94,6 +97,7 @@ def test_report_state_other(self):
client.indices.stats.return_value = testvars.stats_one
client.snapshot.get_repository.return_value = testvars.test_repo
client.snapshot.get.return_value = testvars.highly_unlikely
client.tasks.get.return_value = testvars.no_snap_tasks
ilo = curator.IndexList(client)
so = curator.Snapshot(ilo, repository=testvars.repo_name,
name=testvars.snap_name)
Expand All @@ -107,6 +111,7 @@ def test_do_dry_run(self):
client.indices.stats.return_value = testvars.stats_one
client.snapshot.get_repository.return_value = testvars.test_repo
client.snapshot.get.return_value = testvars.snapshots
client.tasks.get.return_value = testvars.no_snap_tasks
client.snapshot.create.return_value = None
client.snapshot.status.return_value = testvars.nosnap_running
client.snapshot.verify_repository.return_value = testvars.verified_nodes
Expand All @@ -122,6 +127,7 @@ def test_do_action_success(self):
client.indices.stats.return_value = testvars.stats_one
client.snapshot.get_repository.return_value = testvars.test_repo
client.snapshot.get.return_value = testvars.snapshots
client.tasks.get.return_value = testvars.no_snap_tasks
client.snapshot.create.return_value = testvars.generic_task
client.tasks.get.return_value = testvars.completed_task
client.snapshot.status.return_value = testvars.nosnap_running
Expand All @@ -138,6 +144,7 @@ def test_do_action_raise_snap_in_progress(self):
client.indices.stats.return_value = testvars.stats_one
client.snapshot.get_repository.return_value = testvars.test_repo
client.snapshot.get.return_value = testvars.snapshots
client.tasks.get.return_value = testvars.no_snap_tasks
client.snapshot.create.return_value = None
client.snapshot.status.return_value = testvars.snap_running
client.snapshot.verify_repository.return_value = testvars.verified_nodes
Expand All @@ -153,6 +160,7 @@ def test_do_action_no_wait_for_completion(self):
client.indices.stats.return_value = testvars.stats_one
client.snapshot.get_repository.return_value = testvars.test_repo
client.snapshot.get.return_value = testvars.snapshots
client.tasks.get.return_value = testvars.no_snap_tasks
client.snapshot.create.return_value = testvars.generic_task
client.snapshot.status.return_value = testvars.nosnap_running
client.snapshot.verify_repository.return_value = testvars.verified_nodes
Expand All @@ -168,6 +176,7 @@ def test_do_action_raise_on_failure(self):
client.indices.stats.return_value = testvars.stats_one
client.snapshot.get_repository.return_value = testvars.test_repo
client.snapshot.get.return_value = testvars.snapshots
client.tasks.get.return_value = testvars.no_snap_tasks
client.snapshot.create.return_value = None
client.snapshot.create.side_effect = testvars.fake_fail
client.snapshot.status.return_value = testvars.nosnap_running
Expand Down
13 changes: 13 additions & 0 deletions test/unit/test_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -553,6 +553,18 @@ def test_in_progress_fail(self):
client = Mock()
client.snapshot.get.return_value = testvars.inprogress
client.snapshot.get_repository.return_value = testvars.test_repo
client.tasks.get.return_value = testvars.no_snap_tasks
self.assertFalse(
curator.safe_to_snap(
client, repository=testvars.repo_name,
retry_interval=0, retry_count=1
)
)
def test_ongoing_tasks_fail(self):
client = Mock()
client.snapshot.get.return_value = testvars.snapshots
client.snapshot.get_repository.return_value = testvars.test_repo
client.tasks.get.return_value = testvars.snap_task
self.assertFalse(
curator.safe_to_snap(
client, repository=testvars.repo_name,
Expand All @@ -563,6 +575,7 @@ def test_in_progress_pass(self):
client = Mock()
client.snapshot.get.return_value = testvars.snapshots
client.snapshot.get_repository.return_value = testvars.test_repo
client.tasks.get.return_value = testvars.no_snap_tasks
self.assertTrue(
curator.safe_to_snap(
client, repository=testvars.repo_name,
Expand Down

0 comments on commit 017c093

Please sign in to comment.