Skip to content

Commit

Permalink
This closes #337
Browse files Browse the repository at this point in the history
  • Loading branch information
clebertsuconic committed Jan 23, 2016
2 parents 66c1c21 + f5ec152 commit 4a33d2d
Show file tree
Hide file tree
Showing 3 changed files with 37 additions and 49 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -86,8 +86,6 @@ public abstract class MessageImpl implements MessageInternal {

private boolean copied = true;

private boolean bufferUsed;

private UUID userID;

// Constructors --------------------------------------------------
Expand Down Expand Up @@ -157,8 +155,6 @@ protected MessageImpl(final MessageImpl other, TypedProperties properties) {
copied = other.copied;

if (other.buffer != null) {
other.bufferUsed = true;

// We need to copy the underlying buffer too, since the different messsages thereafter might have different
// properties set on them, making their encoding different
buffer = other.buffer.copy(0, other.buffer.writerIndex());
Expand Down Expand Up @@ -507,21 +503,7 @@ public void decode(final ActiveMQBuffer buff) {
@Override
public synchronized ActiveMQBuffer getEncodedBuffer() {
ActiveMQBuffer buff = encodeToBuffer();

if (bufferUsed) {
ActiveMQBuffer copied = buff.copy(0, buff.capacity());

copied.setIndex(0, endOfMessagePosition);

return copied;
}
else {
buffer.setIndex(0, endOfMessagePosition);

bufferUsed = true;

return buffer;
}
return buff.duplicate();
}

@Override
Expand Down Expand Up @@ -935,9 +917,12 @@ public String bodyToString() {
buffer2 = new byte[bodyBuffer.writerIndex() - bodyBuffer.readerIndex()];
bodyBuffer.readBytes(buffer2);
bodyBuffer.readerIndex(readerIndex2);
return "ServerMessage@" + Integer.toHexString(System.identityHashCode(this)) + "[writerIndex=" + buffer.writerIndex() + ",capacity=" + buffer.capacity() + ",bodyStart=" + getEndOfBodyPosition() + " buffer=" + ByteUtil.bytesToHex(buffer1, 1) + ", bodyBuffer=" + ByteUtil.bytesToHex(buffer2, 1);
}
else {
return "ServerMessage@" + Integer.toHexString(System.identityHashCode(this)) + "[writerIndex=" + buffer.writerIndex() + ",capacity=" + buffer.capacity() + ",bodyStart=" + getEndOfBodyPosition() + " buffer=" + ByteUtil.bytesToHex(buffer1, 1);
}

return "ServerMessage@" + Integer.toHexString(System.identityHashCode(this)) + "[" + ",bodyStart=" + getEndOfBodyPosition() + " buffer=" + ByteUtil.bytesToHex(buffer1, 1) + ", bodyBuffer=" + ByteUtil.bytesToHex(buffer2, 1);
}

@Override
Expand All @@ -962,18 +947,8 @@ public TypedProperties getProperties() {
// many queues - the first caller in this case will actually encode it
private synchronized ActiveMQBuffer encodeToBuffer() {
if (!bufferValid) {
if (bufferUsed) {
// Cannot use same buffer - must copy

forceCopy();
}

int bodySize = getEndOfBodyPosition();

// Clebert: I've started sending this on encoding due to conversions between protocols
// and making sure we are not losing the buffer start position between protocols
this.endOfBodyPosition = bodySize;

// write it
buffer.setInt(BUFFER_HEADER_SPACE, bodySize);

Expand Down Expand Up @@ -1032,8 +1007,6 @@ private void forceCopy() {
if (bodyBuffer != null) {
bodyBuffer.setBuffer(buffer);
}

bufferUsed = false;
}

// Inner classes -------------------------------------------------
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,27 +56,31 @@ public int getDeliveryCount() {
public ActiveMQBuffer encode(final RemotingConnection connection) {
ActiveMQBuffer buffer = message.getEncodedBuffer();

ActiveMQBuffer bufferWrite = connection.createTransportBuffer(buffer.writerIndex());
bufferWrite.writeBytes(buffer, 0, bufferWrite.capacity());
bufferWrite.setIndex(buffer.readerIndex(), buffer.writerIndex());

// Sanity check
if (buffer.writerIndex() != message.getEndOfMessagePosition()) {
if (bufferWrite.writerIndex() != message.getEndOfMessagePosition()) {
throw new IllegalStateException("Wrong encode position");
}

buffer.writeLong(consumerID);
buffer.writeInt(deliveryCount);
bufferWrite.writeLong(consumerID);
bufferWrite.writeInt(deliveryCount);

size = buffer.writerIndex();
size = bufferWrite.writerIndex();

// Write standard headers

int len = size - DataConstants.SIZE_INT;
buffer.setInt(0, len);
buffer.setByte(DataConstants.SIZE_INT, getType());
buffer.setLong(DataConstants.SIZE_INT + DataConstants.SIZE_BYTE, channelID);
bufferWrite.setInt(0, len);
bufferWrite.setByte(DataConstants.SIZE_INT, getType());
bufferWrite.setLong(DataConstants.SIZE_INT + DataConstants.SIZE_BYTE, channelID);

// Position reader for reading by Netty
buffer.setIndex(0, size);
bufferWrite.setIndex(0, size);

return buffer;
return bufferWrite;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,28 +63,39 @@ public SendAcknowledgementHandler getHandler() {
public ActiveMQBuffer encode(final RemotingConnection connection) {
ActiveMQBuffer buffer = message.getEncodedBuffer();

ActiveMQBuffer bufferWrite;
if (connection == null) {
// this is for unit tests only
bufferWrite = buffer.copy(0, buffer.capacity());
}
else {
bufferWrite = connection.createTransportBuffer(buffer.writerIndex() + 1); // 1 for the requireResponse
}
bufferWrite.writeBytes(buffer, 0, buffer.writerIndex());
bufferWrite.setIndex(buffer.readerIndex(), buffer.writerIndex());

// Sanity check
if (buffer.writerIndex() != message.getEndOfMessagePosition()) {
if (bufferWrite.writerIndex() != message.getEndOfMessagePosition()) {
throw new IllegalStateException("Wrong encode position");
}

buffer.writeBoolean(requiresResponse);
bufferWrite.writeBoolean(requiresResponse);

size = buffer.writerIndex();
size = bufferWrite.writerIndex();

// Write standard headers

int len = size - DataConstants.SIZE_INT;
buffer.setInt(0, len);
buffer.setByte(DataConstants.SIZE_INT, getType());
buffer.setLong(DataConstants.SIZE_INT + DataConstants.SIZE_BYTE, channelID);
bufferWrite.setInt(0, len);
bufferWrite.setByte(DataConstants.SIZE_INT, getType());
bufferWrite.setLong(DataConstants.SIZE_INT + DataConstants.SIZE_BYTE, channelID);

// Position reader for reading by Netty
buffer.readerIndex(0);
bufferWrite.readerIndex(0);

message.resetCopied();

return buffer;
return bufferWrite;
}

@Override
Expand Down

0 comments on commit 4a33d2d

Please sign in to comment.