Skip to content

Commit

Permalink
Improve Command & Control related tracing.
Browse files Browse the repository at this point in the history
The HTTP adapter now tracks the reception of a
command in a separate span.

Signed-off-by: Carsten Lohmann <carsten.lohmann@bosch.io>
  • Loading branch information
calohmn committed Sep 4, 2020
1 parent c349b86 commit a4e6a96
Showing 1 changed file with 55 additions and 39 deletions.
Expand Up @@ -32,6 +32,7 @@
import org.eclipse.hono.client.CommandResponse;
import org.eclipse.hono.client.DownstreamSender;
import org.eclipse.hono.client.ProtocolAdapterCommandConsumer;
import org.eclipse.hono.client.impl.CommandConsumer;
import org.eclipse.hono.service.AbstractProtocolAdapterBase;
import org.eclipse.hono.service.http.ComponentMetaDataDecorator;
import org.eclipse.hono.service.http.DefaultFailureHandler;
Expand All @@ -55,6 +56,7 @@

import io.micrometer.core.instrument.Timer.Sample;
import io.opentracing.Span;
import io.opentracing.SpanContext;
import io.opentracing.tag.Tags;
import io.vertx.core.AsyncResult;
import io.vertx.core.CompositeFuture;
Expand Down Expand Up @@ -700,13 +702,19 @@ private void doUploadMessage(
}
})
.map(proceed -> {
// downstream message sent and (if ttd was set) command was received or ttd has timed out
final Future<Void> commandConsumerClosedTracker = commandConsumerTracker.result() != null
? commandConsumerTracker.result().close(currentSpan.context())
: Future.succeededFuture();

if (ctx.response().closed()) {
log.debug("failed to send http response for [{}] message from device [tenantId: {}, deviceId: {}]: response already closed",
endpoint, tenant, deviceId);
TracingHelper.logError(currentSpan, "failed to send HTTP response to device: response already closed");
currentSpan.finish();
ctx.response().end(); // close the response here, ensuring that the TracingHandler bodyEndHandler gets called
commandConsumerClosedTracker.onComplete(res -> {
currentSpan.finish();
ctx.response().end(); // close the response here, ensuring that the TracingHandler bodyEndHandler gets called
});
} else {
final CommandContext commandContext = ctx.get(CommandContext.KEY_COMMAND_CONTEXT);
setResponsePayload(ctx.response(), commandContext, currentSpan);
Expand All @@ -733,19 +741,14 @@ private void doUploadMessage(
payloadSize,
ctx.getTtdStatus(),
getMicrometerSample(ctx.getRoutingContext()));
// Before Hono 1.2, closing the consumer needed to be done AFTER having accepted a command (consumer used for single request only);
// now however, this isn't needed anymore (consumer.close() doesn't actually close the link anymore). So, this could be changed here to close the consumer earlier already.
Optional.ofNullable(commandConsumerTracker.result()).ifPresentOrElse(
consumer -> consumer.close(currentSpan.context())
.onComplete(res -> currentSpan.finish()),
currentSpan::finish);
commandConsumerClosedTracker.onComplete(res -> currentSpan.finish());
});
ctx.response().exceptionHandler(t -> {
log.debug("failed to send http response for [{}] message from device [tenantId: {}, deviceId: {}]",
endpoint, tenant, deviceId, t);
if (commandContext != null) {
commandContext.getTracingSpan().log("failed to forward command to device in HTTP response body");
TracingHelper.logError(commandContext.getTracingSpan(), t);
TracingHelper.logError(commandContext.getTracingSpan(),
"failed to forward command to device in HTTP response body", t);
commandContext.release();
metrics.reportCommand(
commandContext.getCommand().isOneWay() ? Direction.ONE_WAY : Direction.REQUEST,
Expand All @@ -757,12 +760,7 @@ private void doUploadMessage(
}
currentSpan.log("failed to send HTTP response to device");
TracingHelper.logError(currentSpan, t);
// Before Hono 1.2, closing the consumer needed to be done AFTER having released a command (consumer used for single request only);
// now however, this isn't needed anymore (consumer.close() doesn't actually close the link anymore). So, this could be changed here to close the consumer earlier already.
Optional.ofNullable(commandConsumerTracker.result()).ifPresentOrElse(
consumer -> consumer.close(currentSpan.context())
.onComplete(res -> currentSpan.finish()),
currentSpan::finish);
commandConsumerClosedTracker.onComplete(res -> currentSpan.finish());
});
ctx.response().end();
}
Expand All @@ -775,9 +773,15 @@ private void doUploadMessage(
log.debug("cannot process [{}] message from device [tenantId: {}, deviceId: {}]",
endpoint, tenant, deviceId, t);
final boolean responseClosedPrematurely = ctx.response().closed();
final Future<Void> commandConsumerClosedTracker = commandConsumerTracker.result() != null
? commandConsumerTracker.result().close(currentSpan.context())
: Future.succeededFuture();
final CommandContext commandContext = ctx.get(CommandContext.KEY_COMMAND_CONTEXT);
if (commandContext != null) {
TracingHelper.logError(commandContext.getTracingSpan(),
"command won't be forwarded to device in HTTP response body, HTTP request handling failed", t);
commandContext.release();
currentSpan.log("released command for device");
}
final ProcessingOutcome outcome;
if (ClientErrorException.class.isInstance(t)) {
Expand All @@ -802,12 +806,7 @@ private void doUploadMessage(
ctx.getTtdStatus(),
getMicrometerSample(ctx.getRoutingContext()));
TracingHelper.logError(currentSpan, t);
// Before Hono 1.2, closing the consumer needed to be done AFTER having released a command (consumer used for single request only);
// now however, this isn't needed anymore (consumer.close() doesn't actually close the link anymore). So, this could be changed here to close the consumer earlier already.
Optional.ofNullable(commandConsumerTracker.result()).ifPresentOrElse(
consumer -> consumer.close(currentSpan.context())
.onComplete(res -> currentSpan.finish()),
currentSpan::finish);
commandConsumerClosedTracker.onComplete(res -> currentSpan.finish());
return Future.failedFuture(t);
});
}
Expand Down Expand Up @@ -940,7 +939,7 @@ protected void setNonEmptyResponsePayload(final HttpServerResponse response, fin
* completed or</li>
* <li>the ttd has expired</li>
* </ul>
* @param currentSpan The OpenTracing Span to use for tracking the processing
* @param uploadMessageSpan The OpenTracing Span used for tracking the processing
* of the request.
* @return A future indicating the outcome of the operation.
* <p>
Expand All @@ -958,13 +957,13 @@ protected final Future<ProtocolAdapterCommandConsumer> createCommandConsumer(
final String gatewayId,
final RoutingContext ctx,
final Handler<AsyncResult<Void>> responseReady,
final Span currentSpan) {
final Span uploadMessageSpan) {

Objects.requireNonNull(tenantObject);
Objects.requireNonNull(deviceId);
Objects.requireNonNull(ctx);
Objects.requireNonNull(responseReady);
Objects.requireNonNull(currentSpan);
Objects.requireNonNull(uploadMessageSpan);

final AtomicBoolean requestProcessed = new AtomicBoolean(false);

Expand All @@ -975,24 +974,34 @@ protected final Future<ProtocolAdapterCommandConsumer> createCommandConsumer(
}
return Future.succeededFuture();
}
uploadMessageSpan.setTag(MessageHelper.APP_PROPERTY_DEVICE_TTD, ttdSecs);

final Span waitForCommandSpan = TracingHelper
.buildChildSpan(tracer, uploadMessageSpan.context(),
"wait for command", getTypeName())
.withTag(Tags.SPAN_KIND.getKey(), Tags.SPAN_KIND_CLIENT)
.withTag(TracingHelper.TAG_TENANT_ID, tenantObject.getTenantId())
.withTag(TracingHelper.TAG_DEVICE_ID, deviceId)
.start();

currentSpan.setTag(MessageHelper.APP_PROPERTY_DEVICE_TTD, ttdSecs);
final Handler<CommandContext> commandHandler = commandContext -> {

Tags.COMPONENT.set(commandContext.getTracingSpan(), getTypeName());
final Command command = commandContext.getCommand();
CommandConsumer.logReceivedCommandToSpan(command, waitForCommandSpan);
final Sample commandSample = getMetrics().startTimer();
if (isCommandValid(command, currentSpan)) {
if (isCommandValid(command, waitForCommandSpan)) {

if (requestProcessed.compareAndSet(false, true)) {
checkMessageLimit(tenantObject, command.getPayloadSize(), currentSpan.context())
checkMessageLimit(tenantObject, command.getPayloadSize(), waitForCommandSpan.context())
.onComplete(result -> {
if (result.succeeded()) {
addMicrometerSample(commandContext, commandSample);
// put command context to routing context and notify
ctx.put(CommandContext.KEY_COMMAND_CONTEXT, commandContext);
} else {
commandContext.reject(getErrorCondition(result.cause()));
TracingHelper.logError(waitForCommandSpan, "rejected command for device", result.cause());
metrics.reportCommand(
command.isOneWay() ? Direction.ONE_WAY : Direction.REQUEST,
tenantObject.getTenantId(),
Expand All @@ -1006,16 +1015,17 @@ protected final Future<ProtocolAdapterCommandConsumer> createCommandConsumer(
responseReady.handle(Future.succeededFuture());
});
} else {
// the timer has already fired, release the command
log.debug("waiting time for command has elapsed or another command has already been processed [tenantId: {}, deviceId: {}]",
tenantObject.getTenantId(), deviceId);
getMetrics().reportCommand(
command.isOneWay() ? Direction.ONE_WAY : Direction.REQUEST,
tenantObject.getTenantId(),
tenantObject,
ProcessingOutcome.UNDELIVERABLE,
command.getPayloadSize(),
commandSample);
log.debug("command for device has already fired [tenantId: {}, deviceId: {}]",
tenantObject.getTenantId(), deviceId);
TracingHelper.logError(commandContext.getTracingSpan(),
"waiting time for command has elapsed or another command has already been processed");
commandContext.release();
}

Expand All @@ -1030,9 +1040,6 @@ protected final Future<ProtocolAdapterCommandConsumer> createCommandConsumer(
log.debug("command message is invalid: {}", command);
commandContext.reject(new ErrorCondition(Constants.AMQP_BAD_REQUEST, "malformed command message"));
}
// we do not issue any new credit because the
// consumer is supposed to deliver a single command
// only per HTTP request
};

final Future<ProtocolAdapterCommandConsumer> commandConsumerFuture;
Expand All @@ -1044,22 +1051,28 @@ protected final Future<ProtocolAdapterCommandConsumer> createCommandConsumer(
gatewayId,
commandHandler,
Duration.ofSeconds(ttdSecs),
currentSpan.context());
waitForCommandSpan.context());
} else {
commandConsumerFuture = getCommandConsumerFactory().createCommandConsumer(
tenantObject.getTenantId(),
deviceId,
commandHandler,
Duration.ofSeconds(ttdSecs),
currentSpan.context());
waitForCommandSpan.context());
}
return commandConsumerFuture
.map(consumer -> {
if (!requestProcessed.get()) {
// if the request was not responded already, add a timer for triggering an empty response
addCommandReceptionTimer(ctx, requestProcessed, responseReady, ttdSecs);
addCommandReceptionTimer(ctx, requestProcessed, responseReady, ttdSecs, waitForCommandSpan);
}
return consumer;
// wrap the consumer so that when it is closed, the waitForCommandSpan will be finished as well
return new ProtocolAdapterCommandConsumer() {
@Override
public Future<Void> close(final SpanContext ignored) {
return consumer.close(waitForCommandSpan.context()).onComplete(ar -> waitForCommandSpan.finish());
}
};
});
}

Expand Down Expand Up @@ -1087,12 +1100,14 @@ protected boolean isCommandValid(final Command command, final Span currentSpan)
* @param ctx The device's currently executing HTTP request.
* @param responseReady The future to complete when the time has expired.
* @param delaySecs The number of seconds to wait for a command.
* @param waitForCommandSpan The span tracking the command reception.
*/
private void addCommandReceptionTimer(
final RoutingContext ctx,
final AtomicBoolean requestProcessed,
final Handler<AsyncResult<Void>> responseReady,
final long delaySecs) {
final long delaySecs,
final Span waitForCommandSpan) {

final Long timerId = ctx.vertx().setTimer(delaySecs * 1000L, id -> {

Expand All @@ -1102,6 +1117,7 @@ private void addCommandReceptionTimer(
// no command to be sent,
// send empty response
setTtdStatus(ctx, TtdStatus.EXPIRED);
waitForCommandSpan.log(String.format("time to wait for command expired (%ds)", delaySecs));
responseReady.handle(Future.succeededFuture());
} else {
// a command has been sent to the device already
Expand Down

0 comments on commit a4e6a96

Please sign in to comment.