Skip to content

Commit

Permalink
ARTEMIS-4224 Optimizing memory consumption from MQTT5SoakTest
Browse files Browse the repository at this point in the history
  • Loading branch information
clebertsuconic committed Apr 6, 2023
1 parent 301aadb commit d9d727b
Showing 1 changed file with 30 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,11 @@

package org.apache.activemq.artemis.tests.soak.mqtt;

import java.nio.charset.StandardCharsets;
import java.lang.invoke.MethodHandles;
import java.util.Arrays;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;

import org.apache.activemq.artemis.api.config.ActiveMQDefaultConfiguration;
import org.apache.activemq.artemis.api.core.RoutingType;
Expand All @@ -36,10 +38,16 @@
import org.eclipse.paho.mqttv5.common.MqttException;
import org.eclipse.paho.mqttv5.common.MqttMessage;
import org.eclipse.paho.mqttv5.common.packet.MqttProperties;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class MQTT5SoakTest extends SoakTestBase {

private static final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());

public static final String SERVER_NAME_0 = "mqtt";

private static final String JMX_SERVER_HOSTNAME = "localhost";
Expand All @@ -65,21 +73,26 @@ public void before() throws Exception {
public void testMaxMessageSize() throws Throwable {

final String TOPIC = "MQTT_SOAK";
// subtract a little to leave room for the header
final int SIZE = MQTTUtil.MAX_PACKET_SIZE - 48;
StringBuilder builder = new StringBuilder(SIZE);

for (int i = 0; i < SIZE; i++) {
builder.append("=");
}
byte[] bytes = builder.toString().getBytes(StandardCharsets.UTF_8);
AtomicInteger errors = new AtomicInteger(0);

final CountDownLatch latch = new CountDownLatch(1);

MqttClient consumer = createPahoClient("consumer");
consumer.setCallback(new DefaultMqttCallback() {
@Override
public void messageArrived(String topic, MqttMessage message) throws Exception {
assertEqualsByteArrays(bytes.length, bytes, message.getPayload());
byte[] payload = message.getPayload();
if (payload.length != SIZE) {
logger.error("payload.length != {}", SIZE);
errors.incrementAndGet();
}
for (int i = 0; i < payload.length; i++) {
if (payload[i] != (byte)'=') {
logger.error("unexpected byte {} at position {} on received payload", payload[i], i);
errors.incrementAndGet();
}
}
latch.countDown();
}

Expand All @@ -97,7 +110,12 @@ public void mqttErrorOccurred(MqttException exception) {

MqttClient producer = createPahoClient("producer");
producer.connect();
producer.publish(TOPIC, bytes, 1, false);

{ // using a little context to make sure the bytes reference goes away asap saving some memory
byte[] bytes = new byte[SIZE];
Arrays.fill(bytes, (byte) '=');
producer.publish(TOPIC, bytes, 1, false);
}

assertTrue(latch.await(30, TimeUnit.SECONDS));

Expand All @@ -110,6 +128,8 @@ public void mqttErrorOccurred(MqttException exception) {
producer.close();

consumer.close();

Assert.assertEquals(0, errors.get());
}

protected interface DefaultMqttCallback extends MqttCallback {
Expand Down

0 comments on commit d9d727b

Please sign in to comment.