Skip to content

Commit

Permalink
Merge pull request #1396 from mattrjacobs/unsubscribe-before-subscribe
Browse files Browse the repository at this point in the history
If command is unsubscribed before any work starts, just return Observable.never()
  • Loading branch information
mattrjacobs committed Oct 19, 2016
2 parents 487ee3a + 5b1e4d6 commit 4e08f1a
Show file tree
Hide file tree
Showing 3 changed files with 79 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -402,6 +402,9 @@ public void call() {
final Func0<Observable<R>> applyHystrixSemantics = new Func0<Observable<R>>() {
@Override
public Observable<R> call() {
if (commandState.get().equals(CommandState.UNSUBSCRIBED)) {
return Observable.never();
}
return applyHystrixSemantics(_cmd);
}
};
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -727,6 +727,10 @@ C getCommand(ExecutionIsolationStrategy isolationStrategy, ExecutionResult execu
return getCommand(isolationStrategy, executionResult, FallbackResult.UNIMPLEMENTED);
}

C getCommand(ExecutionIsolationStrategy isolationStrategy, ExecutionResult executionResult, int executionLatency) {
return getCommand(isolationStrategy, executionResult, executionLatency, FallbackResult.UNIMPLEMENTED);
}

C getCommand(ExecutionIsolationStrategy isolationStrategy, ExecutionResult executionResult, FallbackResult fallbackResult) {
return getCommand(isolationStrategy, executionResult, 0, fallbackResult);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
import rx.functions.Action1;
import rx.functions.Func0;
import rx.functions.Func1;
import rx.functions.Func2;
import rx.observers.TestSubscriber;
import rx.schedulers.Schedulers;

Expand Down Expand Up @@ -3868,6 +3869,77 @@ public void onNext(Integer i) {
assertCommandExecutionEvents(cmd, HystrixEventType.SUCCESS);
}

@Test
public void testUnsubscribeBeforeSubscribe() throws Exception {
//this may happen in Observable chain, so Hystrix should make sure that command never executes/allocates in this situation
Observable<String> error = Observable.error(new RuntimeException("foo"));
HystrixCommand<Integer> cmd = getCommand(ExecutionIsolationStrategy.THREAD, AbstractTestHystrixCommand.ExecutionResult.SUCCESS, 100);
Observable<Integer> cmdResult = cmd.toObservable()
.doOnNext(new Action1<Integer>() {
@Override
public void call(Integer integer) {
System.out.println(System.currentTimeMillis() + " : " + Thread.currentThread().getName() + " : OnNext : " + integer);
}
})
.doOnError(new Action1<Throwable>() {
@Override
public void call(Throwable ex) {
System.out.println(System.currentTimeMillis() + " : " + Thread.currentThread().getName() + " : OnError : " + ex);
}
})
.doOnCompleted(new Action0() {
@Override
public void call() {
System.out.println(System.currentTimeMillis() + " : " + Thread.currentThread().getName() + " : OnCompleted");
}
})
.doOnSubscribe(new Action0() {
@Override
public void call() {
System.out.println(System.currentTimeMillis() + " : " + Thread.currentThread().getName() + " : OnSubscribe");
}
})
.doOnUnsubscribe(new Action0() {
@Override
public void call() {
System.out.println(System.currentTimeMillis() + " : " + Thread.currentThread().getName() + " : OnUnsubscribe");
}
});

//the zip operator will subscribe to each observable. there is a race between the error of the first
//zipped observable terminating the zip and the subscription to the command's observable
Observable<String> zipped = Observable.zip(error, cmdResult, new Func2<String, Integer, String>() {
@Override
public String call(String s, Integer integer) {
return s + integer;
}
});

final CountDownLatch latch = new CountDownLatch(1);

zipped.subscribe(new Subscriber<String>() {
@Override
public void onCompleted() {
System.out.println(System.currentTimeMillis() + " : " + Thread.currentThread().getName() + " OnCompleted");
latch.countDown();
}

@Override
public void onError(Throwable e) {
System.out.println(System.currentTimeMillis() + " : " + Thread.currentThread().getName() + " OnError : " + e);
latch.countDown();
}

@Override
public void onNext(String s) {
System.out.println(System.currentTimeMillis() + " : " + Thread.currentThread().getName() + " OnNext : " + s);
}
});

latch.await(1000, TimeUnit.MILLISECONDS);
System.out.println("ReqLog : " + HystrixRequestLog.getCurrentRequest().getExecutedCommandsAsString());
}

/**
*********************** THREAD-ISOLATED Execution Hook Tests **************************************
*/
Expand Down

0 comments on commit 4e08f1a

Please sign in to comment.