Skip to content
Browse files

HystrixObservableCollapser

Make the collapser support non-blocking HystrixObservableCommand
  • Loading branch information...
1 parent eea4a05 commit fb96410f8df4d59d59ad5006c687c00ba0459517 Ben Christensen committed Apr 22, 2014
View
4 hystrix-core/src/main/java/com/netflix/hystrix/HystrixCollapser.java
@@ -78,7 +78,7 @@
* <li>GLOBAL: Requests from any thread (ie. all HTTP requests) within the JVM will be collapsed. 1 queue for entire app.</li>
* </ul>
*/
- public static enum Scope {
+ public static enum Scope implements RequestCollapserFactory.Scope {
REQUEST, GLOBAL
}
@@ -185,7 +185,7 @@ public HystrixCollapserKey getCollapserKey() {
* @return {@link Scope} that collapsing should be performed within.
*/
public Scope getScope() {
- return collapserFactory.getScope();
+ return Scope.valueOf(collapserFactory.getScope().name());
}
/**
View
534 hystrix-core/src/main/java/com/netflix/hystrix/HystrixObservableCollapser.java
@@ -0,0 +1,534 @@
+/**
+ * Copyright 2014 Netflix, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.netflix.hystrix;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.Future;
+
+import javax.annotation.concurrent.NotThreadSafe;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import rx.Observable;
+import rx.Scheduler;
+import rx.schedulers.Schedulers;
+import rx.subjects.ReplaySubject;
+
+import com.netflix.hystrix.HystrixCollapser.CollapsedRequest;
+import com.netflix.hystrix.HystrixCommandProperties.ExecutionIsolationStrategy;
+import com.netflix.hystrix.collapser.CollapserTimer;
+import com.netflix.hystrix.collapser.HystrixCollapserBridge;
+import com.netflix.hystrix.collapser.RealCollapserTimer;
+import com.netflix.hystrix.collapser.RequestCollapser;
+import com.netflix.hystrix.collapser.RequestCollapserFactory;
+import com.netflix.hystrix.exception.HystrixRuntimeException;
+import com.netflix.hystrix.strategy.HystrixPlugins;
+import com.netflix.hystrix.strategy.concurrency.HystrixRequestContext;
+import com.netflix.hystrix.strategy.properties.HystrixPropertiesStrategy;
+
+/**
+ * Collapse multiple requests into a single {@link HystrixCommand} execution based on a time window and optionally a max batch size.
+ * <p>
+ * This allows an object model to have multiple calls to the command that execute/queue many times in a short period (milliseconds) and have them all get batched into a single backend call.
+ * <p>
+ * Typically the time window is something like 10ms give or take.
+ * <p>
+ * NOTE: Do NOT retain any state within instances of this class.
+ * <p>
+ * It must be stateless or else it will be non-deterministic because most instances are discarded while some are retained and become the
+ * "collapsers" for all the ones that are discarded.
+ *
+ * @param <BatchReturnType>
+ * The type returned from the {@link HystrixCommand} that will be invoked on batch executions.
+ * @param <ResponseType>
+ * The type returned from this command.
+ * @param <RequestArgumentType>
+ * The type of the request argument. If multiple arguments are needed, wrap them in another object or a Tuple.
+ */
+public abstract class HystrixObservableCollapser<BatchReturnType, ResponseType, RequestArgumentType> implements HystrixExecutable<ResponseType> {
+
+ static final Logger logger = LoggerFactory.getLogger(HystrixObservableCollapser.class);
+
+ private final RequestCollapserFactory<BatchReturnType, ResponseType, RequestArgumentType> collapserFactory;
+ private final HystrixRequestCache requestCache;
+ private final HystrixCollapserBridge<BatchReturnType, ResponseType, RequestArgumentType> collapserInstanceWrapper;
+
+ /**
+ * The scope of request collapsing.
+ * <ul>
+ * <li>REQUEST: Requests within the scope of a {@link HystrixRequestContext} will be collapsed.
+ * <p>
+ * Typically this means that requests within a single user-request (ie. HTTP request) are collapsed. No interaction with other user requests. 1 queue per user request.
+ * </li>
+ * <li>GLOBAL: Requests from any thread (ie. all HTTP requests) within the JVM will be collapsed. 1 queue for entire app.</li>
+ * </ul>
+ */
+ public static enum Scope implements RequestCollapserFactory.Scope {
+ REQUEST, GLOBAL
+ }
+
+ /**
+ * Collapser with default {@link HystrixCollapserKey} derived from the implementing class name and scoped to {@link Scope#REQUEST} and default configuration.
+ */
+ protected HystrixObservableCollapser() {
+ this(Setter.withCollapserKey(null).andScope(Scope.REQUEST));
+ }
+
+ /**
+ * Collapser scoped to {@link Scope#REQUEST} and default configuration.
+ *
+ * @param collapserKey
+ * {@link HystrixCollapserKey} that identifies this collapser and provides the key used for retrieving properties, request caches, publishing metrics etc.
+ */
+ protected HystrixObservableCollapser(HystrixCollapserKey collapserKey) {
+ this(Setter.withCollapserKey(collapserKey).andScope(Scope.REQUEST));
+ }
+
+ /**
+ * Construct a {@link HystrixObservableCollapser} with defined {@link Setter} that allows
+ * injecting property and strategy overrides and other optional arguments.
+ * <p>
+ * Null values will result in the default being used.
+ *
+ * @param setter
+ * Fluent interface for constructor arguments
+ */
+ protected HystrixObservableCollapser(Setter setter) {
+ this(setter.collapserKey, setter.scope, new RealCollapserTimer(), setter.propertiesSetter);
+ }
+
+ /* package for tests */HystrixObservableCollapser(HystrixCollapserKey collapserKey, Scope scope, CollapserTimer timer, HystrixCollapserProperties.Setter propertiesBuilder) {
+ if (collapserKey == null || collapserKey.name().trim().equals("")) {
+ String defaultKeyName = getDefaultNameFromClass(getClass());
+ collapserKey = HystrixCollapserKey.Factory.asKey(defaultKeyName);
+ }
+
+ this.collapserFactory = new RequestCollapserFactory<BatchReturnType, ResponseType, RequestArgumentType>(collapserKey, scope, timer, propertiesBuilder);
+ this.requestCache = HystrixRequestCache.getInstance(collapserKey, HystrixPlugins.getInstance().getConcurrencyStrategy());
+
+ final HystrixObservableCollapser<BatchReturnType, ResponseType, RequestArgumentType> self = this;
+
+ /**
+ * Used to pass public method invocation to the underlying implementation in a separate package while leaving the methods 'protected' in this class.
+ */
+ collapserInstanceWrapper = new HystrixCollapserBridge<BatchReturnType, ResponseType, RequestArgumentType>() {
+
+ @Override
+ public Collection<Collection<CollapsedRequest<ResponseType, RequestArgumentType>>> shardRequests(Collection<CollapsedRequest<ResponseType, RequestArgumentType>> requests) {
+ return self.shardRequests(requests);
+ }
+
+ @Override
+ public Observable<BatchReturnType> createObservableCommand(Collection<CollapsedRequest<ResponseType, RequestArgumentType>> requests) {
+ HystrixObservableCommand<BatchReturnType> command = self.createCommand(requests);
+
+ // mark the number of requests being collapsed together
+ command.markAsCollapsedCommand(requests.size());
+
+ return command.toObservable();
+ }
+
+ @Override
+ public void mapResponseToRequests(BatchReturnType batchResponse, Collection<CollapsedRequest<ResponseType, RequestArgumentType>> requests) {
+ self.mapResponseToRequests(batchResponse, requests);
+ }
+
+ @Override
+ public HystrixCollapserKey getCollapserKey() {
+ return self.getCollapserKey();
+ }
+
+ };
+ }
+
+ private HystrixCollapserProperties getProperties() {
+ return collapserFactory.getProperties();
+ }
+
+ /**
+ * Key of the {@link HystrixObservableCollapser} used for properties, metrics, caches, reporting etc.
+ *
+ * @return {@link HystrixCollapserKey} identifying this {@link HystrixObservableCollapser} instance
+ */
+ public HystrixCollapserKey getCollapserKey() {
+ return collapserFactory.getCollapserKey();
+ }
+
+ /**
+ * Scope of collapsing.
+ * <p>
+ * <ul>
+ * <li>REQUEST: Requests within the scope of a {@link HystrixRequestContext} will be collapsed.
+ * <p>
+ * Typically this means that requests within a single user-request (ie. HTTP request) are collapsed. No interaction with other user requests. 1 queue per user request.
+ * </li>
+ * <li>GLOBAL: Requests from any thread (ie. all HTTP requests) within the JVM will be collapsed. 1 queue for entire app.</li>
+ * </ul>
+ * <p>
+ * Default: {@link Scope#REQUEST} (defined via constructor)
+ *
+ * @return {@link Scope} that collapsing should be performed within.
+ */
+ public Scope getScope() {
+ return Scope.valueOf(collapserFactory.getScope().name());
+ }
+
+ /**
+ * The request arguments to be passed to the {@link HystrixCommand}.
+ * <p>
+ * Typically this means to take the argument(s) provided to the constructor and return it here.
+ * <p>
+ * If there are multiple arguments that need to be bundled, create a single object to contain them, or use a Tuple.
+ *
+ * @return RequestArgumentType
+ */
+ public abstract RequestArgumentType getRequestArgument();
+
+ /**
+ * Factory method to create a new {@link HystrixObservableCommand}{@code <BatchReturnType>} command object each time a batch needs to be executed.
+ * <p>
+ * Do not return the same instance each time. Return a new instance on each invocation.
+ * <p>
+ * Process the 'requests' argument into the arguments the command object needs to perform its work.
+ * <p>
+ * If a batch or requests needs to be split (sharded) into multiple commands, see {@link #shardRequests} <p>
+ * IMPLEMENTATION NOTE: Be fast (ie. <1ms) in this method otherwise it can block the Timer from executing subsequent batches. Do not do any processing beyond constructing the command and returning
+ * it.
+ *
+ * @param requests
+ * {@code Collection<CollapsedRequest<ResponseType, RequestArgumentType>>} containing {@link CollapsedRequest} objects containing the arguments of each request collapsed in this batch.
+ * @return {@link HystrixObservableCommand}{@code <BatchReturnType>} which when executed will retrieve results for the batch of arguments as found in the Collection of {@link CollapsedRequest} objects
+ */
+ protected abstract HystrixObservableCommand<BatchReturnType> createCommand(Collection<CollapsedRequest<ResponseType, RequestArgumentType>> requests);
+
+ /**
+ * Override to split (shard) a batch of requests into multiple batches that will each call <code>createCommand</code> separately.
+ * <p>
+ * The purpose of this is to allow collapsing to work for services that have sharded backends and batch executions that need to be shard-aware.
+ * <p>
+ * For example, a batch of 100 requests could be split into 4 different batches sharded on name (ie. a-g, h-n, o-t, u-z) that each result in a separate {@link HystrixCommand} being created and
+ * executed for them.
+ * <p>
+ * By default this method does nothing to the Collection and is a pass-thru.
+ *
+ * @param requests
+ * {@code Collection<CollapsedRequest<ResponseType, RequestArgumentType>>} containing {@link CollapsedRequest} objects containing the arguments of each request collapsed in this batch.
+ * @return Collection of {@code Collection<CollapsedRequest<ResponseType, RequestArgumentType>>} objects sharded according to business rules.
+ * <p>The CollapsedRequest instances should not be modified or wrapped as the CollapsedRequest instance object contains state information needed to complete the execution.
+ */
+ protected Collection<Collection<CollapsedRequest<ResponseType, RequestArgumentType>>> shardRequests(Collection<CollapsedRequest<ResponseType, RequestArgumentType>> requests) {
+ return Collections.singletonList(requests);
+ }
+
+ /**
+ * Executed after the {@link HystrixCommand}{@code <BatchReturnType>} command created by {@link #createCommand} finishes processing (unless it fails) for mapping the {@code <BatchReturnType>} to
+ * the list of {@code CollapsedRequest<ResponseType, RequestArgumentType>} objects.
+ * <p>
+ * IMPORTANT IMPLEMENTATION DETAIL => The expected contract (responsibilities) of this method implementation is:
+ * <p>
+ * <ul>
+ * <li>ALL {@link CollapsedRequest} objects must have either a response or exception set on them even if the response is NULL
+ * otherwise the user thread waiting on the response will think a response was never received and will either block indefinitely or timeout while waiting.</li>
+ * <ul>
+ * <li>Setting a response is done via {@link CollapsedRequest#setResponse(Object)}</li>
+ * <li>Setting an exception is done via {@link CollapsedRequest#setException(Exception)}</li>
+ * </ul>
+ * </ul>
+ * <p>
+ * Common code when {@code <BatchReturnType>} is {@code List<ResponseType>} is:
+ * <p>
+ *
+ * <pre>
+ * int count = 0;
+ * for ({@code CollapsedRequest<ResponseType, RequestArgumentType>} request : requests) {
+ * &nbsp;&nbsp;&nbsp;&nbsp; request.setResponse(batchResponse.get(count++));
+ * }
+ * </pre>
+ *
+ * For example if the types were {@code <List<String>, String, String>}:
+ * <p>
+ *
+ * <pre>
+ * int count = 0;
+ * for ({@code CollapsedRequest<String, String>} request : requests) {
+ * &nbsp;&nbsp;&nbsp;&nbsp; request.setResponse(batchResponse.get(count++));
+ * }
+ * </pre>
+ *
+ * @param batchResponse
+ * The {@code <BatchReturnType>} returned from the {@link HystrixCommand}{@code <BatchReturnType>} command created by {@link #createCommand}.
+ * <p>
+ *
+ * @param requests
+ * {@code Collection<CollapsedRequest<ResponseType, RequestArgumentType>>} containing {@link CollapsedRequest} objects containing the arguments of each request collapsed in this batch.
+ * <p>
+ * The {@link CollapsedRequest#setResponse(Object)} or {@link CollapsedRequest#setException(Exception)} must be called on each {@link CollapsedRequest} in the Collection.
+ */
+ protected abstract void mapResponseToRequests(BatchReturnType batchResponse, Collection<CollapsedRequest<ResponseType, RequestArgumentType>> requests);
+
+ /**
+ * Used for asynchronous execution with a callback by subscribing to the {@link Observable}.
+ * <p>
+ * This eagerly starts execution the same as {@link #queue()} and {@link #execute()}.
+ * A lazy {@link Observable} can be obtained from {@link #toObservable()}.
+ * <p>
+ * <b>Callback Scheduling</b>
+ * <p>
+ * <ul>
+ * <li>When using {@link ExecutionIsolationStrategy#THREAD} this defaults to using {@link Schedulers#threadPoolForComputation()} for callbacks.</li>
+ * <li>When using {@link ExecutionIsolationStrategy#SEMAPHORE} this defaults to using {@link Schedulers#immediate()} for callbacks.</li>
+ * </ul>
+ * Use {@link #toObservable(rx.Scheduler)} to schedule the callback differently.
+ * <p>
+ * See https://github.com/Netflix/RxJava/wiki for more information.
+ *
+ * @return {@code Observable<R>} that executes and calls back with the result of of {@link HystrixCommand}{@code <BatchReturnType>} execution after passing through {@link #mapResponseToRequests}
+ * to transform the {@code <BatchReturnType>} into {@code <ResponseType>}
+ */
+ public Observable<ResponseType> observe() {
+ // us a ReplaySubject to buffer the eagerly subscribed-to Observable
+ ReplaySubject<ResponseType> subject = ReplaySubject.create();
+ // eagerly kick off subscription
+ toObservable().subscribe(subject);
+ // return the subject that can be subscribed to later while the execution has already started
+ return subject;
+ }
+
+ /**
+ * A lazy {@link Observable} that will execute when subscribed to.
+ * <p>
+ * <b>Callback Scheduling</b>
+ * <p>
+ * <ul>
+ * <li>When using {@link ExecutionIsolationStrategy#THREAD} this defaults to using {@link Schedulers#threadPoolForComputation()} for callbacks.</li>
+ * <li>When using {@link ExecutionIsolationStrategy#SEMAPHORE} this defaults to using {@link Schedulers#immediate()} for callbacks.</li>
+ * </ul>
+ * <p>
+ * See https://github.com/Netflix/RxJava/wiki for more information.
+ *
+ * @return {@code Observable<R>} that lazily executes and calls back with the result of of {@link HystrixCommand}{@code <BatchReturnType>} execution after passing through
+ * {@link #mapResponseToRequests} to transform the {@code <BatchReturnType>} into {@code <ResponseType>}
+ */
+ public Observable<ResponseType> toObservable() {
+ // when we callback with the data we want to do the work
+ // on a separate thread than the one giving us the callback
+ return toObservable(Schedulers.computation());
+ }
+
+ /**
+ * A lazy {@link Observable} that will execute when subscribed to.
+ * <p>
+ * See https://github.com/Netflix/RxJava/wiki for more information.
+ *
+ * @param observeOn
+ * The {@link Scheduler} to execute callbacks on.
+ * @return {@code Observable<R>} that lazily executes and calls back with the result of of {@link HystrixCommand}{@code <BatchReturnType>} execution after passing through
+ * {@link #mapResponseToRequests} to transform the {@code <BatchReturnType>} into {@code <ResponseType>}
+ */
+ public Observable<ResponseType> toObservable(Scheduler observeOn) {
+
+ /* try from cache first */
+ if (getProperties().requestCachingEnabled().get()) {
+ Observable<ResponseType> fromCache = requestCache.get(getCacheKey());
+ if (fromCache != null) {
+ /* mark that we received this response from cache */
+ // TODO Add collapser metrics so we can capture this information
+ // we can't add it to the command metrics because the command can change each time (dynamic key for example)
+ // and we don't have access to it when responding from cache
+ // collapserMetrics.markResponseFromCache();
+ return fromCache;
+ }
+ }
+
+ RequestCollapser<BatchReturnType, ResponseType, RequestArgumentType> requestCollapser = collapserFactory.getRequestCollapser(collapserInstanceWrapper);
+ Observable<ResponseType> response = requestCollapser.submitRequest(getRequestArgument());
+ if (getProperties().requestCachingEnabled().get()) {
+ /*
+ * A race can occur here with multiple threads queuing but only one will be cached.
+ * This means we can have some duplication of requests in a thread-race but we're okay
+ * with having some inefficiency in duplicate requests in the same batch
+ * and then subsequent requests will retrieve a previously cached Observable.
+ *
+ * If this is an issue we can make a lazy-future that gets set in the cache
+ * then only the winning 'put' will be invoked to actually call 'submitRequest'
+ */
+ Observable<ResponseType> o = response.cache();
+ Observable<ResponseType> fromCache = requestCache.putIfAbsent(getCacheKey(), o);
+ if (fromCache == null) {
+ response = o;
+ } else {
+ response = fromCache;
+ }
+ }
+ return response;
+ }
+
+ /**
+ * Used for synchronous execution.
+ * <p>
+ * If {@link Scope#REQUEST} is being used then synchronous execution will only result in collapsing if other threads are running within the same scope.
+ *
+ * @return ResponseType
+ * Result of {@link HystrixCommand}{@code <BatchReturnType>} execution after passing through {@link #mapResponseToRequests} to transform the {@code <BatchReturnType>} into
+ * {@code <ResponseType>}
+ * @throws HystrixRuntimeException
+ * if an error occurs and a fallback cannot be retrieved
+ */
+ public ResponseType execute() {
+ try {
+ return queue().get();
+ } catch (Throwable e) {
+ if (e instanceof HystrixRuntimeException) {
+ throw (HystrixRuntimeException) e;
+ }
+ // if we have an exception we know about we'll throw it directly without the threading wrapper exception
+ if (e.getCause() instanceof HystrixRuntimeException) {
+ throw (HystrixRuntimeException) e.getCause();
+ }
+ // we don't know what kind of exception this is so create a generic message and throw a new HystrixRuntimeException
+ String message = getClass().getSimpleName() + " HystrixCollapser failed while executing.";
+ logger.debug(message, e); // debug only since we're throwing the exception and someone higher will do something with it
+ //TODO should this be made a HystrixRuntimeException?
+ throw new RuntimeException(message, e);
+ }
+ }
+
+ /**
+ * Used for asynchronous execution.
+ * <p>
+ * This will queue up the command and return a Future to get the result once it completes.
+ *
+ * @return ResponseType
+ * Result of {@link HystrixCommand}{@code <BatchReturnType>} execution after passing through {@link #mapResponseToRequests} to transform the {@code <BatchReturnType>} into
+ * {@code <ResponseType>}
+ * @throws HystrixRuntimeException
+ * within an <code>ExecutionException.getCause()</code> (thrown by {@link Future#get}) if an error occurs and a fallback cannot be retrieved
+ */
+ public Future<ResponseType> queue() {
+ final Observable<ResponseType> o = toObservable();
+ return o.toBlockingObservable().toFuture();
+ }
+
+ /**
+ * Key to be used for request caching.
+ * <p>
+ * By default this returns null which means "do not cache".
+ * <p>
+ * To enable caching override this method and return a string key uniquely representing the state of a command instance.
+ * <p>
+ * If multiple command instances in the same request scope match keys then only the first will be executed and all others returned from cache.
+ *
+ * @return String cacheKey or null if not to cache
+ */
+ protected String getCacheKey() {
+ return null;
+ }
+
+ /**
+ * Clears all state. If new requests come in instances will be recreated and metrics started from scratch.
+ */
+ /* package */static void reset() {
+ RequestCollapserFactory.reset();
+ }
+
+ private static String getDefaultNameFromClass(@SuppressWarnings("rawtypes") Class<? extends HystrixObservableCollapser> cls) {
+ String fromCache = defaultNameCache.get(cls);
+ if (fromCache != null) {
+ return fromCache;
+ }
+ // generate the default
+ // default HystrixCommandKey to use if the method is not overridden
+ String name = cls.getSimpleName();
+ if (name.equals("")) {
+ // we don't have a SimpleName (anonymous inner class) so use the full class name
+ name = cls.getName();
+ name = name.substring(name.lastIndexOf('.') + 1, name.length());
+ }
+ defaultNameCache.put(cls, name);
+ return name;
+ }
+
+ /**
+ * Fluent interface for arguments to the {@link HystrixObservableCollapser} constructor.
+ * <p>
+ * The required arguments are set via the 'with' factory method and optional arguments via the 'and' chained methods.
+ * <p>
+ * Example:
+ * <pre> {@code
+ * Setter.withCollapserKey(HystrixCollapserKey.Factory.asKey("CollapserName"))
+ .andScope(Scope.REQUEST);
+ * } </pre>
+ */
+ @NotThreadSafe
+ public static class Setter {
+ private final HystrixCollapserKey collapserKey;
+ private Scope scope = Scope.REQUEST; // default if nothing is set
+ private HystrixCollapserProperties.Setter propertiesSetter;
+
+ private Setter(HystrixCollapserKey collapserKey) {
+ this.collapserKey = collapserKey;
+ }
+
+ /**
+ * Setter factory method containing required values.
+ * <p>
+ * All optional arguments can be set via the chained methods.
+ *
+ * @param collapserKey
+ * {@link HystrixCollapserKey} that identifies this collapser and provides the key used for retrieving properties, request caches, publishing metrics etc.
+ * @return Setter for fluent interface via method chaining
+ */
+ public static Setter withCollapserKey(HystrixCollapserKey collapserKey) {
+ return new Setter(collapserKey);
+ }
+
+ /**
+ * {@link Scope} defining what scope the collapsing should occur within
+ *
+ * @param scope
+ *
+ * @return Setter for fluent interface via method chaining
+ */
+ public Setter andScope(Scope scope) {
+ this.scope = scope;
+ return this;
+ }
+
+ /**
+ * @param propertiesSetter
+ * {@link HystrixCollapserProperties.Setter} that allows instance specific property overrides (which can then be overridden by dynamic properties, see
+ * {@link HystrixPropertiesStrategy} for
+ * information on order of precedence).
+ * <p>
+ * Will use defaults if left NULL.
+ * @return Setter for fluent interface via method chaining
+ */
+ public Setter andCollapserPropertiesDefaults(HystrixCollapserProperties.Setter propertiesSetter) {
+ this.propertiesSetter = propertiesSetter;
+ return this;
+ }
+
+ }
+
+ // this is a micro-optimization but saves about 1-2microseconds (on 2011 MacBook Pro)
+ // on the repetitive string processing that will occur on the same classes over and over again
+ @SuppressWarnings("rawtypes")
+ private static ConcurrentHashMap<Class<? extends HystrixObservableCollapser>, String> defaultNameCache = new ConcurrentHashMap<Class<? extends HystrixObservableCollapser>, String>();
+
+}
View
15 hystrix-core/src/main/java/com/netflix/hystrix/collapser/RequestCollapserFactory.java
@@ -36,6 +36,15 @@
private final HystrixConcurrencyStrategy concurrencyStrategy;
private final Scope scope;
+ public static interface Scope {
+ String name();
+ }
+
+ // internally expected scopes, dealing with the not-so-fun inheritance issues of enum when shared between classes
+ private static enum Scopes implements Scope {
+ REQUEST, GLOBAL
+ }
+
public RequestCollapserFactory(HystrixCollapserKey collapserKey, Scope scope, CollapserTimer timer, HystrixCollapserProperties.Setter propertiesBuilder) {
/* strategy: ConcurrencyStrategy */
this.concurrencyStrategy = HystrixPlugins.getInstance().getConcurrencyStrategy();
@@ -59,9 +68,9 @@ public HystrixCollapserProperties getProperties() {
}
public RequestCollapser<BatchReturnType, ResponseType, RequestArgumentType> getRequestCollapser(HystrixCollapserBridge<BatchReturnType, ResponseType, RequestArgumentType> commandCollapser) {
- if (Scope.REQUEST == getScope()) {
+ if (Scopes.REQUEST == Scopes.valueOf(getScope().name())) {
return getCollapserForUserRequest(commandCollapser);
- } else if (Scope.GLOBAL == getScope()) {
+ } else if (Scopes.GLOBAL == Scopes.valueOf(getScope().name())) {
return getCollapserForGlobalScope(commandCollapser);
} else {
logger.warn("Invalid Scope: " + getScope() + " Defaulting to REQUEST scope.");
@@ -210,7 +219,7 @@ public void shutdown(RequestCollapser<BatchReturnType, ResponseType, RequestArgu
@NotThreadSafe
public static class Setter {
private final HystrixCollapserKey collapserKey;
- private Scope scope = Scope.REQUEST; // default if nothing is set
+ private Scope scope = Scopes.REQUEST; // default if nothing is set
private HystrixCollapserProperties.Setter propertiesSetter;
private Setter(HystrixCollapserKey collapserKey) {
View
2 hystrix-core/src/test/java/com/netflix/hystrix/HystrixCollapserTest.java
@@ -1100,7 +1100,7 @@ protected Void run() throws Exception {
}
- private static class TestCollapserTimer implements CollapserTimer {
+ /* package */ static class TestCollapserTimer implements CollapserTimer {
private final ConcurrentLinkedQueue<ATask> tasks = new ConcurrentLinkedQueue<ATask>();
View
204 hystrix-core/src/test/java/com/netflix/hystrix/HystrixObservableCollapserTest.java
@@ -0,0 +1,204 @@
+/**
+ * Copyright 2014 Netflix, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.netflix.hystrix;
+
+import static org.junit.Assert.assertEquals;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.Future;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import rx.Observable;
+import rx.Observable.OnSubscribe;
+import rx.Subscriber;
+import rx.schedulers.Schedulers;
+
+import com.netflix.hystrix.HystrixCollapser.CollapsedRequest;
+import com.netflix.hystrix.HystrixCollapserTest.TestCollapserTimer;
+import com.netflix.hystrix.HystrixObservableCommandTest.TestHystrixCommand;
+import com.netflix.hystrix.strategy.concurrency.HystrixRequestContext;
+
+public class HystrixObservableCollapserTest {
+ static AtomicInteger counter = new AtomicInteger();
+
+ @Before
+ public void init() {
+ counter.set(0);
+ // since we're going to modify properties of the same class between tests, wipe the cache each time
+ HystrixCollapser.reset();
+ /* we must call this to simulate a new request lifecycle running and clearing caches */
+ HystrixRequestContext.initializeContext();
+ }
+
+ @After
+ public void cleanup() {
+ // instead of storing the reference from initialize we'll just get the current state and shutdown
+ if (HystrixRequestContext.getContextForCurrentThread() != null) {
+ // it may be null if a test shuts the context down manually
+ HystrixRequestContext.getContextForCurrentThread().shutdown();
+ }
+ }
+
+ @Test
+ public void testTwoRequests() throws Exception {
+ TestCollapserTimer timer = new TestCollapserTimer();
+ Future<String> response1 = new TestRequestCollapser(timer, counter, 1).queue();
+ Future<String> response2 = new TestRequestCollapser(timer, counter, 2).queue();
+ timer.incrementTime(10); // let time pass that equals the default delay/period
+
+ assertEquals("1", response1.get());
+ assertEquals("2", response2.get());
+
+ assertEquals(1, counter.get());
+
+ assertEquals(1, HystrixRequestLog.getCurrentRequest().getAllExecutedCommands().size());
+ }
+
+ private static class TestRequestCollapser extends HystrixObservableCollapser<List<String>, String, String> {
+
+ private final AtomicInteger count;
+ private final String value;
+ private ConcurrentLinkedQueue<HystrixObservableCommand<List<String>>> commandsExecuted;
+
+ public TestRequestCollapser(TestCollapserTimer timer, AtomicInteger counter, int value) {
+ this(timer, counter, String.valueOf(value));
+ }
+
+ public TestRequestCollapser(TestCollapserTimer timer, AtomicInteger counter, String value) {
+ this(timer, counter, value, 10000, 10);
+ }
+
+ public TestRequestCollapser(TestCollapserTimer timer, AtomicInteger counter, String value, ConcurrentLinkedQueue<HystrixObservableCommand<List<String>>> executionLog) {
+ this(timer, counter, value, 10000, 10, executionLog);
+ }
+
+ public TestRequestCollapser(TestCollapserTimer timer, AtomicInteger counter, int value, int defaultMaxRequestsInBatch, int defaultTimerDelayInMilliseconds) {
+ this(timer, counter, String.valueOf(value), defaultMaxRequestsInBatch, defaultTimerDelayInMilliseconds);
+ }
+
+ public TestRequestCollapser(TestCollapserTimer timer, AtomicInteger counter, String value, int defaultMaxRequestsInBatch, int defaultTimerDelayInMilliseconds) {
+ this(timer, counter, value, defaultMaxRequestsInBatch, defaultTimerDelayInMilliseconds, null);
+ }
+
+ public TestRequestCollapser(Scope scope, TestCollapserTimer timer, AtomicInteger counter, String value, int defaultMaxRequestsInBatch, int defaultTimerDelayInMilliseconds) {
+ this(scope, timer, counter, value, defaultMaxRequestsInBatch, defaultTimerDelayInMilliseconds, null);
+ }
+
+ public TestRequestCollapser(TestCollapserTimer timer, AtomicInteger counter, String value, int defaultMaxRequestsInBatch, int defaultTimerDelayInMilliseconds, ConcurrentLinkedQueue<HystrixObservableCommand<List<String>>> executionLog) {
+ this(Scope.REQUEST, timer, counter, value, defaultMaxRequestsInBatch, defaultTimerDelayInMilliseconds, executionLog);
+ }
+
+ public TestRequestCollapser(Scope scope, TestCollapserTimer timer, AtomicInteger counter, String value, int defaultMaxRequestsInBatch, int defaultTimerDelayInMilliseconds, ConcurrentLinkedQueue<HystrixObservableCommand<List<String>>> executionLog) {
+ // use a CollapserKey based on the CollapserTimer object reference so it's unique for each timer as we don't want caching
+ // of properties to occur and we're using the default HystrixProperty which typically does caching
+ super(collapserKeyFromString(timer), scope, timer, HystrixCollapserProperties.Setter().withMaxRequestsInBatch(defaultMaxRequestsInBatch).withTimerDelayInMilliseconds(defaultTimerDelayInMilliseconds));
+ this.count = counter;
+ this.value = value;
+ this.commandsExecuted = executionLog;
+ }
+
+ @Override
+ public String getRequestArgument() {
+ return value;
+ }
+
+ @Override
+ public HystrixObservableCommand<List<String>> createCommand(final Collection<CollapsedRequest<String, String>> requests) {
+ /* return a mocked command */
+ HystrixObservableCommand<List<String>> command = new TestCollapserCommand(requests);
+ if (commandsExecuted != null) {
+ commandsExecuted.add(command);
+ }
+ return command;
+ }
+
+ @Override
+ public void mapResponseToRequests(List<String> batchResponse, Collection<CollapsedRequest<String, String>> requests) {
+ // count how many times a batch is executed (this method is executed once per batch)
+ System.out.println("increment count: " + count.incrementAndGet());
+
+ // for simplicity I'll assume it's a 1:1 mapping between lists ... in real implementations they often need to index to maps
+ // to allow random access as the response size does not match the request size
+ if (batchResponse.size() != requests.size()) {
+ throw new RuntimeException("lists don't match in size => " + batchResponse.size() + " : " + requests.size());
+ }
+ int i = 0;
+ for (CollapsedRequest<String, String> request : requests) {
+ request.setResponse(batchResponse.get(i++));
+ }
+
+ }
+
+ }
+
+ private static HystrixCollapserKey collapserKeyFromString(final Object o) {
+ return new HystrixCollapserKey() {
+
+ @Override
+ public String name() {
+ return String.valueOf(o);
+ }
+
+ };
+ }
+
+ private static class TestCollapserCommand extends TestHystrixCommand<List<String>> {
+
+ private final Collection<CollapsedRequest<String, String>> requests;
+
+ TestCollapserCommand(Collection<CollapsedRequest<String, String>> requests) {
+ super(testPropsBuilder().setCommandPropertiesDefaults(HystrixCommandPropertiesTest.getUnitTestPropertiesSetter().withExecutionIsolationThreadTimeoutInMilliseconds(50)));
+ this.requests = requests;
+ }
+
+ @Override
+ protected Observable<List<String>> run() {
+ return Observable.create(new OnSubscribe<List<String>>() {
+
+ @Override
+ public void call(Subscriber<? super List<String>> s) {
+ System.out.println(">>> TestCollapserCommand run() ... batch size: " + requests.size());
+ // simulate a batch request
+ ArrayList<String> response = new ArrayList<String>();
+ for (CollapsedRequest<String, String> request : requests) {
+ if (request.getArgument() == null) {
+ throw new NullPointerException("Simulated Error");
+ }
+ if (request.getArgument() == "TIMEOUT") {
+ try {
+ Thread.sleep(200);
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ }
+ }
+ response.add(request.getArgument());
+ }
+ s.onNext(response);
+ s.onCompleted();
+ }
+
+ }).subscribeOn(Schedulers.computation());
+ }
+
+ }
+}
View
2 hystrix-core/src/test/java/com/netflix/hystrix/HystrixObservableCommandTest.java
@@ -6119,7 +6119,7 @@ public void testTimeoutWithFallbackRequestContextWithThreadIsolatedAsynchronousO
/**
* Used by UnitTest command implementations to provide base defaults for constructor and a builder pattern for the arguments being passed in.
*/
- private static abstract class TestHystrixCommand<K> extends HystrixObservableCommand<K> {
+ /* package */ static abstract class TestHystrixCommand<K> extends HystrixObservableCommand<K> {
final TestCommandBuilder builder;

0 comments on commit fb96410

Please sign in to comment.
Something went wrong with that request. Please try again.