Skip to content

Commit

Permalink
HORNETQ-448
Browse files Browse the repository at this point in the history
Large Message Compression Impl
  • Loading branch information
gaohoward committed Dec 5, 2010
1 parent 5806ee1 commit 167f003
Show file tree
Hide file tree
Showing 44 changed files with 2,627 additions and 119 deletions.
6 changes: 5 additions & 1 deletion src/config/common/schema/hornetq-jms.xsd
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,11 @@
</xsd:element>
<xsd:element name="min-large-message-size" type="xsd:long"
maxOccurs="1" minOccurs="0">
</xsd:element>
</xsd:element>
<xsd:element name="compress-large-messages" type="xsd:boolean"
maxOccurs="1" minOccurs="0">
</xsd:element>

<xsd:element name="client-id" type="xsd:string"
maxOccurs="1" minOccurs="0">
</xsd:element>
Expand Down
2 changes: 2 additions & 0 deletions src/main/org/hornetq/api/core/Message.java
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,8 @@ public interface Message
public static final SimpleString HDR_ORIG_MESSAGE_ID = new SimpleString("_HQ_ORIG_MESSAGE_ID");

public static final SimpleString HDR_GROUP_ID = new SimpleString("_HQ_GROUP_ID");

public static final SimpleString HDR_LARGE_COMPRESSED = new SimpleString("_HQ_LARGE_COMPRESSED");

public static final SimpleString HDR_SCHEDULED_DELIVERY_TIME = new SimpleString("_HQ_SCHED_DELIVERY");

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -138,4 +138,7 @@ ClientSession createSession(String username,

CoreRemotingConnection getConnection();

void setCompressLargeMessages(boolean compressLargeMessage);

boolean isCompressLargeMessages();
}
2 changes: 2 additions & 0 deletions src/main/org/hornetq/api/core/client/HornetQClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,8 @@ public class HornetQClient
// Any message beyond this size is considered a large message (to be sent in chunks)

public static final int DEFAULT_MIN_LARGE_MESSAGE_SIZE = 100 * 1024;

public static final boolean DEFAULT_COMPRESS_LARGE_MESSAGES = false;

public static final int DEFAULT_CONSUMER_WINDOW_SIZE = 1024 * 1024;

Expand Down
15 changes: 14 additions & 1 deletion src/main/org/hornetq/core/client/impl/ClientConsumerImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import org.hornetq.core.protocol.core.impl.wireformat.SessionQueueQueryResponseMessage;
import org.hornetq.core.protocol.core.impl.wireformat.SessionReceiveContinuationMessage;
import org.hornetq.core.protocol.core.impl.wireformat.SessionReceiveLargeMessage;
import org.hornetq.utils.DecompressedLargeMessageBuffer;
import org.hornetq.utils.Future;
import org.hornetq.utils.PriorityLinkedList;
import org.hornetq.utils.PriorityLinkedListImpl;
Expand Down Expand Up @@ -454,6 +455,11 @@ public Exception getLastException()
// ClientConsumerInternal implementation
// --------------------------------------------------------------

public ClientSessionInternal getSession()
{
return session;
}

