From dbe49c3924c041007b69c70b52d153b02bde7744 Mon Sep 17 00:00:00 2001 From: Alfusainey Jallow Date: Thu, 14 Jun 2018 15:05:57 +0200 Subject: [PATCH] [#643] AMQP Tests: adjust according to Kai comments on June 14th Signed-off-by: Alfusainey Jallow --- ...=> VertxBasedAmqpProtocolAdapterTest.java} | 18 +- ...asedAmqpProtocolAdapterWithClientTest.java | 414 ------------------ 2 files changed, 8 insertions(+), 424 deletions(-) rename adapters/amqp-vertx/src/test/java/org/eclipse/hono/adapter/amqp/{VertxBasedAmqpProtocolAdapterWithoutClientTest.java => VertxBasedAmqpProtocolAdapterTest.java} (93%) delete mode 100644 adapters/amqp-vertx/src/test/java/org/eclipse/hono/adapter/amqp/VertxBasedAmqpProtocolAdapterWithClientTest.java diff --git a/adapters/amqp-vertx/src/test/java/org/eclipse/hono/adapter/amqp/VertxBasedAmqpProtocolAdapterWithoutClientTest.java b/adapters/amqp-vertx/src/test/java/org/eclipse/hono/adapter/amqp/VertxBasedAmqpProtocolAdapterTest.java similarity index 93% rename from adapters/amqp-vertx/src/test/java/org/eclipse/hono/adapter/amqp/VertxBasedAmqpProtocolAdapterWithoutClientTest.java rename to adapters/amqp-vertx/src/test/java/org/eclipse/hono/adapter/amqp/VertxBasedAmqpProtocolAdapterTest.java index 14b055a0d66..3be4668b7d0 100644 --- a/adapters/amqp-vertx/src/test/java/org/eclipse/hono/adapter/amqp/VertxBasedAmqpProtocolAdapterWithoutClientTest.java +++ b/adapters/amqp-vertx/src/test/java/org/eclipse/hono/adapter/amqp/VertxBasedAmqpProtocolAdapterTest.java @@ -39,12 +39,10 @@ import io.vertx.proton.ProtonServer; /** - * Verifies the behaviour of {@link VertxBasedAmqpProtocolAdapter}. In this test setup, the AMQP adapter is not deployed - * as a running Verticle and does not use an AMQP client to simulate a client device. - * Rather, the setup manually instantiates the adapter and gives it a user defined ProtonServer. + * Verifies the behaviour of {@link VertxBasedAmqpProtocolAdapter}. */ @RunWith(VertxUnitRunner.class) -public class VertxBasedAmqpProtocolAdapterWithoutClientTest { +public class VertxBasedAmqpProtocolAdapterTest { @Rule public Timeout globalTimeout = new Timeout(2, TimeUnit.SECONDS); @@ -157,7 +155,7 @@ public void testAnonymousRelayNotSupported() { public void uploadTelemetryMessageWithSettledDeliverySemantics() { // GIVEN an AMQP adapter with a configured server final VertxBasedAmqpProtocolAdapter adapter = givenAnAmqpAdapter(); - final MessageSender sender = givenAmessageSenderForAnyTenant(); + final MessageSender telemetrySender = givenATelemetrySenderForAnyTenant(); // which is enabled for a tenant givenAConfiguredTenant("some-tenant", true); @@ -170,7 +168,7 @@ public void uploadTelemetryMessageWithSettledDeliverySemantics() { adapter.uploadMessage(new AmqpContext(delivery, getFakeMessage(), resource)); // THEN the adapter sends the message and does not wait for response from the peer. - verify(sender).send(any(Message.class)); + verify(telemetrySender).send(any(Message.class)); } /** @@ -180,10 +178,10 @@ public void uploadTelemetryMessageWithSettledDeliverySemantics() { * AT_LEAST_ONCE delivery semantics. */ @Test - public void uploadTelemetryMessageWithNotSettledDeliverySemantics() { + public void uploadTelemetryMessageWithUnsettledDeliverySemantics() { // GIVEN an adapter configured to use a user-define server. final VertxBasedAmqpProtocolAdapter adapter = givenAnAmqpAdapter(); - final MessageSender sender = givenAmessageSenderForAnyTenant(); + final MessageSender telemetrySender = givenATelemetrySenderForAnyTenant(); // which is enabled for a tenant givenAConfiguredTenant("enabled-tenant", true); @@ -195,7 +193,7 @@ public void uploadTelemetryMessageWithNotSettledDeliverySemantics() { adapter.uploadMessage(new AmqpContext(delivery, getFakeMessage(), resource)); // THEN the sender sends the message and waits for the outcome from the downstream peer - verify(sender).sendAndWaitForOutcome(any(Message.class)); + verify(telemetrySender).sendAndWaitForOutcome(any(Message.class)); } private void givenAConfiguredTenant(final String tenantId, final boolean enabled) { @@ -215,7 +213,7 @@ private Message getFakeMessage() { return message; } - private MessageSender givenAmessageSenderForAnyTenant() { + private MessageSender givenATelemetrySenderForAnyTenant() { final MessageSender sender = mock(MessageSender.class); when(messagingServiceClient.getOrCreateTelemetrySender(anyString())).thenReturn(Future.succeededFuture(sender)); return sender; diff --git a/adapters/amqp-vertx/src/test/java/org/eclipse/hono/adapter/amqp/VertxBasedAmqpProtocolAdapterWithClientTest.java b/adapters/amqp-vertx/src/test/java/org/eclipse/hono/adapter/amqp/VertxBasedAmqpProtocolAdapterWithClientTest.java deleted file mode 100644 index 38f27e999e0..00000000000 --- a/adapters/amqp-vertx/src/test/java/org/eclipse/hono/adapter/amqp/VertxBasedAmqpProtocolAdapterWithClientTest.java +++ /dev/null @@ -1,414 +0,0 @@ -package org.eclipse.hono.adapter.amqp; - -import static org.mockito.ArgumentMatchers.any; -import static org.mockito.ArgumentMatchers.anyString; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.when; - -import java.util.concurrent.TimeUnit; - -import org.apache.qpid.proton.amqp.messaging.Rejected; -import org.apache.qpid.proton.amqp.messaging.Released; -import org.apache.qpid.proton.message.Message; -import org.eclipse.hono.client.HonoClient; -import org.eclipse.hono.client.MessageSender; -import org.eclipse.hono.client.RegistrationClient; -import org.eclipse.hono.client.ServerErrorException; -import org.eclipse.hono.client.TenantClient; -import org.eclipse.hono.config.ProtocolAdapterProperties; -import org.eclipse.hono.service.command.CommandConnection; -import org.eclipse.hono.util.RegistrationConstants; -import org.eclipse.hono.util.TenantConstants; -import org.eclipse.hono.util.TenantObject; -import org.junit.AfterClass; -import org.junit.BeforeClass; -import org.junit.Rule; -import org.junit.Test; -import org.junit.rules.Timeout; -import org.junit.runner.RunWith; - -import io.vertx.core.Future; -import io.vertx.core.Handler; -import io.vertx.core.Vertx; -import io.vertx.core.json.JsonObject; -import io.vertx.ext.unit.Async; -import io.vertx.ext.unit.TestContext; -import io.vertx.ext.unit.junit.VertxUnitRunner; -import io.vertx.proton.ProtonClient; -import io.vertx.proton.ProtonConnection; -import io.vertx.proton.ProtonHelper; -import io.vertx.proton.ProtonQoS; -import io.vertx.proton.ProtonSender; - -/** - * Verifies the behaviour of {@link VertxBasedAmqpProtocolAdapter}. In this setup, the AMQP adapter is deployed and run - * as a Verticle. A ProtonClient, which simulates a device, is used to communicate with the adapter. - */ -@RunWith(VertxUnitRunner.class) -public class VertxBasedAmqpProtocolAdapterWithClientTest { - - @Rule - public Timeout timeout = new Timeout(2, TimeUnit.SECONDS); - - private static final String ADAPTER_HOST = "localhost"; - private static final int ADAPTER_PORT = 4040; - - private static HonoClient tenantServiceClient; - private static HonoClient messagingServiceClient; - private static HonoClient credentialsServiceClient; - private static HonoClient registrationServiceClient; - private static CommandConnection commandConnection; - - private static RegistrationClient registrationClient; - private static TenantClient tenantClient; - - private static Vertx vertx; - private static VertxBasedAmqpProtocolAdapter adapter; - - private static ProtocolAdapterProperties config = new ProtocolAdapterProperties(); - - /** - * Sets up the fixture for this test. - * - * @param context The Vert.x context for running asynchronous tests. - */ - @SuppressWarnings("unchecked") - @BeforeClass - public static void before(final TestContext context) { - vertx = Vertx.vertx(); - - tenantClient = mock(TenantClient.class); - when(tenantClient.get(anyString())).thenAnswer(answer -> { - return Future.succeededFuture(TenantObject.from(answer.getArgument(0), true)); - }); - - tenantServiceClient = mock(HonoClient.class); - when(tenantServiceClient.connect(any(Handler.class))).thenReturn(Future.succeededFuture(tenantServiceClient)); - when(tenantServiceClient.getOrCreateTenantClient()).thenReturn(Future.succeededFuture(tenantClient)); - - messagingServiceClient = mock(HonoClient.class, "Hono Messaging Service client"); - when(messagingServiceClient.connect(any(Handler.class))) - .thenReturn(Future.succeededFuture(messagingServiceClient)); - when(messagingServiceClient.getOrCreateTelemetrySender(anyString())) - .thenReturn(Future.succeededFuture(mock(MessageSender.class))); - when(messagingServiceClient.getOrCreateEventSender(anyString())) - .thenReturn(Future.succeededFuture(mock(MessageSender.class))); - - credentialsServiceClient = mock(HonoClient.class); - when(credentialsServiceClient.connect(any(Handler.class))) - .thenReturn(Future.succeededFuture(credentialsServiceClient)); - - registrationClient = mock(RegistrationClient.class); - final JsonObject regAssertion = new JsonObject().put(RegistrationConstants.FIELD_ASSERTION, "assert-device"); - when(registrationClient.assertRegistration(anyString(), any())) - .thenReturn(Future.succeededFuture(regAssertion)); - - registrationServiceClient = mock(HonoClient.class, "registration service client"); - when(registrationServiceClient.connect(any(Handler.class))) - .thenReturn(Future.succeededFuture(registrationServiceClient)); - when(registrationServiceClient.getOrCreateRegistrationClient(anyString())) - .thenReturn(Future.succeededFuture(registrationClient)); - - commandConnection = mock(CommandConnection.class); - when(commandConnection.connect(any(Handler.class))).thenReturn(Future.succeededFuture(commandConnection)); - - config.setInsecurePort(ADAPTER_PORT); - config.setInsecurePortBindAddress(ADAPTER_HOST); - config.setAuthenticationRequired(false); - - adapter = new VertxBasedAmqpProtocolAdapter(); - adapter.setConfig(config); - adapter.setTenantServiceClient(tenantServiceClient); - adapter.setCredentialsServiceClient(credentialsServiceClient); - adapter.setRegistrationServiceClient(registrationServiceClient); - adapter.setHonoMessagingClient(messagingServiceClient); - adapter.setCommandConnection(commandConnection); - - vertx.deployVerticle(adapter, context.asyncAssertSuccess()); - } - - /** - * Closes the Vert.x instance. - */ - @AfterClass - public static void after() { - vertx.close(); - } - - /** - * Verifies that a request to upload a message to an invalid endpoint results in the adapter closing the link used - * for sending the message. - * - * @param context The context for running asynchronous test on Vert.x - */ - @Test - public void testUploadMessageToInvalidEndpoint(final TestContext context) { - final Async completionTracker = context.async(); - - // GIVEN a client ready to send a request. - final ProtonClient client = ProtonClient.create(vertx); - final String targetAddress = "UNKNOWN/DEFAULT_TENANT/4712"; - client.connect(ADAPTER_HOST, ADAPTER_PORT, ar -> { - if (ar.succeeded()) { - final ProtonConnection unOpenedConnection = ar.result(); - unOpenedConnection.openHandler(remoteOpen -> { - if (remoteOpen.succeeded()) { - final ProtonConnection openedConnection = remoteOpen.result(); - // IF the client opens a sender link with the adapter to upload - // a message to an invalid endpoint. - openedConnection.createSender(targetAddress) - .closeHandler(context.asyncAssertFailure(failed -> { - // THEN the sender link is closed by the adapter. - completionTracker.complete(); - })).open(); - } - }).open(); - } else { - context.fail(ar.cause()); - } - }); - completionTracker.awaitSuccess(); - } - - /** - * Verifies that a request to upload an event message using AT_MOST_ONCE QoS leads to the adapter rejecting the - * message and closing the sender link. - * - * @param context The Vert.x context for running asynchronous tests. - */ - @Test - public void testUploadEventWithAnInvalidQoS(final TestContext context) { - - // GIVEN a client ready to upload an event. - final ProtonClient client = ProtonClient.create(vertx); - final String targetAddress = "event/DEFAULT_TENANT/4712"; - - final Async completionTracker = context.async(); - client.connect(ADAPTER_HOST, ADAPTER_PORT, ar -> { - if (ar.succeeded()) { - final ProtonConnection connection = ar.result(); - connection.openHandler(remoteOpen -> { - if (remoteOpen.succeeded()) { - final ProtonConnection openedConnection = remoteOpen.result(); - - // IF the client opens a sender link with the adapter to upload - // an event to a valid endpoint. - openedConnection.createSender(targetAddress) - // USING AT_MOST_ONCE quality of service - .setQoS(ProtonQoS.AT_MOST_ONCE) - .closeHandler(context.asyncAssertFailure(failed -> { - // THEN the adapter rejects the request and closes the sender link. - completionTracker.complete(); - })).open(); - } - }).open(); - } else { - context.fail(ar.cause()); - } - }); - - completionTracker.awaitSuccess(); - } - - /** - * Verifies that the AMQP adapter responds with the REJECTED disposition frame when a device uploads telemetry data - * (with AT_LEAST_ONCE QoS) for a tenant for which the adapter is disabled. - * - * When Transfer(settled=false) frame is sent, the device will always gets a response when the message cannot be - * processed. - * - * @param context The Vert.x context for running asynchronous tests. - */ - @Test - public void testUploadMessageForDisabledTenant(final TestContext context) { - - // GIVEN a client ready to upload an event. - final ProtonClient client = ProtonClient.create(vertx); - final String targetAddress = "telemetry/disabled-tenant/4712"; - - final TenantObject tenantConfig = TenantObject.from("disabled-tenant", false); - tenantConfig.addAdapterConfiguration(new JsonObject() - .put(TenantConstants.FIELD_ADAPTERS_TYPE, "amqp") - .put(TenantConstants.FIELD_ENABLED, false)); - when(tenantClient.get("disabled-tenant")).thenReturn(Future.succeededFuture(tenantConfig)); - - final Async completionTracker = context.async(); - client.connect(ADAPTER_HOST, ADAPTER_PORT, ar -> { - if (ar.succeeded()) { - final ProtonConnection unOpenedConnection = ar.result(); - unOpenedConnection.openHandler(remoteOpen -> { - if (remoteOpen.succeeded()) { - final ProtonConnection openedConnection = remoteOpen.result(); - - // IF the client opens a sender link with the adapter - // to upload an event to a valid endpoint. - openedConnection.createSender(targetAddress) - // AND wants to be notified when the message cannot be processed - .setQoS(ProtonQoS.AT_LEAST_ONCE) - .openHandler(remoteAttach -> { - if (remoteAttach.succeeded()) { - final ProtonSender sender = remoteAttach.result(); - final JsonObject body = new JsonObject().put("temp", 5); - final Message msg = ProtonHelper.message(body.encode()); - msg.setContentType("application/json"); - sender.send(msg, delivery -> { - // THEN the adapter notifies the device by sending back the - // REJECTED disposition frame. - context.assertEquals(new Rejected().getClass(), - delivery.getRemoteState().getClass()); - - sender.close(); - openedConnection.close(); - completionTracker.complete(); - }); - } - }).open(); - - } - }).open(); - } else { - context.fail(ar.cause()); - } - }); - - completionTracker.awaitSuccess(); - } - - /** - * Verifies that a BAD inbound message, (i.e when the message content-type does not match the payload), gets - * rejected by the adapter i.e the adapter sends back a REJECTED disposition frame to the client device. - * - * @param context The Vert.x context for running asynchronous tests. - */ - @Test - public void testAdapterRejectsBadInboundMessage(final TestContext context) { - - // GIVEN a client ready to upload an event. - final ProtonClient client = ProtonClient.create(vertx); - final String targetAddress = "telemetry/disabled-tenant/4712"; - final Async completionTracker = context.async(); - - client.connect(ADAPTER_HOST, ADAPTER_PORT, ar -> { - if (ar.succeeded()) { - final ProtonConnection unOpenedConnection = ar.result(); - unOpenedConnection.openHandler(remoteOpen -> { - if (remoteOpen.succeeded()) { - final ProtonConnection openedConnection = remoteOpen.result(); - - // IF the client opens a sender link with the adapter - // to upload an event to a valid endpoint. - openedConnection.createSender(targetAddress) - .openHandler(remoteAttach -> { - if (remoteAttach.succeeded()) { - final ProtonSender sender = remoteAttach.result(); - final JsonObject body = new JsonObject().put("temp", 5); - final Message msg = ProtonHelper.message(body.encode()); - msg.setContentType("application/vnd.eclipse-hono-empty-notification"); - sender.send(msg, delivery -> { - // THEN the adapter notifies the device by sending back the - // REJECTED disposition frame. - context.assertEquals(new Rejected().getClass(), - delivery.getRemoteState().getClass()); - - sender.close(); - openedConnection.close(); - completionTracker.complete(); - }); - } - }).open(); - } - }).open(); - - } else { - context.fail(ar.cause()); - } - }); - completionTracker.awaitSuccess(); - } - - /** - * Verifies that the adapter does not accept a link for sending messages to client devices. - * - * @param context The Vert.x context for asynchronous testing. - */ - @Test - public void testAdapterDoesNotSendMessagesToDevices(final TestContext context) { - final Async completionTracker = context.async(); - // IF a client device - final ProtonClient client = ProtonClient.create(vertx); - client.connect(ADAPTER_HOST, ADAPTER_PORT, ar -> { - if (ar.succeeded()) { - final ProtonConnection unOpenedConnection = ar.result(); - unOpenedConnection.openHandler(remoteOpen -> { - if (remoteOpen.succeeded()) { - final ProtonConnection openedConnection = remoteOpen.result(); - // OPENS an AMQP link for receiving messages from the adapter - openedConnection.createReceiver(anyString()) - .closeHandler(remoteDetach -> { - // THEN the adapter closes the link by sending - // back a DETACH frame to the device - completionTracker.complete(); - }).open(); - } - }).open(); - } else { - context.fail(ar.cause()); - } - }); - completionTracker.awaitSuccess(); - } - - /** - * Verifies that the adapter responds with the RELEASED disposition frame if the adapter cannot process a message - * (due to a server error) and the client uses AT_LEAST_ONCE delivery to be notified if the message cannot be - * delivered. - * - * @param context The Vert.x context for asynchronous testing. - */ - @Test - public void testAdapterSendsReleasedDispositionWhenMessageCannotBeProcessed(final TestContext context) { - final Async completionTracker = context.async(); - - // GIVEN a client that wants to upload telemetry message - final ProtonClient client = ProtonClient.create(vertx); - - // WHICH the adapter cannot be processed due to an internal server error. - final MessageSender honoSender = mock(MessageSender.class); - when(messagingServiceClient.getOrCreateTelemetrySender(anyString())) - .thenReturn(Future.succeededFuture(honoSender)); - when(honoSender.sendAndWaitForOutcome(any(Message.class))) - .thenReturn(Future.failedFuture(mock(ServerErrorException.class))); - - client.connect(ADAPTER_HOST, ADAPTER_PORT, ar -> { - if (ar.succeeded()) { - final ProtonConnection unopenedConnection = ar.result(); - unopenedConnection.openHandler(remoteOpen -> { - if (remoteOpen.succeeded()) { - final ProtonConnection openConnection = remoteOpen.result(); - - // IF a client opens a sender link with - // AT_LEAST_ONCE QoS (the default in proton) - openConnection.createSender("telemetry/DEFAULT_TENANT/4712") - .openHandler(remoteAttach -> { - if (remoteAttach.succeeded()) { - final ProtonSender sender = remoteAttach.result(); - final JsonObject body = new JsonObject().put("temp", 5); - final Message msg = ProtonHelper.message(body.encode()); - sender.send(msg, delivery -> { - // THEN the adapter notifies the device by sending - // the RELEASED disposition frame to the client. - context.assertEquals(Released.getInstance(), delivery.getRemoteState()); - completionTracker.complete(); - }); - } - }).open(); - } - }).open(); - } else { - context.fail(ar.cause()); - } - }); - completionTracker.awaitSuccess(); - } - -}