Permalink
Browse files

Merge pull request #261 from benjchristensen/observable-collapser

New signature for HystrixObservableCollapser
  • Loading branch information...
2 parents 7cf36e6 + 6bdb273 commit 5212caa83da1e9ca6120abd452e940231d272ab8 @benjchristensen benjchristensen committed May 6, 2014
@@ -23,8 +23,9 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import rx.Observable;
-import rx.Scheduler;
+import rx.*;
+import rx.Observable.OnSubscribe;
+import rx.functions.Func1;
import rx.schedulers.Schedulers;
import rx.subjects.ReplaySubject;
@@ -142,8 +143,17 @@ protected HystrixCollapser(Setter setter) {
}
@Override
- public void mapResponseToRequests(BatchReturnType batchResponse, Collection<CollapsedRequest<ResponseType, RequestArgumentType>> requests) {
- self.mapResponseToRequests(batchResponse, requests);
+ public Observable<Void> mapResponseToRequests(Observable<BatchReturnType> batchResponse, final Collection<CollapsedRequest<ResponseType, RequestArgumentType>> requests) {
+ return batchResponse.single().flatMap(new Func1<BatchReturnType, Observable<Void>>() {
+
+ @Override
+ public Observable<Void> call(BatchReturnType response) {
+ // this is a blocking call in HystrixCollapser
+ self.mapResponseToRequests(response, requests);
+ return Observable.empty();
+ }
+
+ });
}
@Override
@@ -15,26 +15,20 @@
*/
package com.netflix.hystrix;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.Future;
+import java.util.*;
+import java.util.concurrent.*;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import org.slf4j.*;
+import rx.*;
import rx.Observable;
-import rx.Scheduler;
+import rx.functions.*;
import rx.schedulers.Schedulers;
import rx.subjects.ReplaySubject;
import com.netflix.hystrix.HystrixCollapser.CollapsedRequest;
import com.netflix.hystrix.HystrixCommandProperties.ExecutionIsolationStrategy;
-import com.netflix.hystrix.collapser.CollapserTimer;
-import com.netflix.hystrix.collapser.HystrixCollapserBridge;
-import com.netflix.hystrix.collapser.RealCollapserTimer;
-import com.netflix.hystrix.collapser.RequestCollapser;
-import com.netflix.hystrix.collapser.RequestCollapserFactory;
+import com.netflix.hystrix.collapser.*;
import com.netflix.hystrix.exception.HystrixRuntimeException;
import com.netflix.hystrix.strategy.HystrixPlugins;
import com.netflix.hystrix.strategy.concurrency.HystrixRequestContext;
@@ -52,14 +46,16 @@
* It must be stateless or else it will be non-deterministic because most instances are discarded while some are retained and become the
* "collapsers" for all the ones that are discarded.
*
+ * @param <K>
+ * The key used to match BatchReturnType and RequestArgumentType
* @param <BatchReturnType>
* The type returned from the {@link HystrixCommand} that will be invoked on batch executions.
* @param <ResponseType>
* The type returned from this command.
* @param <RequestArgumentType>
* The type of the request argument. If multiple arguments are needed, wrap them in another object or a Tuple.
*/
-public abstract class HystrixObservableCollapser<BatchReturnType, ResponseType, RequestArgumentType> implements HystrixExecutable<ResponseType> {
+public abstract class HystrixObservableCollapser<K, BatchReturnType, ResponseType, RequestArgumentType> implements HystrixExecutable<ResponseType> {
static final Logger logger = LoggerFactory.getLogger(HystrixObservableCollapser.class);
@@ -120,7 +116,7 @@ protected HystrixObservableCollapser(Setter setter) {
this.collapserFactory = new RequestCollapserFactory<BatchReturnType, ResponseType, RequestArgumentType>(collapserKey, scope, timer, propertiesBuilder);
this.requestCache = HystrixRequestCache.getInstance(collapserKey, HystrixPlugins.getInstance().getConcurrencyStrategy());
- final HystrixObservableCollapser<BatchReturnType, ResponseType, RequestArgumentType> self = this;
+ final HystrixObservableCollapser<K, BatchReturnType, ResponseType, RequestArgumentType> self = this;
/**
* Used to pass public method invocation to the underlying implementation in a separate package while leaving the methods 'protected' in this class.
@@ -143,8 +139,40 @@ protected HystrixObservableCollapser(Setter setter) {
}
@Override
- public void mapResponseToRequests(BatchReturnType batchResponse, Collection<CollapsedRequest<ResponseType, RequestArgumentType>> requests) {
- self.mapResponseToRequests(batchResponse, requests);
+ public Observable<Void> mapResponseToRequests(Observable<BatchReturnType> batchResponse, Collection<CollapsedRequest<ResponseType, RequestArgumentType>> requests) {
+ Func1<RequestArgumentType, K> requestKeySelector = self.getRequestArgumentKeySelector();
+ final Func1<BatchReturnType, K> batchResponseKeySelector = self.getBatchReturnTypeKeySelector();
+ final Func1<BatchReturnType, ResponseType> mapBatchTypeToResponseType = self.getBatchReturnTypeToResponseTypeMapper();
+
+ // index the requests by key
+ final Map<K, CollapsedRequest<ResponseType, RequestArgumentType>> requestsByKey = new HashMap<K, CollapsedRequest<ResponseType, RequestArgumentType>>(requests.size());
+ for (CollapsedRequest<ResponseType, RequestArgumentType> cr : requests) {
+ requestsByKey.put(requestKeySelector.call(cr.getArgument()), cr);
+ }
+
+ // observe the responses and join with the requests by key
+ return batchResponse.flatMap(new Func1<BatchReturnType, Observable<Void>>() {
+
+ @Override
+ public Observable<Void> call(BatchReturnType r) {
+ K responseKey = batchResponseKeySelector.call(r);
+ CollapsedRequest<ResponseType, RequestArgumentType> requestForResponse = requestsByKey.get(responseKey);
+ requestForResponse.setResponse(mapBatchTypeToResponseType.call(r));
+ // now remove from map so we know what wasn't set at end
+ requestsByKey.remove(responseKey);
+ return Observable.empty();
+ }
+
+ }).doOnTerminate(new Action0() {
+
+ @Override
+ public void call() {
+ for (CollapsedRequest<ResponseType, RequestArgumentType> cr : requestsByKey.values()) {
+ onMissingResponse(cr);
+ }
+ }
+
+ });
}
@Override
@@ -211,7 +239,8 @@ public Scope getScope() {
*
* @param requests
* {@code Collection<CollapsedRequest<ResponseType, RequestArgumentType>>} containing {@link CollapsedRequest} objects containing the arguments of each request collapsed in this batch.
- * @return {@link HystrixObservableCommand}{@code <BatchReturnType>} which when executed will retrieve results for the batch of arguments as found in the Collection of {@link CollapsedRequest} objects
+ * @return {@link HystrixObservableCommand}{@code <BatchReturnType>} which when executed will retrieve results for the batch of arguments as found in the Collection of {@link CollapsedRequest}
+ * objects
*/
protected abstract HystrixObservableCommand<BatchReturnType> createCommand(Collection<CollapsedRequest<ResponseType, RequestArgumentType>> requests);
@@ -235,50 +264,41 @@ public Scope getScope() {
}
/**
- * Executed after the {@link HystrixCommand}{@code <BatchReturnType>} command created by {@link #createCommand} finishes processing (unless it fails) for mapping the {@code <BatchReturnType>} to
- * the list of {@code CollapsedRequest<ResponseType, RequestArgumentType>} objects.
+ * Function that returns the key used for matching returned objects against request argument types.
* <p>
- * IMPORTANT IMPLEMENTATION DETAIL => The expected contract (responsibilities) of this method implementation is:
- * <p>
- * <ul>
- * <li>ALL {@link CollapsedRequest} objects must have either a response or exception set on them even if the response is NULL
- * otherwise the user thread waiting on the response will think a response was never received and will either block indefinitely or timeout while waiting.</li>
- * <ul>
- * <li>Setting a response is done via {@link CollapsedRequest#setResponse(Object)}</li>
- * <li>Setting an exception is done via {@link CollapsedRequest#setException(Exception)}</li>
- * </ul>
- * </ul>
- * <p>
- * Common code when {@code <BatchReturnType>} is {@code List<ResponseType>} is:
- * <p>
- *
- * <pre>
- * int count = 0;
- * for ({@code CollapsedRequest<ResponseType, RequestArgumentType>} request : requests) {
- * &nbsp;&nbsp;&nbsp;&nbsp; request.setResponse(batchResponse.get(count++));
- * }
- * </pre>
+ * The key returned from this function should match up with the key returned from {@link #getRequestArgumentKeySelector()};
*
- * For example if the types were {@code <List<String>, String, String>}:
+ * @return key selector function
+ */
+ protected abstract Func1<BatchReturnType, K> getBatchReturnTypeKeySelector();
+
+ /**
+ * Function that returns the key used for matching request arguments against returned objects.
* <p>
+ * The key returned from this function should match up with the key returned from {@link #getBatchReturnTypeKeySelector()};
*
- * <pre>
- * int count = 0;
- * for ({@code CollapsedRequest<String, String>} request : requests) {
- * &nbsp;&nbsp;&nbsp;&nbsp; request.setResponse(batchResponse.get(count++));
- * }
- * </pre>
+ * @return key selector function
+ */
+ protected abstract Func1<RequestArgumentType, K> getRequestArgumentKeySelector();
+
+ /**
+ * Invoked if a {@link CollapsedRequest} in the batch does not have a response set on it.
+ * <p>
+ * This allows setting an exception (via {@link CollapsedRequest#setException(Exception)}) or a fallback response (via {@link CollapsedRequest#setResponse(Object)}).
*
- * @param batchResponse
- * The {@code <BatchReturnType>} returned from the {@link HystrixCommand}{@code <BatchReturnType>} command created by {@link #createCommand}.
- * <p>
+ * @param CollapsedRequest
+ * that needs a response or exception set on it.
+ */
+ protected abstract void onMissingResponse(CollapsedRequest<ResponseType, RequestArgumentType> r);
+
+ /**
+ * Function for mapping from BatchReturnType to ResponseType.
+ * <p>
+ * Often these two types are exactly the same so it's just a pass-thru.
*
- * @param requests
- * {@code Collection<CollapsedRequest<ResponseType, RequestArgumentType>>} containing {@link CollapsedRequest} objects containing the arguments of each request collapsed in this batch.
- * <p>
- * The {@link CollapsedRequest#setResponse(Object)} or {@link CollapsedRequest#setException(Exception)} must be called on each {@link CollapsedRequest} in the Collection.
+ * @return function for mapping from BatchReturnType to ResponseType
*/
- protected abstract void mapResponseToRequests(BatchReturnType batchResponse, Collection<CollapsedRequest<ResponseType, RequestArgumentType>> requests);
+ protected abstract Func1<BatchReturnType, ResponseType> getBatchReturnTypeToResponseTypeMapper();
/**
* Used for asynchronous execution with a callback by subscribing to the {@link Observable}.
@@ -20,7 +20,7 @@
public Observable<BatchReturnType> createObservableCommand(Collection<CollapsedRequest<ResponseType, RequestArgumentType>> requests);
- public void mapResponseToRequests(BatchReturnType batchResponse, Collection<CollapsedRequest<ResponseType, RequestArgumentType>> requests);
+ public Observable<Void> mapResponseToRequests(Observable<BatchReturnType> batchResponse, Collection<CollapsedRequest<ResponseType, RequestArgumentType>> requests);
public HystrixCollapserKey getCollapserKey();
Oops, something went wrong.

0 comments on commit 5212caa

Please sign in to comment.