Skip to content

Commit

Permalink
ARTEMIS-3522 Implement performance tool for RUL benchmarks
Browse files Browse the repository at this point in the history
Co-authored-by: gtully
  • Loading branch information
franz1981 committed Mar 2, 2022
1 parent cf49bfa commit f8b045b
Show file tree
Hide file tree
Showing 36 changed files with 3,415 additions and 11 deletions.
4 changes: 4 additions & 0 deletions artemis-cli/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,10 @@
<groupId>org.apache.commons</groupId>
<artifactId>commons-lang3</artifactId>
</dependency>
<dependency>
<groupId>org.hdrhistogram</groupId>
<artifactId>HdrHistogram</artifactId>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-configuration2</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,9 @@
import org.apache.activemq.artemis.cli.commands.check.NodeCheck;
import org.apache.activemq.artemis.cli.commands.check.QueueCheck;
import org.apache.activemq.artemis.cli.commands.messages.Transfer;
import org.apache.activemq.artemis.cli.commands.messages.perf.PerfClientCommand;
import org.apache.activemq.artemis.cli.commands.messages.perf.PerfConsumerCommand;
import org.apache.activemq.artemis.cli.commands.messages.perf.PerfProducerCommand;
import org.apache.activemq.artemis.cli.commands.queue.StatQueue;
import org.apache.activemq.artemis.cli.commands.Run;
import org.apache.activemq.artemis.cli.commands.Stop;
Expand Down Expand Up @@ -163,6 +166,10 @@ private static Cli.CliBuilder<Action> builder(File artemisInstance) {
withCommand(HelpAction.class).withCommand(Producer.class).withCommand(Transfer.class).withCommand(Consumer.class).
withCommand(Browse.class).withCommand(Mask.class).withCommand(PrintVersion.class).withDefaultCommand(HelpAction.class);

builder.withGroup("perf").withDescription("Perf tools group (example ./artemis perf client)")
.withDefaultCommand(PerfClientCommand.class)
.withCommands(PerfProducerCommand.class, PerfConsumerCommand.class, PerfClientCommand.class);

builder.withGroup("check").withDescription("Check tools group (node|queue) (example ./artemis check node)").
withDefaultCommand(HelpCheck.class).withCommands(NodeCheck.class, QueueCheck.class);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,10 +43,10 @@ public class ConnectionAbstract extends InputAbstract {
protected String password;

@Option(name = "--clientID", description = "ClientID to be associated with connection")
String clientID;
protected String clientID;

@Option(name = "--protocol", description = "Protocol used. Valid values are amqp or core. Default=core.")
String protocol = "core";
protected String protocol = "core";

public String getBrokerURL() {
return brokerURL;
Expand Down Expand Up @@ -126,16 +126,27 @@ public Object execute(ActionContext context) throws Exception {
}

protected ConnectionFactory createConnectionFactory() throws Exception {
return createConnectionFactory(brokerURL, user, password, clientID, protocol);
}

protected ConnectionFactory createConnectionFactory(String brokerURL,
String user,
String password,
String clientID,
String protocol) throws Exception {
if (protocol.equals("core")) {
return createCoreConnectionFactory();
return createCoreConnectionFactory(brokerURL, user, password, clientID);
} else if (protocol.equals("amqp")) {
return createAMQPConnectionFactory();
return createAMQPConnectionFactory(brokerURL, user, password, clientID);
} else {
throw new IllegalStateException("protocol " + protocol + " not supported");
}
}

private ConnectionFactory createAMQPConnectionFactory() {
private ConnectionFactory createAMQPConnectionFactory(String brokerURL,
String user,
String password,
String clientID) {
if (brokerURL.startsWith("tcp://")) {
// replacing tcp:// by amqp://
brokerURL = "amqp" + brokerURL.substring(3);
Expand Down Expand Up @@ -172,8 +183,14 @@ private ConnectionFactory createAMQPConnectionFactory() {
}

protected ActiveMQConnectionFactory createCoreConnectionFactory() {
ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory(brokerURL, user, password);
return createCoreConnectionFactory(brokerURL, user, password, clientID);
}

protected ActiveMQConnectionFactory createCoreConnectionFactory(String brokerURL,
String user,
String password,
String clientID) {
ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory(brokerURL, user, password);
if (clientID != null) {
System.out.println("Consumer:: clientID = " + clientID);
cf.setClientID(clientID);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,13 +65,17 @@ protected MessageSerializer getMessageSerializer() {
}

protected Destination getDestination(Session session) throws JMSException {
return getDestination(session, destination);
}

public static Destination getDestination(Session session, String destination) throws JMSException {
if (destination.startsWith(ActiveMQDestination.TOPIC_QUALIFIED_PREFIX)) {
return session.createTopic(stripPrefix(destination));
}
return session.createQueue(stripPrefix(destination));
}

private String stripPrefix(String destination) {
public static String stripPrefix(String destination) {
int index = destination.indexOf("://");
if (index != -1) {
return destination.substring(index + 3);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,274 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.cli.commands.messages.perf;

import javax.jms.BytesMessage;
import javax.jms.CompletionListener;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageProducer;
import javax.jms.Session;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.atomic.AtomicLongFieldUpdater;

import static java.util.Objects.requireNonNull;

public final class AsyncJms2ProducerFacade {

private final long id;
protected final Session session;
private final MessageProducer producer;

/*
* maxPending limits the number of in-flight sent messages
* in a way that if the limit is reached and a single completion arrive,
* a subsequent send attempt will succeed.
*/
private long pending;
private final long maxPending;

/*
* Tracking sent messages in transaction requires using 2 separate counters
* ie pendingMsgInTransaction, completedMsgInTransaction
* because, using just one won't allow tracking completions of previously sent messages in order to commit
* the transaction while there are no more in-flight ones.
*/
private final long transactionCapacity;
private long pendingMsgInTransaction;
private long completedMsgInTransaction;

private final List<Runnable> availableObservers;
private final List<Runnable> closedObservers;

private static final AtomicLongFieldUpdater<AsyncJms2ProducerFacade> MESSAGE_SENT_UPDATER = AtomicLongFieldUpdater.newUpdater(AsyncJms2ProducerFacade.class, "messageSent");
private static final AtomicLongFieldUpdater<AsyncJms2ProducerFacade> MESSAGE_COMPLETED_UPDATER = AtomicLongFieldUpdater.newUpdater(AsyncJms2ProducerFacade.class, "messageCompleted");
private static final AtomicLongFieldUpdater<AsyncJms2ProducerFacade> NOT_AVAILABLE_UPDATER = AtomicLongFieldUpdater.newUpdater(AsyncJms2ProducerFacade.class, "notAvailable");

private volatile long messageSent;
private volatile long messageCompleted;
private volatile long notAvailable;

private boolean closing;
private boolean closed;
private final Destination destination;

public AsyncJms2ProducerFacade(final long id,
final Session session,
final MessageProducer producer,
final Destination destination,
final long maxPending,
final long transactionCapacity) {
this.id = id;
this.session = requireNonNull(session);
this.producer = requireNonNull(producer);
this.destination = destination;
this.pending = 0;
this.maxPending = transactionCapacity > 0 && maxPending > 0 ? Math.max(maxPending, transactionCapacity) : maxPending;
this.availableObservers = new ArrayList<>(1);
this.closedObservers = new ArrayList<>(1);
this.messageSent = 0;
this.messageCompleted = 0;
this.notAvailable = 0;
try {
if (transactionCapacity < 0) {
throw new IllegalStateException("transactionCapacity must be >= 0");
}
if (transactionCapacity > 0) {
if (!session.getTransacted()) {
throw new IllegalStateException("session must be transacted with transactionCapacity != 0");
}
} else {
if (session.getTransacted()) {
throw new IllegalStateException("session cannot be transacted with transactionCapacity = 0");
}
}
} catch (final JMSException ex) {
throw new IllegalStateException(ex);
}
this.transactionCapacity = transactionCapacity;
this.pendingMsgInTransaction = 0;
this.completedMsgInTransaction = 0;
this.closing = false;
this.closed = false;
}

public long getId() {
return id;
}

public Destination getDestination() {
return destination;
}

BytesMessage createBytesMessage() throws JMSException {
return session.createBytesMessage();
}

private void addedPendingSend() {
if (transactionCapacity > 0 && pendingMsgInTransaction == transactionCapacity) {
throw new IllegalStateException("reached max in-flight transacted sent messages");
}
if (maxPending > 0 && pending == maxPending) {
throw new IllegalStateException("reached max in-flight sent messages");
}
pending++;
pendingMsgInTransaction++;
}

/**
* if {@code true}, a subsequent {@link #trySend} would return {@link SendAttemptResult#Success}.<br>
* Otherwise, a subsequent {@link #trySend} would return {@link SendAttemptResult#NotAvailable}.
*/
private boolean isAvailable() {
if (maxPending > 0 && pending == maxPending) {
return false;
}
return transactionCapacity == 0 || pendingMsgInTransaction != transactionCapacity;
}

public enum SendAttemptResult {
Closing, Closed, NotAvailable, Success
}

public SendAttemptResult trySend(final Message message,
final CompletionListener completionListener,
final Runnable availableObserver) throws JMSException {
if (closing) {
return SendAttemptResult.Closing;
}
if (closed) {
return SendAttemptResult.Closed;
}
if (!isAvailable()) {
availableObservers.add(availableObserver);
orderedIncrementNotAvailable();
return SendAttemptResult.NotAvailable;
}
producer.send(message, completionListener);
orderedIncrementSent();
addedPendingSend();
return SendAttemptResult.Success;
}

public void onSendErrored() {
if (closed) {
return;
}
availableObservers.clear();
closedObservers.forEach(Runnable::run);
closedObservers.clear();
closed = true;
}

public JMSException onSendCompleted() {
if (closed) {
return null;
}
JMSException completionError = null;
orderedIncrementCompleted();
if (transactionCapacity > 0 && completedMsgInTransaction == transactionCapacity) {
throw new IllegalStateException("cannot complete more send");
}
if (pending == 0) {
throw new IllegalStateException("cannot complete more send");
}
pending--;
completedMsgInTransaction++;
if (transactionCapacity > 0) {
if (completedMsgInTransaction == transactionCapacity || (closing && pending == 0)) {
completedMsgInTransaction = 0;
pendingMsgInTransaction = 0;
try {
session.commit();
} catch (final JMSException fatal) {
completionError = fatal;
closing = true;
}
if (closing) {
closing = false;
closed = true;
closedObservers.forEach(Runnable::run);
closedObservers.clear();
} else if (isAvailable()) {
availableObservers.forEach(Runnable::run);
availableObservers.clear();
}
}
} else {
if (closing && pending == 0) {
closing = false;
closed = true;
closedObservers.forEach(Runnable::run);
closedObservers.clear();
} else if (isAvailable()) {
availableObservers.forEach(Runnable::run);
availableObservers.clear();
}
}
return completionError;
}

public long getMessageSent() {
return messageSent;
}

private void orderedIncrementSent() {
MESSAGE_SENT_UPDATER.lazySet(this, messageSent + 1);
}

public long getMessageCompleted() {
return messageCompleted;
}

private void orderedIncrementCompleted() {
MESSAGE_COMPLETED_UPDATER.lazySet(this, messageCompleted + 1);
}

public long getNotAvailable() {
return notAvailable;
}

private void orderedIncrementNotAvailable() {
NOT_AVAILABLE_UPDATER.lazySet(this, notAvailable + 1);
}

public void requestClose() {
requestClose(() -> {
});
}

public void requestClose(final Runnable onClosed) {
if (closed) {
onClosed.run();
return;
}
if (closing) {
closedObservers.add(onClosed);
return;
}
availableObservers.clear();
if (pending > 0) {
closing = true;
closedObservers.add(onClosed);
} else {
closed = true;
onClosed.run();
}
}
}

0 comments on commit f8b045b

Please sign in to comment.