Skip to content

Commit

Permalink
ARTEMIS-2685 Not Block Netty Thread in any way for OpenWire
Browse files Browse the repository at this point in the history
  • Loading branch information
clebertsuconic committed Apr 1, 2020
1 parent 0624b08 commit bd77a53
Show file tree
Hide file tree
Showing 3 changed files with 152 additions and 27 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ public static void killServer(final Process server) throws Exception {
System.out.println("**********************************");
System.out.println("Killing server " + server);
System.out.println("**********************************");
server.destroy();
server.destroyForcibly();
server.waitFor();
Thread.sleep(1000);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,7 @@
import org.apache.activemq.artemis.spi.core.protocol.ConnectionEntry;
import org.apache.activemq.artemis.spi.core.remoting.Connection;
import org.apache.activemq.artemis.utils.UUIDGenerator;
import org.apache.activemq.artemis.utils.actors.Actor;
import org.apache.activemq.artemis.utils.collections.ConcurrentHashSet;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ActiveMQMessage;
Expand Down Expand Up @@ -191,6 +192,7 @@ public class OpenWireConnection extends AbstractRemotingConnection implements Se
private ConnectionEntry connectionEntry;
private boolean useKeepAlive;
private long maxInactivityDuration;
private Actor<Command> openWireActor;

private final Set<SimpleString> knownDestinations = new ConcurrentHashSet<>();

Expand Down Expand Up @@ -270,17 +272,32 @@ private static void traceBufferReceived(Object connectionID, Command command) {
@Override
public void bufferReceived(Object connectionID, ActiveMQBuffer buffer) {
super.bufferReceived(connectionID, buffer);
try {

recoverOperationContext();

try {
Command command = (Command) inWireFormat.unmarshal(buffer);

// log the openwire command
if (logger.isTraceEnabled()) {
traceBufferReceived(connectionID, command);
}

if (openWireActor != null) {
openWireActor.act(command);
} else {
act(command);
}
} catch (Exception e) {
ActiveMQServerLogger.LOGGER.debug(e);
sendException(e);
}

}


private void act(Command command) {
try {
recoverOperationContext();

boolean responseRequired = command.isResponseRequired();
int commandId = command.getCommandId();

Expand Down Expand Up @@ -734,6 +751,9 @@ public AMQConnectionContext initContext(ConnectionInfo info) throws Exception {

createInternalSession(info);

// the actor can only be used after the WireFormat has been initialized with versioning
this.openWireActor = new Actor<>(executor, this::act);

return context;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,15 +25,45 @@
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.Topic;
import java.util.Arrays;
import java.util.Collection;
import java.util.concurrent.atomic.AtomicInteger;

import org.apache.activemq.artemis.tests.smoke.common.SmokeTestBase;
import org.apache.activemq.artemis.utils.SpawnedVMSupport;
import org.apache.qpid.jms.JmsConnectionFactory;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;

@RunWith(Parameterized.class)
public class SoakPagingTest extends SmokeTestBase {

String protocol;
String consumerType;
boolean transaction;
final String destination;

public SoakPagingTest(String protocol, String consumerType, boolean transaction) {
this.protocol = protocol;
this.consumerType = consumerType;
this.transaction = transaction;

if (consumerType.equals("queue")) {
destination = "exampleQueue";
} else {
destination = "exampleTopic";
}
}

@Parameterized.Parameters(name = "protocol={0}, type={1}, tx={2}")
public static Collection<Object[]> getParams() {
return Arrays.asList(new Object[][]{{"AMQP", "shared", false}, {"AMQP", "queue", false}, {"OPENWIRE", "topic", false}, {"OPENWIRE", "queue", false}, {"CORE", "shared", false}, {"CORE", "queue", false},
{"AMQP", "shared", true}, {"AMQP", "queue", true}, {"OPENWIRE", "topic", true}, {"OPENWIRE", "queue", true}, {"CORE", "shared", true}, {"CORE", "queue", true}});
}

public static final String SERVER_NAME_0 = "replicated-static0";
public static final String SERVER_NAME_1 = "replicated-static1";

Expand All @@ -48,27 +78,55 @@ public void before() throws Exception {
cleanupData(SERVER_NAME_1);

server0 = startServer(SERVER_NAME_0, 0, 30000);
server1 = startServer(SERVER_NAME_1, 0, 30000);
}

final String destination = "exampleTopic";
static final int consumer_threads = 20;
static final int producer_threads = 20;
static AtomicInteger j = new AtomicInteger(0);

private static ConnectionFactory createConnectionFactory(String protocol, String uri) {
if (protocol.toUpperCase().equals("OPENWIRE")) {
return new org.apache.activemq.ActiveMQConnectionFactory(uri);
} else if (protocol.toUpperCase().equals("AMQP")) {

if (uri.startsWith("tcp://")) {
// replacing tcp:// by amqp://
uri = "amqp" + uri.substring(3);
}
return new JmsConnectionFactory(uri);
} else if (protocol.toUpperCase().equals("CORE") || protocol.toUpperCase().equals("ARTEMIS")) {
return new org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory(uri);
} else {
throw new IllegalStateException("Unkown:" + protocol);
}
}

public static void main(String[] arg) {
try {

if (arg.length != 4) {
System.err.println("You need to pass in protocol, consumerType, Time, transaction");
System.exit(0);
}

String protocol = arg[0];
String consumerType = arg[1];
int time = Integer.parseInt(arg[2]);
boolean tx = Boolean.parseBoolean(arg[3]);
if (time == 0) {
time = 15000;
}

final String host = "localhost";
final int port = 61616;

final ConnectionFactory factory = new org.apache.qpid.jms.JmsConnectionFactory("failover:(amqp://" + host + ":" + port + ")");
final ConnectionFactory factory = createConnectionFactory(protocol, "tcp://" + host + ":" + port);

for (int i = 0; i < producer_threads; i++) {
Thread t = new Thread(new Runnable() {
@Override
public void run() {
SoakPagingTest app = new SoakPagingTest();
SoakPagingTest app = new SoakPagingTest(protocol, consumerType, tx);
app.produce(factory);
}
});
Expand All @@ -81,36 +139,43 @@ public void run() {
Thread t = new Thread(new Runnable() {
@Override
public void run() {
SoakPagingTest app = new SoakPagingTest();
SoakPagingTest app = new SoakPagingTest(protocol, consumerType, tx);
app.consume(factory, j.getAndIncrement());
}
});
t.start();
}
Thread.sleep(15000);
Thread.sleep(time);

System.exit(consumed.get());
System.exit(consumed.get() > 0 ? 1 : 0);
} catch (Throwable e) {
e.printStackTrace();
System.exit(-1);
System.exit(0);
}

}

@Test
public void testPagingReplication() throws Throwable {
for (int i = 0; i < 3; i++) {
Process process = SpawnedVMSupport.spawnVM(SoakPagingTest.class.getName());
Assert.assertTrue(process.waitFor() > 0);

Process queueProcess = null;
if (consumerType.equals("queue")) {
queueProcess = SpawnedVMSupport.spawnVM(SoakPagingTest.class.getName(), protocol, consumerType, "45000", "" + transaction);
}

server1.destroy();
for (int i = 0; i < 3; i++) {
Process process = SpawnedVMSupport.spawnVM(SoakPagingTest.class.getName(), protocol, consumerType, "15000", "" + transaction);

server1 = startServer(SERVER_NAME_1, 0, 30000);
if (i == 0) {
server1 = startServer(SERVER_NAME_1, 0, 30000);
}

for (int i = 0; i < 2; i++) {
Process process = SpawnedVMSupport.spawnVM(SoakPagingTest.class.getName());
Assert.assertTrue(process.waitFor() > 0);
int result = process.waitFor();
Assert.assertTrue(result > 0);
}

if (queueProcess != null) {
Assert.assertTrue(queueProcess.waitFor() > 0);
}
}

Expand All @@ -124,9 +189,22 @@ public void produce(ConnectionFactory factory) {
Connection connection = factory.createConnection("admin", "admin");

connection.start();
final Session session = connection.createSession(false, Session.DUPS_OK_ACKNOWLEDGE);
final Session session;

if (transaction) {
session = connection.createSession(true, Session.SESSION_TRANSACTED);
} else {
session = connection.createSession(false, Session.DUPS_OK_ACKNOWLEDGE);
}

Destination address;

if (consumerType.equals("queue")) {
address = session.createQueue(destination);
} else {
address = session.createTopic(destination);
}

Destination address = session.createTopic(destination);
MessageProducer messageProducer = session.createProducer(address);

int i = 0;
Expand All @@ -142,8 +220,12 @@ public void produce(ConnectionFactory factory) {
messageProducer.send(message);
produced.incrementAndGet();
i++;
if (i % 100 == 0)
if (i % 100 == 0) {
System.out.println("Published " + i + " messages");
if (transaction) {
session.commit();
}
}
}
} catch (Exception e) {
e.printStackTrace();
Expand All @@ -154,11 +236,30 @@ public void consume(ConnectionFactory factory, int j) {
try {
Connection connection = factory.createConnection("admin", "admin");

final Session session = connection.createSession(false, Session.DUPS_OK_ACKNOWLEDGE);
final Session session;

if (transaction) {
session = connection.createSession(true, Session.SESSION_TRANSACTED);
} else {
session = connection.createSession(false, Session.DUPS_OK_ACKNOWLEDGE);
}

Destination address;

if (consumerType.equals("queue")) {
address = session.createQueue(destination);
} else {
address = session.createTopic(destination);
}

Topic address = session.createTopic(destination);
String consumerId = "ss" + (j % 5);
MessageConsumer messageConsumer = session.createSharedConsumer(address, consumerId);
MessageConsumer messageConsumer;

if (protocol.equals("shared")) {
messageConsumer = session.createSharedConsumer((Topic)address, consumerId);
} else {
messageConsumer = session.createConsumer(address);
}

Thread.sleep(5000);
connection.start();
Expand All @@ -170,8 +271,12 @@ public void consume(ConnectionFactory factory, int j) {
if (m == null)
System.out.println("receive() returned null");
i++;
if (i % 100 == 0)
if (i % 100 == 0) {
System.out.println("Consumed " + i + " messages");
if (transaction) {
session.commit();
}
}
}
} catch (Exception e) {
e.printStackTrace();
Expand Down

0 comments on commit bd77a53

Please sign in to comment.