Skip to content

Commit

Permalink
Merge pull request #883 from bosch-io/feature/weak-ack
Browse files Browse the repository at this point in the history
Send weak acknowledgements for dropped signals
  • Loading branch information
thjaeckle authored Nov 17, 2020
2 parents 04f7d04 + 093c82c commit 9ddab07
Show file tree
Hide file tree
Showing 162 changed files with 2,949 additions and 1,735 deletions.
2 changes: 1 addition & 1 deletion bom/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@
<jmh.version>1.21</jmh.version>

<scalatest.version>3.1.2</scalatest.version>
<flapdoodle.version>2.2.0</flapdoodle.version>
<flapdoodle.version>3.0.0</flapdoodle.version>
<system-rules.version>1.19.0</system-rules.version>
</properties>

Expand Down
4 changes: 2 additions & 2 deletions documentation/src/main/resources/_data/authors.yml
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
thomas_jaeckle:
name: Thomas Jäckle
email: thomas.jaeckle@bosch.io
web: http://twitter.com/thjaeckle
web: https://github.com/thjaeckle

florian_fendt:
name: Florian Fendt
Expand Down Expand Up @@ -38,7 +38,7 @@ yannic_klem:
email: yannic.klem@bosch.io
web: https://github.com/Yannic92

joos_david:
david_joos:
name: David Joos
email: david.joos@bosch.io
web: https://github.com/joosdavid
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -290,8 +290,10 @@ So the device can provide its alternative options to open the barrier after 5 se
I hope I could demonstrate the power of the new acknowledgement feature properly and could make it clear how it can be used.
Maybe you did recognize some of your use cases in the given examples or maybe you have another use case which can or cannot be solved by this feature.

We would love to get your feedback.

We would love to get your [feedback](feedback.html).

<br/>
<br/>
{% include image.html file="ditto.svg" alt="Ditto" max-width=500 %}
--<br/>
The Eclipse Ditto team
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,8 @@ With the upcoming release of Eclipse Ditto **version 1.5.0**
**API versions later than 1**. The _desired properties_ for features are added on the same level of the model as
the feature properties and can reflect for example feature property updates ,which are intended, but not yet applied.

