Skip to content

Commit

Permalink
feat: Add flow control support to publisher (#119)
Browse files Browse the repository at this point in the history
* feat: Add flow control support to publisher
  • Loading branch information
kamalaboulhosn committed Mar 25, 2020
1 parent e7c007b commit fdd9434
Show file tree
Hide file tree
Showing 3 changed files with 364 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@
import com.google.api.core.BetaApi;
import com.google.api.core.SettableApiFuture;
import com.google.api.gax.batching.BatchingSettings;
import com.google.api.gax.batching.FlowControlSettings;
import com.google.api.gax.batching.FlowController;
import com.google.api.gax.core.BackgroundResource;
import com.google.api.gax.core.BackgroundResourceAggregation;
import com.google.api.gax.core.CredentialsProvider;
Expand Down Expand Up @@ -55,6 +57,7 @@
import java.util.List;
import java.util.Map;
import java.util.concurrent.Callable;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
Expand Down Expand Up @@ -108,6 +111,8 @@ public class Publisher {
private ScheduledFuture<?> currentAlarmFuture;
private final ApiFunction<PubsubMessage, PubsubMessage> messageTransform;

private MessageFlowController flowController = null;

/** The maximum number of messages in one request. Defined by the API. */
public static long getApiMaxRequestElementCount() {
return 1000L;
Expand All @@ -122,6 +127,16 @@ private Publisher(Builder builder) throws IOException {
topicName = builder.topicName;

this.batchingSettings = builder.batchingSettings;
FlowControlSettings flowControl = this.batchingSettings.getFlowControlSettings();
if (flowControl != null
&& flowControl.getLimitExceededBehavior() != FlowController.LimitExceededBehavior.Ignore) {
this.flowController =
new MessageFlowController(
flowControl.getMaxOutstandingElementCount(),
flowControl.getMaxOutstandingRequestBytes(),
flowControl.getLimitExceededBehavior());
}

this.enableMessageOrdering = builder.enableMessageOrdering;
this.messageTransform = builder.messageTransform;

Expand Down Expand Up @@ -221,6 +236,19 @@ public ApiFuture<String> publish(PubsubMessage message) {

final OutstandingPublish outstandingPublish =
new OutstandingPublish(messageTransform.apply(message));

if (flowController != null) {
try {
flowController.acquire(outstandingPublish.messageSize);
} catch (FlowController.FlowControlException e) {
if (!orderingKey.isEmpty()) {
sequentialExecutor.stopPublish(orderingKey);
}
outstandingPublish.publishResult.setException(e);
return outstandingPublish.publishResult;
}
}

List<OutstandingBatch> batchesToSend;
messagesBatchLock.lock();
try {
Expand Down Expand Up @@ -454,7 +482,7 @@ public ApiFuture<PublishResponse> call() {
ApiFutures.addCallback(future, futureCallback, directExecutor());
}

private static final class OutstandingBatch {
private final class OutstandingBatch {
final List<OutstandingPublish> outstandingPublishes;
final long creationTime;
int attempt;
Expand Down Expand Up @@ -484,14 +512,21 @@ private List<PubsubMessage> getMessages() {

private void onFailure(Throwable t) {
for (OutstandingPublish outstandingPublish : outstandingPublishes) {
if (flowController != null) {
flowController.release(outstandingPublish.messageSize);
}
outstandingPublish.publishResult.setException(t);
}
}

private void onSuccess(Iterable<String> results) {
Iterator<OutstandingPublish> messagesResultsIt = outstandingPublishes.iterator();
for (String messageId : results) {
messagesResultsIt.next().publishResult.set(messageId);
OutstandingPublish nextPublish = messagesResultsIt.next();
if (flowController != null) {
flowController.release(nextPublish.messageSize);
}
nextPublish.publishResult.set(messageId);
}
}
}
Expand Down Expand Up @@ -602,6 +637,10 @@ public static final class Builder {
.setDelayThreshold(DEFAULT_DELAY_THRESHOLD)
.setRequestByteThreshold(DEFAULT_REQUEST_BYTES_THRESHOLD)
.setElementCountThreshold(DEFAULT_ELEMENT_COUNT_THRESHOLD)
.setFlowControlSettings(
FlowControlSettings.newBuilder()
.setLimitExceededBehavior(FlowController.LimitExceededBehavior.Ignore)
.build())
.build();
static final RetrySettings DEFAULT_RETRY_SETTINGS =
RetrySettings.newBuilder()
Expand Down Expand Up @@ -759,7 +798,135 @@ public Publisher build() throws IOException {
}
}

private static class MessagesBatch {
private static class MessageFlowController {
private final Lock lock;
private final Long messageLimit;
private final Long byteLimit;
private final FlowController.LimitExceededBehavior limitBehavior;

private Long outstandingMessages;
private Long outstandingBytes;
private LinkedList<CountDownLatch> awaitingMessageAcquires;
private LinkedList<CountDownLatch> awaitingBytesAcquires;

MessageFlowController(
Long messageLimit, Long byteLimit, FlowController.LimitExceededBehavior limitBehavior) {
this.messageLimit = messageLimit;
this.byteLimit = byteLimit;
this.limitBehavior = limitBehavior;
this.lock = new ReentrantLock();

this.outstandingMessages = 0L;
this.outstandingBytes = 0L;

this.awaitingMessageAcquires = new LinkedList<CountDownLatch>();
this.awaitingBytesAcquires = new LinkedList<CountDownLatch>();
}

void acquire(long messageSize) throws FlowController.FlowControlException {
lock.lock();
try {
if (outstandingMessages >= messageLimit
&& limitBehavior == FlowController.LimitExceededBehavior.ThrowException) {
throw new FlowController.MaxOutstandingElementCountReachedException(messageLimit);
}
if (outstandingBytes + messageSize >= byteLimit
&& limitBehavior == FlowController.LimitExceededBehavior.ThrowException) {
throw new FlowController.MaxOutstandingRequestBytesReachedException(byteLimit);
}

// We can acquire or we should wait until we can acquire.
// Start by acquiring a slot for a message.
CountDownLatch messageWaiter = null;
while (outstandingMessages >= messageLimit) {
if (messageWaiter == null) {
// This message gets added to the back of the line.
messageWaiter = new CountDownLatch(1);
awaitingMessageAcquires.addLast(messageWaiter);
} else {
// This message already in line stays at the head of the line.
messageWaiter = new CountDownLatch(1);
awaitingMessageAcquires.set(0, messageWaiter);
}
lock.unlock();
try {
messageWaiter.await();
} catch (InterruptedException e) {
logger.log(Level.WARNING, "Interrupted while waiting to acquire flow control tokens");
}
lock.lock();
}
++outstandingMessages;
if (messageWaiter != null) {
awaitingMessageAcquires.removeFirst();
}

// There may be some surplus messages left; let the next message waiting for a token have
// one.
if (!awaitingMessageAcquires.isEmpty() && outstandingMessages < messageLimit) {
awaitingMessageAcquires.getFirst().countDown();
}

// Now acquire space for bytes.
CountDownLatch bytesWaiter = null;
Long bytesRemaining = messageSize;
while (outstandingBytes + bytesRemaining >= byteLimit) {
// Take what is available.
Long available = byteLimit - outstandingBytes;
bytesRemaining -= available;
outstandingBytes = byteLimit;
if (bytesWaiter == null) {
// This message gets added to the back of the line.
bytesWaiter = new CountDownLatch(1);
awaitingBytesAcquires.addLast(bytesWaiter);
} else {
// This message already in line stays at the head of the line.
bytesWaiter = new CountDownLatch(1);
awaitingBytesAcquires.set(0, bytesWaiter);
}
lock.unlock();
try {
bytesWaiter.await();
} catch (InterruptedException e) {
logger.log(Level.WARNING, "Interrupted while waiting to acquire flow control tokens");
}
lock.lock();
}

outstandingBytes += bytesRemaining;
if (bytesWaiter != null) {
awaitingBytesAcquires.removeFirst();
}
// There may be some surplus bytes left; let the next message waiting for bytes have some.
if (!awaitingBytesAcquires.isEmpty() && outstandingBytes < byteLimit) {
awaitingBytesAcquires.getFirst().countDown();
}
} finally {
lock.unlock();
}
}

private void notifyNextAcquires() {
if (!awaitingMessageAcquires.isEmpty()) {
CountDownLatch awaitingAcquire = awaitingMessageAcquires.getFirst();
awaitingAcquire.countDown();
}
if (!awaitingBytesAcquires.isEmpty()) {
CountDownLatch awaitingAcquire = awaitingBytesAcquires.getFirst();
awaitingAcquire.countDown();
}
}

void release(long messageSize) {
lock.lock();
--outstandingMessages;
outstandingBytes -= messageSize;
notifyNextAcquires();
lock.unlock();
}
}

private class MessagesBatch {
private List<OutstandingPublish> messages;
private int batchedBytes;
private String orderingKey;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -247,6 +247,10 @@ void resumePublish(String key) {
keysWithErrors.remove(key);
}

void stopPublish(String key) {
keysWithErrors.add(key);
}

/** Cancels every task in the queue associated with {@code key}. */
private void cancelQueuedTasks(final String key, Throwable e) {
keysWithErrors.add(key);
Expand Down
Loading

0 comments on commit fdd9434

Please sign in to comment.