Skip to content

Commit

Permalink
Gateway: add logging around gateway shard allocation
Browse files Browse the repository at this point in the history
This commit adds more logs around the gateway shard allocation. Any errors while reaching out to nodes to list the local shards are logged in `WARN`. Shard info loading time is logged under DEBUG. Also, we log a `WARN` message if an exception forces a full checksum check during reading the store metadata

Closes #9562
  • Loading branch information
bleskes committed Feb 5, 2015
1 parent 0dc82d6 commit a2fe890
Show file tree
Hide file tree
Showing 5 changed files with 80 additions and 63 deletions.
Expand Up @@ -26,6 +26,7 @@
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.action.FailedNodeException;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.node.DiscoveryNodes;
Expand All @@ -50,7 +51,9 @@
import org.elasticsearch.indices.store.TransportNodesListShardStoreMetaData;
import org.elasticsearch.transport.ConnectTransportException;

import java.util.*;
import java.util.Iterator;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentMap;

/**
Expand Down Expand Up @@ -400,19 +403,8 @@ public boolean apply(DiscoveryNode node) {

String[] nodesIdsArray = nodeIds.toArray(String.class);
TransportNodesListGatewayStartedShards.NodesLocalGatewayStartedShards response = listGatewayStartedShards.list(shard.shardId(), nodesIdsArray, listTimeout).actionGet();
if (logger.isDebugEnabled()) {
if (response.failures().length > 0) {
StringBuilder sb = new StringBuilder(shard + ": failures when trying to list shards on nodes:");
for (int i = 0; i < response.failures().length; i++) {
Throwable cause = ExceptionsHelper.unwrapCause(response.failures()[i]);
if (cause instanceof ConnectTransportException) {
continue;
}
sb.append("\n -> ").append(response.failures()[i].getDetailedMessage());
}
logger.debug(sb.toString());
}
}
logListActionFailures(shard, "state", response.failures());


for (TransportNodesListGatewayStartedShards.NodeLocalGatewayStartedShards nodeShardState : response) {
// -1 version means it does not exists, which is what the API returns, and what we expect to
Expand All @@ -421,6 +413,17 @@ public boolean apply(DiscoveryNode node) {
return shardStates;
}

private void logListActionFailures(MutableShardRouting shard, String actionType, FailedNodeException[] failures) {
for (final FailedNodeException failure : failures) {
Throwable cause = ExceptionsHelper.unwrapCause(failure);
if (cause instanceof ConnectTransportException) {
continue;
}
// we log warn here. debug logs with full stack traces will be logged if debug logging is turned on for TransportNodeListGatewayStartedShards
logger.warn("{}: failed to list shard {} on node [{}]", failure, shard.shardId(), actionType, failure.nodeId());
}
}

private Map<DiscoveryNode, TransportNodesListShardStoreMetaData.StoreFilesMetaData> buildShardStores(DiscoveryNodes nodes, MutableShardRouting shard) {
Map<DiscoveryNode, TransportNodesListShardStoreMetaData.StoreFilesMetaData> shardStores = cachedStores.get(shard.shardId());
ObjectOpenHashSet<String> nodesIds;
Expand Down Expand Up @@ -449,19 +452,7 @@ private Map<DiscoveryNode, TransportNodesListShardStoreMetaData.StoreFilesMetaDa
if (!nodesIds.isEmpty()) {
String[] nodesIdsArray = nodesIds.toArray(String.class);
TransportNodesListShardStoreMetaData.NodesStoreFilesMetaData nodesStoreFilesMetaData = listShardStoreMetaData.list(shard.shardId(), false, nodesIdsArray, listTimeout).actionGet();
if (logger.isTraceEnabled()) {
if (nodesStoreFilesMetaData.failures().length > 0) {
StringBuilder sb = new StringBuilder(shard + ": failures when trying to list stores on nodes:");
for (int i = 0; i < nodesStoreFilesMetaData.failures().length; i++) {
Throwable cause = ExceptionsHelper.unwrapCause(nodesStoreFilesMetaData.failures()[i]);
if (cause instanceof ConnectTransportException) {
continue;
}
sb.append("\n -> ").append(nodesStoreFilesMetaData.failures()[i].getDetailedMessage());
}
logger.trace(sb.toString());
}
}
logListActionFailures(shard, "stores", nodesStoreFilesMetaData.failures());

for (TransportNodesListShardStoreMetaData.NodeStoreFilesMetaData nodeStoreFilesMetaData : nodesStoreFilesMetaData) {
if (nodeStoreFilesMetaData.storeFilesMetaData() != null) {
Expand Down
Expand Up @@ -35,4 +35,9 @@ public ShardStateInfo(long version, Boolean primary) {
this.version = version;
this.primary = primary;
}

@Override
public String toString() {
return "version [" + version + "], primary [" + primary + "]";
}
}
Expand Up @@ -117,10 +117,13 @@ protected NodesLocalGatewayStartedShards newResponse(Request request, AtomicRefe
@Override
protected NodeLocalGatewayStartedShards nodeOperation(NodeRequest request) throws ElasticsearchException {
try {
logger.trace("loading shard state info for {}", request.shardId);
ShardStateInfo shardStateInfo = shardsState.loadShardInfo(request.shardId);
if (shardStateInfo != null) {
logger.debug("{} shard state info found: [{}]", request.shardId, shardStateInfo);
return new NodeLocalGatewayStartedShards(clusterService.localNode(), shardStateInfo.version);
}
logger.trace("no shard info found for {}", request.shardId);
return new NodeLocalGatewayStartedShards(clusterService.localNode(), -1);
} catch (Exception e) {
throw new ElasticsearchException("failed to load started shards", e);
Expand Down
5 changes: 4 additions & 1 deletion src/main/java/org/elasticsearch/index/store/Store.java
Expand Up @@ -623,13 +623,16 @@ ImmutableMap<String, StoreFileMetaData> buildMetadata(IndexCommit commit, Direct
} else {
builder.put(segmentsFile, new StoreFileMetaData(segmentsFile, directory.fileLength(segmentsFile), legacyChecksum, null));
}
} catch (CorruptIndexException ex) {
} catch (CorruptIndexException | IndexNotFoundException ex) {
// we either know the index is corrupted or it's just not there
throw ex;
} catch (Throwable ex) {
try {
// Lucene checks the checksum after it tries to lookup the codec etc.
// in that case we might get only IAE or similar exceptions while we are really corrupt...
// TODO we should check the checksum in lucene if we hit an exception
logger.warn("failed to build store metadata. checking segment info integrity (with commit [{}])",
ex, commit == null ? "no" : "yes");
Lucene.checkSegmentInfoIntegrity(directory);
} catch (CorruptIndexException cex) {
cex.addSuppressed(ex);
Expand Down
Expand Up @@ -50,7 +50,10 @@

import java.io.File;
import java.io.IOException;
import java.util.*;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.atomic.AtomicReferenceArray;

/**
Expand Down Expand Up @@ -142,44 +145,56 @@ protected NodeStoreFilesMetaData nodeOperation(NodeRequest request) throws Elast
}

private StoreFilesMetaData listStoreMetaData(ShardId shardId) throws IOException {
IndexService indexService = indicesService.indexService(shardId.index().name());
if (indexService != null) {
InternalIndexShard indexShard = (InternalIndexShard) indexService.shard(shardId.id());
if (indexShard != null) {
final Store store = indexShard.store();
store.incRef();
try {
return new StoreFilesMetaData(true, shardId, store.getMetadataOrEmpty().asMap());
} finally {
store.decRef();
logger.trace("listing store meta data for {}", shardId);
long startTime = System.currentTimeMillis();
boolean exists = false;
try {
IndexService indexService = indicesService.indexService(shardId.index().name());
if (indexService != null) {
InternalIndexShard indexShard = (InternalIndexShard) indexService.shard(shardId.id());
if (indexShard != null) {
final Store store = indexShard.store();
store.incRef();
try {
exists = true;
return new StoreFilesMetaData(true, shardId, store.getMetadataOrEmpty().asMap());
} finally {
store.decRef();
}
}
}
}
// try and see if we an list unallocated
IndexMetaData metaData = clusterService.state().metaData().index(shardId.index().name());
if (metaData == null) {
return new StoreFilesMetaData(false, shardId, ImmutableMap.<String, StoreFileMetaData>of());
}
String storeType = metaData.settings().get("index.store.type", "fs");
if (!storeType.contains("fs")) {
return new StoreFilesMetaData(false, shardId, ImmutableMap.<String, StoreFileMetaData>of());
}
File[] shardLocations = nodeEnv.shardLocations(shardId);
File[] shardIndexLocations = new File[shardLocations.length];
for (int i = 0; i < shardLocations.length; i++) {
shardIndexLocations[i] = new File(shardLocations[i], "index");
}
boolean exists = false;
for (File shardIndexLocation : shardIndexLocations) {
if (shardIndexLocation.exists()) {
exists = true;
break;
// try and see if we an list unallocated
IndexMetaData metaData = clusterService.state().metaData().index(shardId.index().name());
if (metaData == null) {
return new StoreFilesMetaData(false, shardId, ImmutableMap.<String, StoreFileMetaData>of());
}
String storeType = metaData.settings().get("index.store.type", "fs");
if (!storeType.contains("fs")) {
return new StoreFilesMetaData(false, shardId, ImmutableMap.<String, StoreFileMetaData>of());
}
File[] shardLocations = nodeEnv.shardLocations(shardId);
File[] shardIndexLocations = new File[shardLocations.length];
for (int i = 0; i < shardLocations.length; i++) {
shardIndexLocations[i] = new File(shardLocations[i], "index");
}
for (File shardIndexLocation : shardIndexLocations) {
if (shardIndexLocation.exists()) {
exists = true;
break;
}
}
if (!exists) {
return new StoreFilesMetaData(false, shardId, ImmutableMap.<String, StoreFileMetaData>of());
}
return new StoreFilesMetaData(false, shardId, Store.readMetadataSnapshot(shardIndexLocations, logger).asMap());
} finally {
TimeValue took = new TimeValue(System.currentTimeMillis() - startTime);
if (exists) {
logger.debug("loaded store meta data for {} (took [{}])", shardId, took);
} else {
logger.trace("loaded store meta data for {} (took [{}])", shardId, took);
}
}
if (!exists) {
return new StoreFilesMetaData(false, shardId, ImmutableMap.<String, StoreFileMetaData>of());
}
return new StoreFilesMetaData(false, shardId, Store.readMetadataSnapshot(shardIndexLocations, logger).asMap());
}

@Override
Expand Down

0 comments on commit a2fe890

Please sign in to comment.