Skip to content

Commit

Permalink
[#1023] Add connection attempt outcomes for failure scenarios
Browse files Browse the repository at this point in the history
Connections attempts of devices with protocol adapters may fail due to
arbitrary reasons. The "outcome" tag of the "hono.connections.attempts"
meter has been extended with additional values to reflect the concrete
error that caused the failure to establish the connection.

Fixes #1023

Signed-off-by: Kai Hudalla <kai.hudalla@bosch.io>
  • Loading branch information
sophokles73 committed Aug 6, 2020
1 parent 860c3d1 commit a24e26d
Show file tree
Hide file tree
Showing 29 changed files with 827 additions and 170 deletions.
Expand Up @@ -12,6 +12,7 @@
*******************************************************************************/
package org.eclipse.hono.adapter.amqp;

import java.net.HttpURLConnection;
import java.security.cert.Certificate;
import java.security.cert.CertificateEncodingException;
import java.security.cert.TrustAnchor;
Expand All @@ -29,19 +30,23 @@
import javax.net.ssl.SSLPeerUnverifiedException;
import javax.net.ssl.SSLSession;
import javax.security.auth.login.CredentialException;
import javax.security.auth.login.LoginException;
import javax.security.auth.x500.X500Principal;

import org.apache.qpid.proton.engine.Sasl;
import org.apache.qpid.proton.engine.Sasl.SaslOutcome;
import org.apache.qpid.proton.engine.Transport;
import org.eclipse.hono.auth.Device;
import org.eclipse.hono.client.ClientErrorException;
import org.eclipse.hono.client.TenantClientFactory;
import org.eclipse.hono.config.ProtocolAdapterProperties;
import org.eclipse.hono.service.AuthorizationException;
import org.eclipse.hono.service.auth.DeviceUser;
import org.eclipse.hono.service.auth.device.DeviceCertificateValidator;
import org.eclipse.hono.service.auth.device.HonoClientBasedAuthProvider;
import org.eclipse.hono.service.auth.device.SubjectDnCredentials;
import org.eclipse.hono.service.auth.device.UsernamePasswordCredentials;
import org.eclipse.hono.service.metric.MetricsTags.ConnectionAttemptOutcome;
import org.eclipse.hono.tracing.TracingHelper;
import org.eclipse.hono.util.AuthenticationConstants;
import org.eclipse.hono.util.CredentialsConstants;
Expand Down Expand Up @@ -76,6 +81,7 @@ public class AmqpAdapterSaslAuthenticatorFactory implements ProtonSaslAuthentica

private final ProtocolAdapterProperties config;
private final TenantClientFactory tenantClientFactory;
private final AmqpAdapterMetrics metrics;
private final Supplier<Span> spanFactory;
private final DeviceCertificateValidator certValidator;
private final HonoClientBasedAuthProvider<UsernamePasswordCredentials> usernamePasswordAuthProvider;
Expand All @@ -87,6 +93,7 @@ public class AmqpAdapterSaslAuthenticatorFactory implements ProtonSaslAuthentica
*
* @param tenantClientFactory The factory to use for creating a Tenant service client.
* @param config The protocol adapter configuration object.
* @param metrics The object to use for reporting metrics.
* @param spanFactory The factory to use for creating and starting an OpenTracing span to
* trace the authentication of the device.
* @param usernamePasswordAuthProvider The authentication provider to use for validating device credentials.
Expand All @@ -102,13 +109,15 @@ public class AmqpAdapterSaslAuthenticatorFactory implements ProtonSaslAuthentica
public AmqpAdapterSaslAuthenticatorFactory(
final TenantClientFactory tenantClientFactory,
final ProtocolAdapterProperties config,
final AmqpAdapterMetrics metrics,
final Supplier<Span> spanFactory,
final HonoClientBasedAuthProvider<UsernamePasswordCredentials> usernamePasswordAuthProvider,
final HonoClientBasedAuthProvider<SubjectDnCredentials> clientCertAuthProvider,
final BiFunction<SaslResponseContext, Span, Future<Void>> preAuthenticationHandler) {

this.tenantClientFactory = Objects.requireNonNull(tenantClientFactory, "Tenant client factory cannot be null");
this.config = Objects.requireNonNull(config, "configuration cannot be null");
this.metrics = Objects.requireNonNull(metrics);
this.spanFactory = Objects.requireNonNull(spanFactory);
this.certValidator = new DeviceCertificateValidator();
this.usernamePasswordAuthProvider = usernamePasswordAuthProvider;
Expand Down Expand Up @@ -183,35 +192,52 @@ public void process(final Handler<Boolean> completionHandler) {
sasl.recv(saslResponse, 0, saslResponse.length);

buildSaslResponseContext(remoteMechanism, saslResponse)
.compose(saslResponseContext -> invokePreAuthenticationHandler(saslResponseContext, currentSpan))
.compose(saslResponseContext -> verify(saslResponseContext))
.onComplete(outcome -> {
if (outcome.succeeded()) {
currentSpan.log("credentials verified successfully");
// add span to connection so that it can be used during the
// remaining connection establishment process
protonConnection.attachments().set(AmqpAdapterConstants.KEY_CURRENT_SPAN, Span.class,
currentSpan);
final Device authenticatedDevice = outcome.result();
protonConnection.attachments().set(AmqpAdapterConstants.KEY_CLIENT_DEVICE, Device.class,
authenticatedDevice);
succeeded = true;
sasl.done(SaslOutcome.PN_SASL_OK);

} else {
TracingHelper.logError(currentSpan, outcome.cause());
currentSpan.finish();
LOG.debug("SASL handshake failed: {}", outcome.cause().getMessage());
.compose(saslResponseContext -> invokePreAuthenticationHandler(saslResponseContext, currentSpan))
.compose(saslResponseContext -> verify(saslResponseContext))
.onSuccess(deviceUser -> {
currentSpan.log("credentials verified successfully");
// do not finish span here
// instead, we add the span to the connection so that it can be used during the
// remaining connection establishment process
protonConnection.attachments().set(AmqpAdapterConstants.KEY_CURRENT_SPAN, Span.class,
currentSpan);
protonConnection.attachments().set(AmqpAdapterConstants.KEY_CLIENT_DEVICE, Device.class,
deviceUser);
// we do not report a succeeded authentication here already because some
// additional checks regarding resource limits need to be passed
// before we can consider connection establishment a success
succeeded = true;
sasl.done(SaslOutcome.PN_SASL_OK);
})
.onFailure(t -> {
TracingHelper.logError(currentSpan, t);
currentSpan.finish();
LOG.debug("SASL handshake failed", t);
if (t instanceof ClientErrorException) {
metrics.reportConnectionAttempt(
ConnectionAttemptOutcome.UNAUTHORIZED,
((ClientErrorException) t).getTenant());
sasl.done(SaslOutcome.PN_SASL_AUTH);
} else if (t instanceof LoginException) {
metrics.reportConnectionAttempt(
ConnectionAttemptOutcome.UNAUTHORIZED,
null);
sasl.done(SaslOutcome.PN_SASL_AUTH);
}

if (currentContext == null) {
completionHandler.handle(Boolean.TRUE);
} else {
// invoke the completion handler on the calling context.
currentContext.runOnContext(action -> completionHandler.handle(Boolean.TRUE));
}
});
} else {
metrics.reportConnectionAttempt(
ConnectionAttemptOutcome.UNAVAILABLE,
null);
sasl.done(SaslOutcome.PN_SASL_TEMP);
}
})
.onComplete(outcome -> {
if (currentContext == null) {
completionHandler.handle(Boolean.TRUE);
} else {
// invoke the completion handler on the calling context.
currentContext.runOnContext(action -> completionHandler.handle(Boolean.TRUE));
}
});
}

private Future<SaslResponseContext> buildSaslResponseContext(final String remoteMechanism,
Expand Down Expand Up @@ -262,7 +288,8 @@ private Future<DeviceUser> verify(final SaslResponseContext ctx) {
} else if (AuthenticationConstants.MECHANISM_EXTERNAL.equals(ctx.getRemoteMechanism())) {
return verifyExternal(ctx.getPeerCertificateChain());
} else {
return Future.failedFuture("Unsupported SASL mechanism: " + ctx.getRemoteMechanism());
return Future.failedFuture(new ClientErrorException(HttpURLConnection.HTTP_BAD_REQUEST,
"Unsupported SASL mechanism: " + ctx.getRemoteMechanism()));
}
}

Expand All @@ -286,7 +313,11 @@ private Future<DeviceUser> verifyPlain(final String[] saslResponseFields) {

final Promise<DeviceUser> authResult = Promise.promise();
usernamePasswordAuthProvider.authenticate(credentials, currentSpan.context(), authResult);
return authResult.future();
return authResult.future()
.recover(t -> Future.failedFuture(new AuthorizationException(
credentials.getTenantId(),
"validation of credentials using SASL PLAIN failed",
t)));
}

private Future<DeviceUser> verifyExternal(final Certificate[] peerCertificateChain) {
Expand Down Expand Up @@ -331,7 +362,11 @@ private Future<DeviceUser> verifyExternal(final Certificate[] peerCertificateCha
final SubjectDnCredentials credentials = SubjectDnCredentials.create(tenant.getTenantId(),
deviceCert.getSubjectX500Principal(), clientContext);
clientCertAuthProvider.authenticate(credentials, currentSpan.context(), authResult);
return authResult.future();
return authResult.future()
.recover(t -> Future.failedFuture(new AuthorizationException(
credentials.getTenantId(),
"validation of credentials using SASL EXTERNAL failed",
t)));
});
}

Expand Down
Expand Up @@ -50,6 +50,8 @@
import org.eclipse.hono.client.ServerErrorException;
import org.eclipse.hono.client.ServiceInvocationException;
import org.eclipse.hono.service.AbstractProtocolAdapterBase;
import org.eclipse.hono.service.AdapterConnectionsExceededException;
import org.eclipse.hono.service.AdapterDisabledException;
import org.eclipse.hono.service.auth.device.UsernamePasswordAuthProvider;
import org.eclipse.hono.service.auth.device.X509AuthProvider;
import org.eclipse.hono.service.limiting.ConnectionLimitManager;
Expand Down Expand Up @@ -175,6 +177,7 @@ protected void doStart(final Promise<Void> startPromise) {
authenticatorFactory = new AmqpAdapterSaslAuthenticatorFactory(
getTenantClientFactory(),
getConfig(),
getMetrics(),
() -> tracer.buildSpan("open connection")
.ignoreActiveSpan()
.withTag(Tags.SPAN_KIND.getKey(), Tags.SPAN_KIND_SERVER)
Expand Down Expand Up @@ -378,9 +381,6 @@ private void processRemoteOpen(final ProtonConnection con) {
}

checkConnectionLimitForAdapter()
.onFailure(ex -> {
metrics.reportConnectionAttempt(ConnectionAttemptOutcome.ADAPTER_CONNECTION_LIMIT_EXCEEDED);
})
.compose(ok -> checkAuthorizationAndResourceLimits(authenticatedDevice, con, span))
.compose(ok -> sendConnectedEvent(
Optional.ofNullable(con.getRemoteContainer()).orElse("unknown"),
Expand All @@ -391,12 +391,18 @@ private void processRemoteOpen(final ProtonConnection con) {
con.open();
log.debug("connection with device [container: {}] established", con.getRemoteContainer());
span.log("connection established");
metrics.reportConnectionAttempt(
ConnectionAttemptOutcome.SUCCEEDED,
Optional.ofNullable(authenticatedDevice).map(device -> device.getTenantId()).orElse(null));
return null;
})
.otherwise(t -> {
con.setCondition(getErrorCondition(t));
con.setCondition(AbstractProtocolAdapterBase.getErrorCondition(t));
con.close();
TracingHelper.logError(span, t);
metrics.reportConnectionAttempt(
AbstractProtocolAdapterBase.getOutcome(t),
Optional.ofNullable(authenticatedDevice).map(device -> device.getTenantId()).orElse(null));
return null;
})
.onComplete(s -> span.finish());
Expand Down Expand Up @@ -426,7 +432,11 @@ private Future<Void> checkAuthorizationAndResourceLimits(
checkDeviceRegistration(authenticatedDevice, span.context()),
getTenantConfiguration(authenticatedDevice.getTenantId(), span.context())
.compose(tenantConfig -> CompositeFuture.all(
isAdapterEnabled(tenantConfig),
isAdapterEnabled(tenantConfig).recover(t -> Future.failedFuture(
new AdapterDisabledException(
authenticatedDevice.getTenantId(),
"adapter is disabled for tenant",
t))),
checkConnectionLimit(tenantConfig, span.context()))))
.map(ok -> {
log.debug("{} is registered and enabled", authenticatedDevice);
Expand All @@ -438,8 +448,6 @@ private Future<Void> checkAuthorizationAndResourceLimits(

} else {
log.trace("received connection request from anonymous device [container: {}]", con.getRemoteContainer());
span.log(Map.of(Fields.EVENT, "connection request from anonymous device",
"container ID", con.getRemoteContainer()));
connectAuthorizationCheck.complete();
}

Expand Down Expand Up @@ -710,7 +718,7 @@ protected Future<ProtonDelivery> onMessageReceived(final AmqpContext ctx) {
return d;
}).recover(t -> {
if (t instanceof ClientErrorException) {
final ErrorCondition condition = getErrorCondition(t);
final ErrorCondition condition = AbstractProtocolAdapterBase.getErrorCondition(t);
MessageHelper.rejected(ctx.delivery(), condition);
} else {
ProtonHelper.released(ctx.delivery(), true);
Expand Down Expand Up @@ -1315,12 +1323,13 @@ private static OptionalInt getTraceSamplingPriority(final ProtonConnection con)
}

private Future<Void> checkConnectionLimitForAdapter() {
final Promise<Void> result = Promise.promise();
if (getConnectionLimitManager() != null && getConnectionLimitManager().isLimitExceeded()) {
//The error code is set so to be in sync with that of the tenant level connection limit.
return Future.failedFuture(new ClientErrorException(HttpURLConnection.HTTP_FORBIDDEN,
"connection limit for the adapter exceeded"));
result.fail(new AdapterConnectionsExceededException(null, "connection limit for the adapter exceeded", null));
} else {
result.complete();
}
return Future.succeededFuture();
return result.future();
}
// -------------------------------------------< AbstractServiceBase >---

Expand Down
Expand Up @@ -1138,6 +1138,9 @@ public void testConnectionFailsIfTenantLevelConnectionLimitIsExceeded() {
final ArgumentCaptor<ErrorCondition> errorConditionCaptor = ArgumentCaptor.forClass(ErrorCondition.class);
verify(deviceConnection).setCondition(errorConditionCaptor.capture());
assertEquals(AmqpError.UNAUTHORIZED_ACCESS, errorConditionCaptor.getValue().getCondition());
verify(metrics).reportConnectionAttempt(
ConnectionAttemptOutcome.TENANT_CONNECTIONS_EXCEEDED,
TEST_TENANT_ID);
}

/**
Expand Down Expand Up @@ -1166,6 +1169,9 @@ public void testConnectionFailsIfAdapterIsDisabled() {
final ArgumentCaptor<ErrorCondition> errorConditionCaptor = ArgumentCaptor.forClass(ErrorCondition.class);
verify(deviceConnection).setCondition(errorConditionCaptor.capture());
assertEquals(AmqpError.UNAUTHORIZED_ACCESS, errorConditionCaptor.getValue().getCondition());
verify(metrics).reportConnectionAttempt(
ConnectionAttemptOutcome.ADAPTER_DISABLED,
TEST_TENANT_ID);
}

/**
Expand Down Expand Up @@ -1205,7 +1211,7 @@ public void testConnectionFailsForAuthenticatedDeviceIfAdapterLevelConnectionLim
assertEquals(AmqpError.UNAUTHORIZED_ACCESS, errorConditionCaptor.getValue().getCondition());
// AND the connection count should be decremented accordingly when the connection is closed
metricsInOrderVerifier.verify(metrics).decrementConnections(TEST_TENANT_ID);
verify(metrics).reportConnectionAttempt(ConnectionAttemptOutcome.ADAPTER_CONNECTION_LIMIT_EXCEEDED);
verify(metrics).reportConnectionAttempt(ConnectionAttemptOutcome.ADAPTER_CONNECTIONS_EXCEEDED, TEST_TENANT_ID);
}

/**
Expand Down Expand Up @@ -1242,7 +1248,7 @@ public void testConnectionFailsForUnauthenticatedDeviceIfAdapterLevelConnectionL
assertEquals(AmqpError.UNAUTHORIZED_ACCESS, errorConditionCaptor.getValue().getCondition());
// AND the connection count should be decremented accordingly when the connection is closed
metricsInOrderVerifier.verify(metrics).decrementUnauthenticatedConnections();
verify(metrics).reportConnectionAttempt(ConnectionAttemptOutcome.ADAPTER_CONNECTION_LIMIT_EXCEEDED);
verify(metrics).reportConnectionAttempt(ConnectionAttemptOutcome.ADAPTER_CONNECTIONS_EXCEEDED, null);
}

private String getCommandEndpoint() {
Expand Down
Expand Up @@ -25,6 +25,7 @@
import java.util.concurrent.TimeUnit;

import org.apache.qpid.proton.message.Message;
import org.eclipse.hono.adapter.http.HttpAdapterMetrics;
import org.eclipse.hono.adapter.http.HttpProtocolAdapterProperties;
import org.eclipse.hono.client.ClientErrorException;
import org.eclipse.hono.client.Command;
Expand Down Expand Up @@ -62,6 +63,7 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import io.micrometer.core.instrument.Timer;
import io.opentracing.Span;
import io.opentracing.SpanContext;
import io.vertx.core.AsyncResult;
Expand Down Expand Up @@ -180,6 +182,9 @@ public void deployAdapter(final VertxTestContext ctx) {

usernamePasswordAuthProvider = mock(HonoClientBasedAuthProvider.class);

final HttpAdapterMetrics metrics = mock(HttpAdapterMetrics.class);
when(metrics.startTimer()).thenReturn(Timer.start());

config = new HttpProtocolAdapterProperties();
config.setInsecurePort(0);
config.setInsecurePortBindAddress(HOST);
Expand All @@ -195,6 +200,7 @@ public void deployAdapter(final VertxTestContext ctx) {
httpAdapter.setDeviceConnectionClientFactory(deviceConnectionClientFactory);
httpAdapter.setCommandTargetMapper(commandTargetMapper);
httpAdapter.setUsernamePasswordAuthProvider(usernamePasswordAuthProvider);
httpAdapter.setMetrics(metrics);

vertx.deployVerticle(httpAdapter, ctx.succeeding(deploymentId -> {
final WebClientOptions options = new WebClientOptions()
Expand Down

0 comments on commit a24e26d

Please sign in to comment.