Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ARTEMIS-2685 Not Block Netty Thread in any way for OpenWire #3057

Merged
merged 1 commit into from
Apr 2, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ public static void killServer(final Process server) throws Exception {
System.out.println("**********************************");
System.out.println("Killing server " + server);
System.out.println("**********************************");
server.destroy();
server.destroyForcibly();
server.waitFor();
Thread.sleep(1000);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,7 @@
import org.apache.activemq.artemis.spi.core.protocol.ConnectionEntry;
import org.apache.activemq.artemis.spi.core.remoting.Connection;
import org.apache.activemq.artemis.utils.UUIDGenerator;
import org.apache.activemq.artemis.utils.actors.Actor;
import org.apache.activemq.artemis.utils.collections.ConcurrentHashSet;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ActiveMQMessage;
Expand Down Expand Up @@ -191,6 +192,7 @@ public class OpenWireConnection extends AbstractRemotingConnection implements Se
private ConnectionEntry connectionEntry;
private boolean useKeepAlive;
private long maxInactivityDuration;
private Actor<Command> openWireActor;

private final Set<SimpleString> knownDestinations = new ConcurrentHashSet<>();

Expand Down Expand Up @@ -270,17 +272,32 @@ private static void traceBufferReceived(Object connectionID, Command command) {
@Override
public void bufferReceived(Object connectionID, ActiveMQBuffer buffer) {
super.bufferReceived(connectionID, buffer);
try {

recoverOperationContext();

try {
Command command = (Command) inWireFormat.unmarshal(buffer);

// log the openwire command
if (logger.isTraceEnabled()) {
traceBufferReceived(connectionID, command);
}

if (openWireActor != null) {
openWireActor.act(command);
} else {
act(command);
}
} catch (Exception e) {
ActiveMQServerLogger.LOGGER.debug(e);
sendException(e);
}

}


private void act(Command command) {
try {
recoverOperationContext();

boolean responseRequired = command.isResponseRequired();
int commandId = command.getCommandId();

Expand Down Expand Up @@ -734,6 +751,9 @@ public AMQConnectionContext initContext(ConnectionInfo info) throws Exception {

createInternalSession(info);

// the actor can only be used after the WireFormat has been initialized with versioning
this.openWireActor = new Actor<>(executor, this::act);

return context;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,15 +25,45 @@
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.Topic;
import java.util.Arrays;
import java.util.Collection;
import java.util.concurrent.atomic.AtomicInteger;

import org.apache.activemq.artemis.tests.smoke.common.SmokeTestBase;
import org.apache.activemq.artemis.utils.SpawnedVMSupport;
import org.apache.qpid.jms.JmsConnectionFactory;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;

@RunWith(Parameterized.class)
public class SoakPagingTest extends SmokeTestBase {

String protocol;
String consumerType;
boolean transaction;
final String destination;

public SoakPagingTest(String protocol, String consumerType, boolean transaction) {
this.protocol = protocol;
this.consumerType = consumerType;
this.transaction = transaction;

if (consumerType.equals("queue")) {
destination = "exampleQueue";
} else {
destination = "exampleTopic";
}
}

@Parameterized.Parameters(name = "protocol={0}, type={1}, tx={2}")
public static Collection<Object[]> getParams() {
return Arrays.asList(new Object[][]{{"AMQP", "shared", false}, {"AMQP", "queue", false}, {"OPENWIRE", "topic", false}, {"OPENWIRE", "queue", false}, {"CORE", "shared", false}, {"CORE", "queue", false},
{"AMQP", "shared", true}, {"AMQP", "queue", true}, {"OPENWIRE", "topic", true}, {"OPENWIRE", "queue", true}, {"CORE", "shared", true}, {"CORE", "queue", true}});
}

public static final String SERVER_NAME_0 = "replicated-static0";
public static final String SERVER_NAME_1 = "replicated-static1";

Expand All @@ -48,27 +78,55 @@ public void before() throws Exception {
cleanupData(SERVER_NAME_1);

server0 = startServer(SERVER_NAME_0, 0, 30000);
server1 = startServer(SERVER_NAME_1, 0, 30000);
}

final String destination = "exampleTopic";
static final int consumer_threads = 20;
static final int producer_threads = 20;
static AtomicInteger j = new AtomicInteger(0);

private static ConnectionFactory createConnectionFactory(String protocol, String uri) {
if (protocol.toUpperCase().equals("OPENWIRE")) {
return new org.apache.activemq.ActiveMQConnectionFactory(uri);
} else if (protocol.toUpperCase().equals("AMQP")) {

if (uri.startsWith("tcp://")) {
// replacing tcp:// by amqp://
uri = "amqp" + uri.substring(3);
}
return new JmsConnectionFactory(uri);
} else if (protocol.toUpperCase().equals("CORE") || protocol.toUpperCase().equals("ARTEMIS")) {
return new org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory(uri);
} else {
throw new IllegalStateException("Unkown:" + protocol);
}
}

public static void main(String[] arg) {
try {

if (arg.length != 4) {
System.err.println("You need to pass in protocol, consumerType, Time, transaction");
System.exit(0);
}

String protocol = arg[0];
String consumerType = arg[1];
int time = Integer.parseInt(arg[2]);
boolean tx = Boolean.parseBoolean(arg[3]);
if (time == 0) {
time = 15000;
}

final String host = "localhost";
final int port = 61616;

final ConnectionFactory factory = new org.apache.qpid.jms.JmsConnectionFactory("failover:(amqp://" + host + ":" + port + ")");
final ConnectionFactory factory = createConnectionFactory(protocol, "tcp://" + host + ":" + port);

for (int i = 0; i < producer_threads; i++) {
Thread t = new Thread(new Runnable() {
@Override
public void run() {
SoakPagingTest app = new SoakPagingTest();
SoakPagingTest app = new SoakPagingTest(protocol, consumerType, tx);
app.produce(factory);
}
});
Expand All @@ -81,36 +139,43 @@ public void run() {
Thread t = new Thread(new Runnable() {
@Override
public void run() {
SoakPagingTest app = new SoakPagingTest();
SoakPagingTest app = new SoakPagingTest(protocol, consumerType, tx);
app.consume(factory, j.getAndIncrement());
}
});
t.start();
}
Thread.sleep(15000);
Thread.sleep(time);

System.exit(consumed.get());
System.exit(consumed.get() > 0 ? 1 : 0);
} catch (Throwable e) {
e.printStackTrace();
System.exit(-1);
System.exit(0);
}

}

@Test
public void testPagingReplication() throws Throwable {
for (int i = 0; i < 3; i++) {
Process process = SpawnedVMSupport.spawnVM(SoakPagingTest.class.getName());
Assert.assertTrue(process.waitFor() > 0);

Process queueProcess = null;
if (consumerType.equals("queue")) {
queueProcess = SpawnedVMSupport.spawnVM(SoakPagingTest.class.getName(), protocol, consumerType, "45000", "" + transaction);
}

server1.destroy();
for (int i = 0; i < 3; i++) {
Process process = SpawnedVMSupport.spawnVM(SoakPagingTest.class.getName(), protocol, consumerType, "15000", "" + transaction);

server1 = startServer(SERVER_NAME_1, 0, 30000);
if (i == 0) {
server1 = startServer(SERVER_NAME_1, 0, 30000);
}

for (int i = 0; i < 2; i++) {
Process process = SpawnedVMSupport.spawnVM(SoakPagingTest.class.getName());
Assert.assertTrue(process.waitFor() > 0);
int result = process.waitFor();
Assert.assertTrue(result > 0);
}

if (queueProcess != null) {
Assert.assertTrue(queueProcess.waitFor() > 0);
}
}

Expand All @@ -124,9 +189,22 @@ public void produce(ConnectionFactory factory) {
Connection connection = factory.createConnection("admin", "admin");

connection.start();
final Session session = connection.createSession(false, Session.DUPS_OK_ACKNOWLEDGE);
final Session session;

if (transaction) {
session = connection.createSession(true, Session.SESSION_TRANSACTED);
} else {
session = connection.createSession(false, Session.DUPS_OK_ACKNOWLEDGE);
}

Destination address;

if (consumerType.equals("queue")) {
address = session.createQueue(destination);
} else {
address = session.createTopic(destination);
}

Destination address = session.createTopic(destination);
MessageProducer messageProducer = session.createProducer(address);

int i = 0;
Expand All @@ -142,8 +220,12 @@ public void produce(ConnectionFactory factory) {
messageProducer.send(message);
produced.incrementAndGet();
i++;
if (i % 100 == 0)
if (i % 100 == 0) {
System.out.println("Published " + i + " messages");
if (transaction) {
session.commit();
}
}
}
} catch (Exception e) {
e.printStackTrace();
Expand All @@ -154,11 +236,30 @@ public void consume(ConnectionFactory factory, int j) {
try {
Connection connection = factory.createConnection("admin", "admin");

final Session session = connection.createSession(false, Session.DUPS_OK_ACKNOWLEDGE);
final Session session;

if (transaction) {
session = connection.createSession(true, Session.SESSION_TRANSACTED);
} else {
session = connection.createSession(false, Session.DUPS_OK_ACKNOWLEDGE);
}

Destination address;

if (consumerType.equals("queue")) {
address = session.createQueue(destination);
} else {
address = session.createTopic(destination);
}

Topic address = session.createTopic(destination);
String consumerId = "ss" + (j % 5);
MessageConsumer messageConsumer = session.createSharedConsumer(address, consumerId);
MessageConsumer messageConsumer;

if (protocol.equals("shared")) {
messageConsumer = session.createSharedConsumer((Topic)address, consumerId);
} else {
messageConsumer = session.createConsumer(address);
}

Thread.sleep(5000);
connection.start();
Expand All @@ -170,8 +271,12 @@ public void consume(ConnectionFactory factory, int j) {
if (m == null)
System.out.println("receive() returned null");
i++;
if (i % 100 == 0)
if (i % 100 == 0) {
System.out.println("Consumed " + i + " messages");
if (transaction) {
session.commit();
}
}
}
} catch (Exception e) {
e.printStackTrace();
Expand Down