_Further logics for desired feature properties might be implemented in future Ditto versions._
{% include note.html content="Further logics for desired feature properties might be implemented in future Ditto
versions." %}

A fully-fledged JSON representation of a feature with desired properties is shown below:

Expand Down Expand Up @@ -67,7 +68,7 @@ A fully-fledged JSON representation of a feature with desired properties is show
events you want to receive, for changes done to the desired properties.

### Executing CRUD operations on desired feature properties
CRUD operations can be executed either via the [Ditto HTTP API](httpapi-concepts.html) <b>versions later than 1</b> or via
CRUD operations can be executed either via the [Ditto HTTP API](httpapi-concepts.html) **versions later than 1** or via
[ditto-protocol](protocol-overview.html) messages.

_Possible CRUD operations for desired feature properties via ditto-protocol_:
Expand Down Expand Up @@ -110,6 +111,12 @@ final Adaptable modifyFeatureDesiredProperties =
}));
```

## Feedback?

Please [get in touch](feedback.html) if you have feedback or questions towards this new concept of desired properties.

<br/>
<br/>
{% include image.html file="ditto.svg" alt="Ditto" max-width=500 %}
--<br/>
The Eclipse Ditto team
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
---
title: "Weak acknowledgments to decouple signal publishers and subscribers"
published: true
permalink: 2020-11-16-weak-acknowledgements.html
layout: post
author: yufei_cai
tags: [blog]
hide_sidebar: true
sidebar: false
toc: true
---

## Motivation

[Ditto 1.2.0](2020-08-31-release-announcement-120.html) introduced at-least-once delivery via
[acknowledgement requests](basic-acknowledgements.html).<br/>
It increased coupling between the publisher and the subscriber of signals in that the subscriber is no longer at the
liberty to filter for signals it is interested in. Instead, the subscriber must consume all signals in order to
fulfill acknowledgement requests and prevent endless redelivery.

To combat the problem,
[Ditto 1.4.0](2020-10-28-release-announcement-140.html) made acknowledgement labels unique and introduced the requirement
to manage [_declared acknowledgements_](basic-acknowledgements.html#issuing-acknowledgements), identifying of each
subscriber.<br/>
It is now possible for Ditto to issue
[_weak acknowledgements_](basic-acknowledgements.html#weak-acknowledgements-wacks) on behalf of the subscriber
whenever it decides to not consume a signal. That allows subscribers to configure RQL and namespace filters freely
without causing any futile redelivery.

{% include note.html content="Weak acknowledgements are available since Ditto 1.5.0." %}

## What it is

A [_weak acknowledgement_](basic-acknowledgements.html#weak-acknowledgements-wacks) is issued by Ditto for any
[acknowledgement request](basic-acknowledgements.html#requesting-acks) that will not be fulfilled now or ever without
configuration change.<br/>
A weak acknowledgement is identified by the header `ditto-weak-ack: true`.

The status code of weak acknowledgements is `200 OK`; it signifies that any redelivery is not to be made on their
account.

A weak acknowledgement may look like this in Ditto protocol:
```json
{
"topic": "com.acme/xdk_53/things/twin/acks/my-mqtt-connection:my-mqtt-topic",
"headers": {
"ditto-weak-ack": true
},
"path": "/",
"value": "Acknowledgement was issued automatically, because the subscriber is not authorized to receive the signal.",
"status": 200
}
```

## How it works

Since Ditto 1.4.0, subscribers of _twin events_ or _live signals_ are required to declare unique acknowledgement labels
they are allowed to send. The labels of acknowledgement requests are then identifying the intended subscribers.<br/>
If the intended subscriber exists but does not receive the signal for non-transient reasons, Ditto issues
a weak acknowledgement for that subscriber.<br/>
Such reasons may be:
- The intended subscriber **is not authorized** to receive the signal by policy;
- The intended subscriber did not subscribe for the signal type (_twin event, live command, live event or live message_);
- The intended subscriber filtered the signal out by its [namespace or RQL filter](basic-changenotifications.html#filtering);
- The intended subscriber dropped the signal because its [payload mapper](connectivity-mapping.html) produced nothing.

## Limitation

The distributed nature of cluster pub/sub means that weak acknowledgements are not always issued correctly.<br/>
They are only _eventually correct_ in the sense that some time after a change to the publisher-subscriber pair,
the issued weak acknowledgements will reflect the change.<br/>
Such changes include:
- Opening and closing of Websocket or other connections acting as the subscriber;
- Subscribing and unsubscribing for different signal types via Websocket;
- Modification of connections via the [connectivity API](connectivity-manage-connections.html);
- Migration of a connection from one Ditto cluster member to another due to load balancing.

## Feedback?

Please [get in touch](feedback.html) if you have feedback or questions towards this new concept of weak
acknowledgements.

<br/>
<br/>
{% include image.html file="ditto.svg" alt="Ditto" max-width=500 %}
--<br/>
The Eclipse Ditto team
Original file line number Diff line number Diff line change
Expand Up @@ -324,6 +324,17 @@ event which was consumed by an application integrating with Ditto, or that a liv
without any live or message response, [request the acknowledgement](#requesting-acks) for a
[custom acknowledgement label](#custom-acknowledgement-labels).

## Weak Acknowledgements (WACKs)

Since there are scenarios where the subscriber of events or live messages has defined an RQL filter or is not allowed to receive an event by a policy, it is not always possible that an acknowledgement can be provided.
To avoid that a command fails because of a missing acknowledgement for those reasons, we introduced weak acknowledgements.

These weak acknowledgements are issued automatically by ditto, in case a message or an event is filtered by a subscriber which declared to provide one or more of the requested acknowledgements for the command.
A weak acknowledgement can be identified by checking the header with value `ditto-weak-ack`.
Weak acknowledgements have this header set to `true`.

These weak acknowledgements do not cause redelivery of messages consumed by a Connection.

## Interaction between headers
Three headers control how Ditto responds to a command: `response-required`, `requested-acks`, `timeout`.
* `response-required`: `true` or `false`.<br/>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@ There are some pre-defined headers, which have a special meaning for Ditto:
| `If-None-Match` | Has the same semantics as defined for the [HTTP API](httpapi-concepts.html#conditional-requests). | `String` |
| `response-required` | Configures for a **command** whether or not a **response** should be sent back. | `Boolean` - default: `true` |
| `requested-acks` | Defines which [acknowledgements](basic-acknowledgements.html) are requested for a command processed by Ditto. | `JsonArray` of `String` - default: `["twin-persisted"]` |
| `ditto-weak-ack` | Marks [weak acknowledgements](basic-acknowledgements.html) issued by Ditto. | `Boolean` - default: `false` |
| `timeout` | Defines how long the Ditto server should wait, e.g. applied when waiting for requested acknowledgements. | `String` - e.g.: `42s` or `250ms` or `1m` - default: `60s`|
| `version` | Determines in which schema version the `payload` should be interpreted. | `Number` - currently: \[1,2\] |
| `put-metadata` | Determines which Metadata information is stored in the thing. | `JsonArray` of `JsonObject`s containing [metadata](basic-metadata.html) to apply. |
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -286,7 +286,16 @@ public enum DittoHeaderDefinition implements HeaderDefinition {
* @since 1.3.0
*/
ALLOW_POLICY_LOCKOUT("allow-policy-lockout", boolean.class, true, false,
HeaderValueValidators.getBooleanValidator());
HeaderValueValidators.getBooleanValidator()),

/**
* Header definition to identify a weak acknowledgement.
* Weak acknowledgements are issued by the service in case a subscriber could not provide the acknowledgement
* because of missing permissions or rql filtering.
*
* @since 1.5.0
*/
WEAK_ACK("ditto-weak-ack", boolean.class, false, true, HeaderValueValidators.getBooleanValidator());

/**
* Map to speed up lookup of header definition by key.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,7 @@ public final class ImmutableDittoHeadersTest {
private static final JsonValue KNOWN_METADATA_VALUE = JsonValue.of("knownMetadata");
private static final MetadataHeaders KNOWN_METADATA_HEADERS;
private static final boolean KNOWN_ALLOW_POLICY_LOCKOUT = true;
private static final boolean KNOWN_IS_WEAK_ACK = false;

static {
KNOWN_METADATA_HEADERS = MetadataHeaders.newInstance();
Expand Down Expand Up @@ -165,6 +166,7 @@ public void settingAllKnownHeadersWorksAsExpected() {
.putHeader(DittoHeaderDefinition.CONNECTION_ID.getKey(), KNOWN_CONNECTION_ID)
.expectedResponseTypes(KNOWN_EXPECTED_RESPONSE_TYPES)
.allowPolicyLockout(KNOWN_ALLOW_POLICY_LOCKOUT)
.putHeader(DittoHeaderDefinition.WEAK_ACK.getKey(), String.valueOf(KNOWN_IS_WEAK_ACK))
.build();

assertThat(underTest).isEqualTo(expectedHeaderMap);
Expand Down Expand Up @@ -384,6 +386,7 @@ public void toJsonReturnsExpected() {
charSequencesToJsonArray(KNOWN_EXPECTED_RESPONSE_TYPES))
.set(DittoHeaderDefinition.PUT_METADATA.getKey(), KNOWN_METADATA_HEADERS.toJson())
.set(DittoHeaderDefinition.ALLOW_POLICY_LOCKOUT.getKey(), KNOWN_ALLOW_POLICY_LOCKOUT)
.set(DittoHeaderDefinition.WEAK_ACK.getKey(), KNOWN_IS_WEAK_ACK)
.build();
final Map<String, String> allKnownHeaders = createMapContainingAllKnownHeaders();

Expand Down Expand Up @@ -591,6 +594,7 @@ private static Map<String, String> createMapContainingAllKnownHeaders() {
charSequencesToJsonArray(KNOWN_EXPECTED_RESPONSE_TYPES).toString());
result.put(DittoHeaderDefinition.PUT_METADATA.getKey(), KNOWN_METADATA_HEADERS.toJsonString());
result.put(DittoHeaderDefinition.ALLOW_POLICY_LOCKOUT.getKey(), String.valueOf(KNOWN_ALLOW_POLICY_LOCKOUT));
result.put(DittoHeaderDefinition.WEAK_ACK.getKey(), String.valueOf(KNOWN_IS_WEAK_ACK));

return result;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@

import javax.annotation.Nullable;

import org.eclipse.ditto.services.utils.akka.LogUtil;
import org.eclipse.ditto.services.utils.akka.logging.DittoLoggerFactory;

import akka.actor.AbstractActorWithTimers;
import akka.actor.ActorRef;
Expand All @@ -37,6 +37,7 @@
* <p>
* This actor will never survive beyond the given timeout duration.
* </ol>
*
* @param <T> the type of the messages this aggregator aggregates
*/
final class MessageAggregator<T> extends AbstractActorWithTimers {
Expand All @@ -46,7 +47,7 @@ final class MessageAggregator<T> extends AbstractActorWithTimers {
*/
static final Object TIMEOUT = new AskTimeoutException("MessageAggregator.TIMEOUT");

private final DiagnosticLoggingAdapter log = LogUtil.obtain(this);
private final DiagnosticLoggingAdapter log = DittoLoggerFactory.getDiagnosticLoggingAdapter(this);

private final ActorRef initialReceiver;
private final Class<T> messageClass;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,11 +36,11 @@
import org.eclipse.ditto.services.models.concierge.pubsub.LiveSignalPub;
import org.eclipse.ditto.services.models.concierge.streaming.StreamingType;
import org.eclipse.ditto.services.models.policies.Permission;
import org.eclipse.ditto.services.utils.akka.LogUtil;
import org.eclipse.ditto.services.utils.cache.Cache;
import org.eclipse.ditto.services.utils.cache.EntityIdWithResourceType;
import org.eclipse.ditto.services.utils.cache.entry.Entry;
import org.eclipse.ditto.services.utils.pubsub.DistributedPub;
import org.eclipse.ditto.services.utils.pubsub.extractors.AckExtractor;
import org.eclipse.ditto.signals.base.Signal;
import org.eclipse.ditto.signals.commands.base.Command;
import org.eclipse.ditto.signals.commands.base.CommandResponse;
Expand All @@ -50,7 +50,6 @@
import org.eclipse.ditto.signals.commands.things.ThingCommand;
import org.eclipse.ditto.signals.commands.things.exceptions.EventSendNotAllowedException;
import org.eclipse.ditto.signals.commands.things.exceptions.ThingNotAccessibleException;
import org.eclipse.ditto.signals.events.base.Event;
import org.eclipse.ditto.signals.events.things.ThingEvent;

import akka.actor.ActorRef;
Expand All @@ -61,6 +60,13 @@
*/
public final class LiveSignalEnforcement extends AbstractEnforcement<Signal<?>> {

private static final AckExtractor<ThingCommand<?>> THING_COMMAND_ACK_EXTRACTOR =
AckExtractor.of(ThingCommand::getEntityId, ThingCommand::getDittoHeaders);
private static final AckExtractor<ThingEvent<?>> THING_EVENT_ACK_EXTRACTOR =
AckExtractor.of(ThingEvent::getEntityId, ThingEvent::getDittoHeaders);
private static final AckExtractor<MessageCommand<?, ?>> MESSAGE_COMMAND_ACK_EXTRACTOR =
AckExtractor.of(MessageCommand::getEntityId, MessageCommand::getDittoHeaders);

private final EnforcerRetriever enforcerRetriever;
private final LiveSignalPub liveSignalPub;

Expand Down Expand Up @@ -130,7 +136,6 @@ public AbstractEnforcement<Signal<?>> createEnforcement(final Contextual<Signal<
@Override
public CompletionStage<Contextual<WithDittoHeaders>> enforce() {
final Signal<?> liveSignal = signal();
LogUtil.enhanceLogWithCorrelationIdOrRandom(liveSignal);
return enforcerRetriever.retrieve(entityId(), (enforcerKeyEntry, enforcerEntry) -> {
try {
return doEnforce(liveSignal, enforcerEntry);
Expand Down Expand Up @@ -224,10 +229,10 @@ private CompletionStage<Contextual<WithDittoHeaders>> enforceLiveSignal(final St
} else {
ThingCommandEnforcement.authorizeByPolicyOrThrow(enforcer, (ThingCommand<?>) liveSignal);
}
final Command<?> withReadSubjects =
addEffectedReadSubjectsToThingSignal((Command<?>) liveSignal, enforcer);
final ThingCommand<?> withReadSubjects =
addEffectedReadSubjectsToThingSignal((ThingCommand<?>) liveSignal, enforcer);
log(withReadSubjects).info("Live Command was authorized: <{}>", withReadSubjects);
return publishLiveSignal(withReadSubjects, liveSignalPub.command());
return publishLiveSignal(withReadSubjects, THING_COMMAND_ACK_EXTRACTOR, liveSignalPub.command());
default:
log(liveSignal).warning("Ignoring unsupported command signal: <{}>", liveSignal);
throw UnknownCommandException.newBuilder(liveSignal.getName())
Expand All @@ -249,8 +254,9 @@ private CompletionStage<Contextual<WithDittoHeaders>> enforceLiveEvent(final Sig

if (authorized) {
log(liveSignal).info("Live Event was authorized: <{}>", liveSignal);
final Event<?> withReadSubjects = addEffectedReadSubjectsToThingSignal((Event<?>) liveSignal, enforcer);
return publishLiveSignal(withReadSubjects, liveSignalPub.event());
final ThingEvent<?> withReadSubjects =
addEffectedReadSubjectsToThingSignal((ThingEvent<?>) liveSignal, enforcer);
return publishLiveSignal(withReadSubjects, THING_EVENT_ACK_EXTRACTOR, liveSignalPub.event());
} else {
log(liveSignal).info("Live Event was NOT authorized: <{}>", liveSignal);
throw EventSendNotAllowedException.newBuilder(((ThingEvent<?>) liveSignal).getThingEntityId())
Expand Down Expand Up @@ -290,8 +296,8 @@ private CompletionStage<Contextual<WithDittoHeaders>> publishMessageCommand(fina
.readRevokedSubjects(effectedSubjects.getRevoked())
.build();

final MessageCommand<?, ?> commandWithReadSubjects = command.setDittoHeaders(headersWithReadSubjects);
return publishLiveSignal(commandWithReadSubjects, liveSignalPub.message());
final MessageCommand<?, ?> withReadSubjects = command.setDittoHeaders(headersWithReadSubjects);
return publishLiveSignal(withReadSubjects, MESSAGE_COMMAND_ACK_EXTRACTOR, liveSignalPub.message());
}

private MessageSendNotAllowedException rejectMessageCommand(final MessageCommand<?, ?> command) {
Expand All @@ -308,13 +314,16 @@ private MessageSendNotAllowedException rejectMessageCommand(final MessageCommand
}

@SuppressWarnings("unchecked")
private <T extends Signal<?>> CompletionStage<Contextual<WithDittoHeaders>> publishLiveSignal(final T signal,
private <T extends Signal<?>, S extends T> CompletionStage<Contextual<WithDittoHeaders>> publishLiveSignal(
final S signal,
final AckExtractor<S> ackExtractor,
final DistributedPub<T> pub) {

// using pub/sub to publish the command to any interested parties (e.g. a Websocket):
log(signal).debug("Publish message to pub-sub");
return addToResponseReceiver(signal).thenApply(newSignal ->
withMessageToReceiver(newSignal, pub.getPublisher(), obj -> pub.wrapForPublication((T) obj))
withMessageToReceiver(newSignal, pub.getPublisher(),
obj -> pub.wrapForPublicationWithAcks((S) obj, ackExtractor))
);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@
import org.eclipse.ditto.model.policies.ResourceKey;
import org.eclipse.ditto.services.models.concierge.ConciergeMessagingConstants;
import org.eclipse.ditto.services.models.policies.Permission;
import org.eclipse.ditto.services.utils.akka.LogUtil;
import org.eclipse.ditto.services.utils.cache.Cache;
import org.eclipse.ditto.services.utils.cache.EntityIdWithResourceType;
import org.eclipse.ditto.services.utils.cache.InvalidateCacheEntry;
Expand Down Expand Up @@ -177,8 +176,6 @@ private static DittoRuntimeException errorForPolicyCommand(final PolicyCommand p

@Override
public CompletionStage<Contextual<WithDittoHeaders>> enforce() {
final PolicyCommand command = signal();
LogUtil.enhanceLogWithCorrelationIdOrRandom(command);
return enforcerRetriever.retrieve(entityId(), (idEntry, enforcerEntry) -> {
try {
return CompletableFuture.completedFuture(doEnforce(enforcerEntry));
Expand Down
Loading

0 comments on commit 9ddab07

Please sign in to comment.