Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Singularity performance improvements #1702

Merged
merged 25 commits into from
Feb 13, 2018
Merged
Show file tree
Hide file tree
Changes from 24 commits
Commits
Show all changes
25 commits
Select commit Hold shift + click to select a range
e4f37f5
Update tests to check status update performance as well
ssalinas Jan 26, 2018
5ee1602
request level locking wip
ssalinas Jan 31, 2018
812e1c5
Merge branch 'performance' into request_level_locks
ssalinas Jan 31, 2018
12a4459
Singularity performance improvements
ssalinas Feb 1, 2018
7c3f8a6
fix offer match loop for less iterations, fix status update call in t…
ssalinas Feb 2, 2018
91d11ab
fix key for slave usage
ssalinas Feb 2, 2018
3f36abd
missing dep
ssalinas Feb 2, 2018
f326aa8
add netty-common to dep management
ssalinas Feb 2, 2018
e80e254
fix master reconnect
ssalinas Feb 2, 2018
53a6b7c
test timeout back to 30s
ssalinas Feb 2, 2018
746ed49
remove sync in this use case
ssalinas Feb 2, 2018
65098e0
offer performance wip
ssalinas Feb 2, 2018
7d0e9c3
more efficiency improvements for offer processing
ssalinas Feb 2, 2018
3e58699
more offer performance improvements
ssalinas Feb 2, 2018
3af4530
fix config for offer scoring tests
ssalinas Feb 5, 2018
281927b
lock request during history purge as well
ssalinas Feb 5, 2018
73df8d1
fewer repeated calculations and streams
ssalinas Feb 7, 2018
f97ed21
fix one scoring bug
ssalinas Feb 7, 2018
feae0c3
fix other scoring bug
ssalinas Feb 7, 2018
6ffc598
split out tests on the offer cache
ssalinas Feb 7, 2018
d4275af
Merge branch 'master' into request_level_locks
ssalinas Feb 7, 2018
8cf72f8
avoid concurrent modification exceptions in leader cache
ssalinas Feb 7, 2018
b3acf6e
hashset isn't thread safe
ssalinas Feb 7, 2018
66c29d6
pr comments
ssalinas Feb 7, 2018
87494fa
Better logging in lock
ssalinas Feb 8, 2018
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,13 @@
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;

import org.apache.commons.lang3.time.DateFormatUtils;
import org.apache.commons.lang3.time.DurationFormatUtils;
Expand Down
5 changes: 5 additions & 0 deletions SingularityService/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -448,6 +448,11 @@
<artifactId>netty-codec</artifactId>
</dependency>

<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-common</artifactId>
</dependency>

</dependencies>

<build>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,230 @@
package com.hubspot.singularity.async;
import java.util.Optional;
import java.util.Queue;
import java.util.concurrent.Callable;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
import java.util.concurrent.locks.StampedLock;
import java.util.function.Supplier;

import com.google.common.base.Suppliers;

