Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Schedule pending delete if index store delete fails #9856

Merged
merged 1 commit into from Feb 25, 2015
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
29 changes: 21 additions & 8 deletions src/main/java/org/elasticsearch/env/NodeEnvironment.java
Expand Up @@ -230,19 +230,32 @@ public void deleteIndexDirectorySafe(Index index, long lockTimeoutMS, @IndexSett
assert indexSettings != ImmutableSettings.EMPTY;
final List<ShardLock> locks = lockAllForIndex(index, indexSettings, lockTimeoutMS);
try {
final Path[] indexPaths = indexPaths(index);
logger.trace("deleting index {} directory, paths({}): [{}]", index, indexPaths.length, indexPaths);
IOUtils.rm(indexPaths);
if (hasCustomDataPath(indexSettings)) {
Path customLocation = resolveCustomLocation(indexSettings, index.name());
logger.trace("deleting custom index {} directory [{}]", index, customLocation);
IOUtils.rm(customLocation);
}
deleteIndexDirectoryUnderLock(index, indexSettings);
} finally {
IOUtils.closeWhileHandlingException(locks);
}
}

/**
* Deletes an indexes data directory recursively.
* Note: this method assumes that the shard lock is acquired
*
* @param index the index to delete
* @param indexSettings settings for the index being deleted
*/
public void deleteIndexDirectoryUnderLock(Index index, @IndexSettings Settings indexSettings) throws IOException {
// This is to ensure someone doesn't use ImmutableSettings.EMPTY
assert indexSettings != ImmutableSettings.EMPTY;
final Path[] indexPaths = indexPaths(index);
logger.trace("deleting index {} directory, paths({}): [{}]", index, indexPaths.length, indexPaths);
IOUtils.rm(indexPaths);
if (hasCustomDataPath(indexSettings)) {
Path customLocation = resolveCustomLocation(indexSettings, index.name());
logger.trace("deleting custom index {} directory [{}]", index, customLocation);
IOUtils.rm(customLocation);
}
}


