Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP
Browse files

Merge pull request #256 from benjchristensen/timeout-race-condition

Fix Race Condition on Timeout
  • Loading branch information...
commit ade109a656b75ad5d27e1e5c1be38fd0796833b1 2 parents a3724bc + 1ef8171
Ben Christensen benjchristensen authored
149 hystrix-core/src/main/java/com/netflix/hystrix/HystrixObservableCommand.java
View
@@ -15,37 +15,26 @@
*/
package com.netflix.hystrix;
-import java.lang.ref.Reference;
-import java.lang.ref.SoftReference;
-import java.util.concurrent.RejectedExecutionException;
-import java.util.concurrent.TimeoutException;
+import java.lang.ref.*;
+import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicReference;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import org.slf4j.*;
-import rx.Notification;
-import rx.Observable;
+import rx.*;
import rx.Observable.OnSubscribe;
import rx.Observable.Operator;
-import rx.Scheduler;
-import rx.Subscriber;
-import rx.functions.Action0;
-import rx.functions.Action1;
-import rx.functions.Func1;
+import rx.functions.*;
import rx.schedulers.Schedulers;
import rx.subscriptions.CompositeSubscription;
import com.netflix.hystrix.HystrixCommandProperties.ExecutionIsolationStrategy;
-import com.netflix.hystrix.exception.HystrixBadRequestException;
-import com.netflix.hystrix.exception.HystrixRuntimeException;
+import com.netflix.hystrix.exception.*;
import com.netflix.hystrix.exception.HystrixRuntimeException.FailureType;
-import com.netflix.hystrix.strategy.concurrency.HystrixContextRunnable;
-import com.netflix.hystrix.strategy.concurrency.HystrixContextScheduler;
-import com.netflix.hystrix.strategy.concurrency.HystrixRequestContext;
+import com.netflix.hystrix.strategy.concurrency.*;
import com.netflix.hystrix.strategy.executionhook.HystrixCommandExecutionHook;
import com.netflix.hystrix.strategy.properties.HystrixPropertiesStrategy;
-import com.netflix.hystrix.util.HystrixTimer;
+import com.netflix.hystrix.util.*;
import com.netflix.hystrix.util.HystrixTimer.TimerListener;
/**
@@ -55,7 +44,7 @@
*
* @param <R>
* the return type
- *
+ *
* @ThreadSafe
*/
public abstract class HystrixObservableCommand<R> extends HystrixExecutableBase<R> implements HystrixExecutable<R>, HystrixExecutableInfo<R> {
@@ -261,7 +250,7 @@ public Setter andCommandPropertiesDefaults(HystrixCommandProperties.Setter comma
return toObservable(Schedulers.immediate());
}
- protected ObservableCommand<R> toObservable(final Scheduler observeOn, boolean performAsyncTimeout) {
+ protected ObservableCommand<R> toObservable(final Scheduler observeOn, final boolean performAsyncTimeout) {
/* this is a stateful object so can only be used once */
if (!started.compareAndSet(false, true)) {
throw new IllegalStateException("This instance can only be executed once. Please instantiate a new instance.");
@@ -303,7 +292,7 @@ public void call(Subscriber<? super R> observer) {
// store the command that is being run
endCurrentThreadExecutingCommand.set(Hystrix.startCurrentThreadExecutingCommand(getCommandKey()));
- getRunObservableDecoratedForMetricsAndErrorHandling(observeOn)
+ getRunObservableDecoratedForMetricsAndErrorHandling(observeOn, performAsyncTimeout)
.doOnTerminate(new Action0() {
@Override
@@ -337,9 +326,6 @@ public void call() {
}
});
- // wrap for timeout support
- o = o.lift(new HystrixObservableTimeoutOperator<R>(_this, performAsyncTimeout));
-
// error handling at very end (this means fallback didn't exist or failed)
o = o.onErrorResumeNext(new Func1<Throwable, Observable<R>>() {
@@ -404,9 +390,7 @@ public void call() {
*
* @return R
*/
- private Observable<R> getRunObservableDecoratedForMetricsAndErrorHandling(final Scheduler observeOn) {
- final HystrixObservableCommand<R> _cmd = this;
-
+ private Observable<R> getRunObservableDecoratedForMetricsAndErrorHandling(final Scheduler observeOn, final boolean performAsyncTimeout) {
final HystrixObservableCommand<R> _self = this;
// allow tracking how many concurrent threads are executing
metrics.incrementConcurrentExecutionCount();
@@ -468,29 +452,22 @@ public void call(Notification<? super R> n) {
setRequestContextIfNeeded(currentRequestContext);
}
- }).map(new Func1<R, R>() {
+ }).lift(new HystrixObservableTimeoutOperator<R>(_self, performAsyncTimeout)).map(new Func1<R, R>() {
- @Override
- public R call(R t1) {
- return executionHook.onRunSuccess(_cmd, t1);
- }
+ /**
+ * If we get here it means we did not timeout, otherwise it will skip this and go to onErrorResumeNext
+ */
- }).doOnCompleted(new Action0() {
- // this must come before onErrorResumeNext as we only want successful onCompletes processed here
@Override
- public void call() {
- if (isCommandTimedOut.get() == TimedOutStatus.TIMED_OUT) {
- // the command timed out in the wrapping thread so we will return immediately
- // and not increment any of the counters below or other such logic
- } else {
- long duration = System.currentTimeMillis() - invocationStartTime;
- metrics.addCommandExecutionTime(duration);
- // report success
- executionResult = executionResult.addEvents(HystrixEventType.SUCCESS);
- metrics.markSuccess(duration);
- circuitBreaker.markSuccess();
- eventNotifier.markCommandExecution(getCommandKey(), properties.executionIsolationStrategy().get(), (int) duration, executionResult.events);
- }
+ public R call(R t1) {
+ long duration = System.currentTimeMillis() - invocationStartTime;
+ metrics.addCommandExecutionTime(duration);
+ // report success
+ executionResult = executionResult.addEvents(HystrixEventType.SUCCESS);
+ metrics.markSuccess(duration);
+ circuitBreaker.markSuccess();
+ eventNotifier.markCommandExecution(getCommandKey(), properties.executionIsolationStrategy().get(), (int) duration, executionResult.events);
+ return executionHook.onRunSuccess(_self, t1);
}
}).onErrorResumeNext(new Func1<Throwable, Observable<R>>() {
@@ -499,11 +476,27 @@ public void call() {
public Observable<R> call(Throwable t) {
Exception e = getExceptionFromThrowable(t);
if (e instanceof RejectedExecutionException) {
- // mark on counter
+ /**
+ * Rejection handling
+ */
metrics.markThreadPoolRejection();
// use a fallback instead (or throw exception if not implemented)
return getFallbackOrThrowException(HystrixEventType.THREAD_POOL_REJECTED, FailureType.REJECTED_THREAD_EXECUTION, "could not be queued for execution", e);
+ } else if (t instanceof HystrixObservableTimeoutOperator.HystrixTimeoutException) {
+ /**
+ * Timeout handling
+ */
+ Observable<R> v = getFallbackOrThrowException(HystrixEventType.TIMEOUT, FailureType.TIMEOUT, "timed-out", new TimeoutException());
+ /*
+ * we subscribeOn the computation scheduler as we don't want to use the Timer thread, nor can we use the
+ * THREAD isolation pool as it may be saturated and that's the reason we're in fallback. The fallback logic
+ * should not perform IO and thus we run on the computation event loops.
+ */
+ return v.subscribeOn(new HystrixContextScheduler(concurrencyStrategy, Schedulers.computation()));
} else if (t instanceof HystrixBadRequestException) {
+ /**
+ * BadRequest handling
+ */
try {
Exception decorated = executionHook.onRunError(_self, (Exception) t);
@@ -521,22 +514,16 @@ public void call() {
*/
return Observable.error(t);
} else {
+ /**
+ * All other error handling
+ */
try {
e = executionHook.onRunError(_self, e);
} catch (Exception hookException) {
logger.warn("Error calling ExecutionHook.endRunFailure", hookException);
}
- if (isCommandTimedOut.get() == TimedOutStatus.TIMED_OUT) {
- // http://jira/browse/API-4905 HystrixCommand: Error/Timeout Double-count if both occur
- // this means we have already timed out then we don't count this error stat and we just return
- // as this means the user-thread has already returned, we've already done fallback logic
- // and we've already counted the timeout stat
- logger.debug("Error executing HystrixCommand.run() [TimedOut]. Proceeding to fallback logic ...", e);
- return Observable.empty();
- } else {
- logger.debug("Error executing HystrixCommand.run(). Proceeding to fallback logic ...", e);
- }
+ logger.debug("Error executing HystrixCommand.run(). Proceeding to fallback logic ...", e);
// report failure
metrics.markFailure(System.currentTimeMillis() - invocationStartTime);
@@ -557,7 +544,7 @@ public void call(Notification<? super R> n) {
@Override
public R call(R t1) {
// allow transforming the results via the executionHook whether it came from success or fallback
- return executionHook.onComplete(_cmd, t1);
+ return executionHook.onComplete(_self, t1);
}
});
@@ -776,6 +763,12 @@ public HystrixObservableTimeoutOperator(final HystrixObservableCommand<R> origin
this.isNonBlocking = isNonBlocking;
}
+ public static class HystrixTimeoutException extends Exception {
+
+ private static final long serialVersionUID = 7460860948388895401L;
+
+ }
+
@Override
public Subscriber<? super R> call(final Subscriber<? super R> child) {
final CompositeSubscription s = new CompositeSubscription();
@@ -790,17 +783,7 @@ public HystrixObservableTimeoutOperator(final HystrixObservableCommand<R> origin
@Override
public void run() {
- try {
- Observable<R> v = originalCommand.getFallbackOrThrowException(HystrixEventType.TIMEOUT, FailureType.TIMEOUT, "timed-out", new TimeoutException());
- /*
- * we subscribeOn the computation scheduler as we don't want to use the Timer thread, nor can we use the
- * THREAD isolation pool as it may be saturated and that's the reason we're in fallback. The fallback logic
- * should not perform IO and thus we run on the computation event loops.
- */
- v.subscribeOn(new HystrixContextScheduler(originalCommand.concurrencyStrategy, Schedulers.computation())).unsafeSubscribe(child);
- } catch (HystrixRuntimeException re) {
- child.onError(re);
- }
+ child.onError(new HystrixTimeoutException());
}
});
@@ -818,11 +801,12 @@ public void tick() {
// we record execution time because we are returning before
originalCommand.recordTotalExecutionTime(originalCommand.invocationStartTime);
+ // shut down the original request
+ s.unsubscribe();
+
timeoutRunnable.run();
}
- // shut down the original request
- s.unsubscribe();
}
@Override
@@ -852,11 +836,15 @@ public int getIntervalTimeInMilliseconds() {
// set externally so execute/queue can see this
originalCommand.timeoutTimer.set(tl);
+ /**
+ * If this subscriber receives values it means the parent succeeded/completed
+ */
return new Subscriber<R>(s) {
@Override
public void onCompleted() {
- if (originalCommand.isCommandTimedOut.compareAndSet(TimedOutStatus.NOT_EXECUTED, TimedOutStatus.COMPLETED)) {
+ if (isNotTimedOut()) {
+ // stop timer and pass notification through
tl.clear();
child.onCompleted();
}
@@ -864,7 +852,8 @@ public void onCompleted() {
@Override
public void onError(Throwable e) {
- if (originalCommand.isCommandTimedOut.compareAndSet(TimedOutStatus.NOT_EXECUTED, TimedOutStatus.COMPLETED)) {
+ if (isNotTimedOut()) {
+ // stop timer and pass notification through
tl.clear();
child.onError(e);
}
@@ -872,13 +861,17 @@ public void onError(Throwable e) {
@Override
public void onNext(R v) {
- // TODO does this need to compareAndSet(TimedOutStatus.NOT_EXECUTED, TimedOutStatus.NOT_EXECUTED)
- // to be thread-safe, and does that even work?
- if (originalCommand.isCommandTimedOut.get().equals(TimedOutStatus.NOT_EXECUTED)) {
+ if (isNotTimedOut()) {
child.onNext(v);
}
}
+ private boolean isNotTimedOut() {
+ // if already marked COMPLETED (by onNext) or succeeds in setting to COMPLETED
+ return originalCommand.isCommandTimedOut.get() == TimedOutStatus.COMPLETED ||
+ originalCommand.isCommandTimedOut.compareAndSet(TimedOutStatus.NOT_EXECUTED, TimedOutStatus.COMPLETED);
+ }
+
};
}
64 hystrix-core/src/test/java/com/netflix/hystrix/HystrixCommandTimeoutConcurrencyTesting.java
View
@@ -0,0 +1,64 @@
+package com.netflix.hystrix;
+import org.junit.Test;
+
+import com.netflix.hystrix.strategy.concurrency.HystrixRequestContext;
+
+public class HystrixCommandTimeoutConcurrencyTesting {
+
+ @Test
+ public void testTimeoutRace() {
+ for (int i = 0; i < 2000; i++) {
+ String a = null;
+ String b = null;
+ try {
+ HystrixRequestContext.initializeContext();
+ a = new TestCommand().execute();
+ b = new TestCommand().execute();
+ if (a == null || b == null) {
+ System.err.println("Received NULL!");
+ throw new RuntimeException("Received NULL");
+ }
+
+ for (HystrixExecutableInfo<?> hi : HystrixRequestLog.getCurrentRequest().getAllExecutedCommands()) {
+ if (hi.isResponseTimedOut() && hi.getExecutionEvents().size() == 1) {
+ System.err.println("Missing fallback status!");
+ throw new RuntimeException("Missing fallback status on timeout.");
+ }
+ }
+
+ } catch (Exception e) {
+ System.err.println("Error: " + e.getMessage());
+ e.printStackTrace();
+ throw new RuntimeException(e);
+ } finally {
+ System.out.println(a + " " + b + " ==> " + HystrixRequestLog.getCurrentRequest().getExecutedCommandsAsString());
+ HystrixRequestContext.getContextForCurrentThread().shutdown();
+ }
+ }
+
+ Hystrix.reset();
+ }
+
+ public static class TestCommand extends HystrixCommand<String> {
+
+ protected TestCommand() {
+ super(Setter.withGroupKey(HystrixCommandGroupKey.Factory.asKey("testTimeoutConcurrency"))
+ .andCommandKey(HystrixCommandKey.Factory.asKey("testTimeoutConcurrencyCommand"))
+ .andCommandPropertiesDefaults(HystrixCommandProperties.Setter()
+ .withExecutionIsolationThreadTimeoutInMilliseconds(1)));
+ }
+
+ @Override
+ protected String run() throws Exception {
+ // throw new RuntimeException("test");
+ // Thread.sleep(5);
+ return "hello";
+ }
+
+ @Override
+ protected String getFallback() {
+ return "failed";
+ }
+
+ }
+}
Please sign in to comment.
Something went wrong with that request. Please try again.