Skip to content

Commit

Permalink
Set more detailed Response data; fix sub sampler message extra space
Browse files Browse the repository at this point in the history
  • Loading branch information
chongyuan committed Sep 10, 2018
1 parent 3dde245 commit 693cb36
Show file tree
Hide file tree
Showing 5 changed files with 68 additions and 14 deletions.
@@ -1,6 +1,11 @@
package net.xmeter.samplers;

import java.util.HashMap;
import java.util.Map;
import java.util.Set;

import org.apache.jmeter.samplers.AbstractSampler;
import org.fusesource.hawtbuf.UTF8Buffer;

import net.xmeter.Constants;

Expand All @@ -9,6 +14,9 @@ public abstract class AbstractMQTTSampler extends AbstractSampler implements Con
*
*/
private static final long serialVersionUID = 7163793218595455807L;

//<connection client id, topics>
protected static Map<UTF8Buffer, Set<String>> topicSubscribed = new HashMap<UTF8Buffer, Set<String>>();

public String getServer() {
return getPropertyAsString(SERVER, DEFAULT_SERVER);
Expand Down
16 changes: 13 additions & 3 deletions mqtt_jmeter/src/main/java/net/xmeter/samplers/ConnectSampler.java
@@ -1,6 +1,7 @@
package net.xmeter.samplers;

import java.text.MessageFormat;
import java.util.HashSet;
import java.util.concurrent.TimeUnit;
import java.util.logging.Logger;

Expand All @@ -10,6 +11,7 @@
import org.apache.jmeter.threads.JMeterVariables;
import org.fusesource.mqtt.client.CallbackConnection;
import org.fusesource.mqtt.client.MQTT;
import org.fusesource.mqtt.client.Tracer;

import net.xmeter.Util;

Expand All @@ -31,7 +33,7 @@ public SampleResult sample(Entry entry) {
result.sampleStart();
result.setSuccessful(false);
result.setResponseMessage(MessageFormat.format("Connection {0} is already established.", connection));
result.setResponseData("Failed.".getBytes());
result.setResponseData("Failed. Connection is already established.".getBytes());
result.setResponseCode("500");
result.sampleEnd(); // avoid endtime=0 exposed in trace log
return result;
Expand Down Expand Up @@ -64,6 +66,12 @@ public SampleResult sample(Entry entry) {
mqtt.setPassword(getPasswordAuth());
}
mqtt.setCleanSession(getConnCleanSession());

mqtt.setTracer(new Tracer() {
public void debug(String message, Object...args) {
logger.info("MQTT Tracer: " + message);
}
});

result.sampleStart();
// TODO: Optionally connection can subscribe to topics ??
Expand All @@ -78,22 +86,24 @@ public SampleResult sample(Entry entry) {

if (callback.isConnectionSucc()) {
vars.putObject("conn", connection); // save connection object as thread local variable !!
vars.putObject("clientId", mqtt.getClientId()); //save client id as thread local variable
topicSubscribed.put(mqtt.getClientId(), new HashSet<String>());
result.setSuccessful(true);
result.setResponseData("Successful.".getBytes());
result.setResponseMessage(MessageFormat.format("Connection {0} established successfully.", connection));
result.setResponseCodeOK();
} else {
result.setSuccessful(false);
result.setResponseMessage(MessageFormat.format("Failed to establish Connection {0}.", connection));
result.setResponseData("Failed.".getBytes());
result.setResponseData(MessageFormat.format("Client [{0}] failed. Couldn't establish connection.", mqtt.getClientId().toString()).getBytes());
result.setResponseCode("501");
}
} catch (Exception e) {
logger.severe(e.getMessage());
if (result.getEndTime() == 0) result.sampleEnd(); //avoid re-enter sampleEnd()
result.setSuccessful(false);
result.setResponseMessage(MessageFormat.format("Failed to establish Connection {0}.", connection));
result.setResponseData("Failed with exception.".getBytes());
result.setResponseData(MessageFormat.format("Client [{0}] failed with exception.", mqtt.getClientId().toString()).getBytes());
result.setResponseCode("502");
}

Expand Down
Expand Up @@ -7,6 +7,7 @@
import org.apache.jmeter.samplers.SampleResult;
import org.apache.jmeter.threads.JMeterContextService;
import org.apache.jmeter.threads.JMeterVariables;
import org.fusesource.hawtbuf.UTF8Buffer;
import org.fusesource.mqtt.client.CallbackConnection;

public class DisConnectSampler extends AbstractMQTTSampler {
Expand All @@ -22,11 +23,12 @@ public SampleResult sample(Entry entry) {

JMeterVariables vars = JMeterContextService.getContext().getVariables();
connection = (CallbackConnection) vars.getObject("conn");
UTF8Buffer clientId = (UTF8Buffer) vars.getObject("clientId");
if (connection == null) {
result.sampleStart();
result.setSuccessful(false);
result.setResponseMessage("Connection not found.");
result.setResponseData("Failed.".getBytes());
result.setResponseData("Failed. Connection not found.".getBytes());
result.setResponseCode("500");
result.sampleEnd(); // avoid endtime=0 exposed in trace log
return result;
Expand All @@ -39,6 +41,7 @@ public SampleResult sample(Entry entry) {
logger.info(MessageFormat.format("Disconnect connection {0}.", connection));
connection.disconnect(null);
vars.remove("conn"); // clean up thread local var as well
topicSubscribed.remove(clientId);
}

result.sampleEnd();
Expand All @@ -52,7 +55,7 @@ public SampleResult sample(Entry entry) {
if (result.getEndTime() == 0) result.sampleEnd(); //avoid re-enter sampleEnd()
result.setSuccessful(false);
result.setResponseMessage(MessageFormat.format("Failed to disconnect Connection {0}.", connection));
result.setResponseData("Failed.".getBytes());
result.setResponseData(MessageFormat.format("Client [{0}] failed. Couldn't disconnect connection.", (clientId == null ? "null" : clientId.toString())).getBytes());
result.setResponseCode("501");
}
return result;
Expand Down
14 changes: 11 additions & 3 deletions mqtt_jmeter/src/main/java/net/xmeter/samplers/PubSampler.java
Expand Up @@ -9,6 +9,7 @@
import org.apache.jmeter.samplers.SampleResult;
import org.apache.jmeter.threads.JMeterContextService;
import org.apache.jmeter.threads.JMeterVariables;
import org.fusesource.hawtbuf.UTF8Buffer;
import org.fusesource.mqtt.client.CallbackConnection;
import org.fusesource.mqtt.client.QoS;

Expand Down Expand Up @@ -91,6 +92,7 @@ public SampleResult sample(Entry arg0) {

JMeterVariables vars = JMeterContextService.getContext().getVariables();
connection = (CallbackConnection) vars.getObject("conn");
UTF8Buffer clientId = (UTF8Buffer) vars.getObject("clientId");
if (connection == null) {
result.sampleStart();
result.setSuccessful(false);
Expand All @@ -101,8 +103,8 @@ public SampleResult sample(Entry arg0) {
return result;
}

byte[] toSend = new byte[]{};
try {
byte[] toSend = new byte[]{};
byte[] tmp = new byte[]{};

if (MESSAGE_TYPE_HEX_STRING.equals(getMessageType())) {
Expand Down Expand Up @@ -147,6 +149,7 @@ public SampleResult sample(Entry arg0) {
result.sampleStart();
final Object pubLock = new Object();
PubCallback pubCallback = new PubCallback(pubLock, qos_enum);
logger.fine("pub [topic]: " + topicName + ", [payload]: " + new String(toSend));

if(qos_enum == QoS.AT_MOST_ONCE) {
//For QoS == 0, the callback is the same thread with sampler thread, so it cannot use the lock object wait() & notify() in else block;
Expand All @@ -172,17 +175,22 @@ public SampleResult sample(Entry arg0) {
} else {
result.setSuccessful(false);
result.setResponseMessage(MessageFormat.format("Publish failed for connection {0}.", connection));
result.setResponseData(pubCallback.getErrorMessage().getBytes());
result.setResponseData(MessageFormat.format("Client [{0}] publish failed: {1}", (clientId == null ? "null" : clientId.toString()), pubCallback.getErrorMessage()).getBytes());
result.setResponseCode("501");
logger.info(MessageFormat.format("** [clientId: {0}, topic: {1}, payload: {2}] Publish failed for connection {3}.", (clientId == null ? "null" : clientId.toString()),
topicName, new String(toSend), connection));
logger.info(pubCallback.getErrorMessage());
}
} catch (Exception ex) {
logger.severe(ex.getMessage());
if (result.getEndTime() == 0) result.sampleEnd();
result.setLatency(result.getEndTime() - result.getStartTime());
result.setSuccessful(false);
result.setResponseMessage(MessageFormat.format("Publish failed for connection {0}.", connection));
result.setResponseData(ex.getMessage().getBytes());
result.setResponseData(MessageFormat.format("Client [{0}] publish failed: {1}", (clientId == null ? "null" : clientId.toString()), ex.getMessage()).getBytes());
result.setResponseCode("502");
logger.info(MessageFormat.format("** [clientId: {0}, topic: {1}, payload: {2}] Publish failed for connection {3}.", (clientId == null ? "null" : clientId.toString()),
topicName, new String(toSend), connection));
}
return result;
}
Expand Down
37 changes: 31 additions & 6 deletions mqtt_jmeter/src/main/java/net/xmeter/samplers/SubSampler.java
Expand Up @@ -3,7 +3,9 @@
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.text.MessageFormat;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.TimeUnit;
import java.util.logging.Logger;
Expand All @@ -28,6 +30,7 @@ public class SubSampler extends AbstractMQTTSampler {
private static final Logger logger = Logger.getLogger(SubSampler.class.getCanonicalName());

private transient CallbackConnection connection = null;
private transient UTF8Buffer clientId;
private boolean subFailed = false;

private boolean sampleByTime = true; // initial values
Expand Down Expand Up @@ -104,6 +107,8 @@ public SampleResult sample(Entry arg0) {

JMeterVariables vars = JMeterContextService.getContext().getVariables();
connection = (CallbackConnection) vars.getObject("conn");
clientId = (UTF8Buffer) vars.getObject("clientId");
UTF8Buffer clientId = (UTF8Buffer) vars.getObject("clientId");
if (connection == null) {
return fillFailedResult(result, "500", "Subscribe failed because connection is not established.");
}
Expand All @@ -127,7 +132,21 @@ public SampleResult sample(Entry arg0) {

final String topicsName= getTopics();
setListener(sampleByTime, sampleCount);
listenToTopics(topicsName); // TODO: run once or multiple times ?
Set<String> topics = topicSubscribed.get(clientId);
if (topics == null) {
logger.severe("subscribed topics haven't been initiated. [clientId: " + (clientId == null ? "null" : clientId.toString()) + "]");
topics = new HashSet<String>();
topics.add(topicsName);
topicSubscribed.put(clientId, topics);
listenToTopics(topicsName); // TODO: run once or multiple times ?
} else {
if (!topics.contains(topicsName)) {
topics.add(topicsName);
topicSubscribed.put(clientId, topics);
logger.fine("Listen to topics: " + topicsName);
listenToTopics(topicsName); // TODO: run once or multiple times ?
}
}

if (subFailed) {
return fillFailedResult(result, "501", "Failed to subscribe to topic(s):" + topicsName);
Expand All @@ -141,7 +160,7 @@ public SampleResult sample(Entry arg0) {
}
synchronized (dataLock) {
result.sampleStart();
return produceResult(result);
return produceResult(result, topicsName);
}
} else {
synchronized (dataLock) {
Expand All @@ -159,12 +178,12 @@ public SampleResult sample(Entry arg0) {
}
}
result.sampleStart();
return produceResult(result);
return produceResult(result, topicsName);
}
}
}

private SampleResult produceResult(SampleResult result) {
private SampleResult produceResult(SampleResult result, String topicName) {
SubBean bean = batches.poll();
if(bean == null) { // In "elapsed time" mode, return "dummy" when time is reached
bean = new SubBean();
Expand All @@ -175,10 +194,11 @@ private SampleResult produceResult(SampleResult result) {
StringBuffer content = new StringBuffer("");
if (isDebugResponse()) {
for (int i = 0; i < contents.size(); i++) {
content.append(contents.get(i) + " \n");
content.append(contents.get(i) + "\n");
}
}
result = fillOKResult(result, bean.getReceivedMessageSize(), message, content.toString());
logger.fine("sub [topic]: " + topicName + ", [payload]: " + content.toString());

if(receivedCount == 0) {
result.setEndTime(result.getStartTime()); // dummy result, rectify sample time
Expand Down Expand Up @@ -228,6 +248,7 @@ public void onSuccess(byte[] value) {

@Override
public void onFailure(Throwable value) {
logger.info("subscribe failed: " + value.getMessage());
subFailed = true;
}
});
Expand Down Expand Up @@ -319,7 +340,11 @@ private SampleResult fillFailedResult(SampleResult result, String code, String m
result.setResponseCode(code); // 5xx means various failures
result.setSuccessful(false);
result.setResponseMessage(message);
result.setResponseData(message.getBytes());
if (clientId != null) {
result.setResponseData(MessageFormat.format("Client [{0}]: {1}", clientId.toString(), message).getBytes());
} else {
result.setResponseData(message.getBytes());
}
result.sampleEnd();

// avoid massive repeated "early stage" failures in a short period of time
Expand Down

0 comments on commit 693cb36

Please sign in to comment.