Skip to content

Commit

Permalink
Fix timeout race conditions
Browse files Browse the repository at this point in the history
  • Loading branch information
benjchristensen committed May 2, 2014
1 parent a3724bc commit 58da8b3
Showing 1 changed file with 71 additions and 78 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -15,37 +15,26 @@
*/
package com.netflix.hystrix;

import java.lang.ref.Reference;
import java.lang.ref.SoftReference;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.TimeoutException;
import java.lang.ref.*;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicReference;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.slf4j.*;

import rx.Notification;
import rx.Observable;
import rx.*;
import rx.Observable.OnSubscribe;
import rx.Observable.Operator;
import rx.Scheduler;
import rx.Subscriber;
import rx.functions.Action0;
import rx.functions.Action1;
import rx.functions.Func1;
import rx.functions.*;
import rx.schedulers.Schedulers;
import rx.subscriptions.CompositeSubscription;

import com.netflix.hystrix.HystrixCommandProperties.ExecutionIsolationStrategy;
import com.netflix.hystrix.exception.HystrixBadRequestException;
import com.netflix.hystrix.exception.HystrixRuntimeException;
import com.netflix.hystrix.exception.*;
import com.netflix.hystrix.exception.HystrixRuntimeException.FailureType;
import com.netflix.hystrix.strategy.concurrency.HystrixContextRunnable;
import com.netflix.hystrix.strategy.concurrency.HystrixContextScheduler;
import com.netflix.hystrix.strategy.concurrency.HystrixRequestContext;
import com.netflix.hystrix.strategy.concurrency.*;
import com.netflix.hystrix.strategy.executionhook.HystrixCommandExecutionHook;
import com.netflix.hystrix.strategy.properties.HystrixPropertiesStrategy;
import com.netflix.hystrix.util.HystrixTimer;
import com.netflix.hystrix.util.*;
import com.netflix.hystrix.util.HystrixTimer.TimerListener;

