Skip to content

Commit

Permalink
Recover broken IndexMetaData as closed
Browse files Browse the repository at this point in the history
Today if something is wrong with the IndexMetaData we detect it very
late and most of the time if that happens we already allocated the index
and get endless loops and full log files on data-nodes. This change tries
to verify IndexService creattion during initial state recovery on the master
and if the recovery fails the index is imported as `closed` and won't be allocated
at all.

Closes #17187
  • Loading branch information
s1monw committed Mar 21, 2016
1 parent 7f16a1d commit 8127a06
Show file tree
Hide file tree
Showing 8 changed files with 238 additions and 33 deletions.
1 change: 0 additions & 1 deletion buildSrc/src/main/resources/checkstyle_suppressions.xml
Original file line number Diff line number Diff line change
Expand Up @@ -419,7 +419,6 @@
<suppress files="core[/\\]src[/\\]main[/\\]java[/\\]org[/\\]elasticsearch[/\\]env[/\\]NodeEnvironment.java" checks="LineLength" />
<suppress files="core[/\\]src[/\\]main[/\\]java[/\\]org[/\\]elasticsearch[/\\]gateway[/\\]AsyncShardFetch.java" checks="LineLength" />
<suppress files="core[/\\]src[/\\]main[/\\]java[/\\]org[/\\]elasticsearch[/\\]gateway[/\\]DanglingIndicesState.java" checks="LineLength" />
<suppress files="core[/\\]src[/\\]main[/\\]java[/\\]org[/\\]elasticsearch[/\\]gateway[/\\]Gateway.java" checks="LineLength" />
<suppress files="core[/\\]src[/\\]main[/\\]java[/\\]org[/\\]elasticsearch[/\\]gateway[/\\]GatewayAllocator.java" checks="LineLength" />
<suppress files="core[/\\]src[/\\]main[/\\]java[/\\]org[/\\]elasticsearch[/\\]gateway[/\\]GatewayMetaState.java" checks="LineLength" />
<suppress files="core[/\\]src[/\\]main[/\\]java[/\\]org[/\\]elasticsearch[/\\]gateway[/\\]GatewayService.java" checks="LineLength" />
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

package org.elasticsearch.cluster.metadata;

