Skip to content

Commit

Permalink
[#2392] Parameterize the DownstreamMessage with the MessageContext
Browse files Browse the repository at this point in the history
This will prevent client code from having to cast the message context to
the concrete context type specific to the underlying messaging
technology.

Signed-off-by: Kai Hudalla <kai.hudalla@bosch.io>
  • Loading branch information
sophokles73 committed Jan 26, 2021
1 parent e8bec82 commit 02cb919
Show file tree
Hide file tree
Showing 7 changed files with 31 additions and 76 deletions.
Expand Up @@ -13,58 +13,11 @@

package org.eclipse.hono.application.client.amqp;

import java.util.function.Consumer;

import org.eclipse.hono.application.client.ApplicationClientFactory;
import org.eclipse.hono.application.client.DownstreamMessage;
import org.eclipse.hono.application.client.MessageConsumer;

import io.vertx.core.Future;
import io.vertx.core.Handler;
import org.eclipse.hono.util.Lifecycle;

/**
* A factory for creating clients for Hono's AMQP-based northbound APIs.
*/
public interface AmqpApplicationClientFactory extends ApplicationClientFactory {

/**
* Creates a client for consuming data from Hono's northbound <em>Telemetry API</em>.
*
* @param tenantId The tenant to consume data for.
* @param telemetryConsumer The handler to invoke with every message received.
* @param autoAccept {@code true} if received deliveries should be automatically accepted (and settled) after the
* message handler runs for them, if no other disposition has been applied during handling. NOTE: When
* using {@code false} here, make sure that deliveries (from {@link AmqpMessageContext#getDelivery()})
* are quickly updated and settled, so that the messages don't remain <em>in flight</em> for long.
* @param closeHandler The handler invoked when the peer detaches the link.
* @return A future that will complete with the consumer once the link has been established. The future will fail if
* the link cannot be established, e.g. because this factory is not connected.
* @throws NullPointerException if any of the parameters is {@code null}.
*/
Future<MessageConsumer<DownstreamMessage>> createTelemetryConsumer(
String tenantId,
Consumer<DownstreamMessage> telemetryConsumer,
boolean autoAccept,
Handler<Throwable> closeHandler);

/**
* Creates a client for consuming events from Hono's northbound <em>Event API</em>.
*
* @param tenantId The tenant to consume data for.
* @param eventConsumer The handler to invoke with every message received.
* @param autoAccept {@code true} if received deliveries should be automatically accepted (and settled) after the
* message handler runs for them, if no other disposition has been applied during handling. NOTE: When
* using {@code false} here, make sure that deliveries (from {@link AmqpMessageContext#getDelivery()})
* are quickly updated and settled, so that the messages don't remain <em>in flight</em> for long.
* @param closeHandler The handler invoked when the peer detaches the link.
* @return A future that will complete with the consumer once the link has been established. The future will fail if
* the link cannot be established, e.g. because this factory is not connected.
* @throws NullPointerException if any of the parameters is {@code null}.
*/
Future<MessageConsumer<DownstreamMessage>> createEventConsumer(
String tenantId,
Consumer<DownstreamMessage> eventConsumer,
boolean autoAccept,
Handler<Throwable> closeHandler);

public interface AmqpApplicationClientFactory extends ApplicationClientFactory<AmqpMessageContext>, Lifecycle {
}
Expand Up @@ -18,6 +18,6 @@
/**
* A factory for creating clients for Hono's Kafka-based northbound APIs.
*/
public interface KafkaApplicationClientFactory extends ApplicationClientFactory {
public interface KafkaApplicationClientFactory extends ApplicationClientFactory<KafkaMessageContext> {

}
Expand Up @@ -19,12 +19,14 @@
import io.vertx.core.Handler;

/**
* A factory for creating clients for Hono's northbound APIs.
* A factory for creating clients for Hono's north bound APIs.
*
* @param <T> The type of context that messages are being received in.
*/
public interface ApplicationClientFactory {
public interface ApplicationClientFactory<T extends MessageContext> {

/**
* Creates a client for consuming data from Hono's northbound <em>Telemetry API</em>.
* Creates a client for consuming data from Hono's north bound <em>Telemetry API</em>.
* <p>
* The messages passed in to the consumer will be acknowledged automatically if the consumer does not throw an
* exception.
Expand All @@ -38,9 +40,9 @@ public interface ApplicationClientFactory {
* cannot be started.
* @throws NullPointerException if any of the parameters is {@code null}.
*/
Future<MessageConsumer<DownstreamMessage>> createTelemetryConsumer(
Future<MessageConsumer<DownstreamMessage<T>>> createTelemetryConsumer(
String tenantId,
Consumer<DownstreamMessage> telemetryConsumer,
Consumer<DownstreamMessage<T>> telemetryConsumer,
Handler<Throwable> closeHandler);

/**
Expand All @@ -58,9 +60,9 @@ Future<MessageConsumer<DownstreamMessage>> createTelemetryConsumer(
* cannot be started.
* @throws NullPointerException if any of the parameters is {@code null}.
*/
Future<MessageConsumer<DownstreamMessage>> createEventConsumer(
Future<MessageConsumer<DownstreamMessage<T>>> createEventConsumer(
String tenantId,
Consumer<DownstreamMessage> eventConsumer,
Consumer<DownstreamMessage<T>> eventConsumer,
Handler<Throwable> closeHandler);

// TODO add methods for command & control
Expand Down
Expand Up @@ -18,9 +18,12 @@
import io.vertx.core.buffer.Buffer;

/**
* A message of Hono's northbound APIs, flowing from the messaging system to the backend application.
* A message being delivered to an application via Hono's north bound APIs.
*
* @param <T> The type of context that the message is being received in.
*/
public interface DownstreamMessage extends Message {
public interface DownstreamMessage<T extends MessageContext> extends Message<T> {

/**
* Gets the tenant that sent the message.
*
Expand Down Expand Up @@ -50,13 +53,7 @@ public interface DownstreamMessage extends Message {
String getContentType();

/**
* {@inheritDoc}
*/
@Override
MessageContext getMessageContext();

/**
* Gets the quality of service level that the device requested.
* Gets the quality-of-service level used by the device that this message originates from.
*
* @return The QoS.
*/
Expand Down
Expand Up @@ -20,15 +20,17 @@
import io.vertx.core.buffer.Buffer;

/**
* A downstream message of Hono's northbound APIs.
* A downstream message of Hono's north bound APIs.
*
* @param <T> The type of context that the message is being received in.
*/
public class DownstreamMessageImpl implements DownstreamMessage {
public class DownstreamMessageImpl<T extends MessageContext> implements DownstreamMessage<T> {

private final String tenantId;
private final String deviceId;
private final MessageProperties properties;
private final String contentType;
private final MessageContext messageContext;
private final T messageContext;
private final QoS qos;
private final Buffer payload;

Expand All @@ -45,7 +47,7 @@ public class DownstreamMessageImpl implements DownstreamMessage {
* @throws NullPointerException if any of the parameters, except payload, is {@code null}.
*/
public DownstreamMessageImpl(final String tenantId, final String deviceId, final MessageProperties properties,
final String contentType, final MessageContext messageContext, final QoS qos, final Buffer payload) {
final String contentType, final T messageContext, final QoS qos, final Buffer payload) {

Objects.requireNonNull(tenantId);
Objects.requireNonNull(deviceId);
Expand Down Expand Up @@ -84,7 +86,7 @@ public final String getContentType() {
}

@Override
public final MessageContext getMessageContext() {
public final T getMessageContext() {
return messageContext;
}

Expand Down
Expand Up @@ -14,15 +14,16 @@
package org.eclipse.hono.application.client;

/**
* A message of Hono's northbound APIs, exchanged between the messaging system and the backend application.
* A message of Hono's north bound APIs, exchanged between the messaging system and the back end application.
*
* @param <T> The type of context that the message is being received in.
*/
public interface Message {
public interface Message<T extends MessageContext> {

/**
* Gets the message context which is specific for the messaging system in use.
*
* @return The context.
*/
MessageContext getMessageContext();

T getMessageContext();
}
Expand Up @@ -20,7 +20,7 @@
*
* @param <T> The type of messages consumed by this client.
*/
public interface MessageConsumer<T extends Message> {
public interface MessageConsumer<T extends Message<? extends MessageContext>> {

/**
* Closes the client.
Expand Down

0 comments on commit 02cb919

Please sign in to comment.