Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[#930] Add tenant-scoped command consumer #1145

Merged
merged 1 commit into from May 7, 2019

Conversation

Projects
4 participants
@calohmn
Copy link
Contributor

commented Apr 5, 2019

For #930:
This introduces a new command consumer on a tenant-scoped address.

This consumer handler will map the device id in the command message 'To' address to a corresponding 'via' device id (if applicable) and delegate the command either to a local command handler or to a consumer on another protocol adapter instance by way of the AMQP network.
As the existing command consumer address scheme isn't changed (yet), this change doesn't cause any incompatibilities.

Additional changes for getting this to work are in #1156 and #1157.

@calohmn calohmn force-pushed the bsinno:PR/command_delegator branch from 3b15bb0 to ede7818 Apr 8, 2019

@codecov

This comment has been minimized.

Copy link

commented Apr 8, 2019

Codecov Report

Merging #1145 into master will decrease coverage by 1.55%.
The diff coverage is 37.67%.

Impacted file tree graph

@@             Coverage Diff              @@
##             master    #1145      +/-   ##
============================================
- Coverage     69.34%   67.78%   -1.56%     
  Complexity      383      383              
============================================
  Files           276      283       +7     
  Lines         12447    12739     +292     
  Branches       1060     1073      +13     
============================================
+ Hits           8631     8635       +4     
- Misses         3032     3324     +292     
+ Partials        784      780       -4
Impacted Files Coverage Δ Complexity Δ
...e/hono/client/impl/DelegatedCommandSenderImpl.java 0% <0%> (ø) 0 <0> (?)
...n/java/org/eclipse/hono/client/CommandContext.java 63.63% <0%> (-1.99%) 0 <0> (ø)
...main/java/org/eclipse/hono/util/MessageHelper.java 72.18% <0%> (-4.58%) 0 <0> (ø)
...ono/client/impl/DeviceSpecificCommandConsumer.java 100% <100%> (ø) 0 <0> (?)
...rg/eclipse/hono/client/CommandConsumerFactory.java 100% <100%> (ø) 0 <0> (ø) ⬇️
...rg/eclipse/hono/service/AbstractAdapterConfig.java 86% <100%> (+0.28%) 0 <0> (ø) ⬇️
.../org/eclipse/hono/client/impl/CommandConsumer.java 100% <100%> (ø) 0 <0> (ø) ⬇️
...a/org/eclipse/hono/client/impl/AbstractSender.java 67.25% <100%> (-1.5%) 0 <0> (ø)
...hono/client/impl/GatewayMappingCommandHandler.java 13.15% <13.15%> (ø) 0 <0> (?)
...rg/eclipse/hono/client/impl/GatewayMapperImpl.java 23.07% <23.07%> (ø) 0 <0> (?)
... and 21 more

Continue to review full report at Codecov.

Legend - Click here to learn more
Δ = absolute <relative> (impact), ø = not affected, ? = missing data
Powered by Codecov. Last update c800d54...d225b02. Read the comment docs.

@calohmn calohmn force-pushed the bsinno:PR/command_delegator branch from ede7818 to f34ee2e Apr 8, 2019

@calohmn calohmn force-pushed the bsinno:PR/command_delegator branch from f34ee2e to 42ab5a0 Apr 10, 2019

@calohmn calohmn requested review from ctron, dejanb and sophokles73 as code owners Apr 10, 2019

@calohmn calohmn force-pushed the bsinno:PR/command_delegator branch from 42ab5a0 to a40205b Apr 16, 2019

@calohmn

This comment has been minimized.

Copy link
Contributor Author

commented Apr 16, 2019

Rebased and resolved merge conflicts.
I'll still refactor this a bit (aiming to replace the new registrationClientFactory param in the CommandConsumerFactory).

@BobClaerhout

This comment has been minimized.

Copy link
Contributor

commented Apr 26, 2019

Any particular reason why this is not yet merged?
@calohmn: is the refactor big?

@calohmn calohmn force-pushed the bsinno:PR/command_delegator branch from a40205b to eb94407 Apr 26, 2019

@calohmn

This comment has been minimized.

Copy link
Contributor Author

commented Apr 26, 2019

I've done some refactoring:

  • The mapping logic is now in a separate class (GatewayMapper and GatewayMapperImpl) that also implements the ConnectionLifecycle interface. An object of this class is exposed as a bean and autowired in AbstractProtocolAdapterBase. That allows for potentially exchanging the implementation.
  • The GatewayMapperImpl uses the RegistrationClientFactory internally to query the device registry. The HonoConnection for that is a separate one compared to the connection for the existing AbstractProtocolAdapterBase.commandConsumerFactory instance.
  • The command handling logic for mapping the device id and for delegating the command request back to the AMQP network is in separate Handler classes now (GatewayMappingCommandHandler and DelegatingCommandHandler). I'll add unit tests for these once we have agreed on the general class structure.
  • I had to add a new init(GatewayMapper) method to the CommandConsumerFactory interface. I wanted to add the GatewayMapper instance in the constructor, but that constructor is called from the AbstractAdapterConfig class - the AbstractProtocolAdapterBase.gatewayMapper instance can't be used from there.

@calohmn calohmn force-pushed the bsinno:PR/command_delegator branch from eb94407 to 08dfcff Apr 30, 2019

package org.eclipse.hono.client;

/**
* A sender to send delegated command messages.

This comment has been minimized.

Copy link
@sophokles73

sophokles73 Apr 30, 2019

Member

what is a delegated command message?

This comment has been minimized.

Copy link
@calohmn

calohmn Apr 30, 2019

Author Contributor

That is a command message, first retrieved from the tenant-scoped consumer in the protocol adapter, and then delegated back to the AMQP message network so that it can be consumed by the device-specific consumer later.
I can extend the javadoc there.

This comment has been minimized.

Copy link
@calohmn

calohmn Apr 30, 2019

Author Contributor

See commit below.

* The value of the returned Future can be either
* <ul>
* <li>the gateway device id</li>
* <li>{@code null} if the device is configured to be accessed via a gateway but none is set</li>

This comment has been minimized.

Copy link
@sophokles73

sophokles73 Apr 30, 2019

Member

what does that mean?

This comment has been minimized.

Copy link
@calohmn

calohmn Apr 30, 2019

Author Contributor

That is the case where the device is configured with a via property (ie. it is configured to be accessed via a gateway), but the last-via property is not set.
That means that there hasn't been a message sent for this device yet.

This comment has been minimized.

Copy link
@sophokles73

sophokles73 Apr 30, 2019

Member

Ah, can you somehow incorporate that into the JavaDoc?

This comment has been minimized.

Copy link
@calohmn

calohmn Apr 30, 2019

Author Contributor

See commit below.

import io.vertx.core.Future;

/**
* A mapping component that determines the corresponding gateway device id for a given device id.

This comment has been minimized.

Copy link
@sophokles73

sophokles73 Apr 30, 2019

Member

a component that is called Mapper surely maps one thing to the corresponding other one. The interesting information that I would expect in this comment is what is the relation between these things?

This comment has been minimized.

Copy link
@calohmn

calohmn Apr 30, 2019

Author Contributor

Ok, I would rephrase it like this:
A component that maps a given device to the gateway through which data was last published for the given device.

This comment has been minimized.

Copy link
@calohmn

calohmn Apr 30, 2019

Author Contributor

See commit below.


/**
* A wrapper around an AMQP receiver link for consuming commands that will be delegated to the appropriated
* handler/consumer.

This comment has been minimized.

Copy link
@sophokles73

sophokles73 Apr 30, 2019

Member

which one is the appropriate handler/consumer? Why would I need to delegate the command to a handler?

This comment has been minimized.

Copy link
@calohmn

calohmn Apr 30, 2019

Author Contributor

Right, the info is rather vague here.
I would change the javadoc to:

A wrapper around an AMQP receiver link for consuming commands on a tenant-scoped address.
<p>
This class is used by the default {@link CommandConsumerFactory} implementation to receive commands from northbound applications.

What is done with the command message afterwards is out of the scope of this class.
I can extend the javadoc of the CommandConsumerFactoryImpl to describe the handling of command messages there.

This comment has been minimized.

Copy link
@calohmn

calohmn Apr 30, 2019

Author Contributor

See commit below.

* @param message The message to copy.
* @return The message copy.
*/
public static Message getMessageCopyWithCopiedProperties(final Message message) {

This comment has been minimized.

Copy link
@sophokles73

sophokles73 Apr 30, 2019

Member

IMHO this belongs i MessageHelper instead. I would also better like a shorter name ...

This comment has been minimized.

Copy link
@calohmn

calohmn Apr 30, 2019

Author Contributor

Ok, yes, will move it there.
Concerning the name: Well, we could use getCopyWithCopiedProperties (or getCopyWithPropertiesCopy). Not the prettiest, but I'd rather keep the withCopiedProperties part in there to emphasize what the method is doing.

This comment has been minimized.

Copy link
@sophokles73

sophokles73 Apr 30, 2019

Member

why not simply getShallowCopy and let users read the JavaDoc? in the end that is why you wrote it, isn't it?

This comment has been minimized.

Copy link
@calohmn

calohmn Apr 30, 2019

Author Contributor

ok, I can live with that. See commit below.

* @param gatewayMapper The component mapping a command device id to the corresponding gateway device id.
* @throws NullPointerException if gatewayMapper is {@code null}.
*/
void init(GatewayMapper gatewayMapper);

This comment has been minimized.

Copy link
@sophokles73

sophokles73 Apr 30, 2019

Member

why can't we pass in the gateway mapper to the factory method and the CommandConsumerFactoryImpl constructor? You can then override AbstractHonoClientFactory's connect() and disconnect() method to start/sop both connections, or wouldn't that work?

This comment has been minimized.

Copy link
@calohmn

calohmn Apr 30, 2019

Author Contributor

I thought about this, but decided against it.
This would mean that all the ConnectionLifecycle members of AbstractProtocolAdapterBase are connected in the AbstractProtocolAdapterBase.connectToService method - except the gatewayMapper field, where the connect invocation is hidden in the CommandConsumerFactoryImpl class.
This fact plus needing to override the AbstractHonoClientFactory methods makes this solution IMHO inferior to just adding the init(GatewayMapper) method.

This comment has been minimized.

Copy link
@calohmn

calohmn Apr 30, 2019

Author Contributor

Or do you mean this along with removing the gatewayMapper field from the AbstractProtocolAdapterBase class?
I guess that would make exchanging the GatewayMapper implementation in a protocol adapter different from the way it would be done for the other factories.

This comment has been minimized.

Copy link
@sophokles73

sophokles73 Apr 30, 2019

Member

I guess we do not need (direct) access to the gateway mapper from the protocol adapters anyway, do we? Why not make it an implementation detail of the CommandConsumerFactory?

This comment has been minimized.

Copy link
@calohmn

calohmn May 2, 2019

Author Contributor

I've explored this solution a bit more and think we can go with it. I've amended the commit with the corresponding changes.

@calohmn calohmn force-pushed the bsinno:PR/command_delegator branch 3 times, most recently from 1349617 to 0a4527f Apr 30, 2019

@calohmn

This comment has been minimized.

Copy link
Contributor Author

commented May 2, 2019

Rebased to resolve merge conflict.

@calohmn calohmn force-pushed the bsinno:PR/command_delegator branch from 0a4527f to ce00716 May 3, 2019

@@ -63,6 +64,7 @@ static CommandConsumerFactory create(final HonoConnection connection) {
* the cause of the failure</li>
* </ul>
* @throws NullPointerException if any of tenant, device ID or command handler are {@code null}.
* @throws IllegalStateException if init() hasn't been called before.

This comment has been minimized.

Copy link
@sophokles73

sophokles73 May 3, 2019

Member

is this still required now?

This comment has been minimized.

Copy link
@calohmn

calohmn May 6, 2019

Author Contributor

No, I've removed it.

@@ -106,6 +108,7 @@ static CommandConsumerFactory create(final HonoConnection connection) {
* </ul>
* @throws NullPointerException if tenant, device ID or command handler are {@code null}.
* @throws IllegalArgumentException if the checkInterval is negative.
* @throws IllegalStateException if init() hasn't been called before.

This comment has been minimized.

Copy link
@sophokles73

sophokles73 May 3, 2019

Member

is this still required now?

This comment has been minimized.

Copy link
@calohmn

calohmn May 6, 2019

Author Contributor

No, I've removed it.

* This usually involves command messages first retrieved via a tenant-scoped consumer and then delegated back to the
* downstream peer so that they can be consumed by the device-specific consumer.
*/
public interface DelegatedCommandSender extends MessageSender {

This comment has been minimized.

Copy link
@sophokles73

sophokles73 May 3, 2019

Member

is there a reason why this is empty?

This comment has been minimized.

Copy link
@calohmn

calohmn May 6, 2019

Author Contributor

Good point, I've added sendCommandMessage(Command, SpanContext). Better structure now.

*
* @param tenantId The tenant identifier.
* @param deviceId The device identifier.
* @return Future containing the mapped gateway device id or {@code null}.

This comment has been minimized.

Copy link
@sophokles73

sophokles73 May 3, 2019

Member

can the future also be failed?

This comment has been minimized.

Copy link
@calohmn

calohmn May 6, 2019

Author Contributor

I've adapted the javadoc.

span.finish();
result.fail(e);
} else {
sendMessageAndWaitForOutcome(rawMessage, span).setHandler(result.completer());

This comment has been minimized.

Copy link
@sophokles73

sophokles73 May 3, 2019

Member

Future.completer() will be removed in vert.x 4 so I wouldn't use it anymore ...

This comment has been minimized.

Copy link
@calohmn

calohmn May 6, 2019

Author Contributor

Ok, I've changed that.

CommandConstants.COMMAND_ENDPOINT, tenantId, command.getReplyToId());
final Future<ProtonDelivery> protonDeliveryFuture = sender.sendAndWaitForOutcome(
createDelegatedCommandMessage(command.getCommandMessage(), targetAddress, replyToAddress),
commandContext.getTracingContext());

This comment has been minimized.

Copy link
@sophokles73

sophokles73 May 3, 2019

Member

Isn't this the specific command context handling that DelegatedCommandSender could/should encapsulate? Why don't we define a method DelegatedCommandSender.send(CommandContext)?

This comment has been minimized.

Copy link
@calohmn

calohmn May 6, 2019

Author Contributor

Yes, added a method in the interface (see above).

// failed to create sender
LOG.error("failed to create sender for sending command message to downstream peer", cmdSenderResult.cause());
TracingHelper.logError(commandContext.getCurrentSpan(),
new Exception("failed to create sender for sending command message to downstream peer",

This comment has been minimized.

Copy link
@sophokles73

sophokles73 May 3, 2019

Member

I do not really like using Exception ...

This comment has been minimized.

Copy link
@calohmn

calohmn May 6, 2019

Author Contributor

I've removed creating an Exception there.

public void handle(final CommandContext originalCommandContext) {
final Command originalCommand = originalCommandContext.getCommand();
if (!originalCommand.isValid()) {
originalCommandContext.reject(new ErrorCondition(Constants.AMQP_BAD_REQUEST, "malformed command message"));

This comment has been minimized.

Copy link
@sophokles73

sophokles73 May 3, 2019

Member

do we still need to execute the rest of the method if the command is being rejected?
I think we also should re-issue some credit so that the application can send another command, don't you think?

This comment has been minimized.

Copy link
@calohmn

calohmn May 6, 2019

Author Contributor

Oh, thanks for spotting, missed the return statement there.
As for the credits: The TenantScopedCommandConsumer has its receiver link being created with the default (ie. non-empty) prefetchSize, so the credit handling should be automatic (in contrast to the DeviceSpecificCommandConsumer with prefetchSize=0).
And that's because for the tenant-scoped consumer, there are messages received from not just one command sender but multiple senders (for all devices of the tenant).

// lastViaDeviceId is null - device hasn't connected yet
LOG.error("last-via for device {} is not set", originalDeviceId);
TracingHelper.logError(originalCommandContext.getCurrentSpan(), "last-via for device " + originalDeviceId + " is not set");
originalCommandContext.release();

This comment has been minimized.

Copy link
@sophokles73

sophokles73 May 3, 2019

Member

I think we also should re-issue some credit so that the application can send another command, don't you think?

This comment has been minimized.

Copy link
@calohmn

calohmn May 6, 2019

Author Contributor

See above, credit-handling being automatic on that link.

LOG.error("error getting last-via for device {}", originalDeviceId, deviceIdFutureResult.cause());
TracingHelper.logError(originalCommandContext.getCurrentSpan(),
new Exception("error getting last-via for device", deviceIdFutureResult.cause()));
originalCommandContext.release();

This comment has been minimized.

Copy link
@sophokles73

sophokles73 May 3, 2019

Member

I think we also should re-issue some credit so that the application can send another command, don't you think?

This comment has been minimized.

Copy link
@calohmn

calohmn May 6, 2019

Author Contributor

See above, credit-handling being automatic on that link.

@calohmn calohmn force-pushed the bsinno:PR/command_delegator branch from ce00716 to d30c4cb May 6, 2019

@calohmn

This comment has been minimized.

Copy link
Contributor Author

commented May 6, 2019

I've pushed the corresponding changes.
I've also added a TODO in DelegatedCommandSenderImpl.runSendAndWaitForOutcomeOnContext concerning error-handling. To not further increase the size of this PR, I would fix this in a separate PR.

LOG.warn("got unexpected delivery outcome; remote state: {}", delegatedMsgDelivery.getRemoteState());
TracingHelper.logError(commandContext.getCurrentSpan(), "got unexpected delivery outcome; remote state: "
+ delegatedMsgDelivery.getRemoteState());
commandContext.getCurrentSpan().finish();

This comment has been minimized.

Copy link
@sophokles73

sophokles73 May 6, 2019

Member

My understanding is that at this point the application that sent the message is still expecting a disposition frame indicating the outcome of the transfer, isn't it? IMHO we need to invoke any of the CommandContext's methods in order to finish the original message transfer here, i.e. even if we received an unexpected outcome, we should send a disposition with a supported outcome to the application, don't you think?.

It also feels strange that we finish the Span in this case but not in the other cases ...

This comment has been minimized.

Copy link
@calohmn

calohmn May 6, 2019

Author Contributor

Right, I've put a commandContext.release() in there.
Besides the transaction related outcomes (Declared and Transactional), this block can only be reached with a Received delivery state, indicating partial transfer. I guess the latter case would be handled by the proton-j layer below and not be exposed here.

It also feels strange that we finish the Span in this case but not in the other cases ...

In the other cases the span is finished by the CommandContext#accept/reject/etc. methods.

// failed to get last-via
LOG.error("error getting last-via for device {}", originalDeviceId, deviceIdFutureResult.cause());
TracingHelper.logError(originalCommandContext.getCurrentSpan(),
new Exception("error getting last-via for device", deviceIdFutureResult.cause()));

This comment has been minimized.

Copy link
@sophokles73

sophokles73 May 6, 2019

Member

Exception?

My understanding is that we run into this block also, if the GatewayMapper`s connection to the Device Registration service is lost, right?

This comment has been minimized.

Copy link
@calohmn

calohmn May 6, 2019

Author Contributor

Yes, that would be a common cause for an error here. I've added a comment indicating this.

[#930] Add tenant-scoped command consumer.
This introduces a new command consumer on a tenant-scoped address.
This consumer handler will map the device id in the command message
'To' address to a corresponding 'via' device id (if applicable) and
delegate the command either to a local command handler or to a consumer
on another protocol adapter instance by way of the AMQP network.
As the existing command consumer address scheme isn't changed (yet),
this change doesn't cause any incompatibilities.

Signed-off-by: Carsten Lohmann <carsten.lohmann@bosch-si.com>

@calohmn calohmn force-pushed the bsinno:PR/command_delegator branch from d30c4cb to d225b02 May 6, 2019

@sophokles73
Copy link
Member

left a comment

LGTM

@sophokles73

This comment has been minimized.

Copy link
Member

commented May 7, 2019

@dejanb do you also want to take a look?

@sophokles73 sophokles73 added this to In progress in 1.0-M4 via automation May 7, 2019

@sophokles73 sophokles73 added this to the 1.0.0 milestone May 7, 2019

@dejanb

dejanb approved these changes May 7, 2019

@sophokles73 sophokles73 merged commit 8d841a1 into eclipse:master May 7, 2019

2 of 4 checks passed

codecov/patch 37.67% of diff hit (target 69.34%)
Details
codecov/project 67.78% (-1.56%) compared to c800d54
Details
continuous-integration/travis-ci/pr The Travis CI build passed
Details
eclipsefdn/eca The author(s) of the pull request is covered by necessary legal agreements in order to proceed!
Details

1.0-M4 automation moved this from In progress to Done May 7, 2019

@calohmn calohmn deleted the bsinno:PR/command_delegator branch Jun 11, 2019

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
You can’t perform that action at this time.