Skip to content

Commit

Permalink
Restore global live response dispatching and make it configurable.
Browse files Browse the repository at this point in the history
- Concierge may be configured to do global live response or not.

- Websocket and Connectivity forward unknown responses to concierge.

- Concierge performs best-effort correlation ID collision avoidance.
  Failing that (due to reused correlation ID by twin event e. g.),
  live commands with colliding correlation IDs can only be responded
  to by the same websocket/connection that received them.

Signed-off-by: Yufei Cai <yufei.cai@bosch.io>
  • Loading branch information
yufei-cai committed Sep 8, 2020
1 parent 1733deb commit 1ff9f8f
Show file tree
Hide file tree
Showing 15 changed files with 230 additions and 80 deletions.
Expand Up @@ -299,7 +299,7 @@ public Collection<ResponseType> getExpectedResponseTypes() {
.map(ResponseType::fromName)
.filter(Optional::isPresent)
.map(Optional::get)
.collect(Collectors.toSet());
.collect(Collectors.toList()); // toList() to keep original order
}

@Override
Expand Down
Expand Up @@ -31,10 +31,13 @@ public final class DefaultEnforcementConfig implements EnforcementConfig {

private final Duration askTimeout;
private final int bufferSize;
private final boolean globalLiveResponseDispatching;

private DefaultEnforcementConfig(final ConfigWithFallback configWithFallback) {
askTimeout = configWithFallback.getDuration(EnforcementConfigValue.ASK_TIMEOUT.getConfigPath());
bufferSize = configWithFallback.getInt(EnforcementConfigValue.BUFFER_SIZE.getConfigPath());
globalLiveResponseDispatching =
configWithFallback.getBoolean(EnforcementConfigValue.GLOBAL_LIVE_RESPONSE_DISPATCHING.getConfigPath());
}

/**
Expand All @@ -59,6 +62,11 @@ public int getBufferSize() {
return bufferSize;
}

@Override
public boolean shouldDispatchLiveResponsesGlobally() {
return globalLiveResponseDispatching;
}

@Override
public boolean equals(final Object o) {
if (this == o) {
Expand All @@ -68,19 +76,21 @@ public boolean equals(final Object o) {
return false;
}
final DefaultEnforcementConfig that = (DefaultEnforcementConfig) o;
return bufferSize == that.bufferSize && askTimeout.equals(that.askTimeout);
return bufferSize == that.bufferSize && askTimeout.equals(that.askTimeout) &&
globalLiveResponseDispatching == that.globalLiveResponseDispatching;
}

@Override
public int hashCode() {
return Objects.hash(askTimeout, bufferSize);
return Objects.hash(askTimeout, bufferSize, globalLiveResponseDispatching);
}

@Override
public String toString() {
return getClass().getSimpleName() + " [" +
"askTimeout=" + askTimeout +
", bufferSize=" + bufferSize +
", globalLiveResponseDispatching=" + globalLiveResponseDispatching +
"]";
}

Expand Down
Expand Up @@ -38,6 +38,13 @@ public interface EnforcementConfig {
*/
int getBufferSize();

/**
* Returns whether live responses from channels other than their subscribers should be dispatched.
*
* @return whether global live response dispatching is enabled.
*/
boolean shouldDispatchLiveResponsesGlobally();

/**
* An enumeration of the known config path expressions and their associated default values for
* {@code EnforcementConfig}.
Expand All @@ -52,12 +59,17 @@ enum EnforcementConfigValue implements KnownConfigValue {
/**
* The buffer size used for the queue in the enforcer actor.
*/
BUFFER_SIZE("buffer-size", 1_000);
BUFFER_SIZE("buffer-size", 1_000),

/**
* Whether to enable dispatching live responses from channels other than the subscribers.
*/
GLOBAL_LIVE_RESPONSE_DISPATCHING("global-live-response-dispatching", false);

private final String path;
private final Object defaultValue;

private EnforcementConfigValue(final String thePath, final Object theDefaultValue) {
EnforcementConfigValue(final String thePath, final Object theDefaultValue) {
path = thePath;
defaultValue = theDefaultValue;
}
Expand Down
Expand Up @@ -51,7 +51,7 @@ public abstract class AbstractEnforcement<T extends Signal<?>> {
/**
* Context of the enforcement step: sender, self, signal and so forth.
*/
private final Contextual<T> context;
protected final Contextual<T> context;

/**
* Create an enforcement step from its context.
Expand Down
Expand Up @@ -12,14 +12,18 @@
*/
package org.eclipse.ditto.services.concierge.enforcement;

import java.time.Duration;

import javax.annotation.Nullable;

import org.eclipse.ditto.model.base.headers.WithDittoHeaders;
import org.eclipse.ditto.model.enforcers.Enforcer;
import org.eclipse.ditto.services.concierge.common.ConciergeConfig;
import org.eclipse.ditto.services.concierge.common.DittoConciergeConfig;
import org.eclipse.ditto.services.concierge.common.EnforcementConfig;
import org.eclipse.ditto.services.utils.akka.controlflow.AbstractGraphActor;
import org.eclipse.ditto.services.utils.cache.Cache;
import org.eclipse.ditto.services.utils.cache.CaffeineCache;
import org.eclipse.ditto.services.utils.cache.EntityIdWithResourceType;
import org.eclipse.ditto.services.utils.cache.InvalidateCacheEntry;
import org.eclipse.ditto.services.utils.cache.entry.Entry;
Expand All @@ -31,6 +35,8 @@
import org.eclipse.ditto.signals.base.Signal;
import org.eclipse.ditto.signals.commands.base.Command;

import com.github.benmanes.caffeine.cache.Caffeine;

import akka.NotUsed;
import akka.actor.ActorRef;
import akka.japi.pf.ReceiveBuilder;
Expand Down Expand Up @@ -76,16 +82,18 @@ protected AbstractEnforcerActor(final ActorRef pubSubMediator,

super(WithDittoHeaders.class);

enforcementConfig = DittoConciergeConfig.of(
final ConciergeConfig conciergeConfig = DittoConciergeConfig.of(
DefaultScopedConfig.dittoScoped(getContext().getSystem().settings().config())
).getEnforcementConfig();
);
enforcementConfig = conciergeConfig.getEnforcementConfig();

this.thingIdCache = thingIdCache;
this.aclEnforcerCache = aclEnforcerCache;
this.policyEnforcerCache = policyEnforcerCache;

contextual = Contextual.forActor(getSelf(), getContext().getSystem().deadLetters(),
pubSubMediator, conciergeForwarder, enforcementConfig.getAskTimeout(), logger
pubSubMediator, conciergeForwarder, enforcementConfig.getAskTimeout(), logger,
createResponseReceiverCache(conciergeConfig)
);

// register for sending messages via pub/sub to this enforcer
Expand Down Expand Up @@ -153,4 +161,13 @@ protected Contextual<WithDittoHeaders> mapMessage(final WithDittoHeaders message
return contextual.withReceivedMessage(message, getSender());
}

@Nullable
private static Cache<String, ActorRef> createResponseReceiverCache(final ConciergeConfig conciergeConfig) {
if (conciergeConfig.getEnforcementConfig().shouldDispatchLiveResponsesGlobally()) {
return CaffeineCache.of(Caffeine.newBuilder().expireAfterWrite(Duration.ofSeconds(120L)));
} else {
return null;
}
}

}
Expand Up @@ -27,6 +27,7 @@
import org.eclipse.ditto.model.base.headers.WithDittoHeaders;
import org.eclipse.ditto.services.utils.akka.controlflow.WithSender;
import org.eclipse.ditto.services.utils.akka.logging.DittoDiagnosticLoggingAdapter;
import org.eclipse.ditto.services.utils.cache.Cache;
import org.eclipse.ditto.services.utils.cache.EntityIdWithResourceType;
import org.eclipse.ditto.services.utils.metrics.instruments.timer.StartedTimer;
import org.eclipse.ditto.signals.base.WithId;
Expand Down Expand Up @@ -68,6 +69,10 @@ public final class Contextual<T extends WithDittoHeaders> implements WithSender<
@Nullable
private final Function<Object, Object> receiverWrapperFunction;

// for live signal enforcement
@Nullable
private final Cache<String, ActorRef> responseReceivers;

@Nullable
private final Supplier<CompletionStage<Object>> askFuture;

Expand All @@ -80,6 +85,7 @@ private Contextual(@Nullable final T message, final ActorRef self, final ActorRe
@Nullable final StartedTimer startedTimer,
@Nullable final ActorRef receiver,
@Nullable final Function<Object, Object> receiverWrapperFunction,
@Nullable final Cache<String, ActorRef> responseReceivers,
@Nullable final Supplier<CompletionStage<Object>> askFuture,
final boolean changesAuthorization) {
this.message = message;
Expand All @@ -93,6 +99,7 @@ private Contextual(@Nullable final T message, final ActorRef self, final ActorRe
this.startedTimer = startedTimer;
this.receiver = receiver;
this.receiverWrapperFunction = receiverWrapperFunction;
this.responseReceivers = responseReceivers;
this.askFuture = askFuture;
this.changesAuthorization = changesAuthorization;
}
Expand All @@ -102,11 +109,12 @@ static <T extends WithDittoHeaders<T>> Contextual<T> forActor(final ActorRef sel
final ActorRef pubSubMediator,
final ActorRef conciergeForwarder,
final Duration askTimeout,
final DittoDiagnosticLoggingAdapter log) {
final DittoDiagnosticLoggingAdapter log,
@Nullable final Cache<String, ActorRef> responseReceivers) {

return new Contextual<T>(null, self, deadLetters, pubSubMediator, conciergeForwarder, askTimeout, log, null,
return new Contextual<>(null, self, deadLetters, pubSubMediator, conciergeForwarder, askTimeout, log, null,
null,
null, null, null, false);
null, null, responseReceivers, null, false);
}

/**
Expand All @@ -119,7 +127,7 @@ static <T extends WithDittoHeaders<T>> Contextual<T> forActor(final ActorRef sel
*/
Contextual<T> withAskFuture(final Supplier<CompletionStage<Object>> askFuture) {
return new Contextual<>(message, self, sender, pubSubMediator, conciergeForwarder, askTimeout, log, entityId,
startedTimer, receiver, receiverWrapperFunction, askFuture, changesAuthorization);
startedTimer, receiver, receiverWrapperFunction, responseReceivers, askFuture, changesAuthorization);
}

/**
Expand Down Expand Up @@ -221,6 +229,10 @@ Function<Object, Object> getReceiverWrapperFunction() {
return receiverWrapperFunction != null ? receiverWrapperFunction : Function.identity();
}

Optional<Cache<String, ActorRef>> getResponseReceivers() {
return Optional.ofNullable(responseReceivers);
}

boolean changesAuthorization() {
return changesAuthorization;
}
Expand All @@ -231,31 +243,31 @@ <S extends WithDittoHeaders> Optional<Contextual<S>> tryToMapMessage(final Funct

<S extends WithDittoHeaders> Contextual<S> withReceivedMessage(@Nullable final S message, final ActorRef sender) {
return new Contextual<>(message, self, sender, pubSubMediator, conciergeForwarder, askTimeout,
log, entityIdFor(message), startedTimer, receiver, receiverWrapperFunction,
log, entityIdFor(message), startedTimer, receiver, receiverWrapperFunction, responseReceivers,
askFuture, changesAuthorization);
}

Contextual<T> withTimer(final StartedTimer startedTimer) {
return new Contextual<>(message, self, sender, pubSubMediator, conciergeForwarder, askTimeout,
log, entityId, startedTimer, receiver, receiverWrapperFunction,
log, entityId, startedTimer, receiver, receiverWrapperFunction, responseReceivers,
askFuture, changesAuthorization);
}

Contextual<T> withReceiver(@Nullable final ActorRef receiver) {
return new Contextual<>(message, self, sender, pubSubMediator, conciergeForwarder, askTimeout,
log, entityId, startedTimer, receiver, receiverWrapperFunction,
log, entityId, startedTimer, receiver, receiverWrapperFunction, responseReceivers,
askFuture, changesAuthorization);
}

Contextual<T> withReceiverWrapperFunction(final Function<Object, Object> receiverWrapperFunction) {
return new Contextual<>(message, self, sender, pubSubMediator, conciergeForwarder, askTimeout,
log, entityId, startedTimer, receiver, receiverWrapperFunction,
log, entityId, startedTimer, receiver, receiverWrapperFunction, responseReceivers,
askFuture, changesAuthorization);
}

Contextual<T> changesAuthorization(final boolean changesAuthorization) {
return new Contextual<>(message, self, sender, pubSubMediator, conciergeForwarder, askTimeout,
log, entityId, startedTimer, receiver, receiverWrapperFunction,
log, entityId, startedTimer, receiver, receiverWrapperFunction, responseReceivers,
askFuture, changesAuthorization);
}

Expand Down Expand Up @@ -290,6 +302,7 @@ public String toString() {
", entityId=" + entityId +
", receiver=" + receiver +
", receiverWrapperFunction=" + receiverWrapperFunction +
", responseReceivers=" + responseReceivers +
", changesAuthorization=" + changesAuthorization +
"]";
}
Expand Down

0 comments on commit 1ff9f8f

Please sign in to comment.