Skip to content

Commit

Permalink
Internal: don't try to send a mapping refresh if there is no master
Browse files Browse the repository at this point in the history
After processing mapping updates from the master, we compare the resulting binary representation of them and compare it the one cluster state has. If different, we send a refresh mapping request to master, asking it to reparse the mapping and serialize them again. This mechanism is used to update the mapping after a format change caused by a version upgrade.

The very same process can also be triggered when an old master leaves the cluster, triggering a local cluster state update. If that update contains old mapping format, the local node will again signal the need to refresh, but this time there is no master to accept the request. Instead of failing (which we now do because of elastic#10283, we should just skip the notification and wait for the next elected master to publish a new mapping (triggering another refresh if needed).

Closes elastic#10311
  • Loading branch information
bleskes committed Mar 30, 2015
1 parent 31d158d commit 415c183
Show file tree
Hide file tree
Showing 2 changed files with 81 additions and 66 deletions.
Expand Up @@ -26,6 +26,7 @@
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.metadata.MetaDataMappingService;
import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.component.AbstractComponent;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.io.stream.StreamInput;
Expand Down Expand Up @@ -55,11 +56,16 @@ public NodeMappingRefreshAction(Settings settings, TransportService transportSer
}

public void nodeMappingRefresh(final ClusterState state, final NodeMappingRefreshRequest request) throws ElasticsearchException {
DiscoveryNodes nodes = state.nodes();
final DiscoveryNodes nodes = state.nodes();
if (nodes.masterNode() == null) {
logger.warn("can't send mapping refresh for [{}][{}], no master known.", request.index(), Strings.arrayToCommaDelimitedString(request.types()));
return;
}

if (nodes.localNodeMaster()) {
innerMappingRefresh(request);
} else {
transportService.sendRequest(state.nodes().masterNode(),
transportService.sendRequest(nodes.masterNode(),
ACTION_NAME, request, EmptyTransportResponseHandler.INSTANCE_SAME);
}
}
Expand Down
Expand Up @@ -19,13 +19,25 @@
package org.elasticsearch.gateway.local;

import org.apache.lucene.util.LuceneTestCase;
import org.elasticsearch.Version;
import org.elasticsearch.action.admin.indices.recovery.RecoveryResponse;
import org.elasticsearch.action.admin.indices.recovery.ShardRecoveryResponse;
import org.elasticsearch.action.count.CountResponse;
import org.elasticsearch.action.index.IndexRequestBuilder;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.routing.allocation.allocator.BalancedShardsAllocator;
import org.elasticsearch.cluster.routing.allocation.decider.EnableAllocationDecider;
import org.elasticsearch.common.settings.ImmutableSettings;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.indices.recovery.RecoveryState;
import org.elasticsearch.test.ElasticsearchBackwardsCompatIntegrationTest;
import org.elasticsearch.test.ElasticsearchIntegrationTest;
import org.junit.Test;

import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertHitCount;
import static org.hamcrest.Matchers.*;

@ElasticsearchIntegrationTest.ClusterScope(numDataNodes = 0, scope = ElasticsearchIntegrationTest.Scope.TEST, numClientNodes = 0, transportClientRatio = 0.0)
public class RecoveryBackwardsCompatibilityTests extends ElasticsearchBackwardsCompatIntegrationTest {

Expand All @@ -49,71 +61,68 @@ protected int maxExternalNodes() {

@Test
@LuceneTestCase.Slow
@LuceneTestCase.AwaitsFix(bugUrl = "fails due to https://github.com/elastic/elasticsearch/pull/10283, Boaz looking into it")
public void testReusePeerRecovery() throws Exception {
// BL: also commenting out because CI doesn't honor AwaitsFix on when running bwc tests.
assertAcked(prepareCreate("test").setSettings(ImmutableSettings.builder().put(indexSettings()).put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 0)));
logger.info("--> indexing docs");
int numDocs = scaledRandomIntBetween(100, 1000);
IndexRequestBuilder[] builders = new IndexRequestBuilder[numDocs];
for (int i = 0; i < builders.length; i++) {
builders[i] = client().prepareIndex("test", "type").setSource("field", "value");
}
indexRandom(true, builders);
ensureGreen();

logger.info("--> bump number of replicas from 0 to 1");
client().admin().indices().prepareFlush().execute().actionGet();
client().admin().indices().prepareUpdateSettings("test").setSettings(ImmutableSettings.builder().put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, "1").build()).get();
ensureGreen();

assertAllShardsOnNodes("test", backwardsCluster().backwardsNodePattern());

logger.info("--> upgrade cluster");
logClusterState();
CountResponse countResponse = client().prepareCount().get();
assertHitCount(countResponse, numDocs);

client().admin().cluster().prepareUpdateSettings().setTransientSettings(ImmutableSettings.settingsBuilder().put(EnableAllocationDecider.CLUSTER_ROUTING_ALLOCATION_ENABLE, "none")).execute().actionGet();
backwardsCluster().upgradeAllNodes();
client().admin().cluster().prepareUpdateSettings().setTransientSettings(ImmutableSettings.settingsBuilder().put(EnableAllocationDecider.CLUSTER_ROUTING_ALLOCATION_ENABLE, "all")).execute().actionGet();
ensureGreen();

countResponse = client().prepareCount().get();
assertHitCount(countResponse, numDocs);

// assertAcked(prepareCreate("test").setSettings(ImmutableSettings.builder().put(indexSettings()).put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 0)));
// logger.info("--> indexing docs");
// int numDocs = scaledRandomIntBetween(100, 1000);
// IndexRequestBuilder[] builders = new IndexRequestBuilder[numDocs];
// for (int i = 0; i < builders.length; i++) {
// builders[i] = client().prepareIndex("test", "type").setSource("field", "value");
// }
// indexRandom(true, builders);
// ensureGreen();
//
// logger.info("--> bump number of replicas from 0 to 1");
// client().admin().indices().prepareFlush().execute().actionGet();
// client().admin().indices().prepareUpdateSettings("test").setSettings(ImmutableSettings.builder().put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, "1").build()).get();
// ensureGreen();
//
// assertAllShardsOnNodes("test", backwardsCluster().backwardsNodePattern());
//
// logger.info("--> upgrade cluster");
// logClusterState();
// CountResponse countResponse = client().prepareCount().get();
// assertHitCount(countResponse, numDocs);
//
// client().admin().cluster().prepareUpdateSettings().setTransientSettings(ImmutableSettings.settingsBuilder().put(EnableAllocationDecider.CLUSTER_ROUTING_ALLOCATION_ENABLE, "none")).execute().actionGet();
// backwardsCluster().upgradeAllNodes();
// client().admin().cluster().prepareUpdateSettings().setTransientSettings(ImmutableSettings.settingsBuilder().put(EnableAllocationDecider.CLUSTER_ROUTING_ALLOCATION_ENABLE, "all")).execute().actionGet();
// ensureGreen();
//
// countResponse = client().prepareCount().get();
// assertHitCount(countResponse, numDocs);
//
// RecoveryResponse recoveryResponse = client().admin().indices().prepareRecoveries("test").setDetailed(true).get();
// for (ShardRecoveryResponse response : recoveryResponse.shardResponses().get("test")) {
// RecoveryState recoveryState = response.recoveryState();
// if (!recoveryState.getPrimary()) {
// RecoveryState.Index index = recoveryState.getIndex();
// if (compatibilityVersion().onOrAfter(Version.V_1_2_0)) {
// assertThat(index.toString(), index.recoveredBytes(), equalTo(0l));
// assertThat(index.toString(), index.reusedBytes(), greaterThan(0l));
// assertThat(index.toString(), index.reusedBytes(), equalTo(index.totalBytes()));
// assertThat(index.toString(), index.recoveredFileCount(), equalTo(0));
// assertThat(index.toString(), index.reusedFileCount(), equalTo(index.totalFileCount()));
// assertThat(index.toString(), index.reusedFileCount(), greaterThan(0));
// assertThat(index.toString(), index.recoveredBytesPercent(), equalTo(100.f));
// assertThat(index.toString(), index.recoveredFilesPercent(), equalTo(100.f));
// assertThat(index.toString(), index.reusedBytes(), greaterThan(index.recoveredBytes()));
// } else {
// /* We added checksums on 1.3 but they were available on 1.2 already since this uses Lucene 4.8.
// * yet in this test we upgrade the entire cluster and therefor the 1.3 nodes try to read the checksum
// * from the files even if they haven't been written with ES 1.3. Due to that we don't have to recover
// * the segments files if we are on 1.2 or above...*/
// assertThat(index.toString(), index.recoveredBytes(), greaterThan(0l));
// assertThat(index.toString(), index.recoveredFileCount(), greaterThan(0));
// assertThat(index.toString(), index.reusedBytes(), greaterThan(0l));
// assertThat(index.toString(), index.recoveredBytesPercent(), greaterThan(0.0f));
// assertThat(index.toString(), index.recoveredBytesPercent(), equalTo(100.f));
// assertThat(index.toString(), index.recoveredFilesPercent(), equalTo(100.f));
// assertThat(index.toString(), index.reusedBytes(), greaterThan(index.recoveredBytes()));
// assertThat(index.toString(), index.recoveredBytes(), lessThan(index.totalBytes()));
// }
// // TODO upgrade via optimize?
// }
// }
RecoveryResponse recoveryResponse = client().admin().indices().prepareRecoveries("test").setDetailed(true).get();
for (ShardRecoveryResponse response : recoveryResponse.shardResponses().get("test")) {
RecoveryState recoveryState = response.recoveryState();
if (!recoveryState.getPrimary()) {
RecoveryState.Index index = recoveryState.getIndex();
if (compatibilityVersion().onOrAfter(Version.V_1_2_0)) {
assertThat(index.toString(), index.recoveredBytes(), equalTo(0l));
assertThat(index.toString(), index.reusedBytes(), greaterThan(0l));
assertThat(index.toString(), index.reusedBytes(), equalTo(index.totalBytes()));
assertThat(index.toString(), index.recoveredFileCount(), equalTo(0));
assertThat(index.toString(), index.reusedFileCount(), equalTo(index.totalFileCount()));
assertThat(index.toString(), index.reusedFileCount(), greaterThan(0));
assertThat(index.toString(), index.recoveredBytesPercent(), equalTo(100.f));
assertThat(index.toString(), index.recoveredFilesPercent(), equalTo(100.f));
assertThat(index.toString(), index.reusedBytes(), greaterThan(index.recoveredBytes()));
} else {
/* We added checksums on 1.3 but they were available on 1.2 already since this uses Lucene 4.8.
* yet in this test we upgrade the entire cluster and therefor the 1.3 nodes try to read the checksum
* from the files even if they haven't been written with ES 1.3. Due to that we don't have to recover
* the segments files if we are on 1.2 or above...*/
assertThat(index.toString(), index.recoveredBytes(), greaterThan(0l));
assertThat(index.toString(), index.recoveredFileCount(), greaterThan(0));
assertThat(index.toString(), index.reusedBytes(), greaterThan(0l));
assertThat(index.toString(), index.recoveredBytesPercent(), greaterThan(0.0f));
assertThat(index.toString(), index.recoveredBytesPercent(), equalTo(100.f));
assertThat(index.toString(), index.recoveredFilesPercent(), equalTo(100.f));
assertThat(index.toString(), index.reusedBytes(), greaterThan(index.recoveredBytes()));
assertThat(index.toString(), index.recoveredBytes(), lessThan(index.totalBytes()));
}
// TODO upgrade via optimize?
}
}
}
}

0 comments on commit 415c183

Please sign in to comment.