Skip to content

Commit

Permalink
ARTEMIS-2706 Use FrameSize to decide when to flush large messages
Browse files Browse the repository at this point in the history
  • Loading branch information
clebertsuconic committed Apr 14, 2020
1 parent 5085fab commit d27d61f
Showing 1 changed file with 10 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -836,9 +836,10 @@ private class LargeMessageDeliveryContext {
void resume() {
connection.runNow(this::deliver);
}
private static final int BUFFER_LENGTH = 1024;

void deliver() {

int frameSize = protonSession.session.getConnection().getTransport().getOutboundFrameSizeLimit();

// Let the Message decide how to present the message bytes
LargeBodyReader context = message.getLargeBodyReader();
try {
Expand All @@ -850,7 +851,7 @@ void deliver() {

// TODO: it would be nice to use pooled buffer here,
// however I would need a version of ReadableBuffer for Netty
ByteBuffer buf = ByteBuffer.allocate(BUFFER_LENGTH);
ByteBuffer buf = ByteBuffer.allocate(frameSize);

for (; position < bodySize; ) {
if (!connection.flowControl(this::resume)) {
Expand All @@ -860,11 +861,13 @@ void deliver() {
buf.clear();
int size = context.readInto(buf);

sender.send(buf.array(), 0, size);

connection.instantFlush();
sender.send(new ReadableBuffer.ByteBufferReader(buf));

position += size;

if (position < bodySize) {
connection.instantFlush();
}
}
} finally {
context.close();
Expand All @@ -882,7 +885,7 @@ void deliver() {
sender.advance();
}

connection.flush();
connection.instantFlush();

synchronized (creditsLock) {
pending.decrementAndGet();
Expand Down

0 comments on commit d27d61f

Please sign in to comment.