Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

A couple approaches to fix HystrixObservableCollapser not producing values #1043

Merged
merged 2 commits into from Jan 11, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Expand Up @@ -180,40 +180,53 @@ public Observable<Void> mapResponseToRequests(Observable<BatchReturnType> batchR
final Set<K> seenKeys = new HashSet<K>();

// observe the responses and join with the requests by key
return batchResponse.doOnNext(new Action1<BatchReturnType>() {
@Override
public void call(BatchReturnType batchReturnType) {
try {
K responseKey = batchResponseKeySelector.call(batchReturnType);
CollapsedRequest<ResponseType, RequestArgumentType> requestForResponse = requestsByKey.get(responseKey);
if (requestForResponse != null) {
requestForResponse.emitResponse(mapBatchTypeToResponseType.call(batchReturnType));
// now add this to seenKeys, so we can later check what was seen, and what was unseen
seenKeys.add(responseKey);
} else {
logger.warn("Batch Response contained a response key not in request batch : " + responseKey);
}
} catch (Throwable ex) {
logger.warn("Uncaught error during demultiplexing of BatchResponse", ex);
}
}
}).doOnTerminate(new Action0() {
@Override
public void call() {
for (K key: requestsByKey.keySet()) {
CollapsedRequest<ResponseType, RequestArgumentType> collapsedReq = requestsByKey.get(key);
if (!seenKeys.contains(key)) {
return batchResponse
.doOnNext(new Action1<BatchReturnType>() {
@Override
public void call(BatchReturnType batchReturnType) {
try {
onMissingResponse(collapsedReq);
K responseKey = batchResponseKeySelector.call(batchReturnType);
CollapsedRequest<ResponseType, RequestArgumentType> requestForResponse = requestsByKey.get(responseKey);
if (requestForResponse != null) {
requestForResponse.emitResponse(mapBatchTypeToResponseType.call(batchReturnType));
// now add this to seenKeys, so we can later check what was seen, and what was unseen
seenKeys.add(responseKey);
} else {
logger.warn("Batch Response contained a response key not in request batch : " + responseKey);
}
} catch (Throwable ex) {
collapsedReq.setException(new RuntimeException("Error in HystrixObservableCollapser.onMissingResponse handler", ex));
logger.warn("Uncaught error during demultiplexing of BatchResponse", ex);
}
}
})
.doOnError(new Action1<Throwable>() {
@Override
public void call(Throwable t) {
Exception ex = getExceptionFromThrowable(t);
for (K key: requestsByKey.keySet()) {
CollapsedRequest<ResponseType, RequestArgumentType> collapsedReq = requestsByKey.get(key);
collapsedReq.setException(ex);
}
}
})
.doOnCompleted(new Action0() {
@Override
public void call() {

for (K key : requestsByKey.keySet()) {
CollapsedRequest<ResponseType, RequestArgumentType> collapsedReq = requestsByKey.get(key);
if (!seenKeys.contains(key)) {
try {
onMissingResponse(collapsedReq);
} catch (Throwable ex) {
collapsedReq.setException(new RuntimeException("Error in HystrixObservableCollapser.onMissingResponse handler", ex));
}
}
//then unconditionally issue an onCompleted. this ensures the downstream gets a terminal, regardless of how onMissingResponse was implemented
collapsedReq.setComplete();
}
}
//then unconditionally issue an onCompleted. this ensures the downstream gets a terminal, regardless of how onMissingResponse was implemented
collapsedReq.setComplete();
}
}
}).ignoreElements().cast(Void.class);
}).ignoreElements().cast(Void.class);
}

@Override
Expand All @@ -224,6 +237,18 @@ public HystrixCollapserKey getCollapserKey() {
};
}

protected Exception getExceptionFromThrowable(Throwable t) {
Exception e;
if (t instanceof Exception) {
e = (Exception) t;
} else {
// Hystrix 1.x uses Exception, not Throwable so to prevent a breaking change Throwable will be wrapped in Exception
e = new Exception("Throwable caught while executing.", t);
}
return e;
}


private HystrixCollapserProperties getProperties() {
return collapserFactory.getProperties();
}
Expand Down
Expand Up @@ -17,6 +17,8 @@

import rx.Observable.OnSubscribe;
import rx.Subscriber;
import rx.functions.Action0;
import rx.functions.Action1;
import rx.subjects.PublishSubject;

import com.netflix.hystrix.HystrixCollapser.CollapsedRequest;
Expand Down
Expand Up @@ -87,7 +87,7 @@ public Observable<ResponseType> submitRequest(RequestArgumentType arg) {
while (true) {
RequestBatch<BatchReturnType, ResponseType, RequestArgumentType> b = batch.get();
if (b == null) {
throw new IllegalStateException("Submitting requests after collapser is shutdown");
return Observable.error(new IllegalStateException("Submitting requests after collapser is shutdown"));
}
Observable<ResponseType> f = b.offer(arg);
// it will always get an Observable unless we hit the max batch size
Expand Down Expand Up @@ -150,9 +150,9 @@ public Void call() throws Exception {
createNewBatchAndExecutePreviousIfNeeded(currentBatch);
}
} catch (Throwable t) {
logger.error("Error occurred trying to executeRequestsFromQueue.", t);
logger.error("Error occurred trying to execute the batch.", t);
t.printStackTrace();
// ignore error so we don't kill the Timer mainLoop and prevent further items from being scheduled
// http://jira.netflix.com/browse/API-5042 HystrixCommand: Collapser TimerThread Vulnerable to Shutdown
}
return null;
}
Expand Down
Expand Up @@ -42,6 +42,7 @@
import com.netflix.hystrix.strategy.concurrency.HystrixRequestContext;
import com.netflix.hystrix.strategy.concurrency.HystrixRequestVariableHolder;
import com.netflix.hystrix.util.HystrixTimer.TimerListener;
import rx.Observable;

import static org.junit.Assert.*;

Expand Down Expand Up @@ -817,22 +818,22 @@ public void testRequestCacheWithTimeout() {
public void testRequestWithCommandShortCircuited() throws Exception {
TestCollapserTimer timer = new TestCollapserTimer();
HystrixCollapser<List<String>, String, String> collapser1 = new TestRequestCollapserWithShortCircuitedCommand(timer, "1");
Future<String> response1 = collapser1.queue();
Future<String> response2 = new TestRequestCollapserWithShortCircuitedCommand(timer, "2").queue();
Observable<String> response1 = collapser1.observe();
Observable<String> response2 = new TestRequestCollapserWithShortCircuitedCommand(timer, "2").observe();
timer.incrementTime(10); // let time pass that equals the default delay/period

try {
response1.get();
response1.toBlocking().first();
fail("we should have received an exception");
} catch (ExecutionException e) {
// e.printStackTrace();
} catch (Exception e) {
e.printStackTrace();
// what we expect
}
try {
response2.get();
response2.toBlocking().first();
fail("we should have received an exception");
} catch (ExecutionException e) {
// e.printStackTrace();
} catch (Exception e) {
e.printStackTrace();
// what we expect
}

Expand Down