Skip to content

Commit

Permalink
Fix NPE in SnapshotsService on node shutdown
Browse files Browse the repository at this point in the history
Fixes #6506
  • Loading branch information
imotov committed Aug 19, 2014
1 parent c891f07 commit 8fc41fe
Showing 1 changed file with 35 additions and 33 deletions.
68 changes: 35 additions & 33 deletions src/main/java/org/elasticsearch/snapshots/SnapshotsService.java
Expand Up @@ -760,42 +760,44 @@ private void processIndexShardSnapshots(SnapshotMetaData snapshotMetaData) {
Map<SnapshotId, Map<ShardId, IndexShardSnapshotStatus>> newSnapshots = newHashMap();
// Now go through all snapshots and update existing or create missing
final String localNodeId = clusterService.localNode().id();
for (SnapshotMetaData.Entry entry : snapshotMetaData.entries()) {
if (entry.state() == State.STARTED) {
Map<ShardId, IndexShardSnapshotStatus> startedShards = newHashMap();
SnapshotShards snapshotShards = shardSnapshots.get(entry.snapshotId());
for (Map.Entry<ShardId, SnapshotMetaData.ShardSnapshotStatus> shard : entry.shards().entrySet()) {
// Add all new shards to start processing on
if (localNodeId.equals(shard.getValue().nodeId())) {
if (shard.getValue().state() == State.INIT && (snapshotShards == null || !snapshotShards.shards.containsKey(shard.getKey()))) {
logger.trace("[{}] - Adding shard to the queue", shard.getKey());
startedShards.put(shard.getKey(), new IndexShardSnapshotStatus());
if (snapshotMetaData != null) {
for (SnapshotMetaData.Entry entry : snapshotMetaData.entries()) {
if (entry.state() == State.STARTED) {
Map<ShardId, IndexShardSnapshotStatus> startedShards = newHashMap();
SnapshotShards snapshotShards = shardSnapshots.get(entry.snapshotId());
for (Map.Entry<ShardId, SnapshotMetaData.ShardSnapshotStatus> shard : entry.shards().entrySet()) {
// Add all new shards to start processing on
if (localNodeId.equals(shard.getValue().nodeId())) {
if (shard.getValue().state() == State.INIT && (snapshotShards == null || !snapshotShards.shards.containsKey(shard.getKey()))) {
logger.trace("[{}] - Adding shard to the queue", shard.getKey());
startedShards.put(shard.getKey(), new IndexShardSnapshotStatus());
}
}
}
}
if (!startedShards.isEmpty()) {
newSnapshots.put(entry.snapshotId(), startedShards);
if (snapshotShards != null) {
// We already saw this snapshot but we need to add more started shards
ImmutableMap.Builder<ShardId, IndexShardSnapshotStatus> shards = ImmutableMap.builder();
// Put all shards that were already running on this node
shards.putAll(snapshotShards.shards);
// Put all newly started shards
shards.putAll(startedShards);
survivors.put(entry.snapshotId(), new SnapshotShards(shards.build()));
} else {
// Brand new snapshot that we haven't seen before
survivors.put(entry.snapshotId(), new SnapshotShards(ImmutableMap.copyOf(startedShards)));
if (!startedShards.isEmpty()) {
newSnapshots.put(entry.snapshotId(), startedShards);
if (snapshotShards != null) {
// We already saw this snapshot but we need to add more started shards
ImmutableMap.Builder<ShardId, IndexShardSnapshotStatus> shards = ImmutableMap.builder();
// Put all shards that were already running on this node
shards.putAll(snapshotShards.shards);
// Put all newly started shards
shards.putAll(startedShards);
survivors.put(entry.snapshotId(), new SnapshotShards(shards.build()));
} else {
// Brand new snapshot that we haven't seen before
survivors.put(entry.snapshotId(), new SnapshotShards(ImmutableMap.copyOf(startedShards)));
}
}
}
} else if (entry.state() == State.ABORTED) {
// Abort all running shards for this snapshot
SnapshotShards snapshotShards = shardSnapshots.get(entry.snapshotId());
if (snapshotShards != null) {
for (Map.Entry<ShardId, SnapshotMetaData.ShardSnapshotStatus> shard : entry.shards().entrySet()) {
IndexShardSnapshotStatus snapshotStatus = snapshotShards.shards.get(shard.getKey());
if (snapshotStatus != null) {
snapshotStatus.abort();
} else if (entry.state() == State.ABORTED) {
// Abort all running shards for this snapshot
SnapshotShards snapshotShards = shardSnapshots.get(entry.snapshotId());
if (snapshotShards != null) {
for (Map.Entry<ShardId, SnapshotMetaData.ShardSnapshotStatus> shard : entry.shards().entrySet()) {
IndexShardSnapshotStatus snapshotStatus = snapshotShards.shards.get(shard.getKey());
if (snapshotStatus != null) {
snapshotStatus.abort();
}
}
}
}
Expand Down

0 comments on commit 8fc41fe

Please sign in to comment.