From 7b058ee39c975563b529c8b3e6a7870b4c2a6642 Mon Sep 17 00:00:00 2001 From: Alfusainey Jallow Date: Thu, 14 Jun 2018 05:17:47 +0200 Subject: [PATCH] [#643] Add tests for AMQP Adapter implementation (Unauthenticated devices) Signed-off-by: Alfusainey Jallow --- ...asedAmqpProtocolAdapterWithClientTest.java | 414 ++++++++++++++++++ ...dAmqpProtocolAdapterWithoutClientTest.java | 270 ++++++++++++ 2 files changed, 684 insertions(+) create mode 100644 adapters/amqp-vertx/src/test/java/org/eclipse/hono/adapter/amqp/VertxBasedAmqpProtocolAdapterWithClientTest.java create mode 100644 adapters/amqp-vertx/src/test/java/org/eclipse/hono/adapter/amqp/VertxBasedAmqpProtocolAdapterWithoutClientTest.java diff --git a/adapters/amqp-vertx/src/test/java/org/eclipse/hono/adapter/amqp/VertxBasedAmqpProtocolAdapterWithClientTest.java b/adapters/amqp-vertx/src/test/java/org/eclipse/hono/adapter/amqp/VertxBasedAmqpProtocolAdapterWithClientTest.java new file mode 100644 index 00000000000..38f27e999e0 --- /dev/null +++ b/adapters/amqp-vertx/src/test/java/org/eclipse/hono/adapter/amqp/VertxBasedAmqpProtocolAdapterWithClientTest.java @@ -0,0 +1,414 @@ +package org.eclipse.hono.adapter.amqp; + +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyString; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +import java.util.concurrent.TimeUnit; + +import org.apache.qpid.proton.amqp.messaging.Rejected; +import org.apache.qpid.proton.amqp.messaging.Released; +import org.apache.qpid.proton.message.Message; +import org.eclipse.hono.client.HonoClient; +import org.eclipse.hono.client.MessageSender; +import org.eclipse.hono.client.RegistrationClient; +import org.eclipse.hono.client.ServerErrorException; +import org.eclipse.hono.client.TenantClient; +import org.eclipse.hono.config.ProtocolAdapterProperties; +import org.eclipse.hono.service.command.CommandConnection; +import org.eclipse.hono.util.RegistrationConstants; +import org.eclipse.hono.util.TenantConstants; +import org.eclipse.hono.util.TenantObject; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.Timeout; +import org.junit.runner.RunWith; + +import io.vertx.core.Future; +import io.vertx.core.Handler; +import io.vertx.core.Vertx; +import io.vertx.core.json.JsonObject; +import io.vertx.ext.unit.Async; +import io.vertx.ext.unit.TestContext; +import io.vertx.ext.unit.junit.VertxUnitRunner; +import io.vertx.proton.ProtonClient; +import io.vertx.proton.ProtonConnection; +import io.vertx.proton.ProtonHelper; +import io.vertx.proton.ProtonQoS; +import io.vertx.proton.ProtonSender; + +/** + * Verifies the behaviour of {@link VertxBasedAmqpProtocolAdapter}. In this setup, the AMQP adapter is deployed and run + * as a Verticle. A ProtonClient, which simulates a device, is used to communicate with the adapter. + */ +@RunWith(VertxUnitRunner.class) +public class VertxBasedAmqpProtocolAdapterWithClientTest { + + @Rule + public Timeout timeout = new Timeout(2, TimeUnit.SECONDS); + + private static final String ADAPTER_HOST = "localhost"; + private static final int ADAPTER_PORT = 4040; + + private static HonoClient tenantServiceClient; + private static HonoClient messagingServiceClient; + private static HonoClient credentialsServiceClient; + private static HonoClient registrationServiceClient; + private static CommandConnection commandConnection; + + private static RegistrationClient registrationClient; + private static TenantClient tenantClient; + + private static Vertx vertx; + private static VertxBasedAmqpProtocolAdapter adapter; + + private static ProtocolAdapterProperties config = new ProtocolAdapterProperties(); + + /** + * Sets up the fixture for this test. + * + * @param context The Vert.x context for running asynchronous tests. + */ + @SuppressWarnings("unchecked") + @BeforeClass + public static void before(final TestContext context) { + vertx = Vertx.vertx(); + + tenantClient = mock(TenantClient.class); + when(tenantClient.get(anyString())).thenAnswer(answer -> { + return Future.succeededFuture(TenantObject.from(answer.getArgument(0), true)); + }); + + tenantServiceClient = mock(HonoClient.class); + when(tenantServiceClient.connect(any(Handler.class))).thenReturn(Future.succeededFuture(tenantServiceClient)); + when(tenantServiceClient.getOrCreateTenantClient()).thenReturn(Future.succeededFuture(tenantClient)); + + messagingServiceClient = mock(HonoClient.class, "Hono Messaging Service client"); + when(messagingServiceClient.connect(any(Handler.class))) + .thenReturn(Future.succeededFuture(messagingServiceClient)); + when(messagingServiceClient.getOrCreateTelemetrySender(anyString())) + .thenReturn(Future.succeededFuture(mock(MessageSender.class))); + when(messagingServiceClient.getOrCreateEventSender(anyString())) + .thenReturn(Future.succeededFuture(mock(MessageSender.class))); + + credentialsServiceClient = mock(HonoClient.class); + when(credentialsServiceClient.connect(any(Handler.class))) + .thenReturn(Future.succeededFuture(credentialsServiceClient)); + + registrationClient = mock(RegistrationClient.class); + final JsonObject regAssertion = new JsonObject().put(RegistrationConstants.FIELD_ASSERTION, "assert-device"); + when(registrationClient.assertRegistration(anyString(), any())) + .thenReturn(Future.succeededFuture(regAssertion)); + + registrationServiceClient = mock(HonoClient.class, "registration service client"); + when(registrationServiceClient.connect(any(Handler.class))) + .thenReturn(Future.succeededFuture(registrationServiceClient)); + when(registrationServiceClient.getOrCreateRegistrationClient(anyString())) + .thenReturn(Future.succeededFuture(registrationClient)); + + commandConnection = mock(CommandConnection.class); + when(commandConnection.connect(any(Handler.class))).thenReturn(Future.succeededFuture(commandConnection)); + + config.setInsecurePort(ADAPTER_PORT); + config.setInsecurePortBindAddress(ADAPTER_HOST); + config.setAuthenticationRequired(false); + + adapter = new VertxBasedAmqpProtocolAdapter(); + adapter.setConfig(config); + adapter.setTenantServiceClient(tenantServiceClient); + adapter.setCredentialsServiceClient(credentialsServiceClient); + adapter.setRegistrationServiceClient(registrationServiceClient); + adapter.setHonoMessagingClient(messagingServiceClient); + adapter.setCommandConnection(commandConnection); + + vertx.deployVerticle(adapter, context.asyncAssertSuccess()); + } + + /** + * Closes the Vert.x instance. + */ + @AfterClass + public static void after() { + vertx.close(); + } + + /** + * Verifies that a request to upload a message to an invalid endpoint results in the adapter closing the link used + * for sending the message. + * + * @param context The context for running asynchronous test on Vert.x + */ + @Test + public void testUploadMessageToInvalidEndpoint(final TestContext context) { + final Async completionTracker = context.async(); + + // GIVEN a client ready to send a request. + final ProtonClient client = ProtonClient.create(vertx); + final String targetAddress = "UNKNOWN/DEFAULT_TENANT/4712"; + client.connect(ADAPTER_HOST, ADAPTER_PORT, ar -> { + if (ar.succeeded()) { + final ProtonConnection unOpenedConnection = ar.result(); + unOpenedConnection.openHandler(remoteOpen -> { + if (remoteOpen.succeeded()) { + final ProtonConnection openedConnection = remoteOpen.result(); + // IF the client opens a sender link with the adapter to upload + // a message to an invalid endpoint. + openedConnection.createSender(targetAddress) + .closeHandler(context.asyncAssertFailure(failed -> { + // THEN the sender link is closed by the adapter. + completionTracker.complete(); + })).open(); + } + }).open(); + } else { + context.fail(ar.cause()); + } + }); + completionTracker.awaitSuccess(); + } + + /** + * Verifies that a request to upload an event message using AT_MOST_ONCE QoS leads to the adapter rejecting the + * message and closing the sender link. + * + * @param context The Vert.x context for running asynchronous tests. + */ + @Test + public void testUploadEventWithAnInvalidQoS(final TestContext context) { + + // GIVEN a client ready to upload an event. + final ProtonClient client = ProtonClient.create(vertx); + final String targetAddress = "event/DEFAULT_TENANT/4712"; + + final Async completionTracker = context.async(); + client.connect(ADAPTER_HOST, ADAPTER_PORT, ar -> { + if (ar.succeeded()) { + final ProtonConnection connection = ar.result(); + connection.openHandler(remoteOpen -> { + if (remoteOpen.succeeded()) { + final ProtonConnection openedConnection = remoteOpen.result(); + + // IF the client opens a sender link with the adapter to upload + // an event to a valid endpoint. + openedConnection.createSender(targetAddress) + // USING AT_MOST_ONCE quality of service + .setQoS(ProtonQoS.AT_MOST_ONCE) + .closeHandler(context.asyncAssertFailure(failed -> { + // THEN the adapter rejects the request and closes the sender link. + completionTracker.complete(); + })).open(); + } + }).open(); + } else { + context.fail(ar.cause()); + } + }); + + completionTracker.awaitSuccess(); + } + + /** + * Verifies that the AMQP adapter responds with the REJECTED disposition frame when a device uploads telemetry data + * (with AT_LEAST_ONCE QoS) for a tenant for which the adapter is disabled. + * + * When Transfer(settled=false) frame is sent, the device will always gets a response when the message cannot be + * processed. + * + * @param context The Vert.x context for running asynchronous tests. + */ + @Test + public void testUploadMessageForDisabledTenant(final TestContext context) { + + // GIVEN a client ready to upload an event. + final ProtonClient client = ProtonClient.create(vertx); + final String targetAddress = "telemetry/disabled-tenant/4712"; + + final TenantObject tenantConfig = TenantObject.from("disabled-tenant", false); + tenantConfig.addAdapterConfiguration(new JsonObject() + .put(TenantConstants.FIELD_ADAPTERS_TYPE, "amqp") + .put(TenantConstants.FIELD_ENABLED, false)); + when(tenantClient.get("disabled-tenant")).thenReturn(Future.succeededFuture(tenantConfig)); + + final Async completionTracker = context.async(); + client.connect(ADAPTER_HOST, ADAPTER_PORT, ar -> { + if (ar.succeeded()) { + final ProtonConnection unOpenedConnection = ar.result(); + unOpenedConnection.openHandler(remoteOpen -> { + if (remoteOpen.succeeded()) { + final ProtonConnection openedConnection = remoteOpen.result(); + + // IF the client opens a sender link with the adapter + // to upload an event to a valid endpoint. + openedConnection.createSender(targetAddress) + // AND wants to be notified when the message cannot be processed + .setQoS(ProtonQoS.AT_LEAST_ONCE) + .openHandler(remoteAttach -> { + if (remoteAttach.succeeded()) { + final ProtonSender sender = remoteAttach.result(); + final JsonObject body = new JsonObject().put("temp", 5); + final Message msg = ProtonHelper.message(body.encode()); + msg.setContentType("application/json"); + sender.send(msg, delivery -> { + // THEN the adapter notifies the device by sending back the + // REJECTED disposition frame. + context.assertEquals(new Rejected().getClass(), + delivery.getRemoteState().getClass()); + + sender.close(); + openedConnection.close(); + completionTracker.complete(); + }); + } + }).open(); + + } + }).open(); + } else { + context.fail(ar.cause()); + } + }); + + completionTracker.awaitSuccess(); + } + + /** + * Verifies that a BAD inbound message, (i.e when the message content-type does not match the payload), gets + * rejected by the adapter i.e the adapter sends back a REJECTED disposition frame to the client device. + * + * @param context The Vert.x context for running asynchronous tests. + */ + @Test + public void testAdapterRejectsBadInboundMessage(final TestContext context) { + + // GIVEN a client ready to upload an event. + final ProtonClient client = ProtonClient.create(vertx); + final String targetAddress = "telemetry/disabled-tenant/4712"; + final Async completionTracker = context.async(); + + client.connect(ADAPTER_HOST, ADAPTER_PORT, ar -> { + if (ar.succeeded()) { + final ProtonConnection unOpenedConnection = ar.result(); + unOpenedConnection.openHandler(remoteOpen -> { + if (remoteOpen.succeeded()) { + final ProtonConnection openedConnection = remoteOpen.result(); + + // IF the client opens a sender link with the adapter + // to upload an event to a valid endpoint. + openedConnection.createSender(targetAddress) + .openHandler(remoteAttach -> { + if (remoteAttach.succeeded()) { + final ProtonSender sender = remoteAttach.result(); + final JsonObject body = new JsonObject().put("temp", 5); + final Message msg = ProtonHelper.message(body.encode()); + msg.setContentType("application/vnd.eclipse-hono-empty-notification"); + sender.send(msg, delivery -> { + // THEN the adapter notifies the device by sending back the + // REJECTED disposition frame. + context.assertEquals(new Rejected().getClass(), + delivery.getRemoteState().getClass()); + + sender.close(); + openedConnection.close(); + completionTracker.complete(); + }); + } + }).open(); + } + }).open(); + + } else { + context.fail(ar.cause()); + } + }); + completionTracker.awaitSuccess(); + } + + /** + * Verifies that the adapter does not accept a link for sending messages to client devices. + * + * @param context The Vert.x context for asynchronous testing. + */ + @Test + public void testAdapterDoesNotSendMessagesToDevices(final TestContext context) { + final Async completionTracker = context.async(); + // IF a client device + final ProtonClient client = ProtonClient.create(vertx); + client.connect(ADAPTER_HOST, ADAPTER_PORT, ar -> { + if (ar.succeeded()) { + final ProtonConnection unOpenedConnection = ar.result(); + unOpenedConnection.openHandler(remoteOpen -> { + if (remoteOpen.succeeded()) { + final ProtonConnection openedConnection = remoteOpen.result(); + // OPENS an AMQP link for receiving messages from the adapter + openedConnection.createReceiver(anyString()) + .closeHandler(remoteDetach -> { + // THEN the adapter closes the link by sending + // back a DETACH frame to the device + completionTracker.complete(); + }).open(); + } + }).open(); + } else { + context.fail(ar.cause()); + } + }); + completionTracker.awaitSuccess(); + } + + /** + * Verifies that the adapter responds with the RELEASED disposition frame if the adapter cannot process a message + * (due to a server error) and the client uses AT_LEAST_ONCE delivery to be notified if the message cannot be + * delivered. + * + * @param context The Vert.x context for asynchronous testing. + */ + @Test + public void testAdapterSendsReleasedDispositionWhenMessageCannotBeProcessed(final TestContext context) { + final Async completionTracker = context.async(); + + // GIVEN a client that wants to upload telemetry message + final ProtonClient client = ProtonClient.create(vertx); + + // WHICH the adapter cannot be processed due to an internal server error. + final MessageSender honoSender = mock(MessageSender.class); + when(messagingServiceClient.getOrCreateTelemetrySender(anyString())) + .thenReturn(Future.succeededFuture(honoSender)); + when(honoSender.sendAndWaitForOutcome(any(Message.class))) + .thenReturn(Future.failedFuture(mock(ServerErrorException.class))); + + client.connect(ADAPTER_HOST, ADAPTER_PORT, ar -> { + if (ar.succeeded()) { + final ProtonConnection unopenedConnection = ar.result(); + unopenedConnection.openHandler(remoteOpen -> { + if (remoteOpen.succeeded()) { + final ProtonConnection openConnection = remoteOpen.result(); + + // IF a client opens a sender link with + // AT_LEAST_ONCE QoS (the default in proton) + openConnection.createSender("telemetry/DEFAULT_TENANT/4712") + .openHandler(remoteAttach -> { + if (remoteAttach.succeeded()) { + final ProtonSender sender = remoteAttach.result(); + final JsonObject body = new JsonObject().put("temp", 5); + final Message msg = ProtonHelper.message(body.encode()); + sender.send(msg, delivery -> { + // THEN the adapter notifies the device by sending + // the RELEASED disposition frame to the client. + context.assertEquals(Released.getInstance(), delivery.getRemoteState()); + completionTracker.complete(); + }); + } + }).open(); + } + }).open(); + } else { + context.fail(ar.cause()); + } + }); + completionTracker.awaitSuccess(); + } + +} diff --git a/adapters/amqp-vertx/src/test/java/org/eclipse/hono/adapter/amqp/VertxBasedAmqpProtocolAdapterWithoutClientTest.java b/adapters/amqp-vertx/src/test/java/org/eclipse/hono/adapter/amqp/VertxBasedAmqpProtocolAdapterWithoutClientTest.java new file mode 100644 index 00000000000..14b055a0d66 --- /dev/null +++ b/adapters/amqp-vertx/src/test/java/org/eclipse/hono/adapter/amqp/VertxBasedAmqpProtocolAdapterWithoutClientTest.java @@ -0,0 +1,270 @@ +package org.eclipse.hono.adapter.amqp; + +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyString; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +import java.util.concurrent.TimeUnit; + +import org.apache.qpid.proton.amqp.messaging.AmqpValue; +import org.apache.qpid.proton.message.Message; +import org.eclipse.hono.client.HonoClient; +import org.eclipse.hono.client.MessageSender; +import org.eclipse.hono.client.RegistrationClient; +import org.eclipse.hono.client.TenantClient; +import org.eclipse.hono.config.ProtocolAdapterProperties; +import org.eclipse.hono.service.command.CommandConnection; +import org.eclipse.hono.util.RegistrationConstants; +import org.eclipse.hono.util.ResourceIdentifier; +import org.eclipse.hono.util.TenantConstants; +import org.eclipse.hono.util.TenantObject; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.Timeout; +import org.junit.runner.RunWith; + +import io.vertx.core.AsyncResult; +import io.vertx.core.Future; +import io.vertx.core.Handler; +import io.vertx.core.json.JsonObject; +import io.vertx.ext.unit.Async; +import io.vertx.ext.unit.TestContext; +import io.vertx.ext.unit.junit.VertxUnitRunner; +import io.vertx.proton.ProtonConnection; +import io.vertx.proton.ProtonDelivery; +import io.vertx.proton.ProtonReceiver; +import io.vertx.proton.ProtonServer; + +/** + * Verifies the behaviour of {@link VertxBasedAmqpProtocolAdapter}. In this test setup, the AMQP adapter is not deployed + * as a running Verticle and does not use an AMQP client to simulate a client device. + * Rather, the setup manually instantiates the adapter and gives it a user defined ProtonServer. + */ +@RunWith(VertxUnitRunner.class) +public class VertxBasedAmqpProtocolAdapterWithoutClientTest { + + @Rule + public Timeout globalTimeout = new Timeout(2, TimeUnit.SECONDS); + + private HonoClient tenantServiceClient; + private HonoClient credentialsServiceClient; + private HonoClient messagingServiceClient; + private HonoClient registrationServiceClient; + private CommandConnection commandConnection; + + private RegistrationClient registrationClient; + private TenantClient tenantClient; + + private ProtocolAdapterProperties config; + + /** + * Setups the protocol adapter. + * + * @param context The vert.x test context. + */ + @SuppressWarnings("unchecked") + @Before + public void setup(final TestContext context) { + + tenantClient = mock(TenantClient.class); + + tenantServiceClient = mock(HonoClient.class); + when(tenantServiceClient.connect(any(Handler.class))).thenReturn(Future.succeededFuture(tenantServiceClient)); + when(tenantServiceClient.getOrCreateTenantClient()).thenReturn(Future.succeededFuture(tenantClient)); + + credentialsServiceClient = mock(HonoClient.class); + when(credentialsServiceClient.connect(any(Handler.class))) + .thenReturn(Future.succeededFuture(credentialsServiceClient)); + + messagingServiceClient = mock(HonoClient.class); + when(messagingServiceClient.connect(any(Handler.class))) + .thenReturn(Future.succeededFuture(messagingServiceClient)); + + registrationClient = mock(RegistrationClient.class); + final JsonObject regAssertion = new JsonObject().put(RegistrationConstants.FIELD_ASSERTION, "assert-token"); + when(registrationClient.assertRegistration(anyString(), any())) + .thenReturn(Future.succeededFuture(regAssertion)); + + registrationServiceClient = mock(HonoClient.class); + when(registrationServiceClient.connect(any(Handler.class))) + .thenReturn(Future.succeededFuture(registrationServiceClient)); + when(registrationServiceClient.getOrCreateRegistrationClient(anyString())) + .thenReturn(Future.succeededFuture(registrationClient)); + + commandConnection = mock(CommandConnection.class); + when(commandConnection.connect(any(Handler.class))).thenReturn(Future.succeededFuture(commandConnection)); + + config = new ProtocolAdapterProperties(); + config.setAuthenticationRequired(false); + config.setInsecurePort(4040); + } + + /** + * Verifies that a client provided Proton server instance is used and started by the adapter instead of + * creating/starting a new one. + * + * @param ctx The test context to use for running asynchronous tests. + */ + @SuppressWarnings("unchecked") + @Test + public void testStartUsesClientProvidedAmqpServer(final TestContext ctx) { + // GIVEN an adapter with a client provided Amqp Server + final ProtonServer server = getAmqpServer(); + final VertxBasedAmqpProtocolAdapter adapter = getAdapter(server); + + // WHEN starting the adapter + final Async startup = ctx.async(); + final Future startupTracker = Future.future(); + startupTracker.setHandler(ctx.asyncAssertSuccess(result -> { + startup.complete(); + })); + adapter.start(startupTracker); + + // THEN the client provided server is started + startup.await(); + verify(server).connectHandler(any(Handler.class)); + verify(server).listen(any(Handler.class)); + } + + /** + * Verifies that the AMQP Adapter does not support anonymous relay. + */ + @Test + public void testAnonymousRelayNotSupported() { + // GIVEN an AMQP adapter with a configured server. + final ProtonServer server = getAmqpServer(); + final VertxBasedAmqpProtocolAdapter adapter = getAdapter(server); + + // WHEN the adapter receives a request from a link + // that does not specify a target address + final ProtonReceiver receiver = mock(ProtonReceiver.class); + adapter.handleRemoteReceiverOpen(receiver, mock(ProtonConnection.class)); + + // THEN the adapter closes the link. + verify(receiver).close(); + } + + /** + * Verifies that a request to upload telemetry message (with settled=true) results in the sender sending the message + * without waiting for a response from the downstream peer. + * + * AT_MOST_ONCE delivery semantics + */ + @Test + public void uploadTelemetryMessageWithSettledDeliverySemantics() { + // GIVEN an AMQP adapter with a configured server + final VertxBasedAmqpProtocolAdapter adapter = givenAnAmqpAdapter(); + final MessageSender sender = givenAmessageSenderForAnyTenant(); + + // which is enabled for a tenant + givenAConfiguredTenant("some-tenant", true); + + // IF a device sends a 'fire and forget' telemetry message + final ProtonDelivery delivery = mock(ProtonDelivery.class); + when(delivery.remotelySettled()).thenReturn(true); + + final ResourceIdentifier resource = ResourceIdentifier.from("telemetry", "some-tenant", "the-device"); + adapter.uploadMessage(new AmqpContext(delivery, getFakeMessage(), resource)); + + // THEN the adapter sends the message and does not wait for response from the peer. + verify(sender).send(any(Message.class)); + } + + /** + * Verifies that a request to upload telemetry message (with settled=false) results in the sender sending the + * message and waits for a response from the downstream peer. + * + * AT_LEAST_ONCE delivery semantics. + */ + @Test + public void uploadTelemetryMessageWithNotSettledDeliverySemantics() { + // GIVEN an adapter configured to use a user-define server. + final VertxBasedAmqpProtocolAdapter adapter = givenAnAmqpAdapter(); + final MessageSender sender = givenAmessageSenderForAnyTenant(); + + // which is enabled for a tenant + givenAConfiguredTenant("enabled-tenant", true); + + // IF a device send telemetry data (with un-settled delivery) + final ProtonDelivery delivery = mock(ProtonDelivery.class); + when(delivery.remotelySettled()).thenReturn(false); + final ResourceIdentifier resource = ResourceIdentifier.from("telemetry", "enabled-tenant", "the-device"); + adapter.uploadMessage(new AmqpContext(delivery, getFakeMessage(), resource)); + + // THEN the sender sends the message and waits for the outcome from the downstream peer + verify(sender).sendAndWaitForOutcome(any(Message.class)); + } + + private void givenAConfiguredTenant(final String tenantId, final boolean enabled) { + final TenantObject tenantConfig = TenantObject.from(tenantId, enabled); + if (!enabled) { + tenantConfig.addAdapterConfiguration(new JsonObject() + .put(TenantConstants.FIELD_ADAPTERS_TYPE, "amqp") + .put(TenantConstants.FIELD_ENABLED, enabled)); + } + when(tenantClient.get(tenantId)).thenReturn(Future.succeededFuture(tenantConfig)); + } + + private Message getFakeMessage() { + final Message message = mock(Message.class); + when(message.getContentType()).thenReturn("application/text"); + when(message.getBody()).thenReturn(new AmqpValue("some payload")); + return message; + } + + private MessageSender givenAmessageSenderForAnyTenant() { + final MessageSender sender = mock(MessageSender.class); + when(messagingServiceClient.getOrCreateTelemetrySender(anyString())).thenReturn(Future.succeededFuture(sender)); + return sender; + } + + /** + * Gets an AMQP adapter configured to use a given server. + * + * @return The AMQP adapter instance. + */ + private VertxBasedAmqpProtocolAdapter givenAnAmqpAdapter() { + final ProtonServer server = getAmqpServer(); + return getAdapter(server); + } + + /** + * Creates a protocol adapter for a given AMQP Proton server. + * + * @param server The AMQP Proton server. + * @return The AMQP adapter instance. + */ + private VertxBasedAmqpProtocolAdapter getAdapter(final ProtonServer server) { + final VertxBasedAmqpProtocolAdapter adapter = new VertxBasedAmqpProtocolAdapter(); + + adapter.setConfig(config); + adapter.setInsecureAmqpServer(server); + adapter.setTenantServiceClient(tenantServiceClient); + adapter.setHonoMessagingClient(messagingServiceClient); + adapter.setRegistrationServiceClient(registrationServiceClient); + adapter.setCredentialsServiceClient(credentialsServiceClient); + adapter.setCommandConnection(commandConnection); + return adapter; + } + + /** + * Creates and sets up a ProtonServer instance. + * + * @return The configured server instance. + */ + @SuppressWarnings("unchecked") + private ProtonServer getAmqpServer() { + final ProtonServer server = mock(ProtonServer.class); + when(server.actualPort()).thenReturn(0, 4040); + when(server.connectHandler(any(Handler.class))).thenReturn(server); + when(server.listen(any(Handler.class))).then(invocation -> { + final Handler> handler = (Handler>) invocation.getArgument(0); + handler.handle(Future.succeededFuture(server)); + return server; + }); + return server; + } +}