Skip to content

Commit

Permalink
[#629] Closed command links are only logged and not reopened anymore.
Browse files Browse the repository at this point in the history
Other strategies can be implemented by overwriting the provided method
in subclasses.

Signed-off-by: Karsten Frank <Karsten.Frank@bosch-si.com>
  • Loading branch information
sysexcontrol committed Jun 12, 2018
1 parent 068f0e9 commit 1f1dd5a
Show file tree
Hide file tree
Showing 2 changed files with 19 additions and 31 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -709,7 +709,7 @@ private Future<Handler<Void>> openCommandReceiverLink(final RoutingContext ctx,
deviceId, commandHandler);

createCommandConsumer(tenant, deviceId, commandMessageConsumer, v ->
this.onCloseCommandConsumer(tenant, deviceId, commandMessageConsumer, messageConsumerRef)
onCloseCommandConsumer(tenant, deviceId, commandMessageConsumer)
).map(messageConsumer -> {
// remember message consumer for later usage
messageConsumerRef.set(messageConsumer);
Expand Down Expand Up @@ -746,36 +746,6 @@ private Future<Handler<Void>> openCommandReceiverLink(final RoutingContext ctx,
return resultWithLinkCloseHandler;
}

/**
* Recreate the command consumer for receiving commands.
* <p>
* Intended to be invoked when the close handler of the {@link org.eclipse.hono.service.command.CommandConsumer} was called.
*
* @param tenant The tenant of the device for that a command may be received.
* @param deviceId The id of the device for that a command may be received.
* @param commandMessageConsumer The Handler that will be called for each command to the device.
* @param messageConsumerRef The reference that will be set to the new message consumer that is returned for the recreated link.
*/
private void onCloseCommandConsumer(final String tenant, final String deviceId,
final BiConsumer<ProtonDelivery, Message> commandMessageConsumer,
final AtomicReference<MessageConsumer> messageConsumerRef) {

LOG.debug("Command consumer was closed.");

vertx.setTimer(DEFAULT_REOPEN_COMMAND_CONSUMER_TIMEOUT_MILLIS, reconnect -> {
LOG.debug("Attempting to recreate command consumer ...");
createCommandConsumer(tenant, deviceId, commandMessageConsumer, v -> {
this.onCloseCommandConsumer(tenant, deviceId, commandMessageConsumer, messageConsumerRef);
}).map(messageConsumer -> {
// remember message consumer for later usage
messageConsumerRef.set(messageConsumer);
// let only one command reach the adapter (may change in the future)
messageConsumer.flow(1);
LOG.debug("Recreated command consumer.");
return null;
});
});
}

private static Integer getQoSLevel(final String qosValue) {
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -898,4 +898,22 @@ protected boolean isPayloadOfIndicatedType(final Buffer payload, final String co
return !EventConstants.CONTENT_TYPE_EMPTY_NOTIFICATION.equals(contentType);
}
}

/**
* This method may be set as the close handler of the {@link org.eclipse.hono.service.command.CommandConsumer}.
* <p>
* The implementation only logs that the link was closed and does not try to reopen it. Any other functionality must be
* implemented by overwriting the method in a subclass.
*
* @param tenant The tenant of the device for that a command may be received.
* @param deviceId The id of the device for that a command may be received.
* @param commandMessageConsumer The Handler that will be called for each command to the device.
*/
protected void onCloseCommandConsumer(final String tenant, final String deviceId,
final BiConsumer<ProtonDelivery, Message> commandMessageConsumer
) {
LOG.debug("Command consumer was closed for [tenantId: {}, deviceId: {}] - no command will be received for this request anymore.",
tenant, deviceId);
}

}

0 comments on commit 1f1dd5a

Please sign in to comment.