import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.admin.indices.close.CloseIndexClusterStateUpdateRequest;
import org.elasticsearch.action.admin.indices.open.OpenIndexClusterStateUpdateRequest;
Expand All @@ -37,6 +38,8 @@
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.index.Index;
import org.elasticsearch.index.NodeServicesProvider;
import org.elasticsearch.indices.IndicesService;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.snapshots.RestoreService;
import org.elasticsearch.snapshots.SnapshotsService;
Expand All @@ -59,10 +62,16 @@ public class MetaDataIndexStateService extends AbstractComponent {
private final AllocationService allocationService;

private final MetaDataIndexUpgradeService metaDataIndexUpgradeService;
private final NodeServicesProvider nodeServiceProvider;
private final IndicesService indicesService;

@Inject
public MetaDataIndexStateService(Settings settings, ClusterService clusterService, AllocationService allocationService, MetaDataIndexUpgradeService metaDataIndexUpgradeService) {
public MetaDataIndexStateService(Settings settings, ClusterService clusterService, AllocationService allocationService,
MetaDataIndexUpgradeService metaDataIndexUpgradeService,
NodeServicesProvider nodeServicesProvider, IndicesService indicesService) {
super(settings);
this.nodeServiceProvider = nodeServicesProvider;
this.indicesService = indicesService;
this.clusterService = clusterService;
this.allocationService = allocationService;
this.metaDataIndexUpgradeService = metaDataIndexUpgradeService;
Expand Down Expand Up @@ -162,6 +171,12 @@ public ClusterState execute(ClusterState currentState) {
// The index might be closed because we couldn't import it due to old incompatible version
// We need to check that this index can be upgraded to the current version
indexMetaData = metaDataIndexUpgradeService.upgradeIndexMetaData(indexMetaData);
try {
indicesService.verifyIndexMetadata(nodeServiceProvider, indexMetaData);
} catch (Exception e) {
throw new ElasticsearchException("Failed to verify index " + indexMetaData.getIndex(), e);
}

mdBuilder.put(indexMetaData, true);
blocksBuilder.removeIndexBlock(indexName, INDEX_CLOSED_BLOCK);
}
Expand Down
29 changes: 25 additions & 4 deletions core/src/main/java/org/elasticsearch/gateway/Gateway.java
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,14 @@
import org.elasticsearch.discovery.Discovery;
import org.elasticsearch.env.NodeEnvironment;
import org.elasticsearch.index.Index;
import org.elasticsearch.index.IndexService;
import org.elasticsearch.index.NodeServicesProvider;
import org.elasticsearch.indices.IndicesService;

import java.io.IOException;
import java.nio.file.Path;
import java.util.Arrays;
import java.util.Collections;
import java.util.function.Supplier;

/**
Expand All @@ -53,10 +59,15 @@ public class Gateway extends AbstractComponent implements ClusterStateListener {
private final TransportNodesListGatewayMetaState listGatewayMetaState;

private final Supplier<Integer> minimumMasterNodesProvider;
private final IndicesService indicesService;
private final NodeServicesProvider nodeServicesProvider;

public Gateway(Settings settings, ClusterService clusterService, NodeEnvironment nodeEnv, GatewayMetaState metaState,
TransportNodesListGatewayMetaState listGatewayMetaState, Discovery discovery) {
TransportNodesListGatewayMetaState listGatewayMetaState, Discovery discovery,
NodeServicesProvider nodeServicesProvider, IndicesService indicesService) {
super(settings);
this.nodeServicesProvider = nodeServicesProvider;
this.indicesService = indicesService;
this.clusterService = clusterService;
this.nodeEnv = nodeEnv;
this.metaState = metaState;
Expand All @@ -66,9 +77,9 @@ public Gateway(Settings settings, ClusterService clusterService, NodeEnvironment
}

public void performStateRecovery(final GatewayStateRecoveredListener listener) throws GatewayException {
ObjectHashSet<String> nodesIds = new ObjectHashSet<>(clusterService.state().nodes().masterNodes().keys());
logger.trace("performing state recovery from {}", nodesIds);
TransportNodesListGatewayMetaState.NodesGatewayMetaState nodesState = listGatewayMetaState.list(nodesIds.toArray(String.class), null).actionGet();
String[] nodesIds = clusterService.state().nodes().masterNodes().keys().toArray(String.class);
logger.trace("performing state recovery from {}", Arrays.toString(nodesIds));
TransportNodesListGatewayMetaState.NodesGatewayMetaState nodesState = listGatewayMetaState.list(nodesIds, null).actionGet();


int requiredAllocation = Math.max(1, minimumMasterNodesProvider.get());
Expand Down Expand Up @@ -129,7 +140,17 @@ public void performStateRecovery(final GatewayStateRecoveredListener listener) t
if (electedIndexMetaData != null) {
if (indexMetaDataCount < requiredAllocation) {
logger.debug("[{}] found [{}], required [{}], not adding", index, indexMetaDataCount, requiredAllocation);
} // TODO if this logging statement is correct then we are missing an else here
try {
if (electedIndexMetaData.getState() == IndexMetaData.State.OPEN) {
// verify that we can actually create this index - if not we recover it as closed with lots of warn logs
indicesService.verifyIndexMetadata(nodeServicesProvider, electedIndexMetaData);
}
} catch (Exception e) {
logger.warn("recovering index {} failed - recovering as closed", e, electedIndexMetaData.getIndex());
electedIndexMetaData = IndexMetaData.builder(electedIndexMetaData).state(IndexMetaData.State.CLOSE).build();
}

metaDataBuilder.put(electedIndexMetaData, false);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,8 +86,8 @@ public GatewayMetaState(Settings settings, NodeEnvironment nodeEnv, MetaStateSer
if (DiscoveryNode.masterNode(settings) || DiscoveryNode.dataNode(settings)) {
try {
ensureNoPre019State();
pre20Upgrade();
IndexFolderUpgrader.upgradeIndicesIfNeeded(settings, nodeEnv);
upgradeMetaData();
long startNS = System.nanoTime();
metaStateService.loadFullState();
logger.debug("took {} to load state", TimeValue.timeValueMillis(TimeValue.nsecToMSec(System.nanoTime() - startNS)));
Expand Down Expand Up @@ -222,7 +222,7 @@ private void ensureNoPre019State() throws Exception {
* MetaDataIndexUpgradeService might also update obsolete settings if needed. When this happens we rewrite
* index metadata with new settings.
*/
private void pre20Upgrade() throws Exception {
private void upgradeMetaData() throws Exception {
MetaData metaData = loadMetaState();
List<IndexMetaData> updateIndexMetaData = new ArrayList<>();
for (IndexMetaData indexMetaData : metaData) {
Expand All @@ -235,7 +235,7 @@ private void pre20Upgrade() throws Exception {
// means the upgrade can continue. Now it's safe to overwrite index metadata with the new version.
for (IndexMetaData indexMetaData : updateIndexMetaData) {
// since we still haven't upgraded the index folders, we write index state in the old folder
metaStateService.writeIndex("upgrade", indexMetaData, nodeEnv.resolveIndexFolder(indexMetaData.getIndex().getName()));
metaStateService.writeIndex("upgrade", indexMetaData, nodeEnv.resolveIndexFolder(indexMetaData.getIndex().getUUID()));
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,8 @@
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
import org.elasticsearch.discovery.Discovery;
import org.elasticsearch.env.NodeEnvironment;
import org.elasticsearch.index.NodeServicesProvider;
import org.elasticsearch.indices.IndicesService;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.threadpool.ThreadPool;

Expand Down Expand Up @@ -95,9 +97,11 @@ public class GatewayService extends AbstractLifecycleComponent<GatewayService> i
@Inject
public GatewayService(Settings settings, AllocationService allocationService, ClusterService clusterService,
ThreadPool threadPool, NodeEnvironment nodeEnvironment, GatewayMetaState metaState,
TransportNodesListGatewayMetaState listGatewayMetaState, Discovery discovery) {
TransportNodesListGatewayMetaState listGatewayMetaState, Discovery discovery,
NodeServicesProvider nodeServicesProvider, IndicesService indicesService) {
super(settings);
this.gateway = new Gateway(settings, clusterService, nodeEnvironment, metaState, listGatewayMetaState, discovery);
this.gateway = new Gateway(settings, clusterService, nodeEnvironment, metaState, listGatewayMetaState, discovery,
nodeServicesProvider, indicesService);
this.allocationService = allocationService;
this.clusterService = clusterService;
this.threadPool = threadPool;
Expand Down
79 changes: 59 additions & 20 deletions core/src/main/java/org/elasticsearch/indices/IndicesService.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

package org.elasticsearch.indices;

import com.carrotsearch.hppc.cursors.ObjectCursor;
import org.apache.lucene.index.DirectoryReader;
import org.apache.lucene.store.LockObtainFailedException;
import org.apache.lucene.util.CollectionUtil;
Expand All @@ -33,6 +34,7 @@
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.metadata.MappingMetaData;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.breaker.CircuitBreaker;
Expand Down Expand Up @@ -66,6 +68,7 @@
import org.elasticsearch.index.fielddata.IndexFieldDataCache;
import org.elasticsearch.index.flush.FlushStats;
import org.elasticsearch.index.get.GetStats;
import org.elasticsearch.index.mapper.MapperService;
import org.elasticsearch.index.merge.MergeStats;
import org.elasticsearch.index.recovery.RecoveryStats;
import org.elasticsearch.index.refresh.RefreshStats;
Expand All @@ -74,6 +77,7 @@
import org.elasticsearch.index.shard.IndexEventListener;
import org.elasticsearch.index.shard.IndexShard;
import org.elasticsearch.index.shard.IndexShardState;
import org.elasticsearch.index.shard.IndexingOperationListener;
import org.elasticsearch.index.shard.IndexingStats;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.index.store.IndexStoreConfig;
Expand All @@ -88,9 +92,11 @@
import org.elasticsearch.search.query.QuerySearchResult;
import org.elasticsearch.threadpool.ThreadPool;

import java.io.Closeable;
import java.io.IOException;
import java.nio.file.Files;
import java.util.ArrayList;
import java.util.Collections;
import java.util.EnumSet;
import java.util.HashMap;
import java.util.Iterator;
Expand Down Expand Up @@ -324,44 +330,30 @@ public IndexService indexServiceSafe(Index index) {
* @throws IndexAlreadyExistsException if the index already exists.
*/
public synchronized IndexService createIndex(final NodeServicesProvider nodeServicesProvider, IndexMetaData indexMetaData, List<IndexEventListener> builtInListeners) throws IOException {

if (!lifecycle.started()) {
throw new IllegalStateException("Can't create an index [" + indexMetaData.getIndex() + "], node is closed");
}
if (indexMetaData.getIndexUUID().equals(IndexMetaData.INDEX_UUID_NA_VALUE)) {
throw new IllegalArgumentException("index must have a real UUID found value: [" + indexMetaData.getIndexUUID() + "]");
}
final Index index = indexMetaData.getIndex();
final Predicate<String> indexNameMatcher = (indexExpression) -> indexNameExpressionResolver.matchesIndex(index.getName(), indexExpression, clusterService.state());
final IndexSettings idxSettings = new IndexSettings(indexMetaData, this.settings, indexNameMatcher, indexScopeSetting);
if (hasIndex(index)) {
throw new IndexAlreadyExistsException(index);
}
logger.debug("creating Index [{}], shards [{}]/[{}{}]",
indexMetaData.getIndex(),
idxSettings.getNumberOfShards(),
idxSettings.getNumberOfReplicas(),
idxSettings.isShadowReplicaIndex() ? "s" : "");

final IndexModule indexModule = new IndexModule(idxSettings, indexStoreConfig, analysisRegistry);
pluginsService.onIndexModule(indexModule);
for (IndexEventListener listener : builtInListeners) {
indexModule.addIndexEventListener(listener);
}
List<IndexEventListener> finalListeners = new ArrayList<>(builtInListeners);
final IndexEventListener onStoreClose = new IndexEventListener() {
@Override
public void onStoreClosed(ShardId shardId) {
indicesQueryCache.onClose(shardId);
}
};
indexModule.addIndexEventListener(onStoreClose);
indexModule.addIndexEventListener(oldShardsStats);
final IndexEventListener listener = indexModule.freeze();
listener.beforeIndexCreated(index, idxSettings.getSettings());
final IndexService indexService = indexModule.newIndexService(nodeEnv, this, nodeServicesProvider, indicesQueryCache, mapperRegistry, indicesFieldDataCache, indexingMemoryController);
finalListeners.add(onStoreClose);
finalListeners.add(oldShardsStats);
final IndexService indexService = createIndexService("create index", nodeServicesProvider, indexMetaData, indicesQueryCache, indicesFieldDataCache, finalListeners, indexingMemoryController);
boolean success = false;
try {
assert indexService.getIndexEventListener() == listener;
listener.afterIndexCreated(indexService);
indexService.getIndexEventListener().afterIndexCreated(indexService);
indices = newMapBuilder(indices).put(index.getUUID(), indexService).immutableMap();
success = true;
return indexService;
Expand All @@ -370,7 +362,54 @@ public void onStoreClosed(ShardId shardId) {
indexService.close("plugins_failed", true);
}
}
}

/**
* This creates a new IndexService without registering it
*/
private synchronized IndexService createIndexService(final String reason, final NodeServicesProvider nodeServicesProvider, IndexMetaData indexMetaData, IndicesQueryCache indicesQueryCache, IndicesFieldDataCache indicesFieldDataCache, List<IndexEventListener> builtInListeners, IndexingOperationListener... indexingOperationListeners) throws IOException {
final Index index = indexMetaData.getIndex();
final Predicate<String> indexNameMatcher = (indexExpression) -> indexNameExpressionResolver.matchesIndex(index.getName(), indexExpression, clusterService.state());
final IndexSettings idxSettings = new IndexSettings(indexMetaData, this.settings, indexNameMatcher, indexScopeSetting);
logger.debug("creating Index [{}], shards [{}]/[{}{}] - reason [{}]",
indexMetaData.getIndex(),
idxSettings.getNumberOfShards(),
idxSettings.getNumberOfReplicas(),
idxSettings.isShadowReplicaIndex() ? "s" : "", reason);

final IndexModule indexModule = new IndexModule(idxSettings, indexStoreConfig, analysisRegistry);
pluginsService.onIndexModule(indexModule);
for (IndexEventListener listener : builtInListeners) {
indexModule.addIndexEventListener(listener);
}
final IndexEventListener listener = indexModule.freeze();
listener.beforeIndexCreated(index, idxSettings.getSettings());
return indexModule.newIndexService(nodeEnv, this, nodeServicesProvider, indicesQueryCache, mapperRegistry, indicesFieldDataCache, indexingOperationListeners);
}

/**
* This method verifies that the given {@link IndexMetaData} holds sane values to create an {@link IndexService}. This method will throw an
* exception if the creation fails. The created {@link IndexService} will not be registered and will be closed immediately.
*/
public synchronized void verifyIndexMetadata(final NodeServicesProvider nodeServicesProvider, IndexMetaData metaData) throws IOException {
final List<Closeable> closeables = new ArrayList<>();
try {
IndicesFieldDataCache indicesFieldDataCache = new IndicesFieldDataCache(settings, new IndexFieldDataCache.Listener() {});
closeables.add(indicesFieldDataCache);
IndicesQueryCache indicesQueryCache = new IndicesQueryCache(settings);
closeables.add(indicesQueryCache);
// this will also fail if some plugin fails etc. which is nice since we can verify that early
final IndexService service = createIndexService("metadata verification", nodeServicesProvider,
metaData, indicesQueryCache, indicesFieldDataCache, Collections.emptyList());
for (ObjectCursor<MappingMetaData> typeMapping : metaData.getMappings().values()) {
// don't apply the default mapping, it has been applied when the mapping was created
service.mapperService().merge(typeMapping.value.type(), typeMapping.value.source(),
MapperService.MergeReason.MAPPING_RECOVERY, true);
}
closeables.add(() -> service.close("metadata verification", false));
} finally {
IOUtils.close(closeables);
}
}

/**
Expand Down
Loading

0 comments on commit 8127a06

Please sign in to comment.