Skip to content

Commit

Permalink
Refactor PR as request by Kai on July 24th
Browse files Browse the repository at this point in the history
Signed-off-by: Alfusainey Jallow <alf.jallow@gmail.com>
  • Loading branch information
Alfusainey committed Jul 24, 2018
1 parent a17fab9 commit 0ade23d
Show file tree
Hide file tree
Showing 3 changed files with 117 additions and 95 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,15 @@
import org.apache.qpid.proton.amqp.transport.ErrorCondition;
import org.apache.qpid.proton.amqp.transport.Target;
import org.apache.qpid.proton.message.Message;
import org.eclipse.hono.client.ClientErrorException;
import org.eclipse.hono.client.ServerErrorException;
import org.eclipse.hono.client.ServiceInvocationException;
import org.eclipse.hono.service.auth.device.Device;
import org.eclipse.hono.util.Constants;
import org.eclipse.hono.util.MessageHelper;
import org.eclipse.hono.util.ResourceIdentifier;

import io.vertx.core.Future;
import io.vertx.core.buffer.Buffer;
import io.vertx.proton.ProtonDelivery;
import io.vertx.proton.ProtonHelper;
Expand All @@ -26,7 +28,7 @@ public class AmqpContext {

private final ProtonDelivery delivery;
private final Message message;
private final ResourceIdentifier resource;
private ResourceIdentifier resource;
private final Device authenticatedDevice;
private final Target receiverTarget;

Expand All @@ -35,10 +37,15 @@ public class AmqpContext {
this.message = message;
this.authenticatedDevice = authenticatedDevice;
this.receiverTarget = receiverTarget;
if (receiverTarget == null) {
if (receiverTarget == null || receiverTarget.getAddress() == null) {
// link possibly opened by an "authenticated" gateway component via anonymous relay
// TODO: validate the address in the message's _to property
this.resource = ResourceIdentifier.fromString(message.getProperties().getTo());
validateMessageProperty(message.getProperties().getTo()).compose(ok -> {
resource = ResourceIdentifier.fromString(message.getProperties().getTo());
return Future.succeededFuture();
}).recover(t -> {
handleFailure((ServiceInvocationException) t);
return Future.failedFuture(t);
});
} else {
this.resource = ResourceIdentifier.fromString(receiverTarget.getAddress());
}
Expand Down Expand Up @@ -184,7 +191,7 @@ boolean isRemotelySettled() {
* @param error The service invocation error.
* @return The ErrorCondition.
*/
private ErrorCondition getErrorCondition(final ServiceInvocationException error) {
static ErrorCondition getErrorCondition(final ServiceInvocationException error) {
switch (error.getErrorCode()) {
case HttpURLConnection.HTTP_BAD_REQUEST:
return ProtonHelper.condition(Constants.AMQP_BAD_REQUEST, error.getMessage());
Expand All @@ -194,4 +201,26 @@ private ErrorCondition getErrorCondition(final ServiceInvocationException error)
return ProtonHelper.condition(AmqpError.PRECONDITION_FAILED, error.getMessage());
}
}

/**
* Validates the address contained in an AMQP 1.0 message property.
*
* @param to The property to validate.
* @return A future indicating the outcome of the validation operation.
*/
Future<Void> validateMessageProperty(final String to) {
final Future<Void> propertyCheck = Future.future();
if (to == null || to.isEmpty()) {
propertyCheck.fail(new ClientErrorException(HttpURLConnection.HTTP_BAD_REQUEST, "Message _to propery cannot be null or empty"));
} else {
final ResourceIdentifier resourceCheck = ResourceIdentifier.fromString(to);
if (resourceCheck.getEndpoint() == null || resourceCheck.getTenantId() == null || resourceCheck.getResourceId() == null) {
propertyCheck.fail(new ClientErrorException(HttpURLConnection.HTTP_BAD_REQUEST, "Invalid address contained in message _to property"));
} else {
propertyCheck.complete();
}
}
return propertyCheck;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -283,79 +283,32 @@ private void handleSessionOpen(final ProtonConnection conn, final ProtonSession
protected void handleRemoteReceiverOpen(final ProtonReceiver receiver, final ProtonConnection conn) {
final Device authenticatedDevice = conn.attachments().get(AmqpAdapterConstants.KEY_CLIENT_DEVICE,
Device.class);
if (authenticatedDevice == null && receiver.getRemoteTarget() == null) {
// anonymous relay only supported for authenticated clients (gateways in particular)
receiver.setCondition(
ProtonHelper.condition(AmqpError.NOT_ALLOWED, "anonymous relay ONLY supported for authenticated clients"));
validateEndpoint(receiver, authenticatedDevice).recover(t -> {
LOG.debug(
"fail to establish a receiving link with client [{}] due to an invalid endpoint [{}]."
+ " closing the link",
receiver.getName(), receiver.getRemoteQoS());
receiver.setCondition(AmqpContext.getErrorCondition((ServiceInvocationException) t));
receiver.close();
} else {
validateEndpoint(receiver).recover(t -> {
LOG.debug(
"fail to establish a receiving link with client [{}] due to an invalid endpoint [{}]."
+ " closing the link",
receiver.getName(), receiver.getRemoteQoS());
if (ClientErrorException.class.isInstance(t)) {
final ClientErrorException error = (ClientErrorException) t;
receiver.setCondition(ProtonHelper.condition(AmqpError.NOT_FOUND, error.getMessage()));
receiver.close();
}
return Future.failedFuture(t);
}).compose(resource -> {
return validateAddress(resource, receiver, authenticatedDevice).compose(ok -> {
LOG.debug("Established receiver link at [address: {}]",
(receiver.getRemoteTarget() != null) ? receiver.getRemoteTarget().getAddress() : null);

receiver.setTarget(receiver.getRemoteTarget());
receiver.setQoS(receiver.getRemoteQoS());
if (ProtonQoS.AT_LEAST_ONCE.equals(receiver.getRemoteQoS())) {
// disable auto-accept for this transfer model.
// in this case, the adapter will apply the required disposition
receiver.setAutoAccept(false);
}
receiver.handler((delivery, message) -> {
uploadMessage(new AmqpContext(delivery, message, receiver.getRemoteTarget(), authenticatedDevice));
});
HonoProtonHelper.setCloseHandler(receiver, remoteDetach -> onLinkDetach(receiver));
receiver.open();
return Future.succeededFuture();
});
});
}
}

private Future<Void> validateAddress(final ResourceIdentifier address, final ProtonReceiver receiver, final Device authenticatedDevice) {
final Future<Void> addressCheck = Future.future();
if (address == null) {
addressCheck.complete();
} else {
if (authenticatedDevice == null) {
if (address.getTenantId() == null || address.getResourceId() == null) {
receiver.setCondition(ProtonHelper.condition(AmqpError.NOT_ALLOWED, String.format("Invalid Address [%s] for unauthenticated device", address)));
receiver.close();
addressCheck.fail("Invalid Address for unauthenticated device");
} else {
addressCheck.complete();
}
} else {

if (address.getTenantId() == null) {
if (address.getResourceId() == null) {
addressCheck.complete();
}
} else {
// authenticated device with tenant present in the address
if (!address.getTenantId().equals(authenticatedDevice.getTenantId())) {
receiver.setCondition(ProtonHelper.condition(AmqpError.UNAUTHORIZED_ACCESS,
"cannot publish data for device of other tenant"));
receiver.close();
addressCheck.fail("cannot publish data for device of other tenant");
} else {
addressCheck.complete();
}
}
return Future.failedFuture(t);
}).compose(ok -> {
LOG.debug("Established receiver link at [address: {}]",
(receiver.getRemoteTarget() != null) ? receiver.getRemoteTarget().getAddress() : null);

receiver.setTarget(receiver.getRemoteTarget());
receiver.setQoS(receiver.getRemoteQoS());
if (ProtonQoS.AT_LEAST_ONCE.equals(receiver.getRemoteQoS())) {
// disable auto-accept for this transfer model.
// in this case, the adapter will apply the required disposition
receiver.setAutoAccept(false);
}
}
return addressCheck;
receiver.handler((delivery, message) -> {
uploadMessage(new AmqpContext(delivery, message, receiver.getRemoteTarget(), authenticatedDevice));
});
HonoProtonHelper.setCloseHandler(receiver, remoteDetach -> onLinkDetach(receiver));
receiver.open();
return Future.succeededFuture();
});
}

/**
Expand Down Expand Up @@ -474,14 +427,21 @@ private void onLinkDetach(final ProtonReceiver receiver) {
* supported, the method returns a succeeded future containing the valid address.
*
* @param receiver The link with which the message is received.
* @param authenticatedDevice The authenticated device or {@code null} if the device is not authenticated.
*
* @return A future with the address upon success or a failed future.
*/
Future<ResourceIdentifier> validateEndpoint(final ProtonReceiver receiver) {
Future<Void> validateEndpoint(final ProtonReceiver receiver, final Device authenticatedDevice) {

final Future<ResourceIdentifier> result = Future.future();
if (receiver.getRemoteTarget() == null) {
// this adapter supports anonymous relay -> useful for gateway components
result.complete();
if (receiver.getRemoteTarget() == null || receiver.getRemoteTarget().getAddress() == null) {
if (authenticatedDevice == null) {
// anonymous relay only supported for authenticated clients (gateways in particular)
result.fail(new ClientErrorException(HttpURLConnection.HTTP_BAD_REQUEST,
"anonymous relay ONLY supported for authenticated clients"));
} else {
result.complete();
}
} else {
final ResourceIdentifier address = ResourceIdentifier.fromString(receiver.getRemoteTarget().getAddress());
switch (EndpointType.fromString(address.getEndpoint())) {
Expand Down Expand Up @@ -510,7 +470,41 @@ Future<ResourceIdentifier> validateEndpoint(final ProtonReceiver receiver) {
}

}
return result;
return result.compose(address -> {
return validateAddress(address, authenticatedDevice);
});
}

private Future<Void> validateAddress(final ResourceIdentifier address, final Device authenticatedDevice) {
final Future<Void> addressCheck = Future.future();
if (address == null) {
addressCheck.complete();
} else {
if (authenticatedDevice == null) {
if (address.getTenantId() == null || address.getResourceId() == null) {
addressCheck.fail(new ClientErrorException(HttpURLConnection.HTTP_BAD_REQUEST,
String.format("Invalid Address [%s] for unauthenticated device", address)));
} else {
addressCheck.complete();
}
} else {

if (address.getTenantId() == null) {
if (address.getResourceId() == null) {
addressCheck.complete();
}
} else {
// authenticated device with tenant present in the address
if (!address.getTenantId().equals(authenticatedDevice.getTenantId())) {
addressCheck.fail(new ClientErrorException(HttpURLConnection.HTTP_FORBIDDEN,
"cannot publish data for device of other tenant"));
} else {
addressCheck.complete();
}
}
}
}
return addressCheck;
}

// -------------------------------------------< AbstractServiceBase >---
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@
public class VertxBasedAmqpProtocolAdapterTest {

@Rule
public Timeout globalTimeout = new Timeout(5, TimeUnit.SECONDS);
public Timeout globalTimeout = new Timeout(5, TimeUnit.HOURS);

/**
* A tenant identifier used for testing.
Expand Down Expand Up @@ -161,7 +161,7 @@ public void testAnonymousRelayNotSupportedForUnAuthenticatedClients() {
final ProtonConnection conn = getConnection(null);

// THAT does not specify a target address
final ProtonReceiver receiver = getReceiver(ProtonQoS.AT_LEAST_ONCE, null);
final ProtonReceiver receiver = getReceiver(ProtonQoS.AT_LEAST_ONCE, getTarget(null));

adapter.handleRemoteReceiverOpen(receiver, conn);

Expand All @@ -182,7 +182,7 @@ public void testAnonymousRelaySupportedForAuthenticatedClient() {
final ProtonConnection conn = getConnection(new Device(TEST_TENANT_ID, TEST_DEVICE));

// THAT does not specify a target address -> anonymous relay
final ProtonReceiver receiver = getReceiver(ProtonQoS.AT_LEAST_ONCE, null);
final ProtonReceiver receiver = getReceiver(ProtonQoS.AT_LEAST_ONCE, getTarget(null));

adapter.handleRemoteReceiverOpen(receiver, conn);

Expand All @@ -194,7 +194,7 @@ public void testAnonymousRelaySupportedForAuthenticatedClient() {
}

/**
* Verifies that a request to upload telemetry message (with settled=true) results in the sender sending the message
* Verifies that a request to upload a "settled" telemetry message results in the sender sending the message
* without waiting for a response from the downstream peer.
*
* AT_MOST_ONCE delivery semantics
Expand All @@ -213,14 +213,14 @@ public void uploadTelemetryMessageWithSettledDeliverySemantics() {
when(delivery.remotelySettled()).thenReturn(true);

final ResourceIdentifier resource = ResourceIdentifier.from(TelemetryConstants.TELEMETRY_ENDPOINT, TEST_TENANT_ID, TEST_DEVICE);
adapter.uploadMessage(new AmqpContext(delivery, getFakeMessage(), getTarget(resource), null));
adapter.uploadMessage(new AmqpContext(delivery, getFakeMessage(), getTarget(resource.toString()), null));

// THEN the adapter sends the message and does not wait for response from the peer.
verify(telemetrySender).send(any(Message.class));
}

/**
* Verifies that a request to upload telemetry message (with settled=false) results in the sender sending the
* Verifies that a request to upload an "unsettled" telemetry message results in the sender sending the
* message and waits for a response from the downstream peer.
*
* AT_LEAST_ONCE delivery semantics.
Expand All @@ -241,16 +241,15 @@ public void uploadTelemetryMessageWithUnsettledDeliverySemantics() {
final ResourceIdentifier resource = ResourceIdentifier.from(TelemetryConstants.TELEMETRY_ENDPOINT, TEST_TENANT_ID, TEST_DEVICE);

final Device authenticatedDevice = null;
adapter.uploadMessage(new AmqpContext(delivery, getFakeMessage(), getTarget(resource), authenticatedDevice));
adapter.uploadMessage(new AmqpContext(delivery, getFakeMessage(), getTarget(resource.toString()), authenticatedDevice));

// THEN the sender sends the message and waits for the outcome from the downstream peer
verify(telemetrySender).sendAndWaitForOutcome(any(Message.class));
}

/**
* Verifies that a request to upload a telemetry message from a device that belongs to a tenant for which the AMQP
* adapter is disabled fails and that the device is notified when the message cannot be processed (un-settled
* delivery).
* Verifies that a request to upload an "unsettled" telemetry message from a device that belongs to a tenant for which the AMQP
* adapter is disabled fails and that the device is notified when the message cannot be processed.
*
*/
@Test
Expand All @@ -266,7 +265,7 @@ public void testUploadTelemetryMessageFailsForDisabledTenant() {
final ProtonDelivery delivery = mock(ProtonDelivery.class);
when(delivery.remotelySettled()).thenReturn(false);
final ResourceIdentifier resource = ResourceIdentifier.from(TelemetryConstants.TELEMETRY_ENDPOINT, TEST_TENANT_ID, TEST_DEVICE);
final AmqpContext context = spy(new AmqpContext(delivery, getFakeMessage(), getTarget(resource), null));
final AmqpContext context = spy(new AmqpContext(delivery, getFakeMessage(), getTarget(resource.toString()), null));
adapter.uploadMessage(context);

// THEN the adapter does not send the message (regardless of the delivery mode).
Expand Down Expand Up @@ -304,17 +303,17 @@ public void testUploadFailsForInvalidAmqpAddress() {
final String invalidAddress = String.format("%s/%s", TelemetryConstants.TELEMETRY_ENDPOINT, TEST_TENANT_ID);
final ResourceIdentifier resource = ResourceIdentifier.fromString(invalidAddress);

final ProtonReceiver receiver = getReceiver(ProtonQoS.AT_LEAST_ONCE, getTarget(resource));
final ProtonReceiver receiver = getReceiver(ProtonQoS.AT_LEAST_ONCE, getTarget(resource.toString()));

adapter.handleRemoteReceiverOpen(receiver, conn);

// THEN the adapter closes the link.
verify(receiver).close();
}

private Target getTarget(final ResourceIdentifier resource) {
private Target getTarget(final String address) {
final Target target = new Target();
target.setAddress(resource.toString());
target.setAddress(address);
return target;
}

Expand Down

0 comments on commit 0ade23d

Please sign in to comment.