Skip to content

Commit

Permalink
GEODE-8542: java.lang.IllegalStateException: tcp message exceeded max… (
Browse files Browse the repository at this point in the history
#5562)

* GEODE-8542: java.lang.IllegalStateException: tcp message exceeded max size of 16777215

Limit the size of message chunks to the maximum message size allowed
by org.apache.geode.internal.tcp.Connection.

* implementing the change that Bill requested
  • Loading branch information
bschuchardt committed Sep 29, 2020
1 parent d4b9ecd commit b439d33
Show file tree
Hide file tree
Showing 2 changed files with 35 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import it.unimi.dsi.fastutil.objects.ObjectIterator;

import org.apache.geode.DataSerializer;
import org.apache.geode.annotations.VisibleForTesting;
import org.apache.geode.distributed.internal.DMStats;
import org.apache.geode.distributed.internal.DistributionMessage;
import org.apache.geode.internal.Assert;
Expand Down Expand Up @@ -129,7 +130,8 @@ public List<?> getSentConnections() {
this.stats = stats;
this.msg = msg;
this.cons = cons;
this.buffer = bufferPool.acquireDirectSenderBuffer(sendBufferSize);
int bufferSize = Math.min(sendBufferSize, Connection.MAX_MSG_SIZE);
this.buffer = bufferPool.acquireDirectSenderBuffer(bufferSize);
this.buffer.clear();
this.buffer.position(Connection.MSG_HEADER_BYTES);
this.msgId = MsgIdGenerator.NO_MSG_ID;
Expand Down Expand Up @@ -349,6 +351,11 @@ public void realFlush(boolean lastFlushForMessage) {
this.buffer.position(Connection.MSG_HEADER_BYTES);
}

@VisibleForTesting
protected ByteBuffer getBuffer() {
return buffer;
}

@Override
public void close() throws IOException {
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,31 @@ public void streamerListReleaseWithException() throws IOException {
verify(pool, times(2)).releaseSenderBuffer(isA(ByteBuffer.class));
}

@Test
public void streamerRespectsMaxMessageSize() {
InternalDistributedMember member1;
member1 = new InternalDistributedMember("localhost", 1234);

DistributionMessage message = new SerialAckedMessage();
message.setRecipients(Arrays.asList(member1));

when(connection1.getRemoteAddress()).thenReturn(member1);
when(connection1.getRemoteVersion()).thenReturn(KnownVersion.CURRENT);
// create a streamer for a Connection that has a buffer size that's larger than the
// biggest message we can actually send. This is picked up by the MsgStreamer to allocate
// a buffer
when(connection1.getSendBufferSize()).thenReturn(Connection.MAX_MSG_SIZE + 1);
List<Connection> connections = Arrays.asList(connection1);

final BaseMsgStreamer msgStreamer =
MsgStreamer.create(connections, message, false, stats, pool);
// the streamer ought to have limited the message buffer to MAX_MSG_SIZE
assertThat(((MsgStreamer) msgStreamer).getBuffer().capacity())
.isEqualTo(Connection.MAX_MSG_SIZE);
}



protected BaseMsgStreamer createMsgStreamer(boolean mixedDestinationVersions) {

InternalDistributedMember member1, member2;
Expand All @@ -92,9 +117,9 @@ protected BaseMsgStreamer createMsgStreamer(boolean mixedDestinationVersions) {
when(connection2.getRemoteAddress()).thenReturn(member2);
when(connection2.getSendBufferSize()).thenReturn(Connection.SMALL_BUFFER_SIZE);
if (mixedDestinationVersions) {
when(connection1.getRemoteVersion()).thenReturn(KnownVersion.GEODE_1_12_0);
when(connection2.getRemoteVersion()).thenReturn(KnownVersion.GEODE_1_12_0);
} else {
when(connection1.getRemoteVersion()).thenReturn(KnownVersion.CURRENT);
when(connection2.getRemoteVersion()).thenReturn(KnownVersion.CURRENT);
}
List<Connection> connections = Arrays.asList(connection1, connection2);

Expand Down

0 comments on commit b439d33

Please sign in to comment.