From 42a2e4637f9061689a0a383067543a9895fcd234 Mon Sep 17 00:00:00 2001 From: Robbie Gemmell Date: Thu, 15 Feb 2024 15:54:24 +0000 Subject: [PATCH] ARTEMIS-4644: convert some broker-connection tests to use the ProtonJ2 test peer Changes from myself and Tim Bish --- pom.xml | 1 - tests/integration-tests/pom.xml | 13 - .../amqp/connect/AMQPConnectSaslTest.java | 469 ++++---------- .../integration/amqp/connect/MockServer.java | 81 --- .../amqp/connect/ValidateAMQPErrorsTest.java | 580 +++++++----------- 5 files changed, 343 insertions(+), 801 deletions(-) delete mode 100644 tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/MockServer.java diff --git a/pom.xml b/pom.xml index 264c780a0da..d208f015f46 100644 --- a/pom.xml +++ b/pom.xml @@ -187,7 +187,6 @@ 4.0.13 - 4.5.3 3.3.1 5.15.0 diff --git a/tests/integration-tests/pom.xml b/tests/integration-tests/pom.xml index 15d115d79de..198cc8e4a37 100644 --- a/tests/integration-tests/pom.xml +++ b/tests/integration-tests/pom.xml @@ -358,19 +358,6 @@ ${netty-tcnative-version} - - io.vertx - vertx-proton - ${vertx.version} - test - - - io.vertx - vertx-core - ${vertx.version} - test - - org.apache.activemq diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/AMQPConnectSaslTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/AMQPConnectSaslTest.java index 7140d75bfe5..f8b14b6bf39 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/AMQPConnectSaslTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/AMQPConnectSaslTest.java @@ -16,47 +16,27 @@ */ package org.apache.activemq.artemis.tests.integration.amqp.connect; -import java.nio.charset.StandardCharsets; -import java.security.MessageDigest; -import java.security.NoSuchAlgorithmException; -import java.security.SecureRandom; -import java.util.Objects; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.ExecutionException; +import java.lang.invoke.MethodHandles; +import java.net.URI; import java.util.concurrent.TimeUnit; -import javax.crypto.Mac; -import javax.security.auth.login.LoginException; - import org.apache.activemq.artemis.core.config.amqpBrokerConnectivity.AMQPBrokerConnectConfiguration; import org.apache.activemq.artemis.core.server.ActiveMQServer; -import org.apache.activemq.artemis.protocol.amqp.sasl.SASLResult; -import org.apache.activemq.artemis.protocol.amqp.sasl.scram.SCRAMServerSASL; -import org.apache.activemq.artemis.spi.core.security.scram.SCRAM; -import org.apache.activemq.artemis.spi.core.security.scram.ScramUtils; -import org.apache.activemq.artemis.spi.core.security.scram.UserData; import org.apache.activemq.artemis.tests.integration.amqp.AmqpClientTestSupport; -import org.apache.qpid.proton.engine.Sasl; -import org.apache.qpid.proton.engine.Sasl.SaslOutcome; -import org.apache.qpid.proton.engine.Transport; -import org.junit.After; -import org.junit.Before; +import org.apache.qpid.protonj2.test.driver.ProtonTestServer; +import org.apache.qpid.protonj2.test.driver.ProtonTestServerOptions; +import org.apache.qpid.protonj2.test.driver.codec.security.SaslCode; import org.junit.Test; - -import io.vertx.core.Handler; -import io.vertx.core.Vertx; -import io.vertx.core.http.ClientAuth; -import io.vertx.core.net.JksOptions; -import io.vertx.core.net.NetSocket; -import io.vertx.proton.ProtonConnection; -import io.vertx.proton.ProtonServerOptions; -import io.vertx.proton.sasl.ProtonSaslAuthenticator; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** * See the tests/security-resources/build.sh script for details on the security resources used. */ public class AMQPConnectSaslTest extends AmqpClientTestSupport { + private static final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass()); + private static final int BROKER_PORT_NUM = AMQP_PORT + 1; private static final String SERVER_KEYSTORE_NAME = "server-keystore.jks"; @@ -74,9 +54,7 @@ public class AMQPConnectSaslTest extends AmqpClientTestSupport { private static final String PLAIN = "PLAIN"; private static final String ANONYMOUS = "ANONYMOUS"; private static final String EXTERNAL = "EXTERNAL"; - - private Vertx vertx; - private MockServer mockServer; + private static final String SCRAM_SHA_512 = "SCRAM-SHA-512"; @Override protected ActiveMQServer createServer() throws Exception { @@ -85,366 +63,155 @@ protected ActiveMQServer createServer() throws Exception { return createServer(BROKER_PORT_NUM, false); } - @Before - @Override - public void setUp() throws Exception { - super.setUp(); - vertx = Vertx.vertx(); - } - - @After - @Override - public void tearDown() throws Exception { - try { - super.tearDown(); - } finally { - if (mockServer != null) { - mockServer.close(); - } - - CountDownLatch closeLatch = new CountDownLatch(1); - vertx.close(x -> closeLatch.countDown()); - assertTrue("Vert.x instant not closed in alotted time", closeLatch.await(5, TimeUnit.SECONDS)); - } - } - - @Test(timeout = 20000) + @Test(timeout = 20_000) public void testConnectsWithAnonymous() throws Exception { - CountDownLatch serverConnectionOpen = new CountDownLatch(1); - TestAuthenticator authenticator = new TestAuthenticator(true, PLAIN, ANONYMOUS); - - mockServer = new MockServer(vertx, () -> authenticator, serverConnection -> { - serverConnection.openHandler(serverSender -> { - serverConnectionOpen.countDown(); - serverConnection.closeHandler(x -> serverConnection.close()); - serverConnection.open(); - }); - }); + try (ProtonTestServer peer = new ProtonTestServer()) { + peer.expectSASLAnonymousConnect(PLAIN, ANONYMOUS); + peer.expectOpen().respond(); + peer.expectBegin().respond(); + peer.start(); - // No user or pass given, it will have to select ANONYMOUS even though PLAIN also offered - AMQPBrokerConnectConfiguration amqpConnection = - new AMQPBrokerConnectConfiguration("testSimpleConnect", "tcp://localhost:" + mockServer.actualPort()); - amqpConnection.setReconnectAttempts(0);// No reconnects + final URI remoteURI = peer.getServerURI(); + logger.debug("Connect test started, peer listening on: {}", remoteURI); - server.getConfiguration().addAMQPConnection(amqpConnection); + // No user or pass given, it will have to select ANONYMOUS even though PLAIN also offered + AMQPBrokerConnectConfiguration amqpConnection = + new AMQPBrokerConnectConfiguration(getTestName(), "tcp://localhost:" + remoteURI.getPort()); + amqpConnection.setReconnectAttempts(0);// No reconnects - server.start(); + server.getConfiguration().addAMQPConnection(amqpConnection); + server.start(); - boolean awaitConnectionOpen = serverConnectionOpen.await(10, TimeUnit.SECONDS); - assertTrue("Broker did not open connection in alotted time", awaitConnectionOpen); - - assertEquals(ANONYMOUS, authenticator.getChosenMech()); - assertArrayEquals(new byte[0], authenticator.getInitialResponse()); + peer.waitForScriptToComplete(5, TimeUnit.SECONDS); + } } - @Test(timeout = 20000) + @Test(timeout = 20_000) public void testConnectsWithPlain() throws Exception { - CountDownLatch serverConnectionOpen = new CountDownLatch(1); - TestAuthenticator authenticator = new TestAuthenticator(true, PLAIN, ANONYMOUS); - - mockServer = new MockServer(vertx, () -> authenticator, serverConnection -> { - serverConnection.openHandler(serverSender -> { - serverConnectionOpen.countDown(); - serverConnection.closeHandler(x -> serverConnection.close()); - serverConnection.open(); - }); - }); + try (ProtonTestServer peer = new ProtonTestServer()) { + peer.expectSASLPlainConnect(USER, PASSWD, PLAIN, ANONYMOUS); + peer.expectOpen().respond(); + peer.expectBegin().respond(); + peer.start(); - // User and pass are given, it will select PLAIN - AMQPBrokerConnectConfiguration amqpConnection = - new AMQPBrokerConnectConfiguration("testSimpleConnect", "tcp://localhost:" + mockServer.actualPort()); - amqpConnection.setReconnectAttempts(0);// No reconnects - amqpConnection.setUser(USER); - amqpConnection.setPassword(PASSWD); + final URI remoteURI = peer.getServerURI(); + logger.debug("Connect test started, peer listening on: {}", remoteURI); - server.getConfiguration().addAMQPConnection(amqpConnection); + // User and pass are given, it will select PLAIN + AMQPBrokerConnectConfiguration amqpConnection = + new AMQPBrokerConnectConfiguration(getTestName(), "tcp://localhost:" + remoteURI.getPort()); + amqpConnection.setReconnectAttempts(0);// No reconnects + amqpConnection.setUser(USER); + amqpConnection.setPassword(PASSWD); - server.start(); + server.getConfiguration().addAMQPConnection(amqpConnection); + server.start(); - boolean awaitConnectionOpen = serverConnectionOpen.await(10, TimeUnit.SECONDS); - assertTrue("Broker did not open connection in alotted time", awaitConnectionOpen); + peer.waitForScriptToComplete(5, TimeUnit.SECONDS); + } + } - assertEquals(PLAIN, authenticator.getChosenMech()); - assertArrayEquals(expectedPlainInitialResponse(USER, PASSWD), authenticator.getInitialResponse()); + @Test(timeout = 20_000) + public void testAnonymousSelectedWhenNoCredentialsSupplied() throws Exception { + doMechanismSelectedTestImpl(null, null, ANONYMOUS, new String[]{SCRAM_SHA_512, PLAIN, ANONYMOUS}); } - @Test(timeout = 200000) - public void testConnectsWithSCRAM() throws Exception { - CountDownLatch serverConnectionOpen = new CountDownLatch(1); - SCRAMTestAuthenticator authenticator = new SCRAMTestAuthenticator(SCRAM.SHA512); + @Test(timeout = 20_000) + public void testSelectsSCRAMWhenCredentialsPresent() throws Exception { + doMechanismSelectedTestImpl(USER, PASSWD, SCRAM_SHA_512, new String[]{SCRAM_SHA_512, PLAIN, ANONYMOUS}); + } - mockServer = new MockServer(vertx, () -> authenticator, serverConnection -> { - serverConnection.openHandler(serverSender -> { - serverConnectionOpen.countDown(); - serverConnection.closeHandler(x -> serverConnection.close()); - serverConnection.open(); - }); - }); + private void doMechanismSelectedTestImpl(String user, String passwd, String selectedMechanism, String[] offeredMechanisms) throws Exception { + try (ProtonTestServer peer = new ProtonTestServer()) { + peer.expectSaslConnectThatAlwaysFailsAuthentication(offeredMechanisms, selectedMechanism); + peer.start(); - AMQPBrokerConnectConfiguration amqpConnection = - new AMQPBrokerConnectConfiguration("testSScramConnect", "tcp://localhost:" + mockServer.actualPort()); - amqpConnection.setReconnectAttempts(0);// No reconnects - amqpConnection.setUser(USER); - amqpConnection.setPassword(PASSWD); + final URI remoteURI = peer.getServerURI(); + logger.debug("Connect test started, peer listening on: {}", remoteURI); - server.getConfiguration().addAMQPConnection(amqpConnection); + AMQPBrokerConnectConfiguration amqpConnection = new AMQPBrokerConnectConfiguration(getTestName(), + "tcp://localhost:" + remoteURI.getPort()); + amqpConnection.setReconnectAttempts(0);// No reconnects + if (user != null) { + amqpConnection.setUser(user); + } + if (passwd != null) { + amqpConnection.setPassword(passwd); + } - server.start(); + server.getConfiguration().addAMQPConnection(amqpConnection); - boolean awaitConnectionOpen = serverConnectionOpen.await(10, TimeUnit.SECONDS); - assertTrue("Broker did not open connection in alotted time", awaitConnectionOpen); - assertEquals(SCRAM.SHA512.getName(), authenticator.chosenMech); - assertTrue(authenticator.succeeded()); + server.start(); + peer.waitForScriptToComplete(5, TimeUnit.SECONDS); + } } - @Test(timeout = 20000) + @Test(timeout = 20_000) public void testConnectsWithExternal() throws Exception { doConnectWithExternalTestImpl(true); } - @Test(timeout = 20000) + @Test(timeout = 20_000) public void testExternalIgnoredWhenNoClientCertSupplied() throws Exception { doConnectWithExternalTestImpl(false); } - private void doConnectWithExternalTestImpl(boolean requireClientCert) throws ExecutionException, - InterruptedException, Exception { - CountDownLatch serverConnectionOpen = new CountDownLatch(1); - // The test server always offers EXTERNAL, i.e sometimes mistakenly, to verify that the broker - // only selects it when it actually - // has a client-cert. Real servers shouldnt actually offer the mechanism to a client that - // didnt have to provide a cert. - TestAuthenticator authenticator = new TestAuthenticator(true, EXTERNAL, PLAIN); - + private void doConnectWithExternalTestImpl(boolean requireClientCert) throws Exception { final String keyStorePath = this.getClass().getClassLoader().getResource(SERVER_KEYSTORE_NAME).getFile(); - JksOptions jksKeyStoreOptions = new JksOptions().setPath(keyStorePath).setPassword(SERVER_KEYSTORE_PASSWORD); - - ProtonServerOptions serverOptions = new ProtonServerOptions(); - serverOptions.setSsl(true); - serverOptions.setKeyStoreOptions(jksKeyStoreOptions); - - if (requireClientCert) { - final String trustStorePath = this.getClass().getClassLoader().getResource(CLIENT_TRUSTSTORE_NAME).getFile(); - JksOptions jksTrustStoreOptions = new JksOptions().setPath(trustStorePath).setPassword(CLIENT_TRUSTSTORE_PASSWORD); - - serverOptions.setTrustStoreOptions(jksTrustStoreOptions); - serverOptions.setClientAuth(ClientAuth.REQUIRED); - } - - mockServer = new MockServer(vertx, serverOptions, () -> authenticator, serverConnection -> { - serverConnection.openHandler(serverSender -> { - serverConnectionOpen.countDown(); - serverConnection.closeHandler(x -> serverConnection.close()); - serverConnection.open(); - }); - }); - - String amqpServerConnectionURI = "tcp://localhost:" + mockServer.actualPort() + - "?sslEnabled=true;trustStorePath=" + SERVER_TRUSTSTORE_NAME + ";trustStorePassword=" + SERVER_TRUSTSTORE_PASSWORD; - if (requireClientCert) { - amqpServerConnectionURI += - ";keyStorePath=" + CLIENT_KEYSTORE_NAME + ";keyStorePassword=" + CLIENT_KEYSTORE_PASSWORD; - } + final String trustStorePath = this.getClass().getClassLoader().getResource(CLIENT_TRUSTSTORE_NAME).getFile(); - AMQPBrokerConnectConfiguration amqpConnection = - new AMQPBrokerConnectConfiguration("testSimpleConnect", amqpServerConnectionURI); - amqpConnection.setReconnectAttempts(0);// No reconnects - amqpConnection.setUser(USER); // Wont matter if EXTERNAL is offered and a client-certificate - // is provided, but will otherwise. - amqpConnection.setPassword(PASSWD); - - server.getConfiguration().addAMQPConnection(amqpConnection); - - server.start(); - - boolean awaitConnectionOpen = serverConnectionOpen.await(10, TimeUnit.SECONDS); - assertTrue("Broker did not open connection in alotted time", awaitConnectionOpen); + ProtonTestServerOptions serverOptions = new ProtonTestServerOptions(); + serverOptions.setSecure(true); + serverOptions.setKeyStoreLocation(keyStorePath); + serverOptions.setKeyStorePassword(SERVER_KEYSTORE_PASSWORD); + serverOptions.setVerifyHost(false); if (requireClientCert) { - assertEquals(EXTERNAL, authenticator.getChosenMech()); - assertArrayEquals(new byte[0], authenticator.getInitialResponse()); - } else { - assertEquals(PLAIN, authenticator.getChosenMech()); - assertArrayEquals(expectedPlainInitialResponse(USER, PASSWD), authenticator.getInitialResponse()); - } - } - - private static byte[] expectedPlainInitialResponse(String username, String password) { - Objects.requireNonNull(username); - Objects.requireNonNull(password); - if (username.isEmpty() || password.isEmpty()) { - throw new IllegalArgumentException("Must provide at least 1 character in user and pass"); - } - - byte[] usernameBytes = username.getBytes(StandardCharsets.UTF_8); - byte[] passwordBytes = password.getBytes(StandardCharsets.UTF_8); - - byte[] data = new byte[usernameBytes.length + passwordBytes.length + 2]; - System.arraycopy(usernameBytes, 0, data, 1, usernameBytes.length); - System.arraycopy(passwordBytes, 0, data, 2 + usernameBytes.length, passwordBytes.length); - - return data; - } - - private static final class TestAuthenticator implements ProtonSaslAuthenticator { - private Sasl sasl; - private final boolean succeed; - private final String[] offeredMechs; - String chosenMech = null; - byte[] initialResponse = null; - boolean done = false; - - TestAuthenticator(boolean succeed, String... offeredMechs) { - if (offeredMechs.length == 0) { - throw new IllegalArgumentException("Must provide at least 1 mechanism to offer"); - } - - this.offeredMechs = offeredMechs; - this.succeed = succeed; - } - - @Override - public void init(NetSocket socket, ProtonConnection protonConnection, Transport transport) { - this.sasl = transport.sasl(); - sasl.server(); - sasl.allowSkip(false); - sasl.setMechanisms(offeredMechs); - } - - @Override - public void process(Handler processComplete) { - if (!done) { - String[] remoteMechanisms = sasl.getRemoteMechanisms(); - if (remoteMechanisms.length > 0) { - chosenMech = remoteMechanisms[0]; - - initialResponse = new byte[sasl.pending()]; - sasl.recv(initialResponse, 0, initialResponse.length); - if (succeed) { - sasl.done(SaslOutcome.PN_SASL_OK); - } else { - sasl.done(SaslOutcome.PN_SASL_AUTH); - } - - done = true; - } - } - - processComplete.handle(done); - } - - @Override - public boolean succeeded() { - return succeed; - } - - public String getChosenMech() { - return chosenMech; - } - - public byte[] getInitialResponse() { - return initialResponse; - } - } - - private static final class SCRAMTestAuthenticator implements ProtonSaslAuthenticator { - - private final SCRAM mech; - private Sasl sasl; - private TestSCRAMServerSASL serverSASL; - private String chosenMech; - - SCRAMTestAuthenticator(SCRAM mech) { - this.mech = mech; - } - - @Override - public void init(NetSocket socket, ProtonConnection protonConnection, Transport transport) { - this.sasl = transport.sasl(); - sasl.server(); - sasl.allowSkip(false); - sasl.setMechanisms(mech.getName(), PLAIN, ANONYMOUS); - try { - serverSASL = new TestSCRAMServerSASL(mech); - } catch (NoSuchAlgorithmException e) { - throw new AssertionError(e); - } - - } - - @Override - public void process(Handler completionHandler) { - String[] remoteMechanisms = sasl.getRemoteMechanisms(); - int pending = sasl.pending(); - if (remoteMechanisms.length == 0 || pending == 0) { - completionHandler.handle(false); - return; - } - chosenMech = remoteMechanisms[0]; - byte[] msg = new byte[pending]; - sasl.recv(msg, 0, msg.length); - byte[] result = serverSASL.processSASL(msg); - if (result != null) { - sasl.send(result, 0, result.length); - } - boolean ended = serverSASL.isEnded(); - if (ended) { - if (succeeded()) { - sasl.done(SaslOutcome.PN_SASL_OK); - } else { - sasl.done(SaslOutcome.PN_SASL_AUTH); - } - completionHandler.handle(true); + serverOptions.setNeedClientAuth(true); + serverOptions.setTrustStoreLocation(trustStorePath); + serverOptions.setTrustStorePassword(CLIENT_TRUSTSTORE_PASSWORD); + } + + try (ProtonTestServer peer = new ProtonTestServer(serverOptions)) { + // The test server always offers EXTERNAL, i.e sometimes mistakenly, to verify that the broker + // only selects it when it actually has a client-cert. Real servers shouldn't actually offer + // the mechanism to a client that didn't have to provide a cert. + peer.expectSASLHeader().respondWithSASLHeader(); + peer.remoteSaslMechanisms().withMechanisms(EXTERNAL, PLAIN).queue(); + if (requireClientCert) { + peer.expectSaslInit().withMechanism(EXTERNAL).withInitialResponse(new byte[0]); } else { - completionHandler.handle(false); + peer.expectSaslInit().withMechanism(PLAIN).withInitialResponse(peer.saslPlainInitialResponse(USER, PASSWD)); + } + peer.remoteSaslOutcome().withCode(SaslCode.OK).queue(); + peer.expectAMQPHeader().respondWithAMQPHeader(); + peer.expectOpen().respond(); + peer.expectBegin().respond(); + peer.start(); + + final URI remoteURI = peer.getServerURI(); + logger.debug("Connect test started, peer listening on: {}", remoteURI); + + String amqpServerConnectionURI = "tcp://localhost:" + remoteURI.getPort() + + "?sslEnabled=true;trustStorePath=" + SERVER_TRUSTSTORE_NAME + ";trustStorePassword=" + SERVER_TRUSTSTORE_PASSWORD; + if (requireClientCert) { + amqpServerConnectionURI += + ";keyStorePath=" + CLIENT_KEYSTORE_NAME + ";keyStorePassword=" + CLIENT_KEYSTORE_PASSWORD; } - } - - @Override - public boolean succeeded() { - SASLResult result = serverSASL.result(); - return result != null && result.isSuccess() && serverSASL.e == null; - } - - } - - private static final class TestSCRAMServerSASL extends SCRAMServerSASL { - private Exception e; + AMQPBrokerConnectConfiguration amqpConnection = + new AMQPBrokerConnectConfiguration(getTestName(), amqpServerConnectionURI); + amqpConnection.setReconnectAttempts(0);// No reconnects + amqpConnection.setUser(USER); // Wont matter if EXTERNAL is offered and a client-certificate + // is provided, but will otherwise. + amqpConnection.setPassword(PASSWD); - TestSCRAMServerSASL(SCRAM mechanism) throws NoSuchAlgorithmException { - super(mechanism); - } - - @Override - public void done() { - // nothing to do - } + server.getConfiguration().addAMQPConnection(amqpConnection); - @Override - protected UserData aquireUserData(String userName) throws LoginException { - if (!USER.equals(userName)) { - throw new LoginException("invalid username"); - } - byte[] salt = new byte[32]; - new SecureRandom().nextBytes(salt); - try { - MessageDigest digest = MessageDigest.getInstance(mechanism.getDigest()); - Mac hmac = Mac.getInstance(mechanism.getHmac()); - ScramUtils.NewPasswordStringData data = - ScramUtils.byteArrayToStringData(ScramUtils.newPassword(PASSWD, salt, 4096, digest, hmac)); - return new UserData(data.salt, data.iterations, data.serverKey, data.storedKey); - } catch (Exception e) { - throw new LoginException(e.getMessage()); - } - } + server.start(); - @Override - protected void failed(Exception e) { - this.e = e; + peer.waitForScriptToComplete(5, TimeUnit.SECONDS); } - } - } diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/MockServer.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/MockServer.java deleted file mode 100644 index 94680f1c419..00000000000 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/MockServer.java +++ /dev/null @@ -1,81 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.activemq.artemis.tests.integration.amqp.connect; - -import io.vertx.core.Handler; -import io.vertx.core.Vertx; -import io.vertx.proton.ProtonConnection; -import io.vertx.proton.ProtonServer; -import io.vertx.proton.ProtonServerOptions; -import io.vertx.proton.sasl.ProtonSaslAuthenticatorFactory; - -import java.util.Objects; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.atomic.AtomicReference; - -public class MockServer { - private ProtonServer server; - - public MockServer(Vertx vertx, Handler connectionHandler) throws ExecutionException, InterruptedException { - this(vertx, new ProtonServerOptions(), null, connectionHandler); - } - - public MockServer(Vertx vertx, ProtonSaslAuthenticatorFactory authFactory, Handler connectionHandler) throws ExecutionException, InterruptedException { - this(vertx, new ProtonServerOptions(), authFactory, connectionHandler); - } - - public MockServer(Vertx vertx, ProtonServerOptions options, ProtonSaslAuthenticatorFactory authFactory, Handler connectionHandler) throws ExecutionException, InterruptedException { - Objects.requireNonNull(options, "options must not be null"); - - server = ProtonServer.create(vertx, options); - server.connectHandler(connectionHandler); - if (authFactory != null) { - server.saslAuthenticatorFactory(authFactory); - } - - AtomicReference failure = new AtomicReference<>(); - CountDownLatch latch = new CountDownLatch(1); - - // Passing port 0 to have the server choose port at bind. - // Use actualPort() to discover port used. - server.listen(0, res -> { - if (!res.succeeded()) { - failure.set(res.cause()); - } - latch.countDown(); - }); - - latch.await(); - - if (failure.get() != null) { - throw new ExecutionException(failure.get()); - } - } - - public int actualPort() { - return server.actualPort(); - } - - public void close() { - server.close(); - } - - ProtonServer getProtonServer() { - return server; - } -} diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/ValidateAMQPErrorsTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/ValidateAMQPErrorsTest.java index 00737373a86..e45d5904871 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/ValidateAMQPErrorsTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/ValidateAMQPErrorsTest.java @@ -16,26 +16,21 @@ */ package org.apache.activemq.artemis.tests.integration.amqp.connect; +import static java.util.EnumSet.of; +import static org.apache.qpid.proton.engine.EndpointState.ACTIVE; +import static org.hamcrest.CoreMatchers.equalTo; +import java.lang.invoke.MethodHandles; +import java.net.URI; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; import javax.jms.Connection; import javax.jms.ConnectionFactory; +import javax.jms.JMSException; import javax.jms.MessageConsumer; import javax.jms.MessageProducer; import javax.jms.Session; import javax.jms.TextMessage; -import java.net.URI; -import java.util.ArrayList; -import java.util.Collections; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.CyclicBarrier; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicInteger; -import io.vertx.core.Vertx; -import io.vertx.proton.ProtonConnection; -import io.vertx.proton.ProtonServerOptions; import org.apache.activemq.artemis.core.config.amqpBrokerConnectivity.AMQPBrokerConnectConfiguration; import org.apache.activemq.artemis.core.config.amqpBrokerConnectivity.AMQPBrokerConnectionAddressType; import org.apache.activemq.artemis.core.config.amqpBrokerConnectivity.AMQPBrokerConnectionElement; @@ -49,62 +44,35 @@ import org.apache.activemq.artemis.tests.integration.amqp.AmqpClientTestSupport; import org.apache.activemq.artemis.tests.util.CFUtil; import org.apache.activemq.artemis.tests.util.Wait; -import org.apache.activemq.artemis.utils.collections.ConcurrentHashSet; -import org.apache.activemq.transport.amqp.client.AmqpClient; -import org.apache.activemq.transport.amqp.client.AmqpConnection; -import org.apache.activemq.transport.amqp.client.AmqpSession; -import org.apache.activemq.transport.amqp.client.AmqpValidator; -import org.apache.qpid.proton.amqp.Symbol; -import org.apache.qpid.proton.amqp.messaging.Accepted; import org.apache.qpid.proton.amqp.transport.AmqpError; -import org.apache.qpid.proton.amqp.transport.ErrorCondition; -import org.apache.qpid.proton.amqp.transport.Target; import org.apache.qpid.proton.engine.Link; import org.apache.qpid.proton.engine.Receiver; -import org.apache.qpid.proton.engine.Sender; import org.apache.qpid.proton.engine.impl.ConnectionImpl; -import org.junit.After; +import org.apache.qpid.protonj2.test.driver.ProtonTestClient; +import org.apache.qpid.protonj2.test.driver.ProtonTestPeer; +import org.apache.qpid.protonj2.test.driver.ProtonTestServer; +import org.apache.qpid.protonj2.test.driver.matchers.messaging.ApplicationPropertiesMatcher; +import org.apache.qpid.protonj2.test.driver.matchers.messaging.DeliveryAnnotationsMatcher; +import org.apache.qpid.protonj2.test.driver.matchers.messaging.HeaderMatcher; +import org.apache.qpid.protonj2.test.driver.matchers.messaging.MessageAnnotationsMatcher; +import org.apache.qpid.protonj2.test.driver.matchers.messaging.PropertiesMatcher; +import org.apache.qpid.protonj2.test.driver.matchers.transport.TransferPayloadCompositeMatcher; +import org.apache.qpid.protonj2.test.driver.matchers.types.EncodedAmqpValueMatcher; +import org.hamcrest.Matchers; import org.junit.Assert; import org.junit.Test; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.lang.invoke.MethodHandles; - -import static java.util.EnumSet.of; -import static org.apache.qpid.proton.engine.EndpointState.ACTIVE; /** - * This test will make sure the Broker connection will react accordingly to a few misconfigs and possible errors on the network of brokers and eventually qipd-dispatch. + * This test will make sure the Broker Connection will react accordingly to a few + * misconfigs and possible errors on either side of the connection. */ public class ValidateAMQPErrorsTest extends AmqpClientTestSupport { private static final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass()); protected static final int AMQP_PORT_2 = 5673; - protected Vertx vertx; - - protected MockServer mockServer; - - public void startVerx() { - vertx = Vertx.vertx(); - } - - @After - public void stop() throws Exception { - if (mockServer != null) { - mockServer.close(); - mockServer = null; - } - if (vertx != null) { - try { - CountDownLatch latch = new CountDownLatch(1); - vertx.close((x) -> latch.countDown()); - Assert.assertTrue(latch.await(10, TimeUnit.SECONDS)); - } finally { - vertx = null; - } - } - } @Override protected ActiveMQServer createServer() throws Exception { @@ -115,11 +83,12 @@ protected ActiveMQServer createServer() throws Exception { * Connecting to itself should issue an error. * and the max retry should still be counted, not just keep connecting forever. */ - @Test + @Test(timeout = 30_000) public void testConnectItself() throws Exception { try (AssertionLoggerHandler loggerHandler = new AssertionLoggerHandler()) { - AMQPBrokerConnectConfiguration amqpConnection = new AMQPBrokerConnectConfiguration("test", "tcp://localhost:" + AMQP_PORT).setReconnectAttempts(10).setRetryInterval(1); + AMQPBrokerConnectConfiguration amqpConnection = new AMQPBrokerConnectConfiguration(getTestName(), "tcp://localhost:" + AMQP_PORT); + amqpConnection.setReconnectAttempts(10).setRetryInterval(1); amqpConnection.addElement(new AMQPMirrorBrokerConnectionElement()); server.getConfiguration().addAMQPConnection(amqpConnection); @@ -127,23 +96,24 @@ public void testConnectItself() throws Exception { Assert.assertEquals(1, server.getBrokerConnections().size()); server.getBrokerConnections().forEach((t) -> Wait.assertFalse(t::isStarted)); - Wait.assertTrue(() -> loggerHandler.findText("AMQ111001")); // max retry + Wait.assertTrue(() -> loggerHandler.findText("AMQ111001"), 5000, 25); // max retry } try (AssertionLoggerHandler loggerHandler = new AssertionLoggerHandler()) { - Thread.sleep(100); + Thread.sleep(50); Assert.assertFalse(loggerHandler.findText("AMQ111002")); // there shouldn't be a retry after the last failure Assert.assertFalse(loggerHandler.findText("AMQ111003")); // there shouldn't be a retry after the last failure } } - @Test + @Test(timeout = 30_000) public void testCloseLinkOnMirror() throws Exception { try (AssertionLoggerHandler loggerHandler = new AssertionLoggerHandler()) { ActiveMQServer server2 = createServer(AMQP_PORT_2, false); - AMQPBrokerConnectConfiguration amqpConnection = new AMQPBrokerConnectConfiguration("test", "tcp://localhost:" + AMQP_PORT_2).setReconnectAttempts(-1).setRetryInterval(10); + AMQPBrokerConnectConfiguration amqpConnection = new AMQPBrokerConnectConfiguration(getTestName(), "tcp://localhost:" + AMQP_PORT_2); + amqpConnection.setReconnectAttempts(1000).setRetryInterval(10); amqpConnection.addElement(new AMQPMirrorBrokerConnectionElement()); server.getConfiguration().addAMQPConnection(amqpConnection); @@ -197,33 +167,33 @@ public void testCloseLinkOnMirror() throws Exception { Assert.assertEquals("message " + i, ((TextMessage) consumer.receive(5000)).getText()); } } - } } - @Test + @Test(timeout = 30_000) public void testCloseLinkOnSender() throws Exception { - testCloseLink(true); + doCloseLinkTestImpl(true); } - @Test + @Test(timeout = 30_000) public void testCloseLinkOnReceiver() throws Exception { - testCloseLink(false); + doCloseLinkTestImpl(false); } - public void testCloseLink(boolean isSender) throws Exception { - + private void doCloseLinkTestImpl(boolean isSender) throws Exception { AtomicInteger errors = new AtomicInteger(0); try (AssertionLoggerHandler loggerHandler = new AssertionLoggerHandler()) { ActiveMQServer server2 = createServer(AMQP_PORT_2, false); if (isSender) { - AMQPBrokerConnectConfiguration amqpConnection = new AMQPBrokerConnectConfiguration("test", "tcp://localhost:" + AMQP_PORT_2).setReconnectAttempts(-1).setRetryInterval(10); + AMQPBrokerConnectConfiguration amqpConnection = new AMQPBrokerConnectConfiguration(getTestName(), "tcp://localhost:" + AMQP_PORT_2); + amqpConnection.setReconnectAttempts(1000).setRetryInterval(10); amqpConnection.addElement(new AMQPBrokerConnectionElement().setMatchAddress(getQueueName()).setType(AMQPBrokerConnectionAddressType.SENDER)); server.getConfiguration().addAMQPConnection(amqpConnection); } else { - AMQPBrokerConnectConfiguration amqpConnection = new AMQPBrokerConnectConfiguration("test", "tcp://localhost:" + AMQP_PORT).setReconnectAttempts(-1).setRetryInterval(10); + AMQPBrokerConnectConfiguration amqpConnection = new AMQPBrokerConnectConfiguration(getTestName(), "tcp://localhost:" + AMQP_PORT); + amqpConnection.setReconnectAttempts(1000).setRetryInterval(10); amqpConnection.addElement(new AMQPBrokerConnectionElement().setMatchAddress(getQueueName()).setType(AMQPBrokerConnectionAddressType.RECEIVER)); server2.getConfiguration().addAMQPConnection(amqpConnection); } @@ -296,176 +266,119 @@ public void testCloseLink(boolean isSender) throws Exception { } Assert.assertEquals(0, errors.get()); - } - @Test + @Test(timeout = 30_000) public void testTimeoutOnSenderOpen() throws Exception { + try (ProtonTestServer peer = new ProtonTestServer()) { + // Initial attempt + expectConnectionButDontRespondToSenderAttach(peer); + // Second attempt (reconnect) + expectConnectionButDontRespondToSenderAttach(peer); - startVerx(); + peer.start(); - ProtonServerOptions serverOptions = new ProtonServerOptions(); - - mockServer = new MockServer(vertx, serverOptions, null, serverConnection -> { - serverConnection.openHandler(serverSender -> { - serverConnection.closeHandler(x -> serverConnection.close()); - serverConnection.open(); - }); - serverConnection.sessionOpenHandler((s) -> { - s.open(); - }); - serverConnection.senderOpenHandler((x) -> { - x.open(); - }); - serverConnection.receiverOpenHandler((x) -> { - //x.open(); // I'm missing the open, so it won't ever connect - }); - }); + final URI remoteURI = peer.getServerURI(); + logger.debug("Connect test started, peer listening on: {}", remoteURI); - try (AssertionLoggerHandler loggerHandler = new AssertionLoggerHandler()) { + try (AssertionLoggerHandler loggerHandler = new AssertionLoggerHandler()) { - AMQPBrokerConnectConfiguration amqpConnection = new AMQPBrokerConnectConfiguration("test", "tcp://localhost:" + mockServer.actualPort() + "?connect-timeout-millis=20").setReconnectAttempts(5).setRetryInterval(10); - amqpConnection.addElement(new AMQPBrokerConnectionElement().setMatchAddress(getQueueName()).setType(AMQPBrokerConnectionAddressType.SENDER)); - amqpConnection.addElement(new AMQPMirrorBrokerConnectionElement()); - server.getConfiguration().addAMQPConnection(amqpConnection); - server.start(); + AMQPBrokerConnectConfiguration amqpConnection = new AMQPBrokerConnectConfiguration(getTestName(), + "tcp://localhost:" + remoteURI.getPort() + "?connect-timeout-millis=100"); + amqpConnection.setReconnectAttempts(1).setRetryInterval(100); + amqpConnection.addElement(new AMQPBrokerConnectionElement().setMatchAddress(getQueueName()).setType(AMQPBrokerConnectionAddressType.SENDER)); + amqpConnection.addElement(new AMQPMirrorBrokerConnectionElement()); + server.getConfiguration().addAMQPConnection(amqpConnection); + server.start(); - Wait.assertTrue(() -> loggerHandler.findText("AMQ111001")); - Wait.assertEquals(6, () -> loggerHandler.countText("AMQ119020")); // 0..5 == 6 + peer.waitForScriptToComplete(5, TimeUnit.SECONDS); - } finally { - mockServer.close(); + assertEquals(2, loggerHandler.countText("AMQ119020")); // Initial + reconnect + assertEquals(1, loggerHandler.countText("AMQ111001")); + } } } - @Test + @Test(timeout = 30_000) public void testReconnectAfterSenderOpenTimeout() throws Exception { + try (ProtonTestServer peer = new ProtonTestServer()) { + // Initial attempt, times out + expectConnectionButDontRespondToSenderAttach(peer); + // Second attempt, times out (reconnect) + expectConnectionButDontRespondToSenderAttach(peer); + + // Third attempt, succeeds (reconnect) + peer.expectSASLAnonymousConnect(); + peer.expectOpen().respond(); + peer.expectBegin().respond(); + peer.expectAttach().respondInKind() + .withProperty(AMQPMirrorControllerSource.BROKER_ID.toString(), "Test-Broker"); + peer.remoteFlow().withLinkCredit(1000).queue(); + peer.expectTransfer().accept(); // Notification address create + peer.expectTransfer().accept(); // Address create for odd MQTT address + peer.start(); + + final URI remoteURI = peer.getServerURI(); + logger.debug("Connect test started, peer listening on: {}", remoteURI); + + try (AssertionLoggerHandler loggerHandler = new AssertionLoggerHandler()) { + + AMQPBrokerConnectConfiguration amqpConnection = new AMQPBrokerConnectConfiguration(getTestName(), + "tcp://localhost:" + remoteURI.getPort() + "?connect-timeout-millis=100"); + amqpConnection.setReconnectAttempts(10).setRetryInterval(100); + amqpConnection.addElement(new AMQPMirrorBrokerConnectionElement()); - startVerx(); + server.getConfiguration().addAMQPConnection(amqpConnection); + server.start(); - try (AssertionLoggerHandler loggerHandler = new AssertionLoggerHandler()) { + int msgCount = 10; - ProtonServerOptions serverOptions = new ProtonServerOptions(); - - AtomicInteger countOpen = new AtomicInteger(0); - CyclicBarrier startFlag = new CyclicBarrier(2); - CountDownLatch blockBeforeOpen = new CountDownLatch(1); - AtomicInteger disconnects = new AtomicInteger(0); - AtomicInteger messagesReceived = new AtomicInteger(0); - AtomicInteger errors = new AtomicInteger(0); - - ConcurrentHashSet connections = new ConcurrentHashSet<>(); - - mockServer = new MockServer(vertx, serverOptions, null, serverConnection -> { - serverConnection.disconnectHandler(c -> { - disconnects.incrementAndGet(); // number of retries - connections.remove(c); - }); - serverConnection.openHandler(serverSender -> { - serverConnection.closeHandler(x -> { - serverConnection.close(); - connections.remove(serverConnection); - }); - serverConnection.open(); - connections.add(serverConnection); - }); - serverConnection.sessionOpenHandler((s) -> { - s.open(); - }); - serverConnection.senderOpenHandler((x) -> { - x.open(); - }); - serverConnection.receiverOpenHandler((x) -> { - if (countOpen.incrementAndGet() > 2) { - if (countOpen.get() == 3) { - try { - startFlag.await(10, TimeUnit.SECONDS); - blockBeforeOpen.await(10, TimeUnit.SECONDS); - return; - } catch (Throwable ignored) { - } - } - HashMap brokerIDProperties = new HashMap<>(); - brokerIDProperties.put(AMQPMirrorControllerSource.BROKER_ID, "fake-id"); - x.setProperties(brokerIDProperties); - x.setOfferedCapabilities(new Symbol[]{AMQPMirrorControllerSource.MIRROR_CAPABILITY}); - x.setTarget(x.getRemoteTarget()); - x.open(); - x.handler((del, msg) -> { - if (msg.getApplicationProperties() != null) { - Map map = msg.getApplicationProperties().getValue(); - Object value = map.get("sender"); - if (value != null) { - if (messagesReceived.get() != ((Integer) value).intValue()) { - logger.warn("Message out of order. Expected {} but received {}", messagesReceived.get(), value); - errors.incrementAndGet(); - } - messagesReceived.incrementAndGet(); - } - } - }); - } - }); - }); + peer.waitForScriptToComplete(5, TimeUnit.SECONDS); - AMQPBrokerConnectConfiguration amqpConnection = new AMQPBrokerConnectConfiguration("test", "tcp://localhost:" + mockServer.actualPort() + "?connect-timeout-millis=1000").setReconnectAttempts(10).setRetryInterval(10); - amqpConnection.addElement(new AMQPMirrorBrokerConnectionElement()); - server.getConfiguration().addAMQPConnection(amqpConnection); - server.start(); - - startFlag.await(10, TimeUnit.SECONDS); - blockBeforeOpen.countDown(); + peer.expectTransfer().accept(); // Address create + peer.expectTransfer().accept(); // Queue create + for (int i = 0; i < msgCount; ++i) { + expectMirroredJMSMessage(peer, i); + } - Wait.assertEquals(2, disconnects::intValue); - Wait.assertEquals(1, connections::size); + Wait.assertEquals(2, () -> loggerHandler.countText("AMQ119020"), 2000, 25); - Wait.assertEquals(3, () -> loggerHandler.countText("AMQ119020")); + sendJMSMessage(msgCount, getQueueName()); - ConnectionFactory factory = CFUtil.createConnectionFactory("AMQP", "tcp://localhost:" + AMQP_PORT); - try (Connection connection = factory.createConnection()) { - Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); - MessageProducer producer = session.createProducer(session.createQueue(getQueueName())); - for (int i = 0; i < 100; i++) { - TextMessage message = session.createTextMessage("hello"); - message.setIntProperty("sender", i); - producer.send(message); - } + peer.waitForScriptToComplete(5, TimeUnit.SECONDS); } - - Wait.assertEquals(100, messagesReceived::intValue, 5000); - Assert.assertEquals(0, errors.get(), 5000); } } - @Test + @Test(timeout = 30_000) public void testNoServerOfferedMirrorCapability() throws Exception { - startVerx(); + try (ProtonTestServer peer = new ProtonTestServer()) { + for (int i = 0; i < 3; ++i) { + peer.expectSASLAnonymousConnect(); + peer.expectOpen().respond(); + peer.expectBegin().respond(); + peer.expectAttach().ofSender().respond(); // Omits mirror capabilities + peer.expectConnectionToDrop(); + } + peer.start(); - mockServer = new MockServer(vertx, serverConnection -> { - serverConnection.openHandler(serverSender -> { - serverConnection.open(); - }); - serverConnection.sessionOpenHandler((s) -> { - s.open(); - }); - serverConnection.senderOpenHandler((x) -> { - x.open(); - }); - serverConnection.receiverOpenHandler((x) -> { - x.setTarget(x.getRemoteTarget()); - x.open(); - }); - }); + final URI remoteURI = peer.getServerURI(); + logger.debug("Connect test started, peer listening on: {}", remoteURI); - try (AssertionLoggerHandler loggerHandler = new AssertionLoggerHandler()) { + try (AssertionLoggerHandler loggerHandler = new AssertionLoggerHandler()) { + final AMQPBrokerConnectConfiguration amqpConnection = new AMQPBrokerConnectConfiguration( + getTestName(), "tcp://localhost:" + remoteURI.getPort() + "?connect-timeout-millis=3000"); + amqpConnection.setReconnectAttempts(2).setRetryInterval(100); + amqpConnection.addElement(new AMQPMirrorBrokerConnectionElement()); - AMQPBrokerConnectConfiguration amqpConnection = new AMQPBrokerConnectConfiguration("test", "tcp://localhost:" + mockServer.actualPort() + "?connect-timeout-millis=100").setReconnectAttempts(5).setRetryInterval(10); - amqpConnection.addElement(new AMQPMirrorBrokerConnectionElement()); - server.getConfiguration().addAMQPConnection(amqpConnection); - server.start(); + server.getConfiguration().addAMQPConnection(amqpConnection); + server.start(); - Wait.assertTrue(() -> loggerHandler.findText("AMQ111001")); - Assert.assertEquals(6, loggerHandler.countText("AMQ119018")); // 0..5 = 6 + Wait.assertTrue(() -> loggerHandler.findText("AMQ111001")); + Assert.assertEquals(3, loggerHandler.countText("AMQ119018")); + + peer.waitForScriptToComplete(5, TimeUnit.SECONDS); + } } } @@ -474,173 +387,130 @@ public void testNoServerOfferedMirrorCapability() throws Exception { * * @throws Exception */ - @Test + @Test(timeout = 30_000) public void testReconnectAfterMirrorLinkRefusal() throws Exception { - startVerx(); - - AtomicInteger errors = new AtomicInteger(0); - - AtomicInteger messagesReceived = new AtomicInteger(0); - - List connections = Collections.synchronizedList(new ArrayList()); - List disconnected = Collections.synchronizedList(new ArrayList()); - AtomicInteger refusedLinkMessageCount = new AtomicInteger(); - AtomicInteger linkOpens = new AtomicInteger(0); - - mockServer = new MockServer(vertx, serverConnection -> { - serverConnection.disconnectHandler(c -> { - disconnected.add(serverConnection); - }); - - serverConnection.openHandler(c -> { - connections.add(serverConnection); - serverConnection.open(); - }); - - serverConnection.closeHandler(c -> { - serverConnection.close(); - connections.remove(serverConnection); - }); - - serverConnection.sessionOpenHandler(session -> { - session.open(); - }); - - serverConnection.receiverOpenHandler(serverReceiver -> { - Target remoteTarget = serverReceiver.getRemoteTarget(); - String remoteAddress = remoteTarget == null ? null : remoteTarget.getAddress(); - if (remoteAddress == null || !remoteAddress.startsWith(ProtonProtocolManager.MIRROR_ADDRESS)) { - errors.incrementAndGet(); - logger.warn("Receiving address as {}", remoteAddress); - return; - } - if (linkOpens.incrementAndGet() != 2) { - logger.debug("Link Opens::{}", linkOpens); - logger.debug("ServerReceiver = {}", serverReceiver.getTarget()); - serverReceiver.setTarget(null); - - serverReceiver.handler((del, msg) -> { - refusedLinkMessageCount.incrementAndGet(); - logger.debug("Should not have got message on refused link: {}", msg); - }); - - serverReceiver.open(); - - vertx.setTimer(20, x -> { - serverReceiver.setCondition(new ErrorCondition(AmqpError.ILLEGAL_STATE, "Testing refusal of mirror link for $reasons")); - serverReceiver.close(); - }); - } else { - serverReceiver.setTarget(serverReceiver.getRemoteTarget()); - HashMap linkProperties = new HashMap<>(); - linkProperties.put(AMQPMirrorControllerSource.BROKER_ID, "fake-id"); - - serverReceiver.setProperties(linkProperties); - serverReceiver.setOfferedCapabilities(new Symbol[]{AMQPMirrorControllerSource.MIRROR_CAPABILITY}); - - serverReceiver.handler((del, msg) -> { - logger.debug("prefetch = {}, Got message: {}", serverReceiver.getPrefetch(), msg); - if (msg.getApplicationProperties() != null) { - Map map = msg.getApplicationProperties().getValue(); - Object value = map.get("sender"); - if (value != null) { - if (messagesReceived.get() != ((Integer) value).intValue()) { - logger.warn("Message out of order. Expected {} but received {}", messagesReceived.get(), value); - errors.incrementAndGet(); - } - messagesReceived.incrementAndGet(); - } - } - del.disposition(Accepted.getInstance(), true); - if (serverReceiver.getPrefetch() == 0) { - serverReceiver.flow(1); - } - }); - - serverReceiver.open(); - } - }); - }); + try (ProtonTestServer peer = new ProtonTestServer()) { + // First attempt, refuse + peer.expectSASLAnonymousConnect(); + peer.expectOpen().respond(); + peer.expectBegin().respond(); + peer.expectAttach().ofSender().respond().withNullTarget(); + peer.remoteDetach().withErrorCondition(AmqpError.ILLEGAL_STATE.toString(), "Testing refusal of mirror link for $reasons").queue(); + peer.expectDetach().optional(); + peer.expectClose().optional(); + peer.expectConnectionToDrop(); + + // Second attempt, succeeds + peer.expectSASLAnonymousConnect(); + peer.expectOpen().respond(); + peer.expectBegin().respond(); + peer.expectAttach().respondInKind() + .withProperty(AMQPMirrorControllerSource.BROKER_ID.toString(), "Test-Broker"); + peer.remoteFlow().withLinkCredit(1000).queue(); + peer.expectTransfer().accept(); // Notification address create + peer.expectTransfer().accept(); // Address create for odd MQTT address + peer.start(); + + final URI remoteURI = peer.getServerURI(); + logger.debug("Connect test started, peer listening on: {}", remoteURI); + + AMQPBrokerConnectConfiguration amqpConnection = new AMQPBrokerConnectConfiguration(getTestName(), + "tcp://localhost:" + remoteURI.getPort() + "?connect-timeout-millis=3000"); + amqpConnection.setReconnectAttempts(1).setRetryInterval(100); + amqpConnection.addElement(new AMQPMirrorBrokerConnectionElement()); - AMQPBrokerConnectConfiguration amqpConnection = new AMQPBrokerConnectConfiguration("test", "tcp://localhost:" + mockServer.actualPort()).setReconnectAttempts(3).setRetryInterval(10); - amqpConnection.addElement(new AMQPMirrorBrokerConnectionElement()); - server.getConfiguration().addAMQPConnection(amqpConnection); - server.start(); + server.getConfiguration().addAMQPConnection(amqpConnection); + server.start(); - Wait.assertEquals(1, disconnected::size, 6000); - Wait.assertEquals(2, connections::size, 6000); + int msgCount = 10; - assertSame(connections.get(0), disconnected.get(0)); - assertFalse(connections.get(1).isDisconnected()); + peer.waitForScriptToComplete(5, TimeUnit.SECONDS); + peer.expectTransfer().accept(); // Address create + peer.expectTransfer().accept(); // Queue create + for (int i = 0; i < msgCount; ++i) { + expectMirroredJMSMessage(peer, i); + } - assertEquals("Should not have got any message on refused link", 0, refusedLinkMessageCount.get()); - assertEquals(0, errors.get()); + sendJMSMessage(msgCount, getQueueName()); - ConnectionFactory factory = CFUtil.createConnectionFactory("AMQP", "tcp://localhost:" + AMQP_PORT); - try (Connection connection = factory.createConnection()) { - Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); - MessageProducer producer = session.createProducer(session.createQueue(getQueueName())); - for (int i = 0; i < 100; i++) { - TextMessage message = session.createTextMessage("hello"); - message.setIntProperty("sender", i); - producer.send(message); - } + peer.waitForScriptToComplete(5, TimeUnit.SECONDS); } - - Wait.assertEquals(100, messagesReceived::intValue); - assertEquals(0, errors.get()); // Meant to check again. the errors check before was because of connection issues. This one is about duplicates on receiving } - @Test + @Test(timeout = 30_000) public void testNoClientDesiredMirrorCapability() throws Exception { try (AssertionLoggerHandler loggerHandler = new AssertionLoggerHandler()) { server.start(); - AmqpClient client = new AmqpClient(new URI("tcp://localhost:" + AMQP_PORT), null, null); - client.setValidator(new AmqpValidator() { - - @Override - public void inspectOpenedResource(Sender sender) { - ErrorCondition condition = sender.getRemoteCondition(); + final String address = ProtonProtocolManager.getMirrorAddress(getTestName()); + + try (ProtonTestClient receivingPeer = new ProtonTestClient()) { + receivingPeer.queueClientSaslAnonymousConnect(); + receivingPeer.connect("localhost", AMQP_PORT); + receivingPeer.waitForScriptToComplete(5, TimeUnit.SECONDS); + + receivingPeer.expectOpen(); + receivingPeer.expectBegin(); + receivingPeer.expectAttach().withNullTarget(); + receivingPeer.expectDetach().withError(AmqpError.ILLEGAL_STATE.toString(), + Matchers.containsString("AMQ119024")) + .respond(); + receivingPeer.remoteOpen().withContainerId("test-sender").now(); + receivingPeer.remoteBegin().now(); + receivingPeer.remoteAttach().ofSender() + .withInitialDeliveryCount(0) + .withName("mirror-test") + .withTarget().withAddress(address).also() + .withSource().and() + .now(); + receivingPeer.waitForScriptToComplete(5, TimeUnit.SECONDS); + } - if (condition != null && condition.getCondition() != null) { - if (!condition.getCondition().equals(AmqpError.ILLEGAL_STATE)) { - markAsInvalid("Should have been closed with an illegal state error, but error was: " + condition); - } + Wait.assertTrue(() -> loggerHandler.findText("AMQ119024")); + } + } - if (!condition.getDescription().contains("AMQ119024")) { - markAsInvalid("should have indicated the error code about missing a desired capability"); - } + private static void expectConnectionButDontRespondToSenderAttach(ProtonTestServer peer) { + peer.expectSASLAnonymousConnect(); + peer.expectOpen().respond(); + peer.expectBegin().respond(); + peer.expectAttach().ofSender(); //No response, causes timeout + peer.expectConnectionToDrop(); + } - if (!condition.getDescription().contains(AMQPMirrorControllerSource.MIRROR_CAPABILITY)) { - markAsInvalid("should have indicated the error code about missing a desired capability"); - } - } else { - markAsInvalid("Sender should have been detached with an error"); - } - } - }); + private static void sendJMSMessage(int msgCount, String queueName) throws JMSException { + final ConnectionFactory factory = CFUtil.createConnectionFactory("AMQP", "tcp://localhost:" + AMQP_PORT); - String address = ProtonProtocolManager.getMirrorAddress(getTestName()); + try (Connection connection = factory.createConnection()) { + final Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + final MessageProducer producer = session.createProducer(session.createQueue(queueName)); - AmqpConnection connection = client.connect(); - try { - AmqpSession session = connection.createSession(); + for (int i = 0; i < msgCount; i++) { + final TextMessage message = session.createTextMessage("hello"); - try { - session.createSender(address); - fail("Link should have been refused."); - } catch (Exception ex) { - Assert.assertTrue(ex.getMessage().contains("AMQ119024")); - logger.debug("Caught expected exception"); - } + message.setIntProperty("sender", i); - connection.getStateInspector().assertValid(); - } finally { - connection.close(); + producer.send(message); } - - Wait.assertTrue(() -> loggerHandler.findText("AMQ119024")); } } + + private static void expectMirroredJMSMessage(ProtonTestPeer peer, int sequence) { + final HeaderMatcher headerMatcher = new HeaderMatcher(true); + final PropertiesMatcher properties = new PropertiesMatcher(true); + final DeliveryAnnotationsMatcher daMatcher = new DeliveryAnnotationsMatcher(true); + final MessageAnnotationsMatcher annotationsMatcher = new MessageAnnotationsMatcher(true); + final ApplicationPropertiesMatcher apMatcher = new ApplicationPropertiesMatcher(true); + apMatcher.withEntry("sender", equalTo(sequence)); + final EncodedAmqpValueMatcher bodyMatcher = new EncodedAmqpValueMatcher("hello"); + final TransferPayloadCompositeMatcher matcher = new TransferPayloadCompositeMatcher(); + matcher.setHeadersMatcher(headerMatcher); + matcher.setPropertiesMatcher(properties); + matcher.setDeliveryAnnotationsMatcher(daMatcher); + matcher.setMessageAnnotationsMatcher(annotationsMatcher); + matcher.setApplicationPropertiesMatcher(apMatcher); + matcher.addMessageContentMatcher(bodyMatcher); + + peer.expectTransfer().withPayload(matcher).accept(); + } }