Skip to content

Commit

Permalink
Add option to configure transactional AMQP consumer from the UI (see j…
Browse files Browse the repository at this point in the history
  • Loading branch information
aliesbelik committed Oct 29, 2021
1 parent 3cb0083 commit 5e36592
Show file tree
Hide file tree
Showing 3 changed files with 35 additions and 5 deletions.
22 changes: 21 additions & 1 deletion src/main/com/zeroclue/jmeter/protocol/amqp/AMQPConsumer.java
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,9 @@ public class AMQPConsumer extends AMQPSampler implements Interruptible, TestStat
public static final String ROUTING_KEY_PARAMETER = "Routing Key";
public static final String DELIVERY_TAG_PARAMETER = "Delivery Tag";

public static boolean DEFAULT_USE_TX = false;
private final static String USE_TX = "AMQPConsumer.UseTx";

private transient Channel channel;
private transient QueueingConsumer consumer;
private transient String consumerTag;
Expand Down Expand Up @@ -109,10 +112,15 @@ public SampleResult sample(Entry entry) {
result.setSamplerData("Read response is false.");
}

if(!autoAck())
if (!autoAck())
channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
}

// commit the sample
if (getUseTx()) {
channel.txCommit();
}

/*
* Set up the sample result details
*/
Expand Down Expand Up @@ -234,6 +242,14 @@ public int getPrefetchCountAsInt() {
return getPropertyAsInt(PREFETCH_COUNT);
}

public Boolean getUseTx() {
return getPropertyAsBoolean(USE_TX, DEFAULT_USE_TX);
}

public void setUseTx(Boolean tx) {
setProperty(USE_TX, tx);
}

/**
* set whether the sampler should read the response or not
*
Expand Down Expand Up @@ -325,6 +341,10 @@ protected boolean initChannel() throws IOException, NoSuchAlgorithmException, Ke
boolean ret = super.initChannel();
channel.basicQos(getPrefetchCountAsInt());

if (getUseTx()) {
channel.txSelect();
}

return ret;
}

Expand Down
4 changes: 2 additions & 2 deletions src/main/com/zeroclue/jmeter/protocol/amqp/AMQPPublisher.java
Original file line number Diff line number Diff line change
Expand Up @@ -50,10 +50,10 @@ public class AMQPPublisher extends AMQPSampler implements Interruptible {
private final static String HEADERS = "AMQPPublisher.Headers";

public static boolean DEFAULT_PERSISTENT = false;
private final static String PERSISTENT = "AMQPConsumer.Persistent";
private final static String PERSISTENT = "AMQPPublisher.Persistent";

public static boolean DEFAULT_USE_TX = false;
private final static String USE_TX = "AMQPConsumer.UseTx";
private final static String USE_TX = "AMQPPublisher.UseTx";

public static final int DEFAULT_MESSAGE_PRIORITY = 0;
public static final String DEFAULT_RESPONSE_CODE = "500";
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package com.zeroclue.jmeter.protocol.amqp.gui;

import java.awt.*;
import javax.swing.JCheckBox;
import javax.swing.JPanel;

Expand All @@ -14,9 +15,11 @@ public class AMQPConsumerGui extends AMQPSamplerGui {

protected JLabeledTextField receiveTimeout = new JLabeledTextField("Receive Timeout");
protected JLabeledTextField prefetchCount = new JLabeledTextField(" Prefetch Count");

private final JCheckBox purgeQueue = new JCheckBox("Purge Queue", false);
private final JCheckBox autoAck = new JCheckBox("Auto ACK", true);
private final JCheckBox readResponse = new JCheckBox("Read Response", AMQPConsumer.DEFAULT_READ_RESPONSE);
private final JCheckBox useTx = new JCheckBox("Use Transactions", AMQPConsumer.DEFAULT_USE_TX);

private JPanel mainPanel;

Expand All @@ -30,11 +33,15 @@ public AMQPConsumerGui() {
protected void init() {
super.init();

mainPanel.add(readResponse);
mainPanel.add(prefetchCount);
prefetchCount.setPreferredSize(new Dimension(100,25));
useTx.setPreferredSize(new Dimension(100,25));

mainPanel.add(receiveTimeout);
mainPanel.add(prefetchCount);
mainPanel.add(purgeQueue);
mainPanel.add(autoAck);
mainPanel.add(readResponse);
mainPanel.add(useTx);
}

@Override
Expand All @@ -56,6 +63,7 @@ public void configure(TestElement element) {
receiveTimeout.setText(sampler.getReceiveTimeout());
purgeQueue.setSelected(sampler.purgeQueue());
autoAck.setSelected(sampler.autoAck());
useTx.setSelected(sampler.getUseTx());
}

/**
Expand All @@ -66,6 +74,7 @@ public void clearGui() {
super.clearGui();
readResponse.setSelected(AMQPConsumer.DEFAULT_READ_RESPONSE);
prefetchCount.setText(AMQPConsumer.DEFAULT_PREFETCH_COUNT_STRING);
useTx.setSelected(AMQPConsumer.DEFAULT_USE_TX);
receiveTimeout.setText("");
purgeQueue.setSelected(false);
autoAck.setSelected(true);
Expand Down Expand Up @@ -97,6 +106,7 @@ public void modifyTestElement(TestElement te) {
sampler.setReceiveTimeout(receiveTimeout.getText());
sampler.setPurgeQueue(purgeQueue.isSelected());
sampler.setAutoAck(autoAck.isSelected());
sampler.setUseTx(useTx.isSelected());
}

/**
Expand Down

0 comments on commit 5e36592

Please sign in to comment.