Skip to content

Commit

Permalink
[UPDATE] Handle document missing exception on retry correctly.
Browse files Browse the repository at this point in the history
Throwables thrown on update retries are now caught and handled via
the provided callback. This commit also contains an integration test
demonstrating the bug and validating the fix.

Closes #6355
Closes #6724
  • Loading branch information
peschlowp authored and s1monw committed Jul 15, 2014
1 parent 3c54eb9 commit 9742d08
Show file tree
Hide file tree
Showing 5 changed files with 528 additions and 6 deletions.
43 changes: 43 additions & 0 deletions src/main/java/org/elasticsearch/action/ActionRunnable.java
@@ -0,0 +1,43 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.elasticsearch.action;

/**
* Base class for {@link Runnable}s that need to call {@link ActionListener#onFailure(Throwable)} in case an uncaught
* exception or error is thrown while the actual action is run.
*/
public abstract class ActionRunnable<Response> implements Runnable {

protected final ActionListener<Response> listener;

public ActionRunnable(ActionListener<Response> listener) {
this.listener = listener;
}

public final void run() {
try {
doRun();
} catch (Throwable t) {
listener.onFailure(t);
}
}

protected abstract void doRun();
}
Expand Up @@ -24,6 +24,7 @@
import org.elasticsearch.ElasticsearchIllegalStateException;
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.ActionRunnable;
import org.elasticsearch.action.RoutingMissingException;
import org.elasticsearch.action.admin.indices.create.CreateIndexRequest;
import org.elasticsearch.action.admin.indices.create.CreateIndexResponse;
Expand Down Expand Up @@ -205,9 +206,9 @@ public void onFailure(Throwable e) {
e = ExceptionsHelper.unwrapCause(e);
if (e instanceof VersionConflictEngineException || e instanceof DocumentAlreadyExistsException) {
if (retryCount < request.retryOnConflict()) {
threadPool.executor(executor()).execute(new Runnable() {
threadPool.executor(executor()).execute(new ActionRunnable<UpdateResponse>(listener) {
@Override
public void run() {
protected void doRun() {
shardOperation(request, listener, retryCount + 1);
}
});
Expand Down Expand Up @@ -235,9 +236,9 @@ public void onFailure(Throwable e) {
e = ExceptionsHelper.unwrapCause(e);
if (e instanceof VersionConflictEngineException) {
if (retryCount < request.retryOnConflict()) {
threadPool.executor(executor()).execute(new Runnable() {
threadPool.executor(executor()).execute(new ActionRunnable<UpdateResponse>(listener) {
@Override
public void run() {
protected void doRun() {
shardOperation(request, listener, retryCount + 1);
}
});
Expand All @@ -263,9 +264,9 @@ public void onFailure(Throwable e) {
e = ExceptionsHelper.unwrapCause(e);
if (e instanceof VersionConflictEngineException) {
if (retryCount < request.retryOnConflict()) {
threadPool.executor(executor()).execute(new Runnable() {
threadPool.executor(executor()).execute(new ActionRunnable<UpdateResponse>(listener) {
@Override
public void run() {
protected void doRun() {
shardOperation(request, listener, retryCount + 1);
}
});
Expand Down
@@ -0,0 +1,221 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.elasticsearch.action.update;

import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.index.analysis.AnalysisService;
import org.elasticsearch.index.codec.CodecService;
import org.elasticsearch.index.deletionpolicy.SnapshotDeletionPolicy;
import org.elasticsearch.index.engine.Engine;
import org.elasticsearch.index.engine.EngineException;
import org.elasticsearch.index.engine.VersionConflictEngineException;
import org.elasticsearch.index.engine.internal.InternalEngine;
import org.elasticsearch.index.indexing.ShardIndexingService;
import org.elasticsearch.index.mapper.Uid;
import org.elasticsearch.index.merge.policy.MergePolicyProvider;
import org.elasticsearch.index.merge.scheduler.MergeSchedulerProvider;
import org.elasticsearch.index.settings.IndexSettingsService;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.index.similarity.SimilarityService;
import org.elasticsearch.index.store.Store;
import org.elasticsearch.index.translog.Translog;
import org.elasticsearch.indices.warmer.IndicesWarmer;
import org.elasticsearch.threadpool.ThreadPool;

import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;

/**
* An implementation of {@link Engine} only intended for use with {@link TransportUpdateActionTest}.
*/
public class InternalEngineWithControllableTimingForTesting extends InternalEngine implements Engine {

/*
* Not the best programming practice, but a simple way to make the instance accessible from test classes. The
* "cleaner" way - making the appropriate Guice injector of the respective index available to the test class is
* rather difficult and fragile, too. As long as tests requiring multiple instances of this class are not run in
* parallel, everything will be fine. Currently, there is just a single test suite that uses only a single instance
* anyway.
*/
public static InternalEngineWithControllableTimingForTesting currentTestInstance;

private AtomicBoolean nextGetThrowsException = new AtomicBoolean();

private Semaphore createOperationReceived = new Semaphore(0);
private Semaphore letCreateOperationBegin = new Semaphore(0);
private Semaphore createOperationFinished = new Semaphore(0);
private Semaphore letCreateOperationReturn = new Semaphore(0);

private Semaphore indexOperationReceived = new Semaphore(0);
private Semaphore letIndexOperationBegin = new Semaphore(0);
private Semaphore indexOperationFinished = new Semaphore(0);
private Semaphore letIndexOperationReturn = new Semaphore(0);

private Semaphore deleteOperationReceived = new Semaphore(0);
private Semaphore letDeleteOperationBegin = new Semaphore(0);
private Semaphore deleteOperationFinished = new Semaphore(0);
private Semaphore letDeleteOperationReturn = new Semaphore(0);

// safety timeout so that if something goes wrong the test does not block forever
private static final long SEMAPHORE_ACQUIRE_TIMEOUT_SECONDS = 5;

@Inject
public InternalEngineWithControllableTimingForTesting(ShardId shardId, Settings indexSettings, ThreadPool threadPool,
IndexSettingsService indexSettingsService, ShardIndexingService indexingService, IndicesWarmer warmer, Store store,
SnapshotDeletionPolicy deletionPolicy, Translog translog, MergePolicyProvider mergePolicyProvider,
MergeSchedulerProvider mergeScheduler, AnalysisService analysisService, SimilarityService similarityService,
CodecService codecService) throws EngineException {
super(shardId, indexSettings, threadPool, indexSettingsService, indexingService, warmer, store, deletionPolicy, translog,
mergePolicyProvider, mergeScheduler, analysisService, similarityService, codecService);
// 'this' escapes from the constructor, but for the purpose of this test it is fine.
currentTestInstance = this;
}

@Override
public GetResult get(Get get) throws EngineException {
if (nextGetThrowsException.getAndSet(false)) {
Uid uid = Uid.createUid(get.uid().text());
long dummyVersion = 1000L;
throw new VersionConflictEngineException(shardId, uid.type(), uid.id(), dummyVersion, get.version());
}
return super.get(get);
}

private void acquireWithTimeout(Semaphore semaphore) {
try {
boolean acquired = semaphore.tryAcquire(SEMAPHORE_ACQUIRE_TIMEOUT_SECONDS, TimeUnit.SECONDS);
if (!acquired){
throw new RuntimeException("(Integration test:) Cannot acquire semaphore within the specified timeout of "
+ SEMAPHORE_ACQUIRE_TIMEOUT_SECONDS + " seconds");
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}

@Override
public void create(Create create) throws EngineException {
createOperationReceived.release();
acquireWithTimeout(letCreateOperationBegin);
try {
super.create(create);
} finally {
createOperationFinished.release();
acquireWithTimeout(letCreateOperationReturn);
}
}

@Override
public void index(Index index) throws EngineException {
indexOperationReceived.release();
acquireWithTimeout(letIndexOperationBegin);
try {
super.index(index);
} finally {
indexOperationFinished.release();
acquireWithTimeout(letIndexOperationReturn);
}
}

@Override
public void delete(Delete delete) throws EngineException {
deleteOperationReceived.release();
acquireWithTimeout(letDeleteOperationBegin);
try {
super.delete(delete);
} finally {
deleteOperationFinished.release();
acquireWithTimeout(letDeleteOperationReturn);
}
}

public void letNextGetThrowException() {
nextGetThrowsException.set(true);
}

public void waitUntilCreateOperationReceived() {
acquireWithTimeout(createOperationReceived);
}

public void letCreateOperationBegin() {
letCreateOperationBegin.release();
}

public void waitUntilCreateOperationFinished() {
acquireWithTimeout(createOperationFinished);
}

public void letCreateOperationReturn() {
letCreateOperationReturn.release();
}

public void waitUntilIndexOperationReceived() {
acquireWithTimeout(indexOperationReceived);
}

public void letIndexOperationBegin() {
letIndexOperationBegin.release();
}

public void waitUntilIndexOperationFinished() {
acquireWithTimeout(indexOperationFinished);
}

public void letIndexOperationReturn() {
letIndexOperationReturn.release();
}

public void waitUntilDeleteOperationReceived() {
acquireWithTimeout(deleteOperationReceived);
}

public void letDeleteOperationBegin() {
letDeleteOperationBegin.release();
}

public void waitUntilDeleteOperationFinished() {
acquireWithTimeout(deleteOperationFinished);
}

public void letDeleteOperationReturn() {
letDeleteOperationReturn.release();
}

public void resetSemaphores() {
nextGetThrowsException = new AtomicBoolean();

createOperationReceived = new Semaphore(0);
letCreateOperationBegin = new Semaphore(0);
createOperationFinished = new Semaphore(0);
letCreateOperationReturn = new Semaphore(0);

indexOperationReceived = new Semaphore(0);
letIndexOperationBegin = new Semaphore(0);
indexOperationFinished = new Semaphore(0);
letIndexOperationReturn = new Semaphore(0);

deleteOperationReceived = new Semaphore(0);
letDeleteOperationBegin = new Semaphore(0);
deleteOperationFinished = new Semaphore(0);
letDeleteOperationReturn = new Semaphore(0);
}
}
@@ -0,0 +1,34 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.elasticsearch.action.update;

import org.elasticsearch.common.inject.AbstractModule;
import org.elasticsearch.index.engine.Engine;

/**
* Provides an implementation of {@link Engine} only intended for use with {@link TransportUpdateActionTest}.
*/
public class InternalEngineWithControllableTimingForTestingModule extends AbstractModule {

@Override
protected void configure() {
bind(Engine.class).to(InternalEngineWithControllableTimingForTesting.class).asEagerSingleton();
}
}

0 comments on commit 9742d08

Please sign in to comment.