New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[#643] AMQP adapter - support only anonymous relay for publishing messages #727
[#643] AMQP adapter - support only anonymous relay for publishing messages #727
Conversation
* | ||
* @return The tenant identifier. | ||
*/ | ||
String getTenantId() { | ||
return resource.getTenantId(); | ||
return isDeviceAuthenticated() ? authenticatedDevice.getTenantId() : resource.getTenantId(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
While this will probably yield the intended result, I would rather try to make sure in the receiver open handler that the receiver link's target address actually contains a tenant and device if the device is not authenticated. For an authenticated device, IMHO it should also check, if the target address contains either no tenant and device ID at all or a tenant ID that matches the authenticated device (this is relevant for gateways connecting to the adapter).
The receiver open handler seems to suggest, that anonymous relay mode is not supported. However, I think that for gateway mode, it would actually be very helpful to support this because otherwise a gateway would need to create separate links for each device it wants to publish data for.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
However, I think that for gateway mode, it would actually be very helpful to support this because otherwise a gateway would need to create separate links for each device it wants to publish data for.
But shouldn't the gateway not indicate at least the endpoint
that it wants to publish data to (either telemetry or event)?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
what would be the benefit?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
to identify if the gateway wants to publish an event or a telemetry message. or maybe am missing something?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
the endpoint, tenant and device ID will be contained in the message's to property in that case, i.e. you can determine with each message being received, whether it is a telemetry message or an event ...
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ok, i see now.. thanks!
i think i will have to find a different CLI client for testing with gateways, since qpid-send
mandates that the link address be provided via the -a
option
* | ||
* @return The device identifier. | ||
*/ | ||
String getDeviceId() { | ||
return resource.getResourceId(); | ||
return isDeviceAuthenticated() ? authenticatedDevice.getDeviceId() : resource.getResourceId(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For a gateway connecting to the adapter, this will probably not return the intended value. A gateway usually will publish data on behalf of another device (of the same tenant), therefore the device ID from the link's target address will usually not be the same as the device ID of the authenticated device.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
humm.. ok, i will have to think about this a little bit
@sophokles73: PR updated with a17fab9. i plan to add 3+ test cases once this looks okay |
if (receiver.getRemoteTarget() == null) { | ||
final Device authenticatedDevice = conn.attachments().get(AmqpAdapterConstants.KEY_CLIENT_DEVICE, | ||
Device.class); | ||
if (authenticatedDevice == null && receiver.getRemoteTarget() == null) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
based on my understanding of the AMQP 1.0 spec, the target might actually be non-null but its address property might be. I think you also need to cover that case.
IMHO it would also improve readabilty if all checks regarding the endpoint would be in one place only, in this case in validateEndpoint... IIRC you already have created a mapping of ServiceInvocationExceptions to AmqpErrors, haven't you?
if (address.getTenantId() == null || address.getResourceId() == null) { | ||
receiver.setCondition(ProtonHelper.condition(AmqpError.NOT_ALLOWED, String.format("Invalid Address [%s] for unauthenticated device", address))); | ||
receiver.close(); | ||
addressCheck.fail("Invalid Address for unauthenticated device"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we usually fail futures with an exception, e.g. a ClientErrorException in this case ..
receiver.setCondition(ProtonHelper.condition(AmqpError.UNAUTHORIZED_ACCESS, | ||
"cannot publish data for device of other tenant")); | ||
receiver.close(); | ||
addressCheck.fail("cannot publish data for device of other tenant"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
see above
"Event endpoint only supports AT_LEAST_ONCE delivery semantics.")); | ||
} else { | ||
result.complete(address); | ||
if (receiver.getRemoteTarget() == null) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
again, I think that it might also happen that the Target is not null but its address property is
@@ -77,6 +86,10 @@ String getTenantId() { | |||
* @return The device identifier. | |||
*/ | |||
String getDeviceId() { | |||
if (receiverTarget == null) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
don't we have all necessary information available in the constructor to properly initialize the fields? Why do we need to do additional computations in the accessors? This class is supposed to be immutable, isn't it?
Doing all initializaiton and computation in the constructor would help very much with documenting and understanding the different cases that are possible, don't you think?
if (receiverTarget == null) { | ||
// link possibly opened by an "authenticated" gateway component via anonymous relay | ||
// TODO: validate the address in the message's _to property | ||
this.resource = ResourceIdentifier.fromString(message.getProperties().getTo()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
what if to is empty?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@sophokles73: this is really an amazing feedback, thanks! i will not respond to all the comments since I agree with everything said :-) will update the PR
@sophokles73: PR updated with 0ade23d |
}); | ||
} | ||
|
||
private Future<Void> validateAddress(final ResourceIdentifier address, final Device authenticatedDevice) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this should be validateTargetAddress
private Future<Void> validateAddress(final ResourceIdentifier address, final Device authenticatedDevice) { | ||
final Future<Void> addressCheck = Future.future(); | ||
if (address == null) { | ||
addressCheck.complete(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this is only here for the case that the device didn't provide an address at all. IMHO this should be handled in validateEndpoint already.
"fail to establish a receiving link with client [{}] due to an invalid endpoint [{}]." | ||
+ " closing the link", | ||
receiver.getName(), receiver.getRemoteQoS()); | ||
receiver.setCondition(AmqpContext.getErrorCondition((ServiceInvocationException) t)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
how do you know that it is a ServiceInvocationException
?
AmqpContext.getErrorCondition()
should be able to do this check ...
* @param to The property to validate. | ||
* @return A future indicating the outcome of the validation operation. | ||
*/ | ||
Future<Void> validateMessageProperty(final String to) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this doesn't seem to be a generic method for validating all sorts of properties but instead it looks like it is specifically written to check the message's address, right?
// TODO: validate the address in the message's _to property | ||
this.resource = ResourceIdentifier.fromString(message.getProperties().getTo()); | ||
validateMessageProperty(message.getProperties().getTo()).compose(ok -> { | ||
resource = ResourceIdentifier.fromString(message.getProperties().getTo()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
message.getAddress()
resource = ResourceIdentifier.fromString(message.getProperties().getTo()); | ||
return Future.succeededFuture(); | ||
}).recover(t -> { | ||
handleFailure((ServiceInvocationException) t); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can't handleFailure make a check if it's a ServiceInvocationException
?
@@ -184,7 +191,7 @@ boolean isRemotelySettled() { | |||
* @param error The service invocation error. | |||
* @return The ErrorCondition. | |||
*/ | |||
private ErrorCondition getErrorCondition(final ServiceInvocationException error) { | |||
static ErrorCondition getErrorCondition(final ServiceInvocationException error) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
param should better be a Throwable
...
* @param to The property to validate. | ||
* @return A future indicating the outcome of the validation operation. | ||
*/ | ||
Future<Void> validateMessageProperty(final String to) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this method should not return a Future. The instance created by the constructor should be initialized once the constructor returns, shouldn't it?
why not simply return void and declare a ServiceInvocationException
being thrown in case of a failing check?
@sophokles73: thanks! a02d085 incorporates your latest comments |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Having followed this trail of changes now for I while, I now think that it doesn't make much sense to allow devices to open a link with a target address set at all. It only seems to complicate things when trying to distinguish the different cases and roles that the device might be in. It seems to be much easier to follow the pattern that the MQTT adapter uses, i.e. a device connects and is either authenticated or not. When it publishes a message, we check if the device is allowed to do so or not. However, we always do the check based on the message's identity and the message address (analogously to the topic name in MQTT) instead of also checking the link's target address. IMHO that would make the code for authorizing a message much easier to implement and comprehend. Take some time to chew on this and let me know what you think...
} | ||
|
||
} else if (isDeviceAuthenticated()) { | ||
this.resource = ResourceIdentifier.from(receiverTarget.getAddress(), authenticatedDevice.getTenantId(), authenticatedDevice.getDeviceId()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is there a reason why the target address can only contain the endpoint name in this case?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yes, for authenticated devices(non-gateway), the endpointName
is specified in the address
} else { | ||
final ErrorCondition condition = getErrorCondition(error); | ||
MessageHelper.rejected(delivery, condition); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
what if it's not a ServiceInvocationException
? Don't we need to do anything in that case, e.g. reject the message?
final Future<Void> propertyCheck = Future.future(); | ||
if (to == null || to.isEmpty()) { | ||
propertyCheck.fail(new ClientErrorException(HttpURLConnection.HTTP_BAD_REQUEST, "Message _to propery cannot be null or empty")); | ||
void validateMessageAddress(final String address) throws ServiceInvocationException { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is there a reason for this method to be package private instead of private?
I am asking because if this is package private then the JavaDocs are basically useless because they do not give any clue regarding the validation that is actually taking place. In particular, the first null check only makes sense in the context of the target address being null
as well, right? So, for this method to return a reasonable result, you actually need to have done some checks before as well, right? This would probably be fine, though, if the method was private, so that it cannot be invoked out of the context of this class.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
sorry, this method was intended to be a private method of the class
Okay, actually this was what I thought we were trying to achieve using the link's target address. for me, the link's target address was treated like a topic name..
why the message's identity? shouldn't we only consider the message's address?
am confused by this statement. you mentioned previously that we should not allow the link's target to be set at all, right? so why also check the link's target? |
typo, meant device's identity |
That doesn't match the MQTT pattern, though. In MQTT there is no such thing as a link. There is a connection (not scoped to any topic) and there are messages being published to arbitrary topics. So, in order to get closer to the MQTT pattern, we should try to get the link's target address out of the way, agreed? |
You are currently checking the target address, aren't you? I was suggesting that we should not do that anymore, thus, instead of checking the link's target address (as we do now), we should only check the device's identity and the message address instead. |
👍
Okay, but the another thing: if the link's target is specified, we simply ignore but not reject a link? or should we reject and close the link when the target address is specified? |
Yes, for an unauthenticated device we take the tenant and device ID from the message address. We have no way of authorizing the message in any way in this case. But this is what the operator of the protocol adapter actually wanted to do: allow every device to publish data on behalf of every device. So be it.
Yes, but we still need to authorize the publishing in the context of the authenticated device's identity (which in this case is the gateway). We already do this in the other protocol adapters as well ... |
what's your opinion on that? |
i think we can ignore and log a message (at info level) that the target address is not the address used for publishing messages. we only reject if the message's address is empty (or null). if both the target and message address is specified, we only pick the message address.. i think this will simplify the check on the adapter side. WDYT? let's summarize the different cases:
Is this correct for all three cases? |
I don't get it. when a device opens a link, there is no message (yet). Do you want to accept (open) a link that contains a target address or reject (close) the link? |
yes, we accept (open) the link even if it does not contain a target address. we simply ignore whatever address was set as the target address |
@sophokles73: maybe we should not completely ignore the link's target address. what am deciding now is this:
WDYT? |
FMPOV it should be the other way around: we reject links that contain a target address because accepting such a link might encourage the client (device) to believe that the given target address will have any meaning. I think we should only accept links that do not contain a target address. |
Ok, thanks! you can safely ignore my previous comment :-) |
@sophokles73: i just realized that using this patch might grow a bit larger if it should include those changes. should i create a separate PR for the ITs (and remove the ITs for this patch)? |
yes, that would probably make sense. |
50d0e10
to
adc1344
Compare
@sophokles73: PR updated with adc1344 . i still have to add unit tests for the message address for authenticated clients |
adc1344
to
00aae3e
Compare
if (receiver.getRemoteTarget().getAddress() != null && !receiver.getRemoteTarget().getAddress().isEmpty()) { | ||
LOG.debug("Closing link due to the present of Target [address : {}]", receiver.getRemoteTarget().getAddress()); | ||
} | ||
receiver.setCondition( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why don't you allow a Target without an address?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
i see now... its just for debugging purposes (just to log the address at debug level).. so if the target is not null (and the address is provided), then the log message should contain the specified target address
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
now i understand what you mean 👍
receiver.setQoS(receiver.getRemoteQoS()); | ||
if (ProtonQoS.AT_LEAST_ONCE.equals(receiver.getRemoteQoS())) { | ||
// disable auto-accept for this transfer model. | ||
// in this case, the adapter will apply the required disposition | ||
receiver.setAutoAccept(false); | ||
} | ||
receiver.handler((delivery, message) -> { | ||
uploadMessage(new AmqpContext(delivery, message, receiver.getRemoteTarget(), authenticatedDevice)); | ||
uploadMessage(receiver.getRemoteQoS(), new AmqpContext(delivery, message, authenticatedDevice)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
the device/gateway will use this link to send telemetry and/or event messages using both at least once and at most once. The QoS indicated on link establishment therefore is not very useful. Instead, you need to determine the QoS for each message based on whether the message is already settled or not ...
@sophokles73: PR updated with 0939fae |
@sophokles73: thanks for all the feedback! would be nice if we can have this patch out of the way before end of working day today :-) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
maybe you can get some inspiration from the way AbstractVertxBasedMqttProtocolAdapter and VertxBasedMqttProtocolAdapter do the address/topic checking before a message is being processed. My feeling is that the AMQP adapter can basically do the same thing ...
validateMessageAddress(message.getAddress()); | ||
this.resource = ResourceIdentifier.fromString(message.getAddress()); | ||
} catch (ServiceInvocationException sie) { | ||
handleFailure(sie); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this means that the result of invoking the constructor is the message having been rejected/released already without the calling code being able to know about this fact, right? That seems very strange.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yes, indeed! will update the PR
@sophokles73: PR updated with 3904c2e |
432e12b
to
ede8f3d
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This looks much, much cleaner and easier to follow through :-) Big improvement!
} | ||
|
||
formalCheck.compose(ok -> { | ||
CompositeFuture.all(contentTypeCheck, validateEndpoint(context)).compose(ok -> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is there a reason why you want to validate the endpoint as part of the uploadMessage method instead of doing it as part of the message address check like the MQTT adapter does it? It helps people to understand the different adapters more easily if we use similar patterns in the adapters. So, unless there are any compelling reasons to do otherwise, IMHO you should follow the pattern used in the MQTT adapter, i.e. check the endpoint (and QoS) of the incoming message and then validate the message's address before handing it over to the uploadMessage method. WDYT?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
the problem is that validating the endpoint requires a context instance which is already created after validating the address. let me see how to refactor that
@sophokles73 : PR updated with caa7de0 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This looks really good now, don't you think? Clean, concise, easy to understand 👍
just some small issues ...
receiver.setCondition( | ||
ProtonHelper.condition(AmqpError.NOT_ALLOWED, "anonymous relay not supported by this adapter")); | ||
AmqpContext.getErrorCondition(new ClientErrorException(HttpURLConnection.HTTP_BAD_REQUEST, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
shouldn't this be in an else
branch?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
humm.. i don't think so. once we enter that block, then a target address is already set
@@ -390,7 +390,7 @@ protected void uploadMessage(final AmqpContext context) { | |||
if (tenantObject.isAdapterEnabled(getTypeName())) { | |||
|
|||
final MessageSender sender = senderFuture.result(); | |||
final ResourceIdentifier resource = ResourceIdentifier.from(endpointName, context.getTenantId(), deviceId); | |||
final ResourceIdentifier resource = ResourceIdentifier.from(endpointName, context.getTenantId(), context.getDeviceId()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why not simply use context.getResourceIdentifier()
instead of instantiating a new one?
@@ -428,7 +428,7 @@ protected void uploadMessage(final AmqpContext context) { | |||
}).recover(t -> { | |||
LOG.debug("Cannot process message for Device [tenantId: {}, deviceId: {}, endpoint: {}] due to Exception [Class: {}, Message: {}]", | |||
context.getTenantId(), | |||
deviceId, | |||
context.getDeviceId(), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
omit Class and Message and simply log the exception:
LOG.debug("Cannot process message for Device [tenantId: {}, deviceId: {}, endpoint: {}]",
context.getTenantId(), context.getDeviceId(), context.getEndpoint(), e);
@@ -65,7 +68,7 @@ | |||
public class VertxBasedAmqpProtocolAdapterTest { | |||
|
|||
@Rule | |||
public Timeout globalTimeout = new Timeout(5, TimeUnit.SECONDS); | |||
public Timeout globalTimeout = new Timeout(5, TimeUnit.HOURS); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
seconds
*/ | ||
@Test | ||
public void testAnonymousRelayNotSupported() { | ||
public void testAnonymousRelaySupportedForAllClients() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
IMHO this should be testAnonymousRelayRequired
and verify that a client cannot open a link that contains a target address
yes, its coming up 👍 |
@sophokles73: patch updated with commit 160abb0 |
@Alfusainey can you squash and rebase? |
@sophokles73 squashed and rebased 🕺 i will edit the title of this PR to reflect the changes |
160abb0
to
6b0a4a4
Compare
…lishing messages. Previous implementation of the AMQP adapter does not support anonymous relay and, in addition, uses the link's target address to determine where messages will be published to. After a review of the code changes with Kai Hudalla, we decided that a better approach would be to: * Support only anonymous relay (has advantages for gateways acting on behave of other devices). * Disable target ĺink addresses and support message addresses only. * Adapter now uses a message's address to determine where the message should be published. Signed-off-by: Alfusainey Jallow <alf.jallow@gmail.com>
6b0a4a4
to
3c8dd28
Compare
@sophokles73: the travis build passes for this PR |
Add check for empty tenant identifier in amqp address for unauthenticated devices. I notice this as I was manually testing using the qpid-send command.
@sophokles73
Signed-off-by: Alfusainey Jallow alf.jallow@gmail.com