Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import java.io.InputStream;
import java.util.Map;

import io.netty.buffer.ByteBuf;
import org.apache.activemq.artemis.core.message.LargeBodyEncoder;
import org.apache.activemq.artemis.core.message.impl.CoreMessage;

Expand All @@ -35,36 +36,52 @@ public interface ICoreMessage extends Message {
@Override
InputStream getBodyInputStream();

/** Returns a new Buffer slicing the current Body. */
/**
* Returns a new Buffer slicing the current Body.
*/
ActiveMQBuffer getReadOnlyBodyBuffer();

/** Return the type of the message */
/**
* Return the type of the message
*/
@Override
byte getType();

/** the type of the message */
/**
* the type of the message
*/
@Override
CoreMessage setType(byte type);

/**
* We are really interested if this is a LargeServerMessage.
*
* @return
*/
boolean isServerMessage();

/**
* The body used for this message.
*
* @return
*/
@Override
ActiveMQBuffer getBodyBuffer();

int getEndOfBodyPosition();


/** Used on large messages treatment */
/**
* Used on large messages treatment
*/
void copyHeadersAndProperties(Message msg);

void sendBuffer_1X(ByteBuf sendBuffer);

/**
* it will fix the body of incoming messages from 1.x and before versions
*/
void receiveBuffer_1X(ByteBuf buffer);

/**
* @return Returns the message in Map form, useful when encoding to JSON
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -188,8 +188,16 @@ public CoreMessage setReplyTo(SimpleString address) {
public void receiveBuffer(ByteBuf buffer) {
this.buffer = buffer;
this.buffer.retain();
decode();
this.validBuffer = true;
decode(false);
}

/** This will fix the incoming body of 1.x messages */
@Override
public void receiveBuffer_1X(ByteBuf buffer) {
this.buffer = buffer;
this.buffer.retain();
decode(true);
validBuffer = false;
}

@Override
Expand All @@ -205,7 +213,6 @@ public SimpleString getGroupID() {
}

/**
*
* @param sendBuffer
* @param deliveryCount Some protocols (AMQP) will have this as part of the message. ignored on core
*/
Expand All @@ -215,6 +222,21 @@ public void sendBuffer(ByteBuf sendBuffer, int deliveryCount) {
sendBuffer.writeBytes(buffer, 0, buffer.writerIndex());
}

/**
* Recast the message as an 1.4 message
*/
@Override
public void sendBuffer_1X(ByteBuf sendBuffer) {
checkEncode();
ByteBuf tmpBuffer = buffer.duplicate();
sendBuffer.writeInt(endOfBodyPosition + DataConstants.SIZE_INT);
tmpBuffer.readerIndex(DataConstants.SIZE_INT);
tmpBuffer.readBytes(sendBuffer, endOfBodyPosition - BUFFER_HEADER_SPACE);
sendBuffer.writeInt(tmpBuffer.writerIndex() + DataConstants.SIZE_INT + BUFFER_HEADER_SPACE);
tmpBuffer.readBytes(sendBuffer, tmpBuffer.readableBytes());
sendBuffer.readerIndex(0);
}

private synchronized void checkEncode() {
if (!validBuffer) {
encode();
Expand Down Expand Up @@ -280,12 +302,10 @@ public int getEndOfBodyPosition() {
return endOfBodyPosition;
}


public TypedProperties getTypedProperties() {
return checkProperties();
}


@Override
public void messageChanged() {
validBuffer = false;
Expand Down Expand Up @@ -323,19 +343,18 @@ protected CoreMessage(CoreMessage other, TypedProperties copyProperties) {
public void copyHeadersAndProperties(final Message msg) {
messageID = msg.getMessageID();
address = msg.getAddressSimpleString();
userID = (UUID)msg.getUserID();
userID = (UUID) msg.getUserID();
type = msg.toCore().getType();
durable = msg.isDurable();
expiration = msg.getExpiration();
timestamp = msg.getTimestamp();
priority = msg.getPriority();

if (msg instanceof CoreMessage) {
properties = ((CoreMessage)msg).getTypedProperties();
properties = ((CoreMessage) msg).getTypedProperties();
}
}


@Override
public Message copy() {
checkEncode();
Expand Down Expand Up @@ -380,7 +399,7 @@ public UUID getUserID() {

@Override
public CoreMessage setUserID(Object uuid) {
this.userID = (UUID)uuid;
this.userID = (UUID) uuid;
return this;
}

Expand Down Expand Up @@ -418,7 +437,6 @@ public SimpleString getAddressSimpleString() {
return address;
}


@Override
public CoreMessage setExpiration(long expiration) {
this.expiration = expiration;
Expand Down Expand Up @@ -487,18 +505,22 @@ public CoreMessage setType(byte type) {
return this;
}

private void decode() {
private void decode(boolean beforeAddress) {
endOfBodyPosition = buffer.readInt();

buffer.skipBytes(endOfBodyPosition - BUFFER_HEADER_SPACE);

decodeHeadersAndProperties(buffer, true);
buffer.readerIndex(0);
validBuffer = true;

if (beforeAddress) {
endOfBodyPosition = endOfBodyPosition - DataConstants.SIZE_INT;
}

internalWritableBuffer();
}


public void decodeHeadersAndProperties(final ByteBuf buffer) {
decodeHeadersAndProperties(buffer, false);
}
Expand Down Expand Up @@ -529,7 +551,6 @@ private void decodeHeadersAndProperties(final ByteBuf buffer, boolean lazyProper
}
}


public synchronized CoreMessage encode() {

checkProperties();
Expand Down Expand Up @@ -654,7 +675,6 @@ public CoreMessage setDurable(boolean durable) {
return this;
}


@Override
public CoreMessage putBooleanProperty(final String key, final boolean value) {
messageChanged();
Expand Down Expand Up @@ -683,7 +703,6 @@ public Boolean getBooleanProperty(final String key) throws ActiveMQPropertyConve
return properties.getBooleanProperty(new SimpleString(key));
}


@Override
public CoreMessage putByteProperty(final SimpleString key, final byte value) {
messageChanged();
Expand All @@ -692,7 +711,6 @@ public CoreMessage putByteProperty(final SimpleString key, final byte value) {
return this;
}


@Override
public CoreMessage putByteProperty(final String key, final byte value) {
messageChanged();
Expand All @@ -702,7 +720,6 @@ public CoreMessage putByteProperty(final String key, final byte value) {
return this;
}


@Override
public Byte getByteProperty(final SimpleString key) throws ActiveMQPropertyConversionException {
checkProperties();
Expand Down Expand Up @@ -731,7 +748,6 @@ public CoreMessage putBytesProperty(final String key, final byte[] value) {
return this;
}


@Override
public byte[] getBytesProperty(final SimpleString key) throws ActiveMQPropertyConversionException {
checkProperties();
Expand Down Expand Up @@ -775,7 +791,6 @@ public CoreMessage putShortProperty(final String key, final short value) {
return this;
}


@Override
public CoreMessage putIntProperty(final SimpleString key, final int value) {
messageChanged();
Expand Down Expand Up @@ -803,7 +818,6 @@ public Integer getIntProperty(final String key) throws ActiveMQPropertyConversio
return getIntProperty(SimpleString.toSimpleString(key));
}


@Override
public CoreMessage putLongProperty(final SimpleString key, final long value) {
messageChanged();
Expand Down Expand Up @@ -832,7 +846,6 @@ public Long getLongProperty(final String key) throws ActiveMQPropertyConversionE
return getLongProperty(SimpleString.toSimpleString(key));
}


@Override
public CoreMessage putFloatProperty(final SimpleString key, final float value) {
messageChanged();
Expand Down Expand Up @@ -865,7 +878,6 @@ public CoreMessage putDoubleProperty(final String key, final double value) {
return this;
}


@Override
public Double getDoubleProperty(final SimpleString key) throws ActiveMQPropertyConversionException {
messageChanged();
Expand Down Expand Up @@ -1071,7 +1083,7 @@ public void reloadPersistence(ActiveMQBuffer record) {
int size = record.readInt();
initBuffer(size);
buffer.setIndex(0, 0).writeBytes(record.byteBuf(), size);
decode();
decode(false);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,12 @@
import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
import org.apache.activemq.artemis.core.client.impl.ClientLargeMessageImpl;
import org.apache.activemq.artemis.core.client.impl.ClientMessageImpl;
import org.apache.activemq.artemis.core.protocol.core.CoreRemotingConnection;
import org.apache.activemq.artemis.core.protocol.core.Packet;
import org.apache.activemq.artemis.core.protocol.core.impl.PacketDecoder;
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionReceiveClientLargeMessage;
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionReceiveMessage;
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionReceiveMessage_1X;

import static org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl.SESS_RECEIVE_LARGE_MSG;
import static org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl.SESS_RECEIVE_MSG;
Expand All @@ -33,31 +35,35 @@ public class ClientPacketDecoder extends PacketDecoder {
public static final ClientPacketDecoder INSTANCE = new ClientPacketDecoder();

@Override
public Packet decode(final ActiveMQBuffer in) {
public Packet decode(final ActiveMQBuffer in, CoreRemotingConnection connection) {
final byte packetType = in.readByte();

Packet packet = decode(packetType);
Packet packet = decode(packetType, connection);

packet.decode(in);

return packet;
}

@Override
public Packet decode(byte packetType) {
public Packet decode(byte packetType, CoreRemotingConnection connection) {
Packet packet;

switch (packetType) {
case SESS_RECEIVE_MSG: {
packet = new SessionReceiveMessage(new ClientMessageImpl());
if (connection.isVersionBeforeAddressChange()) {
packet = new SessionReceiveMessage_1X(new ClientMessageImpl());
} else {
packet = new SessionReceiveMessage(new ClientMessageImpl());
}
break;
}
case SESS_RECEIVE_LARGE_MSG: {
packet = new SessionReceiveClientLargeMessage(new ClientLargeMessageImpl());
break;
}
default: {
packet = super.decode(packetType);
packet = super.decode(packetType, connection);
}
}
return packet;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
*/
package org.apache.activemq.artemis.core.protocol.core;

import org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl;
import org.apache.activemq.artemis.core.security.ActiveMQPrincipal;
import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection;

Expand All @@ -28,13 +29,18 @@ public interface CoreRemotingConnection extends RemotingConnection {
* The client protocol used on the communication.
* This will determine if the client has support for certain packet types
*/
int getClientVersion();
int getChannelVersion();

default boolean isVersionBeforeAddressChange() {
int version = getChannelVersion();
return (version > 0 && version < PacketImpl.ADDRESSING_CHANGE_VERSION);
}

/**
* Sets the client protocol used on the communication. This will determine if the client has
* support for certain packet types
*/
void setClientVersion(int clientVersion);
void setChannelVersion(int clientVersion);

/**
* Returns the channel with the channel id specified.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@
package org.apache.activemq.artemis.core.protocol.core;

import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection;

/**
* A Packet represents a packet of data transmitted over a connection.
Expand Down Expand Up @@ -71,7 +70,7 @@ default int expectedEncodeSize() {
* @param connection the connection
* @return the buffer to encode to
*/
ActiveMQBuffer encode(RemotingConnection connection);
ActiveMQBuffer encode(CoreRemotingConnection connection);

/**
* decodes the buffer into this packet
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -326,6 +326,7 @@ public SessionContext createSessionContext(Version clientVersion,
}
}
while (retry);
sessionChannel.getConnection().setChannelVersion(response.getServerVersion());
return newSessionContext(name, confirmationWindowSize, sessionChannel, response);

}
Expand Down
Loading