Skip to content

Commit

Permalink
first cut all tests pass
Browse files Browse the repository at this point in the history
Relates to elastic#10585
  • Loading branch information
s1monw committed Apr 21, 2015
1 parent 63db34f commit 5fd56da
Show file tree
Hide file tree
Showing 3 changed files with 275 additions and 8 deletions.
Expand Up @@ -18,9 +18,49 @@
*/
package org.elasticsearch.index.engine;

import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.common.lease.Releasables;
import org.elasticsearch.common.util.concurrent.ReleasableLock;
import org.elasticsearch.index.deletionpolicy.SnapshotIndexCommit;
import org.elasticsearch.index.translog.Translog;

public class InternalEngineFactory implements EngineFactory {
@Override
public Engine newReadWriteEngine(EngineConfig config, boolean skipTranslogRecovery) {
if (IndexMetaData.isOnSharedFilesystem(config.getIndexSettings())) {
return new InternalEngine(config, skipTranslogRecovery) {
@Override
public void recover(RecoveryHandler recoveryHandler) throws EngineException {
store.incRef();
try {
try (ReleasableLock lock = writeLock.acquire()) {
// phase1 under lock
ensureOpen();
try {
recoveryHandler.phase1(null);
} catch (Throwable e) {
maybeFailEngine("recovery phase 1", e);
throw new RecoveryEngineException(shardId, 1, "Execution failed", wrapIfClosed(e));
}
}
try {
recoveryHandler.phase2(null);
} catch (Throwable e) {
maybeFailEngine("recovery phase 2", e);
throw new RecoveryEngineException(shardId, 2, "Execution failed", wrapIfClosed(e));
}
try {
recoveryHandler.phase3(null);
} catch (Throwable e) {
maybeFailEngine("recovery phase 3", e);
throw new RecoveryEngineException(shardId, 3, "Execution failed", wrapIfClosed(e));
}
} finally {
store.decRef();
}
}
};
}
return new InternalEngine(config, skipTranslogRecovery);
}

Expand Down
Expand Up @@ -30,6 +30,9 @@
import org.elasticsearch.indices.IndicesService;
import org.elasticsearch.transport.TransportService;

import java.io.IOException;
import java.util.concurrent.atomic.AtomicBoolean;

/**
* A recovery handler that skips phase 1 as well as sending the snapshot. During phase 3 the shard is marked
* as relocated an closed to ensure that the engine is closed and the target can acquire the IW write lock.
Expand All @@ -38,6 +41,7 @@ public class SharedFSRecoverySourceHandler extends RecoverySourceHandler {

private final IndexShard shard;
private final StartRecoveryRequest request;
private final AtomicBoolean engineClosed = new AtomicBoolean(false);

public SharedFSRecoverySourceHandler(IndexShard shard, StartRecoveryRequest request, RecoverySettings recoverySettings, TransportService transportService, ClusterService clusterService, IndicesService indicesService, MappingUpdatedAction mappingUpdatedAction, ESLogger logger) {
super(shard, request, recoverySettings, transportService, clusterService, indicesService, mappingUpdatedAction, logger);
Expand All @@ -47,15 +51,18 @@ public SharedFSRecoverySourceHandler(IndexShard shard, StartRecoveryRequest requ

@Override
public void phase1(SnapshotIndexCommit snapshot) throws ElasticsearchException {
if (request.recoveryType() == RecoveryState.Type.RELOCATION && shard.routingEntry().primary()) {
// here we simply fail the primary shard since we can't move them (have 2 writers open at the same time)
// by failing the shard we play safe and just go through the entire reallocation procedure of the primary
// it would be ideal to make sure we flushed the translog here but that is not possible in the current design.
ElasticsearchIllegalStateException exception = new ElasticsearchIllegalStateException("Can't relocate primary - failing");
shard.failShard("primary_relocation", exception);
throw exception;
if (isPrimaryRelocation()) {
logger.debug("[phase1] closing engine on primary for shared filesystem recovery");
shard.engine().flush(true, true);
try {
shard.engine().close();
engineClosed.set(true);
} catch (IOException e) {
logger.warn("close engine failed", e);
shard.failShard("failed to close engine (phase1)", e);
}
}
logger.trace("{} recovery [phase2] to {}: skipping phase 1 for shared filesystem", request.shardId(), request.targetNode());
logger.trace("{} recovery [phase1] to {}: skipping phase 1 for shared filesystem", request.shardId(), request.targetNode());
}


Expand All @@ -65,4 +72,41 @@ protected int sendSnapshot(Translog.Snapshot snapshot) throws ElasticsearchExcep
return 0;
}

@Override
public void phase2(Translog.Snapshot snapshot) throws ElasticsearchException {
try {
super.phase2(snapshot);
} catch (Throwable t) {
onRecoveryFailure(t);
throw t;
}
}

@Override
public void phase3(Translog.Snapshot snapshot) throws ElasticsearchException {
try {
super.phase3(snapshot);
} catch (Throwable t) {
onRecoveryFailure(t);
throw t;
}
}

private boolean isPrimaryRelocation() {
return request.recoveryType() == RecoveryState.Type.RELOCATION && shard.routingEntry().primary();
}

private void onRecoveryFailure(Throwable t) {
if (isPrimaryRelocation() && engineClosed.get()) {
// If the relocation fails then the primary is closed and can't be
// used anymore... (because it's closed) that's a problem, so in
// that case, fail the shard to reallocate a new IndexShard and
// create a new IndexWriter
logger.info("recovery failed for primary shadow shard, failing shard");
shard.failShard("primary relocation failed on shared filesystem", t);
} else {
logger.info("recovery failed on shared filesystem", t);
}
}

}
Expand Up @@ -19,30 +19,42 @@

package org.elasticsearch.index;

import com.carrotsearch.randomizedtesting.annotations.Repeat;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.action.admin.cluster.snapshots.create.CreateSnapshotResponse;
import org.elasticsearch.action.admin.cluster.snapshots.restore.RestoreSnapshotResponse;
import org.elasticsearch.action.admin.cluster.state.ClusterStateResponse;
import org.elasticsearch.action.get.GetResponse;
import org.elasticsearch.action.index.IndexRequestBuilder;
import org.elasticsearch.action.index.IndexResponse;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.routing.RoutingNode;
import org.elasticsearch.cluster.routing.RoutingNodes;
import org.elasticsearch.common.settings.ImmutableSettings;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.discovery.Discovery;
import org.elasticsearch.index.shard.IndexShard;
import org.elasticsearch.index.shard.ShadowIndexShard;
import org.elasticsearch.indices.IndicesService;
import org.elasticsearch.indices.recovery.RecoveryTarget;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.search.sort.SortOrder;
import org.elasticsearch.snapshots.SnapshotState;
import org.elasticsearch.test.ElasticsearchIntegrationTest;
import org.elasticsearch.test.InternalTestCluster;
import org.elasticsearch.test.transport.MockTransportService;
import org.elasticsearch.transport.*;
import org.junit.Test;

import java.io.IOException;
import java.nio.file.Path;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;

import static com.google.common.collect.Lists.newArrayList;
import static org.elasticsearch.index.query.QueryBuilders.matchAllQuery;
Expand Down Expand Up @@ -448,4 +460,175 @@ public void testShadowReplicasUsingFieldData() throws Exception {
assertThat(hits[2].field("foo").getValue().toString(), equalTo("eggplant"));
assertThat(hits[3].field("foo").getValue().toString(), equalTo("foo"));
}


@Test
@Repeat(iterations = 100)
public void testPrimaryRelocationWithConcurrentIndexing() throws Exception {
Settings nodeSettings = ImmutableSettings.builder()
.put("node.add_id_to_custom_path", false)
.put("node.enable_custom_paths", true)
.build();

String node1 = internalCluster().startNode(nodeSettings);
Path dataPath = createTempDir();
final String IDX = "test";

Settings idxSettings = ImmutableSettings.builder()
.put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1)
.put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 1)
.put(IndexMetaData.SETTING_DATA_PATH, dataPath.toAbsolutePath().toString())
.put(IndexMetaData.SETTING_SHADOW_REPLICAS, true)
.put(IndexMetaData.SETTING_SHARED_FILESYSTEM, true)
.build();

prepareCreate(IDX).setSettings(idxSettings).addMapping("doc", "foo", "type=string").get();
ensureYellow(IDX);
// Node1 has the primary, now node2 has the replica
String node2 = internalCluster().startNode(nodeSettings);
ensureGreen(IDX);
flushAndRefresh(IDX);
String node3 = internalCluster().startNode(nodeSettings);
final AtomicInteger counter = new AtomicInteger(0);
final CountDownLatch started = new CountDownLatch(1);

final int numPhase1Docs = scaledRandomIntBetween(25, 200);
final int numPhase2Docs = scaledRandomIntBetween(25, 200);
final CountDownLatch phase1finished = new CountDownLatch(1);
final CountDownLatch phase2finished = new CountDownLatch(1);

Thread thread = new Thread() {
@Override
public void run() {
started.countDown();
while (counter.get() < (numPhase1Docs + numPhase2Docs)) {
final IndexResponse indexResponse = client().prepareIndex(IDX, "doc",
Integer.toString(counter.incrementAndGet())).setSource("foo", "bar").get();
assertEquals(indexResponse.getShardInfo().getFailed(), 0);
final int docCount = counter.get();
if (docCount == numPhase1Docs) {
phase1finished.countDown();
}
}
logger.info("--> stopping indexing thread");
phase2finished.countDown();
}
};
thread.start();
started.await();
phase1finished.await(); // wait for a certain number of documents to be indexed
logger.info("--> excluding {} from allocation", node1);
// now prevent primary from being allocated on node 1 move to node_3
Settings build = ImmutableSettings.builder().put("index.routing.allocation.exclude._name", node1).build();
client().admin().indices().prepareUpdateSettings(IDX).setSettings(build).execute().actionGet();
// wait for more documents to be indexed post-recovery, also waits for
// indexing thread to stop
phase2finished.await();
ensureGreen(IDX);
thread.join();
logger.info("--> performing query");
flushAndRefresh();

SearchResponse resp = client().prepareSearch(IDX).setQuery(matchAllQuery()).get();
assertHitCount(resp, counter.get());
assertHitCount(resp, numPhase1Docs + numPhase2Docs);
}

@Test
@Repeat(iterations = 100)
public void testPrimaryRelocationWhereRecoveryFails() throws Exception {
Settings nodeSettings = ImmutableSettings.builder()
.put("node.add_id_to_custom_path", false)
.put("node.enable_custom_paths", true)
.put(TransportModule.TRANSPORT_SERVICE_TYPE_KEY, MockTransportService.class.getName())
.build();

String node1 = internalCluster().startNode(nodeSettings);
Path dataPath = createTempDir();
final String IDX = "test";

Settings idxSettings = ImmutableSettings.builder()
.put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1)
.put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 1)
.put(IndexMetaData.SETTING_DATA_PATH, dataPath.toAbsolutePath().toString())
.put(IndexMetaData.SETTING_SHADOW_REPLICAS, true)
.put(IndexMetaData.SETTING_SHARED_FILESYSTEM, true)
.build();

prepareCreate(IDX).setSettings(idxSettings).addMapping("doc", "foo", "type=string").get();
ensureYellow(IDX);
// Node1 has the primary, now node2 has the replica
String node2 = internalCluster().startNode(nodeSettings);
ensureGreen(IDX);
flushAndRefresh(IDX);
String node3 = internalCluster().startNode(nodeSettings);
final AtomicInteger counter = new AtomicInteger(0);
final CountDownLatch started = new CountDownLatch(1);

final int numPhase1Docs = scaledRandomIntBetween(25, 200);
final int numPhase2Docs = scaledRandomIntBetween(25, 200);
final int numPhase3Docs = scaledRandomIntBetween(25, 200);
final CountDownLatch phase1finished = new CountDownLatch(1);
final CountDownLatch phase2finished = new CountDownLatch(1);
final CountDownLatch phase3finished = new CountDownLatch(1);

final AtomicBoolean keepFailing = new AtomicBoolean(true);

MockTransportService mockTransportService = ((MockTransportService) internalCluster().getInstance(TransportService.class, node1));
mockTransportService.addDelegate(internalCluster().getInstance(Discovery.class, node3).localNode(),
new MockTransportService.DelegateTransport(mockTransportService.original()) {

@Override
public void sendRequest(DiscoveryNode node, long requestId, String action,
TransportRequest request, TransportRequestOptions options)
throws IOException, TransportException {
if (keepFailing.get() && action.equals(RecoveryTarget.Actions.TRANSLOG_OPS)) {
logger.info("--> failing translog ops");
throw new ElasticsearchException("failing on purpose");
}
super.sendRequest(node, requestId, action, request, options);
}
});

Thread thread = new Thread() {
@Override
public void run() {
started.countDown();
while (counter.get() < (numPhase1Docs + numPhase2Docs + numPhase3Docs)) {
final IndexResponse indexResponse = client().prepareIndex(IDX, "doc",
Integer.toString(counter.incrementAndGet())).setSource("foo", "bar").get();
assertEquals(indexResponse.getShardInfo().getFailed(), 0);
final int docCount = counter.get();
if (docCount == numPhase1Docs) {
phase1finished.countDown();
} else if (docCount == (numPhase1Docs + numPhase2Docs)) {
phase2finished.countDown();
}
}
logger.info("--> stopping indexing thread");
phase3finished.countDown();
}
};
thread.start();
started.await();
phase1finished.await(); // wait for a certain number of documents to be indexed
logger.info("--> excluding {} from allocation", node1);
// now prevent primary from being allocated on node 1 move to node_3
Settings build = ImmutableSettings.builder().put("index.routing.allocation.exclude._name", node1).build();
client().admin().indices().prepareUpdateSettings(IDX).setSettings(build).execute().actionGet();
// wait for more documents to be indexed post-recovery, also waits for
// indexing thread to stop
phase2finished.await();
// stop failing
keepFailing.set(false);
// wait for more docs to be indexed
phase3finished.await();
ensureGreen(IDX);
thread.join();
logger.info("--> performing query");
flushAndRefresh();

SearchResponse resp = client().prepareSearch(IDX).setQuery(matchAllQuery()).get();
assertHitCount(resp, counter.get());
}
}

0 comments on commit 5fd56da

Please sign in to comment.