Skip to content

Commit

Permalink
[#1761] Add tests, documentation and example for the AMQP Adapter Cli…
Browse files Browse the repository at this point in the history
…ent.

Signed-off-by: Abel Buechner-Mihaljevic <abel.buechner-mihaljevic@bosch.io>
  • Loading branch information
b-abel committed Mar 23, 2020
1 parent 7ae1740 commit 636840e
Show file tree
Hide file tree
Showing 9 changed files with 564 additions and 350 deletions.
Expand Up @@ -15,21 +15,29 @@

import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;

import java.util.Collections;
import java.util.Map;

import org.apache.qpid.proton.amqp.messaging.Accepted;
import org.apache.qpid.proton.message.Message;
import org.eclipse.hono.client.impl.HonoClientUnitTestHelper;
import org.eclipse.hono.client.impl.VertxMockSupport;
import org.eclipse.hono.util.MessageHelper;
import org.junit.jupiter.api.BeforeEach;
import org.mockito.ArgumentCaptor;

import io.opentracing.Span;
import io.opentracing.SpanContext;
import io.opentracing.Tracer;
import io.vertx.core.Future;
import io.vertx.core.Handler;
import io.vertx.core.Vertx;
import io.vertx.proton.ProtonDelivery;
import io.vertx.proton.ProtonSender;

/**
Expand All @@ -38,50 +46,82 @@
*/
public abstract class AbstractAmqpAdapterClientDownstreamSenderTestBase {

protected final String tenantId = "test-tenant";
protected final String deviceId = "test-device";
protected final String contentType = "text/plain";
protected final byte[] payload = "test-value".getBytes();
protected final String testPropertyKey = "test-key";
protected final String testPropertyValue = "test-value";
protected final Map<String, ?> applicationProperties = Collections.singletonMap(testPropertyKey, testPropertyValue);
protected static final String TENANT_ID = "test-tenant";
protected static final String DEVICE_ID = "test-device";
protected static final String CONTENT_TYPE = "text/plain";
protected static final byte[] PAYLOAD = "test-value".getBytes();
protected static final String TEST_PROPERTY_KEY = "test-key";
protected static final String TEST_PROPERTY_VALUE = "test-value";
protected static final Map<String, String> APPLICATION_PROPERTIES = Collections.singletonMap(TEST_PROPERTY_KEY,
TEST_PROPERTY_VALUE);

protected ProtonSender sender;
protected HonoConnection connection;
protected ArgumentCaptor<Message> messageArgumentCaptor;
protected ProtonDelivery protonDelivery;
protected Tracer.SpanBuilder spanBuilder;

/**
* Sets up fixture.
*/
@BeforeEach
public void setUp() {
sender = HonoClientUnitTestHelper.mockProtonSender();
final Vertx vertx = mock(Vertx.class);
connection = HonoClientUnitTestHelper.mockHonoConnection(vertx);

protonDelivery = mock(ProtonDelivery.class);
when(protonDelivery.remotelySettled()).thenReturn(true);
final Accepted deliveryState = new Accepted();
when(protonDelivery.getRemoteState()).thenReturn(deliveryState);

when(sender.send(any(Message.class), VertxMockSupport.anyHandler())).thenReturn(protonDelivery);

final Span span = mock(Span.class);
when(span.context()).thenReturn(mock(SpanContext.class));
spanBuilder = HonoClientUnitTestHelper.mockSpanBuilder(span);

final Tracer tracer = mock(Tracer.class);
when(tracer.buildSpan(anyString())).thenReturn(spanBuilder);

connection = HonoClientUnitTestHelper.mockHonoConnection(mock(Vertx.class));

when(connection.getTracer()).thenReturn(tracer);
when(connection.createSender(any(), any(), any())).thenReturn(Future.succeededFuture(sender));
messageArgumentCaptor = ArgumentCaptor.forClass(Message.class);

}

/**
* Updates the disposition for the {@link ProtonSender#send(Message, Handler)} operation.
*/
@SuppressWarnings("unchecked")
protected void updateDisposition() {
final ArgumentCaptor<Handler<ProtonDelivery>> dispositionHandlerCaptor = ArgumentCaptor.forClass(Handler.class);
verify(sender).send(any(Message.class), dispositionHandlerCaptor.capture());
dispositionHandlerCaptor.getValue().handle(protonDelivery);
}

/**
* Executes the assertions that checkthat the message produced conforms to the expectations of the AMQP adapter.
*
* Executes the assertions that check that the message created by the client conforms to the expectations of the
* AMQP adapter.
*
* @param expectedAddress The expected target address.
* @return The captured message.
*/
protected void assertMessageConformsAmqpAdapterSpec(final String expectedAddress) {
protected Message assertMessageConformsAmqpAdapterSpec(final String expectedAddress) {

final ArgumentCaptor<Message> messageArgumentCaptor = ArgumentCaptor.forClass(Message.class);
verify(sender).send(messageArgumentCaptor.capture(), any());

final Message message = messageArgumentCaptor.getValue();

assertThat(message.getAddress()).isEqualTo(expectedAddress);

assertThat(MessageHelper.getPayloadAsString(message)).isEqualTo(new String(payload));
assertThat(message.getContentType()).isEqualTo(contentType);
assertThat(MessageHelper.getPayloadAsString(message)).isEqualTo(new String(PAYLOAD));
assertThat(message.getContentType()).isEqualTo(CONTENT_TYPE);

assertThat(message.getApplicationProperties().getValue().get(TEST_PROPERTY_KEY)).isEqualTo(TEST_PROPERTY_VALUE);

final Map<String, Object> applicationProperties = message.getApplicationProperties().getValue();
assertThat(applicationProperties.get(testPropertyKey)).isEqualTo(testPropertyValue);
assertThat(message.getApplicationProperties().getValue().get("device_id")).isEqualTo(DEVICE_ID);

assertThat(applicationProperties.get("device_id")).isEqualTo(deviceId);
return message;
}

}
Expand Up @@ -14,43 +14,120 @@
package org.eclipse.hono.client.device.amqp;

import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify;

import org.apache.qpid.proton.message.Message;
import org.eclipse.hono.client.AbstractAmqpAdapterClientDownstreamSenderTestBase;
import org.eclipse.hono.client.device.amqp.internal.AmqpAdapterClientCommandResponseSender;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;

import io.opentracing.SpanContext;
import io.vertx.core.Future;
import io.vertx.junit5.VertxExtension;
import io.vertx.junit5.VertxTestContext;
import io.vertx.proton.ProtonDelivery;

/**
* Verifies behavior of {@link CommandResponder}.
*
*/
@ExtendWith(VertxExtension.class)
public class CommandResponderTest extends AbstractAmqpAdapterClientDownstreamSenderTestBase {

private static final String ADDRESS = "command_response/" + TENANT_ID + "/" + DEVICE_ID + "/123";
private static final String CORRELATION_ID = "0";
private static final int STATUS = 200;

/**
* Verifies that the message produced conforms to the expectations of the AMQP adapter.
* Verifies that the message created by the client conforms to the expectations of the AMQP adapter.
*
* @param ctx The test context to use for running asynchronous tests.
*/
@Test
public void testMessageIsValid() {
public void testSendCommandResponseCreatesValidMessage(final VertxTestContext ctx) {

// GIVEN a CommandResponder instance
final CommandResponder commandResponder = AmqpAdapterClientCommandResponseSender
.createWithAnonymousLinkAddress(connection, tenantId, s -> {
}).result();
final CommandResponder commandResponder = createCommandResponder();

// WHEN sending a message using the API...
final Future<ProtonDelivery> deliveryFuture = commandResponder.sendCommandResponse(DEVICE_ID,
ADDRESS, CORRELATION_ID, STATUS, PAYLOAD, CONTENT_TYPE, APPLICATION_PROPERTIES);

// ...AND WHEN the disposition is updated by the peer
updateDisposition();

deliveryFuture.setHandler(ctx.succeeding(delivery -> {
// THEN the AMQP message conforms to the expectations of the AMQP protocol adapter
ctx.verify(this::assertMessageConformsAmqpAdapterSpec);
ctx.completeNow();
}));
}

/**
* Verifies that {@link TraceableCommandResponder} uses the given SpanContext.
*
* @param ctx The test context to use for running asynchronous tests.
*/
@Test
public void testSendCommandResponseWithTracing(final VertxTestContext ctx) {

// GIVEN a TraceableCommandResponder instance
final TraceableCommandResponder commandResponder = ((TraceableCommandResponder) createCommandResponder());

// WHEN sending a message using the API...
final SpanContext spanContext = mock(SpanContext.class);
final Future<ProtonDelivery> deliveryFuture = commandResponder.sendCommandResponse(DEVICE_ID,
ADDRESS, CORRELATION_ID, STATUS, PAYLOAD, CONTENT_TYPE, APPLICATION_PROPERTIES, spanContext);

// ...AND WHEN the disposition is updated by the peer
updateDisposition();

deliveryFuture.setHandler(ctx.succeeding(delivery -> {
// THEN the given SpanContext is used
ctx.verify(() -> {
verify(spanBuilder).addReference(any(), eq(spanContext));
assertMessageConformsAmqpAdapterSpec();
});
ctx.completeNow();
}));
}

/**
* Verifies that sending the command response waits for the disposition update from the peer.
*
* @param ctx The test context to use for running asynchronous tests.
* @throws InterruptedException if test is interrupted while waiting.
*/
@Test
public void testSendingWaitsForDispositionUpdate(final VertxTestContext ctx) throws InterruptedException {

// GIVEN a CommandResponder instance
final CommandResponder commandResponder = createCommandResponder();

// WHEN sending a message using the API
final String targetAddress = "command_response/test-tenant/test-device/123";
final String correlationId = "0";
final int status = 200;
final Future<ProtonDelivery> deliveryFuture = commandResponder.sendCommandResponse(deviceId, targetAddress,
correlationId, status, payload, contentType,
applicationProperties);
final Future<ProtonDelivery> deliveryFuture = commandResponder.sendCommandResponse(DEVICE_ID, ADDRESS,
CORRELATION_ID, STATUS, PAYLOAD, CONTENT_TYPE, APPLICATION_PROPERTIES);

// THEN the AMQP message produces by the client conforms to the expectations of the AMQP protocol adapter
assertThat(deliveryFuture.succeeded());
deliveryFuture.setHandler(ctx.completing());

assertMessageConformsAmqpAdapterSpec(targetAddress);
// THEN the future waits for the disposition to be updated by the peer
Thread.sleep(100L);
assertThat(deliveryFuture.isComplete()).isFalse();
updateDisposition();
}

private CommandResponder createCommandResponder() {
return AmqpAdapterClientCommandResponseSender.createWithAnonymousLinkAddress(connection, TENANT_ID, s -> {
}).result();
}

private void assertMessageConformsAmqpAdapterSpec() {
final Message message = assertMessageConformsAmqpAdapterSpec(ADDRESS);
assertThat(message.getCorrelationId()).isEqualTo(CORRELATION_ID);
assertThat(message.getApplicationProperties().getValue().get("status")).isEqualTo(STATUS);
}
}
Expand Up @@ -14,40 +14,113 @@
package org.eclipse.hono.client.device.amqp;

import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify;

import org.eclipse.hono.client.AbstractAmqpAdapterClientDownstreamSenderTestBase;
import org.eclipse.hono.client.device.amqp.internal.AmqpAdapterClientEventSenderImpl;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;

import io.opentracing.SpanContext;
import io.vertx.core.Future;
import io.vertx.junit5.VertxExtension;
import io.vertx.junit5.VertxTestContext;
import io.vertx.proton.ProtonDelivery;

/**
* Verifies behavior of {@link EventSender}.
*
*/
@ExtendWith(VertxExtension.class)
public class EventSenderTest extends AbstractAmqpAdapterClientDownstreamSenderTestBase {

private static final String ADDRESS = "event/" + TENANT_ID + "/" + DEVICE_ID;

/**
* Verifies that the message produced conforms to the expectations of the AMQP adapter.
* Verifies that the message created by the client conforms to the expectations of the AMQP adapter.
*
* @param ctx The test context to use for running asynchronous tests.
*/
@Test
public void testMessageIsValid() {
public void testSendCreatesValidMessage(final VertxTestContext ctx) {

// GIVEN a EventSender instance
final EventSender eventSender = AmqpAdapterClientEventSenderImpl
.createWithAnonymousLinkAddress(connection, tenantId, s -> {
}).result();
final EventSender eventSender = createEventSender();

// WHEN sending a message using the API...
final Future<ProtonDelivery> deliveryFuture = eventSender.send(DEVICE_ID, PAYLOAD, CONTENT_TYPE,
APPLICATION_PROPERTIES);

// ...AND WHEN the disposition is updated by the peer
updateDisposition();

deliveryFuture.setHandler(ctx.succeeding(delivery -> {
// THEN the AMQP message conforms to the expectations of the AMQP protocol adapter
ctx.verify(() -> assertMessageConformsAmqpAdapterSpec(ADDRESS));
ctx.completeNow();
}));
}

/**
* Verifies that {@link TraceableEventSender} uses the given SpanContext.
*
* @param ctx The test context to use for running asynchronous tests.
*/
@Test
public void testSendWithTracing(final VertxTestContext ctx) {

// GIVEN a EventSender instance
final TraceableEventSender eventSender = ((TraceableEventSender) createEventSender());

// WHEN sending a message using the API
final Future<ProtonDelivery> deliveryFuture = eventSender.send(deviceId, payload, contentType,
applicationProperties);
// WHEN sending a message using the API...
final SpanContext spanContext = mock(SpanContext.class);
final Future<ProtonDelivery> deliveryFuture = eventSender.send(DEVICE_ID, PAYLOAD, CONTENT_TYPE,
APPLICATION_PROPERTIES, spanContext);

// THEN the AMQP message produces by the client conforms to the expectations of the AMQP protocol adapter
assertThat(deliveryFuture.succeeded());
// ...AND WHEN the disposition is updated by the peer
updateDisposition();

assertMessageConformsAmqpAdapterSpec("event" + "/" + tenantId + "/" + deviceId);
deliveryFuture.setHandler(ctx.succeeding(delivery -> {
// THEN the given SpanContext is used
ctx.verify(() -> {
verify(spanBuilder).addReference(any(), eq(spanContext));
assertMessageConformsAmqpAdapterSpec(ADDRESS);
});
ctx.completeNow();
}));
}

/**
* Verifies that sending the message waits for the disposition update from the peer.
*
* @param ctx The test context to use for running asynchronous tests.
* @throws InterruptedException if test is interrupted while waiting.
*/
@Test
public void testSendWaitsForDispositionUpdate(final VertxTestContext ctx)
throws InterruptedException {

// GIVEN a EventSender instance
final EventSender eventSender = createEventSender();

// WHEN sending a message using the API...
final Future<ProtonDelivery> deliveryFuture = eventSender.send(DEVICE_ID, PAYLOAD, CONTENT_TYPE,
APPLICATION_PROPERTIES);

deliveryFuture.setHandler(ctx.completing());

// THEN the future waits for the disposition to be updated by the peer
Thread.sleep(100L);
assertThat(deliveryFuture.isComplete()).isFalse();
updateDisposition();
}

private EventSender createEventSender() {
return AmqpAdapterClientEventSenderImpl.createWithAnonymousLinkAddress(connection, TENANT_ID, s -> {
}).result();
}

}

0 comments on commit 636840e

Please sign in to comment.