Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with
or
.
Download ZIP
Browse files

New signature for HystrixObservableCollapser

Replacing mapResponseToRequests with functional style selectors and allowing streaming response from Observable.
  • Loading branch information...
commit d1204e1aaa5f05e98d96b7cc481dc116d43c8bbb 1 parent 7cf36e6
@benjchristensen benjchristensen authored
View
18 hystrix-core/src/main/java/com/netflix/hystrix/HystrixCollapser.java
@@ -23,8 +23,9 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import rx.Observable;
-import rx.Scheduler;
+import rx.*;
+import rx.Observable.OnSubscribe;
+import rx.functions.Func1;
import rx.schedulers.Schedulers;
import rx.subjects.ReplaySubject;
@@ -142,8 +143,17 @@ protected HystrixCollapser(Setter setter) {
}
@Override
- public void mapResponseToRequests(BatchReturnType batchResponse, Collection<CollapsedRequest<ResponseType, RequestArgumentType>> requests) {
- self.mapResponseToRequests(batchResponse, requests);
+ public Observable<Void> mapResponseToRequests(Observable<BatchReturnType> batchResponse, final Collection<CollapsedRequest<ResponseType, RequestArgumentType>> requests) {
+ return batchResponse.single().flatMap(new Func1<BatchReturnType, Observable<Void>>() {
+
+ @Override
+ public Observable<Void> call(BatchReturnType response) {
+ // this is a blocking call in HystrixCollapser
+ self.mapResponseToRequests(response, requests);
+ return Observable.empty();
+ }
+
+ });
}
@Override
View
113 hystrix-core/src/main/java/com/netflix/hystrix/HystrixObservableCollapser.java
@@ -15,26 +15,20 @@
*/
package com.netflix.hystrix;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.Future;
+import java.util.*;
+import java.util.concurrent.*;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import org.slf4j.*;
+import rx.*;
import rx.Observable;
-import rx.Scheduler;
+import rx.functions.*;
import rx.schedulers.Schedulers;
import rx.subjects.ReplaySubject;
import com.netflix.hystrix.HystrixCollapser.CollapsedRequest;
import com.netflix.hystrix.HystrixCommandProperties.ExecutionIsolationStrategy;
-import com.netflix.hystrix.collapser.CollapserTimer;
-import com.netflix.hystrix.collapser.HystrixCollapserBridge;
-import com.netflix.hystrix.collapser.RealCollapserTimer;
-import com.netflix.hystrix.collapser.RequestCollapser;
-import com.netflix.hystrix.collapser.RequestCollapserFactory;
+import com.netflix.hystrix.collapser.*;
import com.netflix.hystrix.exception.HystrixRuntimeException;
import com.netflix.hystrix.strategy.HystrixPlugins;
import com.netflix.hystrix.strategy.concurrency.HystrixRequestContext;
@@ -59,7 +53,7 @@
* @param <RequestArgumentType>
* The type of the request argument. If multiple arguments are needed, wrap them in another object or a Tuple.
*/
-public abstract class HystrixObservableCollapser<BatchReturnType, ResponseType, RequestArgumentType> implements HystrixExecutable<ResponseType> {
+public abstract class HystrixObservableCollapser<K, BatchReturnType, ResponseType, RequestArgumentType> implements HystrixExecutable<ResponseType> {
static final Logger logger = LoggerFactory.getLogger(HystrixObservableCollapser.class);
@@ -120,7 +114,7 @@ protected HystrixObservableCollapser(Setter setter) {
this.collapserFactory = new RequestCollapserFactory<BatchReturnType, ResponseType, RequestArgumentType>(collapserKey, scope, timer, propertiesBuilder);
this.requestCache = HystrixRequestCache.getInstance(collapserKey, HystrixPlugins.getInstance().getConcurrencyStrategy());
- final HystrixObservableCollapser<BatchReturnType, ResponseType, RequestArgumentType> self = this;
+ final HystrixObservableCollapser<K, BatchReturnType, ResponseType, RequestArgumentType> self = this;
/**
* Used to pass public method invocation to the underlying implementation in a separate package while leaving the methods 'protected' in this class.
@@ -143,8 +137,40 @@ protected HystrixObservableCollapser(Setter setter) {
}
@Override
- public void mapResponseToRequests(BatchReturnType batchResponse, Collection<CollapsedRequest<ResponseType, RequestArgumentType>> requests) {
- self.mapResponseToRequests(batchResponse, requests);
+ public Observable<Void> mapResponseToRequests(Observable<BatchReturnType> batchResponse, Collection<CollapsedRequest<ResponseType, RequestArgumentType>> requests) {
+ Func1<RequestArgumentType, K> requestKeySelector = self.getRequestArgumentKeySelector();
+ final Func1<BatchReturnType, K> batchResponseKeySelector = self.getBatchReturnTypeKeySelector();
+ final Func1<BatchReturnType, ResponseType> mapBatchTypeToResponseType = self.getBatchReturnTypeToResponseTypeMapper();
+
+ // index the requests by key
+ final Map<K, CollapsedRequest<ResponseType, RequestArgumentType>> requestsByKey = new HashMap<K, CollapsedRequest<ResponseType, RequestArgumentType>>(requests.size());
+ for (CollapsedRequest<ResponseType, RequestArgumentType> cr : requests) {
+ requestsByKey.put(requestKeySelector.call(cr.getArgument()), cr);
+ }
+
+ // observe the responses and join with the requests by key
+ return batchResponse.flatMap(new Func1<BatchReturnType, Observable<Void>>() {
+
+ @Override
+ public Observable<Void> call(BatchReturnType r) {
+ K responseKey = batchResponseKeySelector.call(r);
+ CollapsedRequest<ResponseType, RequestArgumentType> requestForResponse = requestsByKey.get(responseKey);
+ requestForResponse.setResponse(mapBatchTypeToResponseType.call(r));
+ // now remove from map so we know what wasn't set at end
+ requestsByKey.remove(responseKey);
+ return Observable.empty();
+ }
+
+ }).doOnTerminate(new Action0() {
+
+ @Override
+ public void call() {
+ for (CollapsedRequest<ResponseType, RequestArgumentType> cr : requestsByKey.values()) {
+ onMissingResponse(cr);
+ }
+ }
+
+ });
}
@Override
@@ -155,6 +181,8 @@ public HystrixCollapserKey getCollapserKey() {
};
}
+ protected abstract Func1<BatchReturnType, ResponseType> getBatchReturnTypeToResponseTypeMapper();
+
private HystrixCollapserProperties getProperties() {
return collapserFactory.getProperties();
}
@@ -211,7 +239,8 @@ public Scope getScope() {
*
* @param requests
* {@code Collection<CollapsedRequest<ResponseType, RequestArgumentType>>} containing {@link CollapsedRequest} objects containing the arguments of each request collapsed in this batch.
- * @return {@link HystrixObservableCommand}{@code <BatchReturnType>} which when executed will retrieve results for the batch of arguments as found in the Collection of {@link CollapsedRequest} objects
+ * @return {@link HystrixObservableCommand}{@code <BatchReturnType>} which when executed will retrieve results for the batch of arguments as found in the Collection of {@link CollapsedRequest}
+ * objects
*/
protected abstract HystrixObservableCommand<BatchReturnType> createCommand(Collection<CollapsedRequest<ResponseType, RequestArgumentType>> requests);
@@ -234,51 +263,11 @@ public Scope getScope() {
return Collections.singletonList(requests);
}
- /**
- * Executed after the {@link HystrixCommand}{@code <BatchReturnType>} command created by {@link #createCommand} finishes processing (unless it fails) for mapping the {@code <BatchReturnType>} to
- * the list of {@code CollapsedRequest<ResponseType, RequestArgumentType>} objects.
- * <p>
- * IMPORTANT IMPLEMENTATION DETAIL => The expected contract (responsibilities) of this method implementation is:
- * <p>
- * <ul>
- * <li>ALL {@link CollapsedRequest} objects must have either a response or exception set on them even if the response is NULL
- * otherwise the user thread waiting on the response will think a response was never received and will either block indefinitely or timeout while waiting.</li>
- * <ul>
- * <li>Setting a response is done via {@link CollapsedRequest#setResponse(Object)}</li>
- * <li>Setting an exception is done via {@link CollapsedRequest#setException(Exception)}</li>
- * </ul>
- * </ul>
- * <p>
- * Common code when {@code <BatchReturnType>} is {@code List<ResponseType>} is:
- * <p>
- *
- * <pre>
- * int count = 0;
- * for ({@code CollapsedRequest<ResponseType, RequestArgumentType>} request : requests) {
- * &nbsp;&nbsp;&nbsp;&nbsp; request.setResponse(batchResponse.get(count++));
- * }
- * </pre>
- *
- * For example if the types were {@code <List<String>, String, String>}:
- * <p>
- *
- * <pre>
- * int count = 0;
- * for ({@code CollapsedRequest<String, String>} request : requests) {
- * &nbsp;&nbsp;&nbsp;&nbsp; request.setResponse(batchResponse.get(count++));
- * }
- * </pre>
- *
- * @param batchResponse
- * The {@code <BatchReturnType>} returned from the {@link HystrixCommand}{@code <BatchReturnType>} command created by {@link #createCommand}.
- * <p>
- *
- * @param requests
- * {@code Collection<CollapsedRequest<ResponseType, RequestArgumentType>>} containing {@link CollapsedRequest} objects containing the arguments of each request collapsed in this batch.
- * <p>
- * The {@link CollapsedRequest#setResponse(Object)} or {@link CollapsedRequest#setException(Exception)} must be called on each {@link CollapsedRequest} in the Collection.
- */
- protected abstract void mapResponseToRequests(BatchReturnType batchResponse, Collection<CollapsedRequest<ResponseType, RequestArgumentType>> requests);
+ protected abstract Func1<BatchReturnType, K> getBatchReturnTypeKeySelector();
+
+ protected abstract Func1<RequestArgumentType, K> getRequestArgumentKeySelector();
+
+ protected abstract void onMissingResponse(CollapsedRequest<ResponseType, RequestArgumentType> r);
/**
* Used for asynchronous execution with a callback by subscribing to the {@link Observable}.
View
2  hystrix-core/src/main/java/com/netflix/hystrix/collapser/HystrixCollapserBridge.java
@@ -20,7 +20,7 @@
public Observable<BatchReturnType> createObservableCommand(Collection<CollapsedRequest<ResponseType, RequestArgumentType>> requests);
- public void mapResponseToRequests(BatchReturnType batchResponse, Collection<CollapsedRequest<ResponseType, RequestArgumentType>> requests);
+ public Observable<Void> mapResponseToRequests(Observable<BatchReturnType> batchResponse, Collection<CollapsedRequest<ResponseType, RequestArgumentType>> requests);
public HystrixCollapserKey getCollapserKey();
View
135 hystrix-core/src/main/java/com/netflix/hystrix/collapser/RequestBatch.java
@@ -11,6 +11,7 @@
import rx.Observable;
import rx.Observer;
+import rx.functions.*;
import com.netflix.hystrix.HystrixCollapser;
import com.netflix.hystrix.HystrixCollapser.CollapsedRequest;
@@ -105,8 +106,55 @@ public void executeBatchIfNotAlreadyStarted() {
try {
// create a new command to handle this batch of requests
Observable<BatchReturnType> o = commandCollapser.createObservableCommand(shardRequests);
- // if more than one item is emitted we fail
- o.single().subscribe(new RequestBatch.BatchRequestObserver<ResponseType, RequestArgumentType, BatchReturnType>(commandCollapser, shardRequests));
+
+ commandCollapser.mapResponseToRequests(o, shardRequests).doOnError(new Action1<Throwable>() {
+
+ /**
+ * This handles failed completions
+ */
+ @Override
+ public void call(Throwable e) {
+ // handle Throwable in case anything is thrown so we don't block Observers waiting for onError/onCompleted
+ Exception ee = null;
+ if (e instanceof Exception) {
+ ee = (Exception) e;
+ } else {
+ ee = new RuntimeException("Throwable caught while executing batch and mapping responses.", e);
+ }
+ logger.error("Exception mapping responses to requests.", e);
+ // if a failure occurs we want to pass that exception to all of the Futures that we've returned
+ for (CollapsedRequest<ResponseType, RequestArgumentType> request : requests) {
+ try {
+ ((CollapsedRequestObservableFunction<ResponseType, RequestArgumentType>) request).setExceptionIfResponseNotReceived(ee);
+ } catch (IllegalStateException e2) {
+ // if we have partial responses set in mapResponseToRequests
+ // then we may get IllegalStateException as we loop over them
+ // so we'll log but continue to the rest
+ logger.error("Partial success of 'mapResponseToRequests' resulted in IllegalStateException while setting Exception. Continuing ... ", e2);
+ }
+ }
+ }
+
+ }).doOnCompleted(new Action0() {
+
+ /**
+ * This handles successful completions
+ */
+ @Override
+ public void call() {
+ // check that all requests had setResponse or setException invoked in case 'mapResponseToRequests' was implemented poorly
+ IllegalStateException ie = new IllegalStateException("No response set by " + commandCollapser.getCollapserKey().name() + " 'mapResponseToRequests' implementation.");
+ for (CollapsedRequest<ResponseType, RequestArgumentType> request : shardRequests) {
+ try {
+ ((CollapsedRequestObservableFunction<ResponseType, RequestArgumentType>) request).setExceptionIfResponseNotReceived(ie);
+ } catch (IllegalStateException e2) {
+ logger.debug("Partial success of 'mapResponseToRequests' resulted in IllegalStateException while setting 'No response set' Exception. Continuing ... ", e2);
+ }
+ }
+ }
+
+ }).subscribe();
+
} catch (Exception e) {
logger.error("Exception while creating and queueing command with batch.", e);
// if a failure occurs we want to pass that exception to all of the Futures that we've returned
@@ -171,87 +219,4 @@ public void shutdown() {
}
}
- private static final class BatchRequestObserver<ResponseType, RequestArgumentType, BatchReturnType> implements Observer<BatchReturnType> {
- private final Collection<CollapsedRequest<ResponseType, RequestArgumentType>> requests;
- private final HystrixCollapserBridge<BatchReturnType, ResponseType, RequestArgumentType> commandCollapser;
-
- private BatchRequestObserver(HystrixCollapserBridge<BatchReturnType, ResponseType, RequestArgumentType> commandCollapser, Collection<CollapsedRequest<ResponseType, RequestArgumentType>> requests) {
- this.commandCollapser = commandCollapser;
- this.requests = requests;
- }
-
- private boolean receivedResponse = false;
- private BatchReturnType response;
-
- @Override
- public void onCompleted() {
- try {
- // use a boolean since null can be a real response
- if (!receivedResponse) {
- onError(new IllegalStateException("Did not receive batch response."));
- return;
- }
- commandCollapser.mapResponseToRequests(response, requests);
- } catch (Throwable e) {
- // handle Throwable in case anything is thrown so we don't block Observers waiting for onError/onCompleted
- Exception ee = null;
- if (e instanceof Exception) {
- ee = (Exception) e;
- } else {
- ee = new RuntimeException("Throwable caught while invoking 'mapResponseToRequests'", e);
- }
- logger.error("Exception mapping responses to requests.", e);
- // if a failure occurs we want to pass that exception to all of the Futures that we've returned
- for (CollapsedRequest<ResponseType, RequestArgumentType> request : requests) {
- try {
- ((CollapsedRequestObservableFunction<ResponseType, RequestArgumentType>) request).setExceptionIfResponseNotReceived(ee);
- } catch (IllegalStateException e2) {
- // if we have partial responses set in mapResponseToRequests
- // then we may get IllegalStateException as we loop over them
- // so we'll log but continue to the rest
- logger.error("Partial success of 'mapResponseToRequests' resulted in IllegalStateException while setting Exception. Continuing ... ", e2);
- }
- }
- }
-
- // check that all requests had setResponse or setException invoked in case 'mapResponseToRequests' was implemented poorly
- IllegalStateException ie = new IllegalStateException("No response set by " + commandCollapser.getCollapserKey().name() + " 'mapResponseToRequests' implementation.");
- for (CollapsedRequest<ResponseType, RequestArgumentType> request : requests) {
- try {
- ((CollapsedRequestObservableFunction<ResponseType, RequestArgumentType>) request).setExceptionIfResponseNotReceived(ie);
- } catch (IllegalStateException e2) {
- logger.debug("Partial success of 'mapResponseToRequests' resulted in IllegalStateException while setting 'No response set' Exception. Continuing ... ", e2);
- }
- }
- }
-
- @Override
- public void onError(Throwable t) {
- Exception e = null;
- if (t instanceof Exception) {
- e = (Exception) t;
- } else {
- // Hystrix 1.x uses Exception, not Throwable so to prevent a breaking change Throwable will be wrapped in Exception
- e = new Exception("Throwable caught while executing command with batch.", t);
- }
- logger.error("Exception while executing command with batch.", e);
- // if a failure occurs we want to pass that exception to all of the Futures that we've returned
- for (CollapsedRequest<ResponseType, RequestArgumentType> request : requests) {
- try {
- request.setException(e);
- } catch (IllegalStateException e2) {
- logger.debug("Failed trying to setException on CollapsedRequest", e2);
- }
- }
- }
-
- @Override
- public void onNext(BatchReturnType response) {
- receivedResponse = true;
- this.response = response;
- // we want to wait until onComplete to do the processing
- // so we don't release the callers before metrics/logs/etc are available
- }
- }
-
}
View
75 hystrix-core/src/test/java/com/netflix/hystrix/HystrixObservableCollapserTest.java
@@ -31,6 +31,7 @@
import rx.Observable;
import rx.Observable.OnSubscribe;
import rx.Subscriber;
+import rx.functions.Func1;
import rx.schedulers.Schedulers;
import com.netflix.hystrix.HystrixCollapser.CollapsedRequest;
@@ -74,11 +75,11 @@ public void testTwoRequests() throws Exception {
assertEquals(1, HystrixRequestLog.getCurrentRequest().getAllExecutedCommands().size());
}
- private static class TestRequestCollapser extends HystrixObservableCollapser<List<String>, String, String> {
+ private static class TestRequestCollapser extends HystrixObservableCollapser<String, String, String, String> {
private final AtomicInteger count;
private final String value;
- private ConcurrentLinkedQueue<HystrixObservableCommand<List<String>>> commandsExecuted;
+ private ConcurrentLinkedQueue<HystrixObservableCommand<String>> commandsExecuted;
public TestRequestCollapser(TestCollapserTimer timer, AtomicInteger counter, int value) {
this(timer, counter, String.valueOf(value));
@@ -88,7 +89,7 @@ public TestRequestCollapser(TestCollapserTimer timer, AtomicInteger counter, Str
this(timer, counter, value, 10000, 10);
}
- public TestRequestCollapser(TestCollapserTimer timer, AtomicInteger counter, String value, ConcurrentLinkedQueue<HystrixObservableCommand<List<String>>> executionLog) {
+ public TestRequestCollapser(TestCollapserTimer timer, AtomicInteger counter, String value, ConcurrentLinkedQueue<HystrixObservableCommand<String>> executionLog) {
this(timer, counter, value, 10000, 10, executionLog);
}
@@ -104,11 +105,11 @@ public TestRequestCollapser(Scope scope, TestCollapserTimer timer, AtomicInteger
this(scope, timer, counter, value, defaultMaxRequestsInBatch, defaultTimerDelayInMilliseconds, null);
}
- public TestRequestCollapser(TestCollapserTimer timer, AtomicInteger counter, String value, int defaultMaxRequestsInBatch, int defaultTimerDelayInMilliseconds, ConcurrentLinkedQueue<HystrixObservableCommand<List<String>>> executionLog) {
+ public TestRequestCollapser(TestCollapserTimer timer, AtomicInteger counter, String value, int defaultMaxRequestsInBatch, int defaultTimerDelayInMilliseconds, ConcurrentLinkedQueue<HystrixObservableCommand<String>> executionLog) {
this(Scope.REQUEST, timer, counter, value, defaultMaxRequestsInBatch, defaultTimerDelayInMilliseconds, executionLog);
}
- public TestRequestCollapser(Scope scope, TestCollapserTimer timer, AtomicInteger counter, String value, int defaultMaxRequestsInBatch, int defaultTimerDelayInMilliseconds, ConcurrentLinkedQueue<HystrixObservableCommand<List<String>>> executionLog) {
+ public TestRequestCollapser(Scope scope, TestCollapserTimer timer, AtomicInteger counter, String value, int defaultMaxRequestsInBatch, int defaultTimerDelayInMilliseconds, ConcurrentLinkedQueue<HystrixObservableCommand<String>> executionLog) {
// use a CollapserKey based on the CollapserTimer object reference so it's unique for each timer as we don't want caching
// of properties to occur and we're using the default HystrixProperty which typically does caching
super(collapserKeyFromString(timer), scope, timer, HystrixCollapserProperties.Setter().withMaxRequestsInBatch(defaultMaxRequestsInBatch).withTimerDelayInMilliseconds(defaultTimerDelayInMilliseconds));
@@ -123,9 +124,9 @@ public String getRequestArgument() {
}
@Override
- public HystrixObservableCommand<List<String>> createCommand(final Collection<CollapsedRequest<String, String>> requests) {
+ public HystrixObservableCommand<String> createCommand(final Collection<CollapsedRequest<String, String>> requests) {
/* return a mocked command */
- HystrixObservableCommand<List<String>> command = new TestCollapserCommand(requests);
+ HystrixObservableCommand<String> command = new TestCollapserCommand(requests);
if (commandsExecuted != null) {
commandsExecuted.add(command);
}
@@ -133,20 +134,47 @@ public String getRequestArgument() {
}
@Override
- public void mapResponseToRequests(List<String> batchResponse, Collection<CollapsedRequest<String, String>> requests) {
+ protected Func1<String, String> getBatchReturnTypeToResponseTypeMapper() {
// count how many times a batch is executed (this method is executed once per batch)
System.out.println("increment count: " + count.incrementAndGet());
- // for simplicity I'll assume it's a 1:1 mapping between lists ... in real implementations they often need to index to maps
- // to allow random access as the response size does not match the request size
- if (batchResponse.size() != requests.size()) {
- throw new RuntimeException("lists don't match in size => " + batchResponse.size() + " : " + requests.size());
- }
- int i = 0;
- for (CollapsedRequest<String, String> request : requests) {
- request.setResponse(batchResponse.get(i++));
- }
+ return new Func1<String, String>() {
+
+ @Override
+ public String call(String s) {
+ return s;
+ }
+
+ };
+ }
+
+ @Override
+ protected Func1<String, String> getBatchReturnTypeKeySelector() {
+ return new Func1<String, String>() {
+
+ @Override
+ public String call(String s) {
+ return s;
+ }
+
+ };
+ }
+ @Override
+ protected Func1<String, String> getRequestArgumentKeySelector() {
+ return new Func1<String, String>() {
+
+ @Override
+ public String call(String s) {
+ return s;
+ }
+
+ };
+ }
+
+ @Override
+ protected void onMissingResponse(CollapsedRequest<String, String> r) {
+ r.setException(new RuntimeException("missing value!"));
}
}
@@ -162,7 +190,7 @@ public String name() {
};
}
- private static class TestCollapserCommand extends TestHystrixCommand<List<String>> {
+ private static class TestCollapserCommand extends TestHystrixCommand<String> {
private final Collection<CollapsedRequest<String, String>> requests;
@@ -172,14 +200,13 @@ public String name() {
}
@Override
- protected Observable<List<String>> run() {
- return Observable.create(new OnSubscribe<List<String>>() {
+ protected Observable<String> run() {
+ return Observable.create(new OnSubscribe<String>() {
@Override
- public void call(Subscriber<? super List<String>> s) {
+ public void call(Subscriber<? super String> s) {
System.out.println(">>> TestCollapserCommand run() ... batch size: " + requests.size());
// simulate a batch request
- ArrayList<String> response = new ArrayList<String>();
for (CollapsedRequest<String, String> request : requests) {
if (request.getArgument() == null) {
throw new NullPointerException("Simulated Error");
@@ -191,9 +218,9 @@ public void call(Subscriber<? super List<String>> s) {
e.printStackTrace();
}
}
- response.add(request.getArgument());
+ s.onNext(request.getArgument());
}
- s.onNext(response);
+
s.onCompleted();
}

0 comments on commit d1204e1

Please sign in to comment.
Something went wrong with that request. Please try again.