/**
* AsyncSemaphore guarantees that at most N executions
* of an underlying completablefuture exeuction are occuring
* at the same time.
*
* The general strategy is to try acquiring a permit for execution.
* If it succeeds, the semaphore just proceeds normally. Otherwise,
* it pushes the execution onto a queue.
*
* At the completion of any execution, the queue is checked for
* any pending executions. If any executions are found, they are
* executed in order.
*
* @param <T>
*/
public class AsyncSemaphore<T> {
private final StampedLock stampedLock = new StampedLock();
private final AtomicInteger concurrentRequests = new AtomicInteger();
private final Queue<DelayedExecution<T>> requestQueue;
private final com.google.common.base.Supplier<Integer> queueRejectionThreshold;
private final Supplier<Exception> timeoutExceptionSupplier;
private final PermitSource permitSource;

/**
* Create an AsyncSemaphore with the given limit.
*
* @param concurrentRequestLimit - A supplier saying how many concurrent requests are allowed
*/
public static AsyncSemaphoreBuilder newBuilder(Supplier<Integer> concurrentRequestLimit) {
return new AsyncSemaphoreBuilder(new PermitSource(concurrentRequestLimit));
}

/**
* Create an AsyncSemaphore with the given permit source.
*
* @param permitSource - A source for the permits used by the semaphore
*/
public static AsyncSemaphoreBuilder newBuilder(PermitSource permitSource) {
return new AsyncSemaphoreBuilder(permitSource);
}

AsyncSemaphore(PermitSource permitSource,
Queue<DelayedExecution<T>> requestQueue,
Supplier<Integer> queueRejectionThreshold,
Supplier<Exception> timeoutExceptionSupplier) {
this.permitSource = permitSource;
this.requestQueue = requestQueue;
this.queueRejectionThreshold = Suppliers.memoizeWithExpiration(queueRejectionThreshold::get, 1, TimeUnit.MINUTES);
this.timeoutExceptionSupplier = timeoutExceptionSupplier;
}

public CompletableFuture<T> call(Callable<CompletableFuture<T>> execution) {
return callWithQueueTimeout(execution, Optional.empty());
}

/**
* Try to execute the supplier if there are enough permits available.
* If not, add the execution to a queue (if there is room).
* If the queue attempts to start the execution after the timeout
* has passed, short circuit the execution and complete the future
* exceptionally with TimeoutException
*
* @param execution - The execution of the item
* @param timeout - The time before which we'll short circuit the execution
* @param timeUnit
* @return
*/
public CompletableFuture<T> callWithQueueTimeout(Callable<CompletableFuture<T>> execution, long timeout, TimeUnit timeUnit) {
return callWithQueueTimeout(execution, Optional.of(TimeUnit.MILLISECONDS.convert(timeout, timeUnit)));
}

private CompletableFuture<T> callWithQueueTimeout(Callable<CompletableFuture<T>> execution,
Optional<Long> timeoutInMillis) {
CompletableFuture<T> responseFuture;
if (timeoutInMillis.isPresent() && timeoutInMillis.get() <= 0) {
return CompletableFutures.exceptionalFuture(timeoutExceptionSupplier.get());
} else if (tryAcquirePermit()) {
responseFuture = executeCall(execution);
} else {
DelayedExecution<T> delayedExecution = new DelayedExecution<>(execution, timeoutExceptionSupplier, timeoutInMillis);
if (!tryEnqueueAttempt(delayedExecution)) {
return CompletableFutures.exceptionalFuture(
new RejectedExecutionException("Could not queue future for execution.")
);
}
responseFuture = delayedExecution.getResponseFuture();
}

return responseFuture.whenComplete((ignored1, ignored2) -> {
DelayedExecution<T> nextExecutionDue = requestQueue.poll();
if (nextExecutionDue == null) {
releasePermit();
} else {
// reuse the previous permit for the queued request
nextExecutionDue.execute();
}
});
}

private boolean tryAcquirePermit() {
boolean acquired = permitSource.tryAcquire();

if (acquired) {
concurrentRequests.incrementAndGet();
}

return acquired;
}

private int releasePermit() {
permitSource.release();
return concurrentRequests.decrementAndGet();
}

private static <T> CompletableFuture<T> executeCall(Callable<CompletableFuture<T>> execution) {
try {
return execution.call();
} catch (Throwable t) {
return CompletableFutures.exceptionalFuture(t);
}
}

/**
* enqueue the attempt into our underlying queue. since it's expensive to dynamically
* resize the queue, we have a separate rejection threshold which, if less than 0 is
* ignored, but otherwise is the practical cap on the size of the queue.
*/
private boolean tryEnqueueAttempt(DelayedExecution<T> delayedExecution) {
int rejectionThreshold = queueRejectionThreshold.get();
if (rejectionThreshold < 0) {
return requestQueue.offer(delayedExecution);
}
long stamp = stampedLock.readLock();
try {
while (requestQueue.size() < rejectionThreshold) {
long writeStamp = stampedLock.tryConvertToWriteLock(stamp);
if (writeStamp != 0L) {
stamp = writeStamp;
return requestQueue.offer(delayedExecution);
} else {
stampedLock.unlock(stamp);
stamp = stampedLock.writeLock();
}
}
return false;
} finally {
stampedLock.unlock(stamp);
}
}

private static class DelayedExecution<T> {
private static final AtomicIntegerFieldUpdater<DelayedExecution> EXECUTED_UPDATER = AtomicIntegerFieldUpdater.newUpdater(
DelayedExecution.class,
"executed"
);
private final Callable<CompletableFuture<T>> execution;
private final CompletableFuture<T> responseFuture;
private final Supplier<Exception> timeoutExceptionSupplier;
private final long deadlineEpochMillis;
@SuppressWarnings( "unused" ) // use the EXECUTED_UPDATER
private volatile int executed = 0;

private DelayedExecution(Callable<CompletableFuture<T>> execution,
Supplier<Exception> timeoutExceptionSupplier,
Optional<Long> timeoutMillis) {
this.execution = execution;
this.responseFuture = new CompletableFuture<>();
this.timeoutExceptionSupplier = timeoutExceptionSupplier;
this.deadlineEpochMillis = timeoutMillis.map(x -> System.currentTimeMillis() + x).orElse(0L);
}

private CompletableFuture<T> getResponseFuture() {
return responseFuture;
}

private void execute() {
if (!EXECUTED_UPDATER.compareAndSet(this, 0, 1)) {
return;
}
if (isExpired()) {
Exception ex = timeoutExceptionSupplier.get();
responseFuture.completeExceptionally(ex);
} else {
executeCall(execution).whenComplete((response, ex) -> {
if (ex == null) {
responseFuture.complete(response);
} else {
responseFuture.completeExceptionally(ex);
}
});
}
}

private boolean isExpired() {
return deadlineEpochMillis > 0 && System.currentTimeMillis() > deadlineEpochMillis;
}
}

int getQueueSize() {
long stamp = stampedLock.tryOptimisticRead();
int queueSize = requestQueue.size();
if (!stampedLock.validate(stamp)) {
stamp = stampedLock.readLock();
try {
queueSize = requestQueue.size();
} finally {
stampedLock.unlockRead(stamp);
}
}
return queueSize;
}

int getConcurrentRequests() {
return concurrentRequests.get();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
package com.hubspot.singularity.async;

import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.TimeoutException;
import java.util.function.Supplier;

public class AsyncSemaphoreBuilder {
private final PermitSource permitSource;

private int queueSize = -1;
private Supplier<Integer> queueRejectionThreshold = () -> -1;
private Supplier<Exception> timeoutExceptionSupplier = TimeoutException::new;

AsyncSemaphoreBuilder(PermitSource permitSource) {
this.permitSource = permitSource;
}

/**
* Sets the maximum size of the queue. Note that this should be larger than any
* desired queueRejectionThreshold.
*/
public AsyncSemaphoreBuilder withQueueSize(int queueSize) {
this.queueSize = queueSize;
return this;
}

/**
* Sets a dynamic rejection threshold. If -1, the queue size is used
* to reject requests. Otherwise, this number is the effective
* number of allowed tasks.
*/
public AsyncSemaphoreBuilder withQueueRejectionThreshold(Supplier<Integer> queueRejectionThreshold) {
this.queueRejectionThreshold = queueRejectionThreshold;
return this;
}

/**
* Sets the type of the exception to be thrown when the {@code callWithQueueTimeout}
* method is called and the call is queued for longer than the timeout.
*/
public AsyncSemaphoreBuilder withTimeoutExceptionSupplier(Supplier<Exception> timeoutExceptionSupplier) {
this.timeoutExceptionSupplier = timeoutExceptionSupplier;
return this;
}

public <T> AsyncSemaphore<T> build() {
return new AsyncSemaphore<>(
permitSource,
queueSize == -1 ? new ConcurrentLinkedQueue<>() : new ArrayBlockingQueue<>(queueSize),
queueRejectionThreshold,
timeoutExceptionSupplier
);
}
}
Loading