public SessionQueueQueryResponseMessage getQueueInfo()
{
return queueInfo;
Expand Down Expand Up @@ -554,7 +560,14 @@ public synchronized void handleLargeMessage(final SessionReceiveLargeMessage pac

currentLargeMessageBuffer = new LargeMessageBufferImpl(this, packet.getLargeMessageSize(), 60, largeMessageCache);

currentChunkMessage.setBuffer(currentLargeMessageBuffer);
if (currentChunkMessage.isCompressed())
{
currentChunkMessage.setBuffer(new DecompressedLargeMessageBuffer(currentLargeMessageBuffer));
}
else
{
currentChunkMessage.setBuffer(currentLargeMessageBuffer);
}

currentChunkMessage.setFlowControlSize(0);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,4 +67,6 @@ public interface ClientConsumerInternal extends ClientConsumer
void start();

SessionQueueQueryResponseMessage getQueueInfo();

ClientSessionInternal getSession();
}
10 changes: 7 additions & 3 deletions src/main/org/hornetq/core/client/impl/ClientMessageImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,11 @@
import org.hornetq.api.core.HornetQBuffer;
import org.hornetq.api.core.HornetQBuffers;
import org.hornetq.api.core.HornetQException;
import org.hornetq.api.core.Message;
import org.hornetq.api.core.SimpleString;
import org.hornetq.core.logging.Logger;
import org.hornetq.core.message.BodyEncoder;
import org.hornetq.core.message.impl.MessageImpl;
import org.hornetq.utils.DataConstants;

/**
*
Expand Down Expand Up @@ -117,6 +117,11 @@ public boolean isLargeMessage()
{
return largeMessage;
}

public boolean isCompressed()
{
return properties.getBooleanProperty(Message.HDR_LARGE_COMPRESSED);
}

/**
* @param largeMessage the largeMessage to set
Expand All @@ -142,15 +147,14 @@ public String toString()
"]";
}

// FIXME - only used for large messages - move it!
/* (non-Javadoc)
* @see org.hornetq.api.core.client.ClientMessage#saveToOutputStream(java.io.OutputStream)
*/
public void saveToOutputStream(final OutputStream out) throws HornetQException
{
if (largeMessage)
{
((LargeMessageBufferInternal)getWholeBuffer()).saveBuffer(out);
((LargeMessageBufferInternal)getWholeBuffer()).saveBuffer(out);
}
else
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,4 +44,6 @@ public interface ClientMessageInternal extends ClientMessage, MessageInternal
void discardLargeBody();

void setBuffer(HornetQBuffer buffer);

boolean isCompressed();
}
100 changes: 32 additions & 68 deletions src/main/org/hornetq/core/client/impl/ClientProducerImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,13 @@
import org.hornetq.api.core.Message;
import org.hornetq.api.core.SimpleString;
import org.hornetq.core.logging.Logger;
import org.hornetq.core.message.BodyEncoder;
import org.hornetq.core.message.impl.MessageInternal;
import org.hornetq.core.protocol.core.Channel;
import org.hornetq.core.protocol.core.impl.wireformat.SessionSendContinuationMessage;
import org.hornetq.core.protocol.core.impl.wireformat.SessionSendLargeMessage;
import org.hornetq.core.protocol.core.impl.wireformat.SessionSendMessage;
import org.hornetq.utils.DeflaterReader;
import org.hornetq.utils.HornetQBufferInputStream;
import org.hornetq.utils.TokenBucketLimiter;
import org.hornetq.utils.UUIDGenerator;

Expand Down Expand Up @@ -150,7 +151,7 @@ public synchronized void close() throws HornetQException
{
return;
}

doCleanup();
}

Expand Down Expand Up @@ -190,7 +191,7 @@ public ClientProducerCredits getProducerCredits()
{
return credits;
}

// Protected ------------------------------------------------------------------------------------

// Package Private ------------------------------------------------------------------------------
Expand All @@ -203,7 +204,7 @@ private void doCleanup()
{
session.returnCredits(address);
}

session.removeProducer(this);

closed = true;
Expand All @@ -212,12 +213,13 @@ private void doCleanup()
private void doSend(final SimpleString address, final Message msg) throws HornetQException
{
MessageInternal msgI = (MessageInternal)msg;

ClientProducerCredits theCredits;

boolean isLarge;

if (msgI.getBodyInputStream() != null || msgI.isLargeMessage() || msgI.getBodyBuffer().writerIndex() > minLargeMessageSize)
if (msgI.getBodyInputStream() != null || msgI.isLargeMessage() ||
msgI.getBodyBuffer().writerIndex() > minLargeMessageSize)
{
isLarge = true;
}
Expand All @@ -236,7 +238,7 @@ private void doSend(final SimpleString address, final Message msg) throws Hornet
{
msg.setAddress(address);
}

// Anonymous
theCredits = session.getCredits(address, true);
}
Expand All @@ -250,7 +252,7 @@ private void doSend(final SimpleString address, final Message msg) throws Hornet
{
msg.setAddress(this.address);
}

theCredits = credits;
}

Expand All @@ -270,8 +272,6 @@ private void doSend(final SimpleString address, final Message msg) throws Hornet

session.workDone();



