Skip to content

Commit

Permalink
feat: fix bug where blockingpullsubscriber doesn't refill flowcontrol…
Browse files Browse the repository at this point in the history
… quota. (#449)
  • Loading branch information
jiangmichaellll committed Jan 12, 2021
1 parent 2ab6e3e commit 109bd83
Show file tree
Hide file tree
Showing 2 changed files with 15 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@
import java.util.ArrayDeque;
import java.util.Collection;
import java.util.Deque;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.ExecutionException;

Expand Down Expand Up @@ -113,7 +112,13 @@ public synchronized Optional<SequencedMessage> messageIfAvailable() throws Check
if (messages.isEmpty()) {
return Optional.empty();
}
return Optional.of(Objects.requireNonNull(messages.pollFirst()));
SequencedMessage msg = messages.remove();
underlying.allowFlow(
FlowControlRequest.newBuilder()
.setAllowedMessages(1)
.setAllowedBytes(msg.byteSize())
.build());
return Optional.of(msg);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -148,10 +148,17 @@ public void onDataSuccess() throws Exception {

@Test
public void pullMessage() throws Exception {
int byteSize = 30;
SequencedMessage message =
SequencedMessage.of(Message.builder().build(), Timestamps.EPOCH, Offset.of(12), 30);
SequencedMessage.of(Message.builder().build(), Timestamps.EPOCH, Offset.of(12), byteSize);
messageConsumer.accept(ImmutableList.of(message));
assertThat(Optional.of(message)).isEqualTo(subscriber.messageIfAvailable());
verify(underlying)
.allowFlow(
FlowControlRequest.newBuilder()
.setAllowedBytes(byteSize)
.setAllowedMessages(1)
.build());
}

@Test
Expand Down

0 comments on commit 109bd83

Please sign in to comment.