Skip to content

Commit

Permalink
Fix TransportNodesListShardStoreMetaData for custom data paths
Browse files Browse the repository at this point in the history
Cleans up the testReusePeerRecovery test as well

The actual fix is in TransportNodesListShardStoreMetaData.java, which
needs to use `nodeEnv.shardDataPaths` instead of `nodeEnv.shardPaths`.

Due to the difficulty in tracking this down, I've added a lot of
additional logging. This also fixes a logging issue in GatewayAllocator
  • Loading branch information
dakrone committed Dec 30, 2014
1 parent 904f20a commit 31652a8
Show file tree
Hide file tree
Showing 4 changed files with 35 additions and 19 deletions.
Expand Up @@ -330,6 +330,8 @@ public boolean allocateUnassigned(RoutingAllocation allocation) {
sizeMatched += storeFileMetaData.length();
}
}
logger.trace("{}: node [{}] has [{}/{}] bytes of re-usable data",
shard, discoNode.name(), new ByteSizeValue(sizeMatched), sizeMatched);
if (sizeMatched > lastSizeMatched) {
lastSizeMatched = sizeMatched;
lastDiscoNodeMatched = discoNode;
Expand All @@ -345,7 +347,7 @@ public boolean allocateUnassigned(RoutingAllocation allocation) {
// we only check on THROTTLE since we checked before before on NO
Decision decision = allocation.deciders().canAllocate(shard, lastNodeMatched, allocation);
if (decision.type() == Decision.Type.THROTTLE) {
if (logger.isTraceEnabled()) {
if (logger.isDebugEnabled()) {
logger.debug("[{}][{}]: throttling allocation [{}] to [{}] in order to reuse its unallocated persistent store with total_size [{}]", shard.index(), shard.id(), shard, lastDiscoNodeMatched, new ByteSizeValue(lastSizeMatched));
}
// we are throttling this, but we have enough to allocate to this node, ignore it for now
Expand Down Expand Up @@ -411,6 +413,8 @@ public boolean apply(DiscoveryNode node) {

for (TransportNodesListGatewayStartedShards.NodeGatewayStartedShards nodeShardState : response) {
// -1 version means it does not exists, which is what the API returns, and what we expect to
logger.trace("[{}] on node [{}] has version [{}] of shard",
shard, nodeShardState.getNode(), nodeShardState.version());
shardStates.put(nodeShardState.getNode(), nodeShardState.version());
}
return shardStates;
Expand Down
Expand Up @@ -244,7 +244,9 @@ public static <T> T loadLatestState(ESLogger logger, MetaDataStateFormat<T> form
maxVersion = Math.max(maxVersion, version);
final boolean legacy = MetaDataStateFormat.STATE_FILE_EXTENSION.equals(matcher.group(2)) == false;
maxVersionIsLegacy &= legacy; // on purpose, see NOTE below
files.add(new PathAndVersion(stateFile, version, legacy));
PathAndVersion pav = new PathAndVersion(stateFile, version, legacy);
logger.trace("found state file: {}", pav);
files.add(pav);
}
}
}
Expand Down Expand Up @@ -275,6 +277,7 @@ public static <T> T loadLatestState(ESLogger logger, MetaDataStateFormat<T> form
}
} else {
state = format.read(stateFile, version);
logger.trace("state version [{}] read from [{}]", version, stateFile.getFileName());
}
return state;
} catch (Throwable e) {
Expand Down Expand Up @@ -324,6 +327,10 @@ private PathAndVersion(Path file, long version, boolean legacy) {
this.version = version;
this.legacy = legacy;
}

public String toString() {
return "[version:" + version + ", legacy:" + legacy + ", file:" + file.toAbsolutePath() + "]";
}
}

/**
Expand Down
Expand Up @@ -165,7 +165,7 @@ private StoreFilesMetaData listStoreMetaData(ShardId shardId) throws IOException
if (!storeType.contains("fs")) {
return new StoreFilesMetaData(false, shardId, ImmutableMap.<String, StoreFileMetaData>of());
}
Path[] shardLocations = nodeEnv.shardPaths(shardId);
Path[] shardLocations = nodeEnv.shardDataPaths(shardId, metaData.settings());
Path[] shardIndexLocations = new Path[shardLocations.length];
for (int i = 0; i < shardLocations.length; i++) {
shardIndexLocations[i] = shardLocations[i].resolve("index");
Expand Down
Expand Up @@ -354,7 +354,8 @@ public void testReusePeerRecovery() throws Exception {
// prevent any rebalance actions during the peer recovery
// if we run into a relocation the reuse count will be 0 and this fails the test. We are testing here if
// we reuse the files on disk after full restarts for replicas.
assertAcked(prepareCreate("test").setSettings(ImmutableSettings.builder().put(indexSettings()).put(EnableAllocationDecider.INDEX_ROUTING_REBALANCE_ENABLE, EnableAllocationDecider.Rebalance.NONE)));
assertAcked(prepareCreate("test").setSettings(ImmutableSettings.builder().put(indexSettings())));
ensureGreen();
logger.info("--> indexing docs");
for (int i = 0; i < 1000; i++) {
client().prepareIndex("test", "type").setSource("field", "value").execute().actionGet();
Expand All @@ -368,44 +369,48 @@ public void testReusePeerRecovery() throws Exception {
logger.info("Running Cluster Health");
ensureGreen();
client().admin().indices().prepareOptimize("test").setWaitForMerge(true).setMaxNumSegments(100).get(); // just wait for merges
client().admin().indices().prepareFlush().setWaitIfOngoing(true).setForce(true).execute().actionGet();
client().admin().indices().prepareFlush().setWaitIfOngoing(true).setForce(true).get();

logger.info("--> shutting down the nodes");
logger.info("--> disabling allocation while the cluster is shut down");

// Disable allocations while we are closing nodes
client().admin().cluster().prepareUpdateSettings()
.setTransientSettings(settingsBuilder()
.put(EnableAllocationDecider.CLUSTER_ROUTING_ALLOCATION_ENABLE, EnableAllocationDecider.Allocation.NONE))
.get();
logger.info("--> full cluster restart");
internalCluster().fullRestart();

logger.info("Running Cluster Health");
logger.info("--> waiting for cluster to return to green after first shutdown");
ensureGreen();
logger.info("--> shutting down the nodes");

logger.info("--> disabling allocation while the cluster is shut down second time");
// Disable allocations while we are closing nodes
client().admin().cluster().prepareUpdateSettings()
.setTransientSettings(settingsBuilder()
.put(EnableAllocationDecider.CLUSTER_ROUTING_ALLOCATION_ENABLE, EnableAllocationDecider.Allocation.NONE))
.get();
logger.info("--> full cluster restart");
internalCluster().fullRestart();


logger.info("Running Cluster Health");
logger.info("--> waiting for cluster to return to green after second shutdown");
ensureGreen();

RecoveryResponse recoveryResponse = client().admin().indices().prepareRecoveries("test").get();

for (ShardRecoveryResponse response : recoveryResponse.shardResponses().get("test")) {
RecoveryState recoveryState = response.recoveryState();
if (!recoveryState.getPrimary()) {
logger.info("--> shard {}, recovered {}, reuse {}", response.getShardId(), recoveryState.getIndex().recoveredTotalSize(), recoveryState.getIndex().reusedByteCount());
assertThat(recoveryState.getIndex().recoveredByteCount(), equalTo(0l));
assertThat(recoveryState.getIndex().reusedByteCount(), greaterThan(0l));
assertThat(recoveryState.getIndex().reusedByteCount(), equalTo(recoveryState.getIndex().totalByteCount()));
assertThat(recoveryState.getIndex().recoveredFileCount(), equalTo(0));
assertThat(recoveryState.getIndex().reusedFileCount(), equalTo(recoveryState.getIndex().totalFileCount()));
assertThat(recoveryState.getIndex().reusedFileCount(), greaterThan(0));
assertThat(recoveryState.getIndex().reusedByteCount(), greaterThan(recoveryState.getIndex().numberOfRecoveredBytes()));
logger.info("--> replica shard {} recovered from {} to {}, recovered {}, reuse {}",
response.getShardId(), recoveryState.getSourceNode().name(), recoveryState.getTargetNode().name(),
recoveryState.getIndex().recoveredTotalSize(), recoveryState.getIndex().reusedByteCount());
assertThat("no bytes should be recovered", recoveryState.getIndex().recoveredByteCount(), equalTo(0l));
assertThat("data should have been reused", recoveryState.getIndex().reusedByteCount(), greaterThan(0l));
assertThat("all bytes should be reused", recoveryState.getIndex().reusedByteCount(), equalTo(recoveryState.getIndex().totalByteCount()));
assertThat("no files should be recovered", recoveryState.getIndex().recoveredFileCount(), equalTo(0));
assertThat("all files should be reused", recoveryState.getIndex().reusedFileCount(), equalTo(recoveryState.getIndex().totalFileCount()));
assertThat("> 0 files should be reused", recoveryState.getIndex().reusedFileCount(), greaterThan(0));
assertThat("all bytes should be reused bytes",
recoveryState.getIndex().reusedByteCount(), greaterThan(recoveryState.getIndex().numberOfRecoveredBytes()));
} else {
assertThat(recoveryState.getIndex().recoveredByteCount(), equalTo(recoveryState.getIndex().reusedByteCount()));
assertThat(recoveryState.getIndex().recoveredFileCount(), equalTo(recoveryState.getIndex().reusedFileCount()));
Expand Down

0 comments on commit 31652a8

Please sign in to comment.