if (isLarge)
{
largeMessageSend(sendBlocking, msgI, theCredits);
Expand Down Expand Up @@ -322,8 +322,16 @@ private void checkClosed() throws HornetQException
* @param msgI
* @throws HornetQException
*/
private void largeMessageSend(final boolean sendBlocking, final MessageInternal msgI, final ClientProducerCredits credits) throws HornetQException
private void largeMessageSend(final boolean sendBlocking,
final MessageInternal msgI,
final ClientProducerCredits credits) throws HornetQException
{

if (session.isCompressLargeMessages())
{
msgI.putBooleanProperty(Message.HDR_LARGE_COMPRESSED, true);
}

int headerSize = msgI.getHeadersAndPropertiesEncodeSize();

if (headerSize >= minLargeMessageSize)
Expand All @@ -341,7 +349,6 @@ private void largeMessageSend(final boolean sendBlocking, final MessageInternal
HornetQBuffer headerBuffer = HornetQBuffers.fixedBuffer(headerSize);

msgI.encodeHeadersAndProperties(headerBuffer);

SessionSendLargeMessage initialChunk = new SessionSendLargeMessage(headerBuffer.toByteBuffer().array());

channel.send(initialChunk);
Expand All @@ -358,7 +365,7 @@ private void largeMessageSend(final boolean sendBlocking, final MessageInternal

if (input != null)
{
largeMessageSendStreamed(sendBlocking, input, credits);
largeMessageSendStreamed(sendBlocking, msgI, input, credits);
}
else
{
Expand All @@ -375,72 +382,29 @@ private void largeMessageSendBuffered(final boolean sendBlocking,
final MessageInternal msgI,
final ClientProducerCredits credits) throws HornetQException
{
BodyEncoder context = msgI.getBodyEncoder();

final long bodySize = context.getLargeBodySize();

context.open();
try
{

for (int pos = 0; pos < bodySize;)
{
final boolean lastChunk;

final int chunkLength = Math.min((int)(bodySize - pos), minLargeMessageSize);

final HornetQBuffer bodyBuffer = HornetQBuffers.fixedBuffer(chunkLength);

context.encode(bodyBuffer, chunkLength);

pos += chunkLength;

lastChunk = pos >= bodySize;

final SessionSendContinuationMessage chunk = new SessionSendContinuationMessage(bodyBuffer.toByteBuffer()
.array(),
!lastChunk,
lastChunk && sendBlocking);

if (sendBlocking && lastChunk)
{
// When sending it blocking, only the last chunk will be blocking.
channel.sendBlocking(chunk);
}
else
{
channel.send(chunk);
}

try
{
credits.acquireCredits(chunk.getPacketSize());
}
catch (InterruptedException e)
{
}
}
}
finally
{
context.close();
}
msgI.getBodyBuffer().readerIndex(0);
largeMessageSendStreamed(sendBlocking, msgI, new HornetQBufferInputStream(msgI.getBodyBuffer()), credits);
}

/**
* TODO: This method could be eliminated and
* combined with {@link ClientProducerImpl#largeMessageSendBuffered(boolean, Message, ClientProducerCredits)}.
* All that's needed for this is ClientMessage returning the proper BodyEncoder for streamed
* @param sendBlocking
* @param input
* @throws HornetQException
*/
private void largeMessageSendStreamed(final boolean sendBlocking,
final InputStream input,
final MessageInternal msgI,
final InputStream inputStreamParameter,
final ClientProducerCredits credits) throws HornetQException
{
boolean lastPacket = false;

InputStream input = inputStreamParameter;

if (session.isCompressLargeMessages())
{
input = new DeflaterReader(inputStreamParameter);
}

while (!lastPacket)
{
byte[] buff = new byte[minLargeMessageSize];
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@

import org.hornetq.api.core.*;
import org.hornetq.api.core.client.ClientSession;
import org.hornetq.api.core.client.HornetQClient;
import org.hornetq.api.core.client.ServerLocator;
import org.hornetq.api.core.client.SessionFailureListener;
import org.hornetq.core.logging.Logger;
Expand Down Expand Up @@ -146,6 +147,8 @@ public class ClientSessionFactoryImpl implements ClientSessionFactoryInternal, C
public final Exception e = new Exception();

private final Object waitLock = new Object();

private boolean compressLargeMessages;

// Static
// ---------------------------------------------------------------------------------------
Expand Down Expand Up @@ -202,6 +205,8 @@ public ClientSessionFactoryImpl(final ServerLocatorInternal serverLocator,
closeExecutor = orderedExecutorFactory.getExecutor();

this.interceptors = interceptors;

compressLargeMessages = HornetQClient.DEFAULT_COMPRESS_LARGE_MESSAGES;

}

Expand Down Expand Up @@ -768,6 +773,7 @@ private ClientSession createSessionInternal(final String username,
serverLocator.isBlockOnDurableSend(),
serverLocator.isCacheLargeMessagesClient(),
serverLocator.getMinLargeMessageSize(),
compressLargeMessages,
serverLocator.getInitialMessagePacketSize(),
serverLocator.getGroupID(),
connection,
Expand Down Expand Up @@ -1358,4 +1364,14 @@ public synchronized void cancel()
cancelled = true;
}
}

public void setCompressLargeMessages(boolean compressLargeMessage)
{
this.compressLargeMessages = compressLargeMessage;
}

public boolean isCompressLargeMessages()
{
return this.compressLargeMessages;
}
}
Loading

0 comments on commit 167f003

Please sign in to comment.