Skip to content

Commit

Permalink
ARTEMIS-4186 Ability to set compressionLevel for compressLargeMessages
Browse files Browse the repository at this point in the history
  • Loading branch information
AntonRoskvist authored and clebertsuconic committed Jun 14, 2023
1 parent 600799d commit 582a689
Show file tree
Hide file tree
Showing 27 changed files with 226 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ public class ServerLocatorConfig {
public int failoverAttempts = ActiveMQClient.DEFAULT_FAILOVER_ATTEMPTS;
public int initialMessagePacketSize = ActiveMQClient.DEFAULT_INITIAL_MESSAGE_PACKET_SIZE;
public boolean cacheLargeMessagesClient = ActiveMQClient.DEFAULT_CACHE_LARGE_MESSAGE_CLIENT;
public int compressionLevel = ActiveMQClient.DEFAULT_COMPRESSION_LEVEL;
public boolean compressLargeMessage = ActiveMQClient.DEFAULT_COMPRESS_LARGE_MESSAGES;
public boolean useTopologyForLoadBalancing = ActiveMQClient.DEFAULT_USE_TOPOLOGY_FOR_LOADBALANCING;

Expand Down Expand Up @@ -84,5 +85,6 @@ public ServerLocatorConfig(final ServerLocatorConfig locator) {
failoverAttempts = locator.failoverAttempts;
initialMessagePacketSize = locator.initialMessagePacketSize;
useTopologyForLoadBalancing = locator.useTopologyForLoadBalancing;
compressionLevel = locator.compressionLevel;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,8 @@ public final class ActiveMQClient {

public static final boolean DEFAULT_COMPRESS_LARGE_MESSAGES = false;

public static final int DEFAULT_COMPRESSION_LEVEL = -1;

public static final int DEFAULT_CONSUMER_WINDOW_SIZE = 1024 * 1024;

public static final int DEFAULT_CONSUMER_MAX_RATE = -1;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -809,6 +809,22 @@ ClientSessionFactory createSessionFactory(TransportConfiguration transportConfig
*/
ServerLocator setCompressLargeMessage(boolean compressLargeMessages);

/**
* What compression level is in use
*
* @return
*/
int getCompressionLevel();

/**
* Sets what compressionLevel to use when compressing messages
* Value must be -1 (default), or 0-9
*
* @param compressionLevel
* @return this ServerLocator
*/
ServerLocator setCompressionLevel(int compressionLevel);

// XXX No javadocs
ServerLocator addClusterTopologyListener(ClusterTopologyListener listener);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -441,6 +441,7 @@ private void largeMessageSendStreamed(final boolean sendBlocking,
if (session.isCompressLargeMessages()) {
msgI.putBooleanProperty(Message.HDR_LARGE_COMPRESSED, true);
deflaterReader = new DeflaterReader(inputStreamParameter, messageSize);
deflaterReader.setLevel(session.getCompressionLevel());
input = deflaterReader;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -836,7 +836,7 @@ private ClientSession createSessionInternal(final String rawUsername,

SessionContext context = createSessionChannel(name, username, password, xa, autoCommitSends, autoCommitAcks, preAcknowledge, clientID);

ClientSessionInternal session = new ClientSessionImpl(this, name, username, password, xa, autoCommitSends, autoCommitAcks, preAcknowledge, serverLocator.isBlockOnAcknowledge(), serverLocator.isAutoGroup(), ackBatchSize, serverLocator.getConsumerWindowSize(), serverLocator.getConsumerMaxRate(), serverLocator.getConfirmationWindowSize(), serverLocator.getProducerWindowSize(), serverLocator.getProducerMaxRate(), serverLocator.isBlockOnNonDurableSend(), serverLocator.isBlockOnDurableSend(), serverLocator.isCacheLargeMessagesClient(), serverLocator.getMinLargeMessageSize(), serverLocator.isCompressLargeMessage(), serverLocator.getInitialMessagePacketSize(), serverLocator.getGroupID(), context, orderedExecutorFactory.getExecutor(), orderedExecutorFactory.getExecutor(), orderedExecutorFactory.getExecutor(), orderedExecutorFactory.getExecutor());
ClientSessionInternal session = new ClientSessionImpl(this, name, username, password, xa, autoCommitSends, autoCommitAcks, preAcknowledge, serverLocator.isBlockOnAcknowledge(), serverLocator.isAutoGroup(), ackBatchSize, serverLocator.getConsumerWindowSize(), serverLocator.getConsumerMaxRate(), serverLocator.getConfirmationWindowSize(), serverLocator.getProducerWindowSize(), serverLocator.getProducerMaxRate(), serverLocator.isBlockOnNonDurableSend(), serverLocator.isBlockOnDurableSend(), serverLocator.isCacheLargeMessagesClient(), serverLocator.getMinLargeMessageSize(), serverLocator.isCompressLargeMessage(), serverLocator.getCompressionLevel(), serverLocator.getInitialMessagePacketSize(), serverLocator.getGroupID(), context, orderedExecutorFactory.getExecutor(), orderedExecutorFactory.getExecutor(), orderedExecutorFactory.getExecutor(), orderedExecutorFactory.getExecutor());

synchronized (sessions) {
if (closed || !clientProtocolManager.isAlive()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,8 @@ public final class ClientSessionImpl implements ClientSessionInternal, FailureLi

private final boolean compressLargeMessages;

private final int compressionLevel;

private volatile int initialMessagePacketSize;

private final boolean cacheLargeMessageClient;
Expand Down Expand Up @@ -184,6 +186,7 @@ public final class ClientSessionImpl implements ClientSessionInternal, FailureLi
final boolean cacheLargeMessageClient,
final int minLargeMessageSize,
final boolean compressLargeMessages,
final int compressionLevel,
final int initialMessagePacketSize,
final String groupID,
final SessionContext sessionContext,
Expand Down Expand Up @@ -237,6 +240,8 @@ public final class ClientSessionImpl implements ClientSessionInternal, FailureLi

this.compressLargeMessages = compressLargeMessages;

this.compressionLevel = compressionLevel;

this.initialMessagePacketSize = initialMessagePacketSize;

this.groupID = groupID;
Expand Down Expand Up @@ -1186,6 +1191,11 @@ public boolean isCompressLargeMessages() {
return compressLargeMessages;
}

@Override
public int getCompressionLevel() {
return compressionLevel;
}

/**
* @return the cacheLargeMessageClient
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,8 @@ public interface ClientSessionInternal extends ClientSession {

boolean isCompressLargeMessages();

int getCompressionLevel();

void expire(ClientConsumer consumer, Message message) throws ActiveMQException;

void addConsumer(ClientConsumerInternal consumer);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1292,6 +1292,17 @@ public ServerLocatorImpl setCompressLargeMessage(boolean avoid) {
return this;
}

@Override
public int getCompressionLevel() {
return config.compressionLevel;
}

@Override
public ServerLocatorImpl setCompressionLevel(int compressionLevel) {
this.config.compressionLevel = compressionLevel;
return this;
}

private void checkWrite() {
synchronized (stateGuard) {
if (state != null && state != STATE.CLOSED) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -117,4 +117,8 @@ public long getTotalSize() {
return bytesRead.get();
}

public void setLevel(int level) {
deflater.setLevel(level);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -854,6 +854,14 @@ public void setCompressLargeMessage(boolean avoidLargeMessages) {
serverLocator.setCompressLargeMessage(avoidLargeMessages);
}

public int getCompressionLevel() {
return serverLocator.getCompressionLevel();
}

public void setCompressionLevel(int compressionLevel) {
serverLocator.setCompressionLevel(compressionLevel);
}

@Override
public void close() {
ServerLocator locator0 = serverLocator;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -246,6 +246,7 @@ void createConnectionFactory(String name,
boolean cacheLargeMessagesClient,
int minLargeMessageSize,
boolean compressLargeMessage,
int compressionLevel,
int consumerWindowSize,
int consumerMaxRate,
int confirmationWindowSize,
Expand Down Expand Up @@ -282,6 +283,7 @@ void createConnectionFactory(String name,
boolean cacheLargeMessagesClient,
int minLargeMessageSize,
boolean compressLargeMessages,
int compressionLevel,
int consumerWindowSize,
int consumerMaxRate,
int confirmationWindowSize,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,10 @@ public interface ConnectionFactoryConfiguration extends EncodingSupport {

ConnectionFactoryConfiguration setCompressLargeMessages(boolean avoidLargeMessages);

int getCompressionLevel();

ConnectionFactoryConfiguration setCompressionLevel(int compressionLevel);

int getConsumerWindowSize();

ConnectionFactoryConfiguration setConsumerWindowSize(int consumerWindowSize);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,8 @@ public class ConnectionFactoryConfigurationImpl implements ConnectionFactoryConf

private boolean compressLargeMessage = ActiveMQClient.DEFAULT_COMPRESS_LARGE_MESSAGES;

private int compressionLevel = ActiveMQClient.DEFAULT_COMPRESSION_LEVEL;

private int consumerWindowSize = ActiveMQClient.DEFAULT_CONSUMER_WINDOW_SIZE;

private int consumerMaxRate = ActiveMQClient.DEFAULT_CONSUMER_MAX_RATE;
Expand Down Expand Up @@ -281,6 +283,17 @@ public ConnectionFactoryConfiguration setMinLargeMessageSize(final int minLargeM
return this;
}

@Override
public int getCompressionLevel() {
return compressionLevel;
}

@Override
public ConnectionFactoryConfiguration setCompressionLevel(final int compressionLevel) {
this.compressionLevel = compressionLevel;
return this;
}

@Override
public int getConsumerWindowSize() {
return consumerWindowSize;
Expand Down Expand Up @@ -642,6 +655,9 @@ public void decode(final ActiveMQBuffer buffer) {
enableSharedClientID = buffer.readableBytes() > 0 ? BufferHelper.readNullableBoolean(buffer) : ActiveMQClient.DEFAULT_ENABLED_SHARED_CLIENT_ID;

useTopologyForLoadBalancing = buffer.readableBytes() > 0 ? BufferHelper.readNullableBoolean(buffer) : ActiveMQClient.DEFAULT_USE_TOPOLOGY_FOR_LOADBALANCING;

compressionLevel = buffer.readableBytes() > 0 ? BufferHelper.readNullableInteger(buffer) : ActiveMQClient.DEFAULT_COMPRESSION_LEVEL;

}

@Override
Expand Down Expand Up @@ -738,6 +754,8 @@ public void encode(final ActiveMQBuffer buffer) {
BufferHelper.writeNullableBoolean(buffer, enableSharedClientID);

BufferHelper.writeNullableBoolean(buffer, useTopologyForLoadBalancing);

BufferHelper.writeNullableInteger(buffer, compressionLevel);
}

@Override
Expand Down Expand Up @@ -858,7 +876,9 @@ public int getEncodeSize() {

BufferHelper.sizeOfNullableBoolean(enableSharedClientID) +

BufferHelper.sizeOfNullableBoolean(useTopologyForLoadBalancing);
BufferHelper.sizeOfNullableBoolean(useTopologyForLoadBalancing) +

BufferHelper.sizeOfNullableInteger(compressionLevel);

return size;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -872,6 +872,7 @@ public synchronized void createConnectionFactory(final String name,
final boolean cacheLargeMessagesClient,
final int minLargeMessageSize,
final boolean compressLargeMessage,
final int compressionLevel,
final int consumerWindowSize,
final int consumerMaxRate,
final int confirmationWindowSize,
Expand Down Expand Up @@ -917,6 +918,7 @@ public synchronized void createConnectionFactory(final String name,
final boolean cacheLargeMessagesClient,
final int minLargeMessageSize,
final boolean compressLargeMessages,
final int compressionLevel,
final int consumerWindowSize,
final int consumerMaxRate,
final int confirmationWindowSize,
Expand All @@ -943,7 +945,7 @@ public synchronized void createConnectionFactory(final String name,
checkInitialised();
ActiveMQConnectionFactory cf = connectionFactories.get(name);
if (cf == null) {
ConnectionFactoryConfiguration configuration = new ConnectionFactoryConfigurationImpl().setName(name).setHA(ha).setBindings(registryBindings).setDiscoveryGroupName(discoveryGroupName).setFactoryType(cfType).setClientID(clientID).setClientFailureCheckPeriod(clientFailureCheckPeriod).setConnectionTTL(connectionTTL).setCallTimeout(callTimeout).setCallFailoverTimeout(callFailoverTimeout).setCacheLargeMessagesClient(cacheLargeMessagesClient).setMinLargeMessageSize(minLargeMessageSize).setCompressLargeMessages(compressLargeMessages).setConsumerWindowSize(consumerWindowSize).setConsumerMaxRate(consumerMaxRate).setConfirmationWindowSize(confirmationWindowSize).setProducerWindowSize(producerWindowSize).setProducerMaxRate(producerMaxRate).setBlockOnAcknowledge(blockOnAcknowledge).setBlockOnDurableSend(blockOnDurableSend).setBlockOnNonDurableSend(blockOnNonDurableSend).setAutoGroup(autoGroup).setPreAcknowledge(preAcknowledge).setLoadBalancingPolicyClassName(loadBalancingPolicyClassName).setTransactionBatchSize(transactionBatchSize).setDupsOKBatchSize(dupsOKBatchSize).setUseGlobalPools(useGlobalPools).setScheduledThreadPoolMaxSize(scheduledThreadPoolMaxSize).setThreadPoolMaxSize(threadPoolMaxSize).setRetryInterval(retryInterval).setRetryIntervalMultiplier(retryIntervalMultiplier).setMaxRetryInterval(maxRetryInterval).setReconnectAttempts(reconnectAttempts).setFailoverOnInitialConnection(failoverOnInitialConnection);
ConnectionFactoryConfiguration configuration = new ConnectionFactoryConfigurationImpl().setName(name).setHA(ha).setBindings(registryBindings).setDiscoveryGroupName(discoveryGroupName).setFactoryType(cfType).setClientID(clientID).setClientFailureCheckPeriod(clientFailureCheckPeriod).setConnectionTTL(connectionTTL).setCallTimeout(callTimeout).setCallFailoverTimeout(callFailoverTimeout).setCacheLargeMessagesClient(cacheLargeMessagesClient).setMinLargeMessageSize(minLargeMessageSize).setCompressLargeMessages(compressLargeMessages).setCompressionLevel(compressionLevel).setConsumerWindowSize(consumerWindowSize).setConsumerMaxRate(consumerMaxRate).setConfirmationWindowSize(confirmationWindowSize).setProducerWindowSize(producerWindowSize).setProducerMaxRate(producerMaxRate).setBlockOnAcknowledge(blockOnAcknowledge).setBlockOnDurableSend(blockOnDurableSend).setBlockOnNonDurableSend(blockOnNonDurableSend).setAutoGroup(autoGroup).setPreAcknowledge(preAcknowledge).setLoadBalancingPolicyClassName(loadBalancingPolicyClassName).setTransactionBatchSize(transactionBatchSize).setDupsOKBatchSize(dupsOKBatchSize).setUseGlobalPools(useGlobalPools).setScheduledThreadPoolMaxSize(scheduledThreadPoolMaxSize).setThreadPoolMaxSize(threadPoolMaxSize).setRetryInterval(retryInterval).setRetryIntervalMultiplier(retryIntervalMultiplier).setMaxRetryInterval(maxRetryInterval).setReconnectAttempts(reconnectAttempts).setFailoverOnInitialConnection(failoverOnInitialConnection);
createConnectionFactory(true, configuration, registryBindings);
}
}
Expand Down Expand Up @@ -1212,6 +1214,7 @@ protected ActiveMQConnectionFactory internalCreateCFPOJO(final ConnectionFactory
cf.setReconnectAttempts(cfConfig.getReconnectAttempts());
cf.setFailoverOnInitialConnection(cfConfig.isFailoverOnInitialConnection());
cf.setCompressLargeMessage(cfConfig.isCompressLargeMessages());
cf.setCompressionLevel(cfConfig.getCompressionLevel());
cf.setGroupID(cfConfig.getGroupID());
cf.setProtocolManagerFactoryStr(cfConfig.getProtocolManagerFactoryStr());
cf.setDeserializationBlackList(cfConfig.getDeserializationBlackList());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -689,6 +689,14 @@ public void setCompressLargeMessage(final Boolean compressLargeMessage) {
mcfProperties.setCompressLargeMessage(compressLargeMessage);
}

public Integer getCompressionLevel() {
return mcfProperties.getCompressionLevel();
}

public void setCompressionLevel(final Integer compressionLevel) {
mcfProperties.setCompressionLevel(compressionLevel);
}

public Integer getInitialConnectAttempts() {
return mcfProperties.getInitialConnectAttempts();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -640,6 +640,29 @@ public void setCompressLargeMessage(final Boolean compressLargeMessage) {
raProperties.setCompressLargeMessage(compressLargeMessage);
}

/**
* Get compressionLevel
*
* @return The value
*/
public Integer getCompressionLevel() {
logger.trace("getCompressionLevel()");

return raProperties.getCompressionLevel();
}

/**
* Sets what compressionLevel to use when compressing messages
* Value must be -1 (default) or 0-9
*
* @param compressionLevel The value
*/
public void setCompressionLevel(final Integer compressionLevel) {
logger.trace("setCompressionLevel({})", compressionLevel);

raProperties.setCompressionLevel(compressionLevel);
}

/**
* Get call timeout
*
Expand Down Expand Up @@ -1828,6 +1851,10 @@ private void setParams(final ActiveMQConnectionFactory cf, final ConnectionFacto
if (val2 != null) {
cf.setInitialMessagePacketSize(val2);
}
val2 = overrideProperties.getCompressionLevel() != null ? overrideProperties.getCompressionLevel() : raProperties.getCompressionLevel();
if (val2 != null) {
cf.setCompressionLevel(val2);
}

Long val3 = overrideProperties.getClientFailureCheckPeriod() != null ? overrideProperties.getClientFailureCheckPeriod() : raProperties.getClientFailureCheckPeriod();
if (val3 != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,8 @@ public class ConnectionFactoryProperties implements ConnectionFactoryOptions {

private Boolean compressLargeMessage;

private Integer compressionLevel;

private Integer consumerWindowSize;

private Integer producerWindowSize;
Expand Down Expand Up @@ -178,6 +180,15 @@ public void setCompressLargeMessage(Boolean compressLargeMessage) {
this.compressLargeMessage = compressLargeMessage;
}

public Integer getCompressionLevel() {
return compressionLevel;
}

public void setCompressionLevel(Integer compressionLevel) {
hasBeenUpdated = true;
this.compressionLevel = compressionLevel;
}

public String getConnectionLoadBalancingPolicyClassName() {
logger.trace("getConnectionLoadBalancingPolicyClassName()");

Expand Down
9 changes: 9 additions & 0 deletions docs/user-manual/en/large-messages.md
Original file line number Diff line number Diff line change
Expand Up @@ -68,11 +68,20 @@ is transferred to the server's side. Notice that there's no special treatment
at the server's side, all the compressing and uncompressing is done at the
client.

This behavior can be tuned further by setting an optional parameter: `compressionLevel`.
This will decide how much the message body should be compressed. `compressionLevel`
accepts an integer of `-1` or a value between `0-9`. The default value is `-1` which
corresponds to around level 6-7.

If the compressed size of a large message is below `minLargeMessageSize`, it is
sent to server as regular messages. This means that the message won't be
written into the server's large-message data directory, thus reducing the disk
I/O.

**Note:** A higher `compressionLevel` means the message body will get further compressed,
but this is at the cost of speed and computational overhead. Make sure to tune this value
according to its specific use-case.

## Streaming large messages from Core Protocol

Apache ActiveMQ Artemis supports setting the body of messages using input and
Expand Down

0 comments on commit 582a689

Please sign in to comment.