From a02d0850c002ae228cedb2deb6aeef34e94fd4f6 Mon Sep 17 00:00:00 2001 From: Alfusainey Jallow Date: Tue, 24 Jul 2018 20:19:21 +0200 Subject: [PATCH] Another refactoring of the PR (also according to Kai's comments on July 24) Signed-off-by: Alfusainey Jallow --- .../hono/adapter/amqp/AmqpContext.java | 96 +++++++++---------- .../amqp/VertxBasedAmqpProtocolAdapter.java | 53 +++++----- 2 files changed, 72 insertions(+), 77 deletions(-) diff --git a/adapters/amqp-vertx/src/main/java/org/eclipse/hono/adapter/amqp/AmqpContext.java b/adapters/amqp-vertx/src/main/java/org/eclipse/hono/adapter/amqp/AmqpContext.java index 5dc080d64d..42b45bf9e0 100644 --- a/adapters/amqp-vertx/src/main/java/org/eclipse/hono/adapter/amqp/AmqpContext.java +++ b/adapters/amqp-vertx/src/main/java/org/eclipse/hono/adapter/amqp/AmqpContext.java @@ -15,7 +15,6 @@ import org.eclipse.hono.util.MessageHelper; import org.eclipse.hono.util.ResourceIdentifier; -import io.vertx.core.Future; import io.vertx.core.buffer.Buffer; import io.vertx.proton.ProtonDelivery; import io.vertx.proton.ProtonHelper; @@ -30,22 +29,22 @@ public class AmqpContext { private final Message message; private ResourceIdentifier resource; private final Device authenticatedDevice; - private final Target receiverTarget; AmqpContext(final ProtonDelivery delivery, final Message message, final Target receiverTarget, final Device authenticatedDevice) { this.delivery = delivery; this.message = message; this.authenticatedDevice = authenticatedDevice; - this.receiverTarget = receiverTarget; if (receiverTarget == null || receiverTarget.getAddress() == null) { // link possibly opened by an "authenticated" gateway component via anonymous relay - validateMessageProperty(message.getProperties().getTo()).compose(ok -> { - resource = ResourceIdentifier.fromString(message.getProperties().getTo()); - return Future.succeededFuture(); - }).recover(t -> { - handleFailure((ServiceInvocationException) t); - return Future.failedFuture(t); - }); + try { + validateMessageAddress(message.getAddress()); + this.resource = ResourceIdentifier.fromString(message.getAddress()); + } catch (ServiceInvocationException sie) { + handleFailure(sie); + } + + } else if (isDeviceAuthenticated()) { + this.resource = ResourceIdentifier.from(receiverTarget.getAddress(), authenticatedDevice.getTenantId(), authenticatedDevice.getDeviceId()); } else { this.resource = ResourceIdentifier.fromString(receiverTarget.getAddress()); } @@ -79,25 +78,21 @@ ProtonDelivery delivery() { } /** - * Gets the tenant identifier for this context. + * Gets the tenant identifier of this context's resource. * * @return The tenant identifier. */ String getTenantId() { - return isDeviceAuthenticated() ? authenticatedDevice.getTenantId() : resource.getTenantId(); + return resource.getTenantId(); } /** - * Gets the device identifier for this context. + * Gets the device identifier of this context's resource. * * @return The device identifier. */ String getDeviceId() { - if (receiverTarget == null) { - // device authenticates as a gateway -> get actual device-id from message's _to property - return resource.getResourceId(); - } - return isDeviceAuthenticated() ? authenticatedDevice.getDeviceId() : resource.getResourceId(); + return resource.getResourceId(); } /** @@ -141,16 +136,19 @@ boolean isDeviceAuthenticated() { * case of a ClientErrorException. In the REJECTED case, the supplied exception will provide * the error condition value and description as reason for rejection. * - * @param error The service invocation exception. + * @param t The service invocation exception. * @throws NullPointerException if error is {@code null}. */ - void handleFailure(final ServiceInvocationException error) { - Objects.requireNonNull(error); - if (ServerErrorException.class.isInstance(error)) { - ProtonHelper.released(delivery, true); - } else { - final ErrorCondition condition = getErrorCondition(error); - MessageHelper.rejected(delivery, condition); + void handleFailure(final Throwable t) { + Objects.requireNonNull(t); + if (ServiceInvocationException.class.isInstance(t)) { + final ServiceInvocationException error = (ServiceInvocationException) t; + if (ServerErrorException.class.isInstance(error)) { + ProtonHelper.released(delivery, true); + } else { + final ErrorCondition condition = getErrorCondition(error); + MessageHelper.rejected(delivery, condition); + } } } @@ -185,42 +183,44 @@ boolean isRemotelySettled() { //---------------------------------------------< private methods >--- /** - * Creates an ErrorCondition using the given service invocation error to provide an error condition value - * and description. + * Creates an ErrorCondition using the given throwable to provide an error condition value and + * description. All throwables that are not service invocation exceptions will be mapped to {@link AmqpError#PRECONDITION_FAILED}. * - * @param error The service invocation error. + * @param t The throwable to map to an error condition. * @return The ErrorCondition. */ - static ErrorCondition getErrorCondition(final ServiceInvocationException error) { - switch (error.getErrorCode()) { - case HttpURLConnection.HTTP_BAD_REQUEST: - return ProtonHelper.condition(Constants.AMQP_BAD_REQUEST, error.getMessage()); - case HttpURLConnection.HTTP_FORBIDDEN: - return ProtonHelper.condition(AmqpError.UNAUTHORIZED_ACCESS, error.getMessage()); - default: - return ProtonHelper.condition(AmqpError.PRECONDITION_FAILED, error.getMessage()); + static ErrorCondition getErrorCondition(final Throwable t) { + if (ServiceInvocationException.class.isInstance(t)) { + final ServiceInvocationException error = (ServiceInvocationException) t; + switch (error.getErrorCode()) { + case HttpURLConnection.HTTP_BAD_REQUEST: + return ProtonHelper.condition(Constants.AMQP_BAD_REQUEST, error.getMessage()); + case HttpURLConnection.HTTP_FORBIDDEN: + return ProtonHelper.condition(AmqpError.UNAUTHORIZED_ACCESS, error.getMessage()); + default: + return ProtonHelper.condition(AmqpError.PRECONDITION_FAILED, error.getMessage()); + } + } else { + return ProtonHelper.condition(AmqpError.PRECONDITION_FAILED, t.getMessage()); } } /** - * Validates the address contained in an AMQP 1.0 message property. + * Validates the address contained in an AMQP 1.0 message. * - * @param to The property to validate. + * @param address The message address to validate. * @return A future indicating the outcome of the validation operation. + * @throws ServiceInvocationException if the message address is not valid. */ - Future validateMessageProperty(final String to) { - final Future propertyCheck = Future.future(); - if (to == null || to.isEmpty()) { - propertyCheck.fail(new ClientErrorException(HttpURLConnection.HTTP_BAD_REQUEST, "Message _to propery cannot be null or empty")); + void validateMessageAddress(final String address) throws ServiceInvocationException { + if (address == null || address.isEmpty()) { + throw new ClientErrorException(HttpURLConnection.HTTP_BAD_REQUEST, "Message _to propery cannot be null or empty"); } else { - final ResourceIdentifier resourceCheck = ResourceIdentifier.fromString(to); + final ResourceIdentifier resourceCheck = ResourceIdentifier.fromString(address); if (resourceCheck.getEndpoint() == null || resourceCheck.getTenantId() == null || resourceCheck.getResourceId() == null) { - propertyCheck.fail(new ClientErrorException(HttpURLConnection.HTTP_BAD_REQUEST, "Invalid address contained in message _to property")); - } else { - propertyCheck.complete(); + throw new ClientErrorException(HttpURLConnection.HTTP_BAD_REQUEST, "Invalid address contained in message _to property"); } } - return propertyCheck; } } diff --git a/adapters/amqp-vertx/src/main/java/org/eclipse/hono/adapter/amqp/VertxBasedAmqpProtocolAdapter.java b/adapters/amqp-vertx/src/main/java/org/eclipse/hono/adapter/amqp/VertxBasedAmqpProtocolAdapter.java index 1454efc15e..20d2515604 100644 --- a/adapters/amqp-vertx/src/main/java/org/eclipse/hono/adapter/amqp/VertxBasedAmqpProtocolAdapter.java +++ b/adapters/amqp-vertx/src/main/java/org/eclipse/hono/adapter/amqp/VertxBasedAmqpProtocolAdapter.java @@ -8,7 +8,6 @@ import org.apache.qpid.proton.message.Message; import org.eclipse.hono.client.ClientErrorException; import org.eclipse.hono.client.MessageSender; -import org.eclipse.hono.client.ServiceInvocationException; import org.eclipse.hono.config.ProtocolAdapterProperties; import org.eclipse.hono.service.AbstractProtocolAdapterBase; import org.eclipse.hono.service.auth.device.Device; @@ -288,7 +287,7 @@ protected void handleRemoteReceiverOpen(final ProtonReceiver receiver, final Pro "fail to establish a receiving link with client [{}] due to an invalid endpoint [{}]." + " closing the link", receiver.getName(), receiver.getRemoteQoS()); - receiver.setCondition(AmqpContext.getErrorCondition((ServiceInvocationException) t)); + receiver.setCondition(AmqpContext.getErrorCondition(t)); receiver.close(); return Future.failedFuture(t); }).compose(ok -> { @@ -349,7 +348,7 @@ protected void uploadMessage(final AmqpContext context) { }).recover(t -> { if (!context.isRemotelySettled()) { // client wants to be informed that the message cannot be processed. - context.handleFailure((ServiceInvocationException) t); + context.handleFailure(t); } return Future.failedFuture(t); }); @@ -433,16 +432,16 @@ private void onLinkDetach(final ProtonReceiver receiver) { */ Future validateEndpoint(final ProtonReceiver receiver, final Device authenticatedDevice) { - final Future result = Future.future(); if (receiver.getRemoteTarget() == null || receiver.getRemoteTarget().getAddress() == null) { if (authenticatedDevice == null) { // anonymous relay only supported for authenticated clients (gateways in particular) - result.fail(new ClientErrorException(HttpURLConnection.HTTP_BAD_REQUEST, + return Future.failedFuture(new ClientErrorException(HttpURLConnection.HTTP_BAD_REQUEST, "anonymous relay ONLY supported for authenticated clients")); } else { - result.complete(); + return Future.succeededFuture(); } } else { + final Future result = Future.future(); final ResourceIdentifier address = ResourceIdentifier.fromString(receiver.getRemoteTarget().getAddress()); switch (EndpointType.fromString(address.getEndpoint())) { case TELEMETRY: @@ -469,38 +468,34 @@ Future validateEndpoint(final ProtonReceiver receiver, final Device authen break; } + return result.compose(validResource -> { + return validateTargetAddress(validResource, authenticatedDevice); + }); } - return result.compose(address -> { - return validateAddress(address, authenticatedDevice); - }); } - private Future validateAddress(final ResourceIdentifier address, final Device authenticatedDevice) { + private Future validateTargetAddress(final ResourceIdentifier address, final Device authenticatedDevice) { final Future addressCheck = Future.future(); - if (address == null) { - addressCheck.complete(); + if (authenticatedDevice == null) { + if (address.getTenantId() == null || address.getResourceId() == null) { + addressCheck.fail(new ClientErrorException(HttpURLConnection.HTTP_BAD_REQUEST, + String.format("Invalid Address [%s] for unauthenticated device", address))); + } else { + addressCheck.complete(); + } } else { - if (authenticatedDevice == null) { - if (address.getTenantId() == null || address.getResourceId() == null) { - addressCheck.fail(new ClientErrorException(HttpURLConnection.HTTP_BAD_REQUEST, - String.format("Invalid Address [%s] for unauthenticated device", address))); - } else { + + if (address.getTenantId() == null) { + if (address.getResourceId() == null) { addressCheck.complete(); } } else { - - if (address.getTenantId() == null) { - if (address.getResourceId() == null) { - addressCheck.complete(); - } + // authenticated device with tenant present in the address + if (!address.getTenantId().equals(authenticatedDevice.getTenantId())) { + addressCheck.fail(new ClientErrorException(HttpURLConnection.HTTP_FORBIDDEN, + "cannot publish data for device of other tenant")); } else { - // authenticated device with tenant present in the address - if (!address.getTenantId().equals(authenticatedDevice.getTenantId())) { - addressCheck.fail(new ClientErrorException(HttpURLConnection.HTTP_FORBIDDEN, - "cannot publish data for device of other tenant")); - } else { - addressCheck.complete(); - } + addressCheck.complete(); } } }