diff --git a/hystrix-core/src/main/java/com/netflix/hystrix/HystrixCollapser.java b/hystrix-core/src/main/java/com/netflix/hystrix/HystrixCollapser.java index ed4c3eda0..744cb536c 100644 --- a/hystrix-core/src/main/java/com/netflix/hystrix/HystrixCollapser.java +++ b/hystrix-core/src/main/java/com/netflix/hystrix/HystrixCollapser.java @@ -34,6 +34,7 @@ import rx.Subscription; import rx.functions.Action0; import rx.functions.Action1; +import rx.functions.Func0; import rx.schedulers.Schedulers; import rx.subjects.ReplaySubject; @@ -379,42 +380,45 @@ public Observable toObservable() { */ public Observable toObservable(Scheduler observeOn) { - final boolean isRequestCacheEnabled = getProperties().requestCacheEnabled().get(); - final String cacheKey = getCacheKey(); - - /* try from cache first */ - if (isRequestCacheEnabled) { - HystrixCachedObservable fromCache = requestCache.get(cacheKey); - if (fromCache != null) { - metrics.markResponseFromCache(); - return fromCache.toObservable(); - } - } - - final HystrixCollapser _self = this; - - RequestCollapser requestCollapser = collapserFactory.getRequestCollapser(collapserInstanceWrapper); - Observable response = requestCollapser.submitRequest(getRequestArgument()); - - if (isRequestCacheEnabled && cacheKey != null) { - /* - * A race can occur here with multiple threads queuing but only one will be cached. - * This means we can have some duplication of requests in a thread-race but we're okay - * with having some inefficiency in duplicate requests in the same batch - * and then subsequent requests will retrieve a previously cached Observable. - * - * If this is an issue we can make a lazy-future that gets set in the cache - * then only the winning 'put' will be invoked to actually call 'submitRequest' - */ - HystrixCachedObservable toCache = HystrixCachedObservable.from(response); - HystrixCachedObservable fromCache = requestCache.putIfAbsent(cacheKey, toCache); - if (fromCache == null) { - return toCache.toObservable(); - } else { - return fromCache.toObservable(); + return Observable.defer(new Func0>() { + @Override + public Observable call() { + final boolean isRequestCacheEnabled = getProperties().requestCacheEnabled().get(); + final String cacheKey = getCacheKey(); + + /* try from cache first */ + if (isRequestCacheEnabled) { + HystrixCachedObservable fromCache = requestCache.get(cacheKey); + if (fromCache != null) { + metrics.markResponseFromCache(); + return fromCache.toObservable(); + } + } + + RequestCollapser requestCollapser = collapserFactory.getRequestCollapser(collapserInstanceWrapper); + Observable response = requestCollapser.submitRequest(getRequestArgument()); + + if (isRequestCacheEnabled && cacheKey != null) { + /* + * A race can occur here with multiple threads queuing but only one will be cached. + * This means we can have some duplication of requests in a thread-race but we're okay + * with having some inefficiency in duplicate requests in the same batch + * and then subsequent requests will retrieve a previously cached Observable. + * + * If this is an issue we can make a lazy-future that gets set in the cache + * then only the winning 'put' will be invoked to actually call 'submitRequest' + */ + HystrixCachedObservable toCache = HystrixCachedObservable.from(response); + HystrixCachedObservable fromCache = requestCache.putIfAbsent(cacheKey, toCache); + if (fromCache == null) { + return toCache.toObservable(); + } else { + return fromCache.toObservable(); + } + } + return response; } - } - return response; + }); } /**