Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Prevent duplicate arguments from getting into a single collapser RequestBatch #1278

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -379,7 +379,6 @@ public Observable<ResponseType> toObservable() {
* {@link #mapResponseToRequests} to transform the {@code <BatchReturnType>} into {@code <ResponseType>}
*/
public Observable<ResponseType> toObservable(Scheduler observeOn) {

return Observable.defer(new Func0<Observable<ResponseType>>() {
@Override
public Observable<ResponseType> call() {
Expand All @@ -399,20 +398,12 @@ public Observable<ResponseType> call() {
Observable<ResponseType> response = requestCollapser.submitRequest(getRequestArgument());

if (isRequestCacheEnabled && cacheKey != null) {
/*
* A race can occur here with multiple threads queuing but only one will be cached.
* This means we can have some duplication of requests in a thread-race but we're okay
* with having some inefficiency in duplicate requests in the same batch
* and then subsequent requests will retrieve a previously cached Observable.
*
* If this is an issue we can make a lazy-future that gets set in the cache
* then only the winning 'put' will be invoked to actually call 'submitRequest'
*/
HystrixCachedObservable<ResponseType> toCache = HystrixCachedObservable.from(response);
HystrixCachedObservable<ResponseType> fromCache = requestCache.putIfAbsent(cacheKey, toCache);
if (fromCache == null) {
return toCache.toObservable();
} else {
toCache.unsubscribe();
return fromCache.toObservable();
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,19 +49,19 @@
private final ReplaySubject<T> subject = ReplaySubject.create();
private final Observable<T> subjectWithAccounting;

private volatile boolean subscribedTo = false;
private volatile int outstandingSubscriptions = 0;

public CollapsedRequestSubject(final R arg, final RequestBatch<?, T, R> containingBatch) {
if (arg == RequestCollapser.NULL_SENTINEL) {
this.argument = null;
} else {
this.argument = arg;
}
this.subjectWithAccounting = subject
.doOnSubscribe(new Action0() {
@Override
public void call() {
outstandingSubscriptions++;
if (!subscribedTo) {
subscribedTo = true;
//containingBatch.add(arg, this);
}
}
})
.doOnUnsubscribe(new Action0() {
Expand All @@ -73,7 +73,6 @@ public void call() {
}
}
});
this.argument = arg;
}

public CollapsedRequestSubject(final R arg) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,9 @@
package com.netflix.hystrix.collapser;

import java.util.Collection;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.ReentrantReadWriteLock;

import org.slf4j.Logger;
Expand Down Expand Up @@ -46,13 +46,14 @@ public class RequestBatch<BatchReturnType, ResponseType, RequestArgumentType> {
private final int maxBatchSize;
private final AtomicBoolean batchStarted = new AtomicBoolean();

private final ConcurrentLinkedQueue<CollapsedRequest<ResponseType, RequestArgumentType>> batchArgumentQueue =
new ConcurrentLinkedQueue<CollapsedRequest<ResponseType, RequestArgumentType>>();
private final AtomicInteger count = new AtomicInteger(0);
private final ConcurrentMap<RequestArgumentType, CollapsedRequest<ResponseType, RequestArgumentType>> argumentMap =
new ConcurrentHashMap<RequestArgumentType, CollapsedRequest<ResponseType, RequestArgumentType>>();
private final HystrixCollapserProperties properties;

private ReentrantReadWriteLock batchLock = new ReentrantReadWriteLock();

public RequestBatch(HystrixCollapserProperties properties, HystrixCollapserBridge<BatchReturnType, ResponseType, RequestArgumentType> commandCollapser, int maxBatchSize) {
this.properties = properties;
this.commandCollapser = commandCollapser;
this.maxBatchSize = maxBatchSize;
}
Expand All @@ -76,14 +77,35 @@ public Observable<ResponseType> offer(RequestArgumentType arg) {
return null;
}

if (count.get() >= maxBatchSize) {
if (argumentMap.size() >= maxBatchSize) {
return null;
} else {
CollapsedRequestSubject<ResponseType, RequestArgumentType> collapsedRequest =
new CollapsedRequestSubject<ResponseType, RequestArgumentType>(arg, this);
batchArgumentQueue.add(collapsedRequest);
count.incrementAndGet();
return collapsedRequest.toObservable();
final CollapsedRequestSubject<ResponseType, RequestArgumentType> existing = (CollapsedRequestSubject<ResponseType, RequestArgumentType>) argumentMap.putIfAbsent(arg, collapsedRequest);
/**
* If the argument already exists in the batch, then there are 2 options:
* A) If request caching is ON (the default): only keep 1 argument in the batch and let all responses
* be hooked up to that argument
* B) If request caching is OFF: return an error to all duplicate argument requests
*
* This maintains the invariant that each batch has no duplicate arguments. This prevents the impossible
* logic (in a user-provided mapResponseToRequests for HystrixCollapser and the internals of HystrixObservableCollapser)
* of trying to figure out which argument of a set of duplicates should get attached to a response.
*
* See https://github.com/Netflix/Hystrix/pull/1176 for further discussion.
*/
if (existing != null) {
boolean requestCachingEnabled = properties.requestCacheEnabled().get();
if (requestCachingEnabled) {
return existing.toObservable();
} else {
return Observable.error(new IllegalArgumentException("Duplicate argument in collapser batch : [" + arg + "] This is not supported. Please turn request-caching on for HystrixCollapser:" + commandCollapser.getCollapserKey().name() + " or prevent duplicates from making it into the batch!"));
}
} else {
return collapsedRequest.toObservable();
}

}
} finally {
batchLock.readLock().unlock();
Expand All @@ -95,10 +117,8 @@ public Observable<ResponseType> offer(RequestArgumentType arg) {

/**
* Best-effort attempt to remove an argument from a batch. This may get invoked when a cancellation occurs somewhere downstream.
* This method finds the first occurrence of an argument in the batch, and removes that occurrence.
* This method finds the argument in the batch, and removes it.
*
* This is currently O(n). If an O(1) approach is needed, then we need to refactor internals to use a Map instead of Queue.
* My first pass at this is fairly naive, on the suspicion that unsubscription will be rare enough to not cause a perf problem.
* @param arg argument to remove from batch
*/
/* package-private */ void remove(RequestArgumentType arg) {
Expand All @@ -114,13 +134,7 @@ public Observable<ResponseType> offer(RequestArgumentType arg) {
return;
}

for (CollapsedRequest<ResponseType, RequestArgumentType> collapsedRequest: batchArgumentQueue) {
if (arg.equals(collapsedRequest.getArgument())) {
batchArgumentQueue.remove(collapsedRequest);
count.decrementAndGet();
return; //just remove a single instance
}
}
argumentMap.remove(arg);
} finally {
batchLock.readLock().unlock();
}
Expand Down Expand Up @@ -150,7 +164,7 @@ public void executeBatchIfNotAlreadyStarted() {

try {
// shard batches
Collection<Collection<CollapsedRequest<ResponseType, RequestArgumentType>>> shards = commandCollapser.shardRequests(batchArgumentQueue);
Collection<Collection<CollapsedRequest<ResponseType, RequestArgumentType>>> shards = commandCollapser.shardRequests(argumentMap.values());
// for each shard execute its requests
for (final Collection<CollapsedRequest<ResponseType, RequestArgumentType>> shardRequests : shards) {
try {
Expand All @@ -173,7 +187,7 @@ public void call(Throwable e) {
}
logger.debug("Exception mapping responses to requests.", e);
// if a failure occurs we want to pass that exception to all of the Futures that we've returned
for (CollapsedRequest<ResponseType, RequestArgumentType> request : batchArgumentQueue) {
for (CollapsedRequest<ResponseType, RequestArgumentType> request : argumentMap.values()) {
try {
((CollapsedRequestSubject<ResponseType, RequestArgumentType>) request).setExceptionIfResponseNotReceived(ee);
} catch (IllegalStateException e2) {
Expand Down Expand Up @@ -221,7 +235,7 @@ public void call() {
} catch (Exception e) {
logger.error("Exception while sharding requests.", e);
// same error handling as we do around the shards, but this is a wider net in case the shardRequest method fails
for (CollapsedRequest<ResponseType, RequestArgumentType> request : batchArgumentQueue) {
for (CollapsedRequest<ResponseType, RequestArgumentType> request : argumentMap.values()) {
try {
request.setException(e);
} catch (IllegalStateException e2) {
Expand All @@ -241,16 +255,16 @@ public void shutdown() {
batchLock.writeLock().lock();
try {
// if we win the 'start' and once we have the lock we can now shut it down otherwise another thread will finish executing this batch
if (count.get() > 0) {
logger.warn("Requests still exist in queue but will not be executed due to RequestCollapser shutdown: " + count.get(), new IllegalStateException());
if (argumentMap.size() > 0) {
logger.warn("Requests still exist in queue but will not be executed due to RequestCollapser shutdown: " + argumentMap.size(), new IllegalStateException());
/*
* In the event that there is a concurrency bug or thread scheduling prevents the timer from ticking we need to handle this so the Future.get() calls do not block.
*
* I haven't been able to reproduce this use case on-demand but when stressing a machine saw this occur briefly right after the JVM paused (logs stopped scrolling).
*
* This safety-net just prevents the CollapsedRequestFutureImpl.get() from waiting on the CountDownLatch until its max timeout.
*/
for (CollapsedRequest<ResponseType, RequestArgumentType> request : batchArgumentQueue) {
for (CollapsedRequest<ResponseType, RequestArgumentType> request : argumentMap.values()) {
try {
((CollapsedRequestSubject<ResponseType, RequestArgumentType>) request).setExceptionIfResponseNotReceived(new IllegalStateException("Requests not executed before shutdown."));
} catch (Exception e) {
Expand All @@ -270,6 +284,6 @@ public void shutdown() {
}

public int getSize() {
return count.get();
return argumentMap.size();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
*/
public class RequestCollapser<BatchReturnType, ResponseType, RequestArgumentType> {
static final Logger logger = LoggerFactory.getLogger(RequestCollapser.class);
static final Object NULL_SENTINEL = new Object();

private final HystrixCollapserBridge<BatchReturnType, ResponseType, RequestArgumentType> commandCollapser;
// batch can be null once shutdown
Expand Down Expand Up @@ -89,10 +90,15 @@ public Observable<ResponseType> submitRequest(final RequestArgumentType arg) {
return Observable.error(new IllegalStateException("Submitting requests after collapser is shutdown"));
}

Observable<ResponseType> f = b.offer(arg);
final Observable<ResponseType> response;
if (arg != null) {
response = b.offer(arg);
} else {
response = b.offer( (RequestArgumentType) NULL_SENTINEL);
}
// it will always get an Observable unless we hit the max batch size
if (f != null) {
return f;
if (response != null) {
return response;
} else {
// this batch can't accept requests so create a new one and set it if another thread doesn't beat us
createNewBatchAndExecutePreviousIfNeeded(b);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ public class HystrixPropertiesFactory {
public static void reset() {
commandProperties.clear();
threadPoolProperties.clear();
collapserProperties.clear();
}

// String is CommandKey.name() (we can't use CommandKey directly as we can't guarantee it implements hashcode/equals correctly)
Expand Down