/**
* Tries to lock all local shards for the given index. If any of the shard locks can't be acquired
Expand Down
2 changes: 1 addition & 1 deletion src/main/java/org/elasticsearch/index/IndexService.java
Expand Up @@ -444,7 +444,7 @@ private void onShardClose(ShardLock lock, boolean ownsShard) {
indicesServices.deleteShardStore("delete index", lock, indexSettings);
}
} catch (IOException e) {
indicesServices.addPendingDelete(index(), lock.getShardId(), indexSettings);
indicesServices.addPendingDelete(lock.getShardId(), indexSettings);
logger.debug("{} failed to delete shard content - scheduled a retry", e, lock.getShardId().id());
}
}
Expand Down
2 changes: 0 additions & 2 deletions src/main/java/org/elasticsearch/index/store/Store.java
Expand Up @@ -92,7 +92,6 @@ public class Store extends AbstractIndexShardComponent implements Closeable, Ref
public static final String INDEX_STORE_STATS_REFRESH_INTERVAL = "index.store.stats_refresh_interval";

private final AtomicBoolean isClosed = new AtomicBoolean(false);
private final DirectoryService directoryService;
private final StoreDirectory directory;
private final ReentrantReadWriteLock metadataLock = new ReentrantReadWriteLock();
private final ShardLock shardLock;
Expand All @@ -114,7 +113,6 @@ public Store(ShardId shardId, @IndexSettings Settings indexSettings, DirectorySe
@Inject
public Store(ShardId shardId, @IndexSettings Settings indexSettings, DirectoryService directoryService, Distributor distributor, ShardLock shardLock, OnClose onClose) throws IOException {
super(shardId, indexSettings);
this.directoryService = directoryService;
this.directory = new StoreDirectory(directoryService.newFromDistributor(distributor), Loggers.getLogger("index.store.deletes", indexSettings, shardId));
this.shardLock = shardLock;
this.onClose = onClose;
Expand Down
123 changes: 87 additions & 36 deletions src/main/java/org/elasticsearch/indices/IndicesService.java
Expand Up @@ -28,14 +28,17 @@
import com.google.common.collect.Maps;

import org.apache.lucene.store.LockObtainFailedException;
import org.apache.lucene.util.CollectionUtil;
import org.apache.lucene.util.IOUtils;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.ElasticsearchIllegalArgumentException;
import org.elasticsearch.ElasticsearchIllegalStateException;
import org.elasticsearch.action.admin.indices.stats.CommonStats;
import org.elasticsearch.action.admin.indices.stats.CommonStatsFlags;
import org.elasticsearch.action.admin.indices.stats.CommonStatsFlags.Flag;
import org.elasticsearch.action.admin.indices.stats.IndexShardStats;
import org.elasticsearch.action.admin.indices.stats.ShardStats;
import org.elasticsearch.bootstrap.Elasticsearch;
import org.elasticsearch.cluster.ClusterService;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.metadata.IndexMetaData;
Expand Down Expand Up @@ -485,6 +488,7 @@ public void deleteIndexStore(String reason, IndexMetaData metaData) throws IOExc
}

private void deleteIndexStore(String reason, Index index, Settings indexSettings) throws IOException {
boolean success = false;
try {
// we are trying to delete the index store here - not a big deal if the lock can't be obtained
// the store metadata gets wiped anyway even without the lock this is just best effort since
Expand All @@ -493,11 +497,15 @@ private void deleteIndexStore(String reason, Index index, Settings indexSettings
if (canDeleteIndexContents(index, indexSettings)) {
nodeEnv.deleteIndexDirectorySafe(index, 0, indexSettings);
}
success = true;
} catch (LockObtainFailedException ex) {
logger.debug("{} failed to delete index store - at least one shards is still locked", ex, index);
} catch (Exception ex) {
logger.warn("{} failed to delete index", ex, index);
} finally {
if (success == false) {
addPendingDelete(index, indexSettings);
}
// this is a pure protection to make sure this index doesn't get re-imported as a dangeling index.
// we should in the future rather write a tombstone rather than wiping the metadata.
MetaDataStateFormat.deleteMetaState(nodeEnv.indexPaths(index));
Expand Down Expand Up @@ -603,32 +611,61 @@ private Settings buildIndexSettings(IndexMetaData metaData) {
}

/**
* Adds a pending delete for the given index.
* Adds a pending delete for the given index shard.
*/
public void addPendingDelete(Index index, ShardId shardId, Settings settings) {
public void addPendingDelete(ShardId shardId, @IndexSettings Settings settings) {
if (shardId == null) {
throw new ElasticsearchIllegalArgumentException("shardId must not be null");
}
if (settings == null) {
throw new ElasticsearchIllegalArgumentException("settings must not be null");
}
PendingDelete pendingDelete = new PendingDelete(shardId, settings, false);
addPendingDelete(shardId.index(), pendingDelete);
}

private void addPendingDelete(Index index, PendingDelete pendingDelete) {
synchronized (pendingDeletes) {
List<PendingDelete> list = pendingDeletes.get(index);
if (list == null) {
list = new ArrayList<>();
pendingDeletes.put(index, list);
}
list.add(new PendingDelete(shardId, settings));
list.add(pendingDelete);
}
}

private static final class PendingDelete {
/**
* Adds a pending delete for the given index shard.
*/
public void addPendingDelete(Index index, @IndexSettings Settings settings) {
PendingDelete pendingDelete = new PendingDelete(null, settings, true);
addPendingDelete(index, pendingDelete);
}

private static final class PendingDelete implements Comparable<PendingDelete> {
final ShardId shardId;
final Settings settings;
final boolean deleteIndex;

public PendingDelete(ShardId shardId, Settings settings) {
public PendingDelete(ShardId shardId, Settings settings, boolean deleteIndex) {
this.shardId = shardId;
this.settings = settings;
this.deleteIndex = deleteIndex;
assert deleteIndex || shardId != null;
}

@Override
public String toString() {
return shardId.toString();
}

@Override
public int compareTo(PendingDelete o) {
int left = deleteIndex ? -1 : shardId.id();
int right = o.deleteIndex ? -1 : o.shardId.id();
return Integer.compare(left, right);
}
}

/**
Expand All @@ -653,40 +690,54 @@ public void processPendingDeletes(Index index, @IndexSettings Settings indexSett
synchronized (pendingDeletes) {
remove = pendingDeletes.remove(index);
}
final long maxSleepTimeMs = 10 * 1000; // ensure we retry after 10 sec
long sleepTime = 10;
do {
if (remove == null || remove.isEmpty()) {
break;
}
Iterator<PendingDelete> iterator = remove.iterator();
while (iterator.hasNext()) {
PendingDelete delete = iterator.next();
ShardLock shardLock = locks.get(delete.shardId);
if (shardLock != null) {
try {
deleteShardStore("pending delete", shardLock, delete.settings);
iterator.remove();
} catch (IOException ex) {
logger.debug("{} retry pending delete", ex, shardLock.getShardId());
if (remove != null && remove.isEmpty() == false) {
CollectionUtil.timSort(remove); // make sure we delete indices first
final long maxSleepTimeMs = 10 * 1000; // ensure we retry after 10 sec
long sleepTime = 10;
do {
if (remove.isEmpty()) {
break;
}
Iterator<PendingDelete> iterator = remove.iterator();
while (iterator.hasNext()) {
PendingDelete delete = iterator.next();

if (delete.deleteIndex) {
logger.debug("{} deleting index store reason [{}]", index, "pending delete");
try {
nodeEnv.deleteIndexDirectoryUnderLock(index, indexSettings);
iterator.remove();
} catch (IOException ex) {
logger.debug("{} retry pending delete", ex, index);
}
} else {
ShardLock shardLock = locks.get(delete.shardId);
if (shardLock != null) {
try {
deleteShardStore("pending delete", shardLock, delete.settings);
iterator.remove();
} catch (IOException ex) {
logger.debug("{} retry pending delete", ex, shardLock.getShardId());
}
} else {
logger.warn("{} no shard lock for pending delete", delete.shardId);
iterator.remove();
}
}
} else {
logger.warn("{} no shard lock for pending delete", delete.shardId);
iterator.remove();
}
}
if (remove.isEmpty() == false) {
logger.warn("{} still pending deletes present for shards {} - retrying", index, remove.toString());
try {
Thread.sleep(sleepTime);
sleepTime = Math.min(maxSleepTimeMs, sleepTime * 2); // increase the sleep time gradually
logger.debug("{} schedule pending delete retry after {} ms", index, sleepTime);
} catch (InterruptedException e) {
Thread.interrupted();
return;
if (remove.isEmpty() == false) {
logger.warn("{} still pending deletes present for shards {} - retrying", index, remove.toString());
try {
Thread.sleep(sleepTime);
sleepTime = Math.min(maxSleepTimeMs, sleepTime * 2); // increase the sleep time gradually
logger.debug("{} schedule pending delete retry after {} ms", index, sleepTime);
} catch (InterruptedException e) {
Thread.interrupted();
return;
}
}
}
} while ((System.currentTimeMillis() - startTime) < timeout.millis());
} while ((System.currentTimeMillis() - startTime) < timeout.millis());
}
} finally {
IOUtils.close(shardLocks);
}
Expand Down
31 changes: 17 additions & 14 deletions src/test/java/org/elasticsearch/indices/IndicesServiceTest.java
Expand Up @@ -21,18 +21,12 @@
import org.apache.lucene.store.LockObtainFailedException;
import org.elasticsearch.ElasticsearchIllegalStateException;
import org.elasticsearch.Version;
import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse;
import org.elasticsearch.action.admin.cluster.health.ClusterHealthStatus;
import org.elasticsearch.client.Requests;
import org.elasticsearch.cluster.ClusterService;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.common.Priority;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.env.NodeEnvironment;
import org.elasticsearch.gateway.GatewayMetaState;
import org.elasticsearch.index.Index;
import org.elasticsearch.index.IndexService;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.test.ElasticsearchSingleNodeTest;
Expand Down Expand Up @@ -130,10 +124,10 @@ public void testDeleteIndexStore() throws Exception {
public void testPendingTasks() throws IOException {
IndicesService indicesService = getIndicesService();
IndexService test = createIndex("test");
NodeEnvironment nodeEnc = getInstanceFromNode(NodeEnvironment.class);
NodeEnvironment nodeEnv = getInstanceFromNode(NodeEnvironment.class);

assertTrue(test.hasShard(0));
Path[] paths = nodeEnc.shardDataPaths(new ShardId(test.index(), 0), test.getIndexSettings());
Path[] paths = nodeEnv.shardDataPaths(new ShardId(test.index(), 0), test.getIndexSettings());
try {
indicesService.processPendingDeletes(test.index(), test.getIndexSettings(), new TimeValue(0, TimeUnit.MILLISECONDS));
fail("can't get lock");
Expand All @@ -143,23 +137,32 @@ public void testPendingTasks() throws IOException {
for (Path p : paths) {
assertTrue(Files.exists(p));
}
indicesService.addPendingDelete(test.index(), new ShardId(test.index(), 0), test.getIndexSettings());
int numPending = 1;
if (randomBoolean()) {
indicesService.addPendingDelete(new ShardId(test.index(), 0), test.getIndexSettings());
} else {
if (randomBoolean()) {
numPending++;
indicesService.addPendingDelete(new ShardId(test.index(), 0), test.getIndexSettings());
}
indicesService.addPendingDelete(test.index(), test.getIndexSettings());
}
assertAcked(client().admin().indices().prepareClose("test"));
for (Path p : paths) {
assertTrue(Files.exists(p));
}
assertEquals(indicesService.numPendingDeletes(test.index()), 1);
assertEquals(indicesService.numPendingDeletes(test.index()), numPending);
// shard lock released... we can now delete
indicesService.processPendingDeletes(test.index(), test.getIndexSettings(), new TimeValue(0, TimeUnit.MILLISECONDS));
indicesService.processPendingDeletes(test.index(), test.getIndexSettings(), new TimeValue(0, TimeUnit.MILLISECONDS));
assertEquals(indicesService.numPendingDeletes(test.index()), 0);
for (Path p : paths) {
assertFalse(Files.exists(p));
}

if (randomBoolean()) {
indicesService.addPendingDelete(test.index(), new ShardId(test.index(), 0), test.getIndexSettings());
indicesService.addPendingDelete(test.index(), new ShardId(test.index(), 1), test.getIndexSettings());
indicesService.addPendingDelete(new Index("bogus"), new ShardId("bogus", 1), test.getIndexSettings());
indicesService.addPendingDelete(new ShardId(test.index(), 0), test.getIndexSettings());
indicesService.addPendingDelete(new ShardId(test.index(), 1), test.getIndexSettings());
indicesService.addPendingDelete(new ShardId("bogus", 1), test.getIndexSettings());
assertEquals(indicesService.numPendingDeletes(test.index()), 2);
// shard lock released... we can now delete
indicesService.processPendingDeletes(test.index(), test.getIndexSettings(), new TimeValue(0, TimeUnit.MILLISECONDS));
Expand Down