Skip to content

Commit

Permalink
Another refactoring of the PR (also according to Kai's comments on Ju…
Browse files Browse the repository at this point in the history
…ly 24)

Signed-off-by: Alfusainey Jallow <alf.jallow@gmail.com>
  • Loading branch information
Alfusainey committed Jul 24, 2018
1 parent 0ade23d commit a02d085
Show file tree
Hide file tree
Showing 2 changed files with 72 additions and 77 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@
import org.eclipse.hono.util.MessageHelper;
import org.eclipse.hono.util.ResourceIdentifier;

import io.vertx.core.Future;
import io.vertx.core.buffer.Buffer;
import io.vertx.proton.ProtonDelivery;
import io.vertx.proton.ProtonHelper;
Expand All @@ -30,22 +29,22 @@ public class AmqpContext {
private final Message message;
private ResourceIdentifier resource;
private final Device authenticatedDevice;
private final Target receiverTarget;

AmqpContext(final ProtonDelivery delivery, final Message message, final Target receiverTarget, final Device authenticatedDevice) {
this.delivery = delivery;
this.message = message;
this.authenticatedDevice = authenticatedDevice;
this.receiverTarget = receiverTarget;
if (receiverTarget == null || receiverTarget.getAddress() == null) {
// link possibly opened by an "authenticated" gateway component via anonymous relay
validateMessageProperty(message.getProperties().getTo()).compose(ok -> {
resource = ResourceIdentifier.fromString(message.getProperties().getTo());
return Future.succeededFuture();
}).recover(t -> {
handleFailure((ServiceInvocationException) t);
return Future.failedFuture(t);
});
try {
validateMessageAddress(message.getAddress());
this.resource = ResourceIdentifier.fromString(message.getAddress());
} catch (ServiceInvocationException sie) {
handleFailure(sie);
}

} else if (isDeviceAuthenticated()) {
this.resource = ResourceIdentifier.from(receiverTarget.getAddress(), authenticatedDevice.getTenantId(), authenticatedDevice.getDeviceId());
} else {
this.resource = ResourceIdentifier.fromString(receiverTarget.getAddress());
}
Expand Down Expand Up @@ -79,25 +78,21 @@ ProtonDelivery delivery() {
}

/**
* Gets the tenant identifier for this context.
* Gets the tenant identifier of this context's resource.
*
* @return The tenant identifier.
*/
String getTenantId() {
return isDeviceAuthenticated() ? authenticatedDevice.getTenantId() : resource.getTenantId();
return resource.getTenantId();
}

/**
* Gets the device identifier for this context.
* Gets the device identifier of this context's resource.
*
* @return The device identifier.
*/
String getDeviceId() {
if (receiverTarget == null) {
// device authenticates as a gateway -> get actual device-id from message's _to property
return resource.getResourceId();
}
return isDeviceAuthenticated() ? authenticatedDevice.getDeviceId() : resource.getResourceId();
return resource.getResourceId();
}

/**
Expand Down Expand Up @@ -141,16 +136,19 @@ boolean isDeviceAuthenticated() {
* case of a <em>ClientErrorException</em>. In the REJECTED case, the supplied exception will provide
* the error condition value and description as reason for rejection.
*
* @param error The service invocation exception.
* @param t The service invocation exception.
* @throws NullPointerException if error is {@code null}.
*/
void handleFailure(final ServiceInvocationException error) {
Objects.requireNonNull(error);
if (ServerErrorException.class.isInstance(error)) {
ProtonHelper.released(delivery, true);
} else {
final ErrorCondition condition = getErrorCondition(error);
MessageHelper.rejected(delivery, condition);
void handleFailure(final Throwable t) {
Objects.requireNonNull(t);
if (ServiceInvocationException.class.isInstance(t)) {
final ServiceInvocationException error = (ServiceInvocationException) t;
if (ServerErrorException.class.isInstance(error)) {
ProtonHelper.released(delivery, true);
} else {
final ErrorCondition condition = getErrorCondition(error);
MessageHelper.rejected(delivery, condition);
}
}
}

Expand Down Expand Up @@ -185,42 +183,44 @@ boolean isRemotelySettled() {

//---------------------------------------------< private methods >---
/**
* Creates an ErrorCondition using the given service invocation error to provide an error condition value
* and description.
* Creates an ErrorCondition using the given throwable to provide an error condition value and
* description. All throwables that are not service invocation exceptions will be mapped to {@link AmqpError#PRECONDITION_FAILED}.
*
* @param error The service invocation error.
* @param t The throwable to map to an error condition.
* @return The ErrorCondition.
*/
static ErrorCondition getErrorCondition(final ServiceInvocationException error) {
switch (error.getErrorCode()) {
case HttpURLConnection.HTTP_BAD_REQUEST:
return ProtonHelper.condition(Constants.AMQP_BAD_REQUEST, error.getMessage());
case HttpURLConnection.HTTP_FORBIDDEN:
return ProtonHelper.condition(AmqpError.UNAUTHORIZED_ACCESS, error.getMessage());
default:
return ProtonHelper.condition(AmqpError.PRECONDITION_FAILED, error.getMessage());
static ErrorCondition getErrorCondition(final Throwable t) {
if (ServiceInvocationException.class.isInstance(t)) {
final ServiceInvocationException error = (ServiceInvocationException) t;
switch (error.getErrorCode()) {
case HttpURLConnection.HTTP_BAD_REQUEST:
return ProtonHelper.condition(Constants.AMQP_BAD_REQUEST, error.getMessage());
case HttpURLConnection.HTTP_FORBIDDEN:
return ProtonHelper.condition(AmqpError.UNAUTHORIZED_ACCESS, error.getMessage());
default:
return ProtonHelper.condition(AmqpError.PRECONDITION_FAILED, error.getMessage());
}
} else {
return ProtonHelper.condition(AmqpError.PRECONDITION_FAILED, t.getMessage());
}
}

/**
* Validates the address contained in an AMQP 1.0 message property.
* Validates the address contained in an AMQP 1.0 message.
*
* @param to The property to validate.
* @param address The message address to validate.
* @return A future indicating the outcome of the validation operation.
* @throws ServiceInvocationException if the message address is not valid.
*/
Future<Void> validateMessageProperty(final String to) {
final Future<Void> propertyCheck = Future.future();
if (to == null || to.isEmpty()) {
propertyCheck.fail(new ClientErrorException(HttpURLConnection.HTTP_BAD_REQUEST, "Message _to propery cannot be null or empty"));
void validateMessageAddress(final String address) throws ServiceInvocationException {
if (address == null || address.isEmpty()) {
throw new ClientErrorException(HttpURLConnection.HTTP_BAD_REQUEST, "Message _to propery cannot be null or empty");
} else {
final ResourceIdentifier resourceCheck = ResourceIdentifier.fromString(to);
final ResourceIdentifier resourceCheck = ResourceIdentifier.fromString(address);
if (resourceCheck.getEndpoint() == null || resourceCheck.getTenantId() == null || resourceCheck.getResourceId() == null) {
propertyCheck.fail(new ClientErrorException(HttpURLConnection.HTTP_BAD_REQUEST, "Invalid address contained in message _to property"));
} else {
propertyCheck.complete();
throw new ClientErrorException(HttpURLConnection.HTTP_BAD_REQUEST, "Invalid address contained in message _to property");
}
}
return propertyCheck;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@
import org.apache.qpid.proton.message.Message;
import org.eclipse.hono.client.ClientErrorException;
import org.eclipse.hono.client.MessageSender;
import org.eclipse.hono.client.ServiceInvocationException;
import org.eclipse.hono.config.ProtocolAdapterProperties;
import org.eclipse.hono.service.AbstractProtocolAdapterBase;
import org.eclipse.hono.service.auth.device.Device;
Expand Down Expand Up @@ -288,7 +287,7 @@ protected void handleRemoteReceiverOpen(final ProtonReceiver receiver, final Pro
"fail to establish a receiving link with client [{}] due to an invalid endpoint [{}]."
+ " closing the link",
receiver.getName(), receiver.getRemoteQoS());
receiver.setCondition(AmqpContext.getErrorCondition((ServiceInvocationException) t));
receiver.setCondition(AmqpContext.getErrorCondition(t));
receiver.close();
return Future.failedFuture(t);
}).compose(ok -> {
Expand Down Expand Up @@ -349,7 +348,7 @@ protected void uploadMessage(final AmqpContext context) {
}).recover(t -> {
if (!context.isRemotelySettled()) {
// client wants to be informed that the message cannot be processed.
context.handleFailure((ServiceInvocationException) t);
context.handleFailure(t);
}
return Future.failedFuture(t);
});
Expand Down Expand Up @@ -433,16 +432,16 @@ private void onLinkDetach(final ProtonReceiver receiver) {
*/
Future<Void> validateEndpoint(final ProtonReceiver receiver, final Device authenticatedDevice) {

final Future<ResourceIdentifier> result = Future.future();
if (receiver.getRemoteTarget() == null || receiver.getRemoteTarget().getAddress() == null) {
if (authenticatedDevice == null) {
// anonymous relay only supported for authenticated clients (gateways in particular)
result.fail(new ClientErrorException(HttpURLConnection.HTTP_BAD_REQUEST,
return Future.failedFuture(new ClientErrorException(HttpURLConnection.HTTP_BAD_REQUEST,
"anonymous relay ONLY supported for authenticated clients"));
} else {
result.complete();
return Future.succeededFuture();
}
} else {
final Future<ResourceIdentifier> result = Future.future();
final ResourceIdentifier address = ResourceIdentifier.fromString(receiver.getRemoteTarget().getAddress());
switch (EndpointType.fromString(address.getEndpoint())) {
case TELEMETRY:
Expand All @@ -469,38 +468,34 @@ Future<Void> validateEndpoint(final ProtonReceiver receiver, final Device authen
break;
}

return result.compose(validResource -> {
return validateTargetAddress(validResource, authenticatedDevice);
});
}
return result.compose(address -> {
return validateAddress(address, authenticatedDevice);
});
}

private Future<Void> validateAddress(final ResourceIdentifier address, final Device authenticatedDevice) {
private Future<Void> validateTargetAddress(final ResourceIdentifier address, final Device authenticatedDevice) {
final Future<Void> addressCheck = Future.future();
if (address == null) {
addressCheck.complete();
if (authenticatedDevice == null) {
if (address.getTenantId() == null || address.getResourceId() == null) {
addressCheck.fail(new ClientErrorException(HttpURLConnection.HTTP_BAD_REQUEST,
String.format("Invalid Address [%s] for unauthenticated device", address)));
} else {
addressCheck.complete();
}
} else {
if (authenticatedDevice == null) {
if (address.getTenantId() == null || address.getResourceId() == null) {
addressCheck.fail(new ClientErrorException(HttpURLConnection.HTTP_BAD_REQUEST,
String.format("Invalid Address [%s] for unauthenticated device", address)));
} else {

if (address.getTenantId() == null) {
if (address.getResourceId() == null) {
addressCheck.complete();
}
} else {

if (address.getTenantId() == null) {
if (address.getResourceId() == null) {
addressCheck.complete();
}
// authenticated device with tenant present in the address
if (!address.getTenantId().equals(authenticatedDevice.getTenantId())) {
addressCheck.fail(new ClientErrorException(HttpURLConnection.HTTP_FORBIDDEN,
"cannot publish data for device of other tenant"));
} else {
// authenticated device with tenant present in the address
if (!address.getTenantId().equals(authenticatedDevice.getTenantId())) {
addressCheck.fail(new ClientErrorException(HttpURLConnection.HTTP_FORBIDDEN,
"cannot publish data for device of other tenant"));
} else {
addressCheck.complete();
}
addressCheck.complete();
}
}
}
Expand Down

0 comments on commit a02d085

Please sign in to comment.