/**
Expand All @@ -55,7 +44,7 @@
*
* @param <R>
* the return type
*
*
* @ThreadSafe
*/
public abstract class HystrixObservableCommand<R> extends HystrixExecutableBase<R> implements HystrixExecutable<R>, HystrixExecutableInfo<R> {
Expand Down Expand Up @@ -261,7 +250,7 @@ public Observable<R> toObservable() {
return toObservable(Schedulers.immediate());
}

protected ObservableCommand<R> toObservable(final Scheduler observeOn, boolean performAsyncTimeout) {
protected ObservableCommand<R> toObservable(final Scheduler observeOn, final boolean performAsyncTimeout) {
/* this is a stateful object so can only be used once */
if (!started.compareAndSet(false, true)) {
throw new IllegalStateException("This instance can only be executed once. Please instantiate a new instance.");
Expand Down Expand Up @@ -303,7 +292,7 @@ public void call(Subscriber<? super R> observer) {
// store the command that is being run
endCurrentThreadExecutingCommand.set(Hystrix.startCurrentThreadExecutingCommand(getCommandKey()));

getRunObservableDecoratedForMetricsAndErrorHandling(observeOn)
getRunObservableDecoratedForMetricsAndErrorHandling(observeOn, performAsyncTimeout)
.doOnTerminate(new Action0() {

@Override
Expand Down Expand Up @@ -337,9 +326,6 @@ public void call() {
}
});

// wrap for timeout support
o = o.lift(new HystrixObservableTimeoutOperator<R>(_this, performAsyncTimeout));

// error handling at very end (this means fallback didn't exist or failed)
o = o.onErrorResumeNext(new Func1<Throwable, Observable<R>>() {

Expand Down Expand Up @@ -404,9 +390,7 @@ public void call() {
*
* @return R
*/
private Observable<R> getRunObservableDecoratedForMetricsAndErrorHandling(final Scheduler observeOn) {
final HystrixObservableCommand<R> _cmd = this;

private Observable<R> getRunObservableDecoratedForMetricsAndErrorHandling(final Scheduler observeOn, final boolean performAsyncTimeout) {
final HystrixObservableCommand<R> _self = this;
// allow tracking how many concurrent threads are executing
metrics.incrementConcurrentExecutionCount();
Expand Down Expand Up @@ -468,29 +452,22 @@ public void call(Notification<? super R> n) {
setRequestContextIfNeeded(currentRequestContext);
}

}).map(new Func1<R, R>() {
}).lift(new HystrixObservableTimeoutOperator<R>(_self, performAsyncTimeout)).map(new Func1<R, R>() {

@Override
public R call(R t1) {
return executionHook.onRunSuccess(_cmd, t1);
}
/**
* If we get here it means we did not timeout, otherwise it will skip this and go to onErrorResumeNext
*/

}).doOnCompleted(new Action0() {
// this must come before onErrorResumeNext as we only want successful onCompletes processed here
@Override
public void call() {
if (isCommandTimedOut.get() == TimedOutStatus.TIMED_OUT) {
// the command timed out in the wrapping thread so we will return immediately
// and not increment any of the counters below or other such logic
} else {
long duration = System.currentTimeMillis() - invocationStartTime;
metrics.addCommandExecutionTime(duration);
// report success
executionResult = executionResult.addEvents(HystrixEventType.SUCCESS);
metrics.markSuccess(duration);
circuitBreaker.markSuccess();
eventNotifier.markCommandExecution(getCommandKey(), properties.executionIsolationStrategy().get(), (int) duration, executionResult.events);
}
public R call(R t1) {
long duration = System.currentTimeMillis() - invocationStartTime;
metrics.addCommandExecutionTime(duration);
// report success
executionResult = executionResult.addEvents(HystrixEventType.SUCCESS);
metrics.markSuccess(duration);
circuitBreaker.markSuccess();
eventNotifier.markCommandExecution(getCommandKey(), properties.executionIsolationStrategy().get(), (int) duration, executionResult.events);
return executionHook.onRunSuccess(_self, t1);
}

}).onErrorResumeNext(new Func1<Throwable, Observable<R>>() {
Expand All @@ -499,11 +476,27 @@ public void call() {
public Observable<R> call(Throwable t) {
Exception e = getExceptionFromThrowable(t);
if (e instanceof RejectedExecutionException) {
// mark on counter
/**
* Rejection handling
*/
metrics.markThreadPoolRejection();
// use a fallback instead (or throw exception if not implemented)
return getFallbackOrThrowException(HystrixEventType.THREAD_POOL_REJECTED, FailureType.REJECTED_THREAD_EXECUTION, "could not be queued for execution", e);
} else if (t instanceof HystrixObservableTimeoutOperator.HystrixTimeoutException) {
/**
* Timeout handling
*/
Observable<R> v = getFallbackOrThrowException(HystrixEventType.TIMEOUT, FailureType.TIMEOUT, "timed-out", new TimeoutException());
/*
* we subscribeOn the computation scheduler as we don't want to use the Timer thread, nor can we use the
* THREAD isolation pool as it may be saturated and that's the reason we're in fallback. The fallback logic
* should not perform IO and thus we run on the computation event loops.
*/
return v.subscribeOn(new HystrixContextScheduler(concurrencyStrategy, Schedulers.computation()));
} else if (t instanceof HystrixBadRequestException) {
/**
* BadRequest handling
*/
try {
Exception decorated = executionHook.onRunError(_self, (Exception) t);

Expand All @@ -521,22 +514,16 @@ public Observable<R> call(Throwable t) {
*/
return Observable.error(t);
} else {
/**
* All other error handling
*/
try {
e = executionHook.onRunError(_self, e);
} catch (Exception hookException) {
logger.warn("Error calling ExecutionHook.endRunFailure", hookException);
}

if (isCommandTimedOut.get() == TimedOutStatus.TIMED_OUT) {
// http://jira/browse/API-4905 HystrixCommand: Error/Timeout Double-count if both occur
// this means we have already timed out then we don't count this error stat and we just return
// as this means the user-thread has already returned, we've already done fallback logic
// and we've already counted the timeout stat
logger.debug("Error executing HystrixCommand.run() [TimedOut]. Proceeding to fallback logic ...", e);
return Observable.empty();
} else {
logger.debug("Error executing HystrixCommand.run(). Proceeding to fallback logic ...", e);
}
logger.debug("Error executing HystrixCommand.run(). Proceeding to fallback logic ...", e);

// report failure
metrics.markFailure(System.currentTimeMillis() - invocationStartTime);
Expand All @@ -557,7 +544,7 @@ public void call(Notification<? super R> n) {
@Override
public R call(R t1) {
// allow transforming the results via the executionHook whether it came from success or fallback
return executionHook.onComplete(_cmd, t1);
return executionHook.onComplete(_self, t1);
}

});
Expand Down Expand Up @@ -776,6 +763,12 @@ public HystrixObservableTimeoutOperator(final HystrixObservableCommand<R> origin
this.isNonBlocking = isNonBlocking;
}

public static class HystrixTimeoutException extends Exception {

private static final long serialVersionUID = 7460860948388895401L;

}

@Override
public Subscriber<? super R> call(final Subscriber<? super R> child) {
final CompositeSubscription s = new CompositeSubscription();
Expand All @@ -790,17 +783,7 @@ public Subscriber<? super R> call(final Subscriber<? super R> child) {

@Override
public void run() {
try {
Observable<R> v = originalCommand.getFallbackOrThrowException(HystrixEventType.TIMEOUT, FailureType.TIMEOUT, "timed-out", new TimeoutException());
/*
* we subscribeOn the computation scheduler as we don't want to use the Timer thread, nor can we use the
* THREAD isolation pool as it may be saturated and that's the reason we're in fallback. The fallback logic
* should not perform IO and thus we run on the computation event loops.
*/
v.subscribeOn(new HystrixContextScheduler(originalCommand.concurrencyStrategy, Schedulers.computation())).unsafeSubscribe(child);
} catch (HystrixRuntimeException re) {
child.onError(re);
}
child.onError(new HystrixTimeoutException());
}
});

Expand All @@ -818,11 +801,12 @@ public void tick() {
// we record execution time because we are returning before
originalCommand.recordTotalExecutionTime(originalCommand.invocationStartTime);

// shut down the original request
s.unsubscribe();

timeoutRunnable.run();
}

// shut down the original request
s.unsubscribe();
}

@Override
Expand Down Expand Up @@ -852,33 +836,42 @@ public int getIntervalTimeInMilliseconds() {
// set externally so execute/queue can see this
originalCommand.timeoutTimer.set(tl);

/**
* If this subscriber receives values it means the parent succeeded/completed
*/
return new Subscriber<R>(s) {

@Override
public void onCompleted() {
if (originalCommand.isCommandTimedOut.compareAndSet(TimedOutStatus.NOT_EXECUTED, TimedOutStatus.COMPLETED)) {
if (isNotTimedOut()) {
// stop timer and pass notification through
tl.clear();
child.onCompleted();
}
}

@Override
public void onError(Throwable e) {
if (originalCommand.isCommandTimedOut.compareAndSet(TimedOutStatus.NOT_EXECUTED, TimedOutStatus.COMPLETED)) {
if (isNotTimedOut()) {
// stop timer and pass notification through
tl.clear();
child.onError(e);
}
}

@Override
public void onNext(R v) {
// TODO does this need to compareAndSet(TimedOutStatus.NOT_EXECUTED, TimedOutStatus.NOT_EXECUTED)
// to be thread-safe, and does that even work?
if (originalCommand.isCommandTimedOut.get().equals(TimedOutStatus.NOT_EXECUTED)) {
if (isNotTimedOut()) {
child.onNext(v);
}
}

private boolean isNotTimedOut() {
// if already marked COMPLETED (by onNext) or succeeds in setting to COMPLETED
return originalCommand.isCommandTimedOut.get() == TimedOutStatus.COMPLETED ||
originalCommand.isCommandTimedOut.compareAndSet(TimedOutStatus.NOT_EXECUTED, TimedOutStatus.COMPLETED);
}

};
}

Expand Down

0 comments on commit 58da8b3

Please sign in to comment.