Skip to content

Commit

Permalink
added namespace scoped loggers for logging details on ThingCommandEnf…
Browse files Browse the repository at this point in the history
…orcement command forwarding

+ DispatcherActor SearchCommand processing
* make the namespaces to inspect/log configurable via config

Signed-off-by: Thomas Jaeckle <thomas.jaeckle@bosch.io>
  • Loading branch information
thjaeckle committed Nov 19, 2021
1 parent 6ef40b6 commit d0e60cc
Show file tree
Hide file tree
Showing 9 changed files with 123 additions and 19 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,10 @@
*/
package org.eclipse.ditto.concierge.service.common;

import java.util.Collections;
import java.util.HashSet;
import java.util.Objects;
import java.util.Set;

import javax.annotation.concurrent.Immutable;

Expand All @@ -34,12 +37,15 @@ public final class DefaultEnforcementConfig implements EnforcementConfig {
private final AskWithRetryConfig askWithRetryConfig;
private final int bufferSize;
private final boolean globalLiveResponseDispatching;
private final Set<String> specialLoggingInspectedNamespaces;

private DefaultEnforcementConfig(final ConfigWithFallback configWithFallback) {
askWithRetryConfig = DefaultAskWithRetryConfig.of(configWithFallback, ASK_WITH_RETRY_CONFIG_PATH);
bufferSize = configWithFallback.getPositiveIntOrThrow(EnforcementConfigValue.BUFFER_SIZE);
globalLiveResponseDispatching =
configWithFallback.getBoolean(EnforcementConfigValue.GLOBAL_LIVE_RESPONSE_DISPATCHING.getConfigPath());
specialLoggingInspectedNamespaces = Collections.unmodifiableSet(new HashSet<>(configWithFallback.getStringList(
EnforcementConfigValue.SPECIAL_LOGGING_INSPECTED_NAMESPACES.getConfigPath())));
}

/**
Expand Down Expand Up @@ -69,6 +75,11 @@ public boolean shouldDispatchLiveResponsesGlobally() {
return globalLiveResponseDispatching;
}

@Override
public Set<String> getSpecialLoggingInspectedNamespaces() {
return specialLoggingInspectedNamespaces;
}

@Override
public boolean equals(final Object o) {
if (this == o) {
Expand All @@ -80,12 +91,14 @@ public boolean equals(final Object o) {
final DefaultEnforcementConfig that = (DefaultEnforcementConfig) o;
return bufferSize == that.bufferSize &&
globalLiveResponseDispatching == that.globalLiveResponseDispatching &&
askWithRetryConfig.equals(that.askWithRetryConfig);
askWithRetryConfig.equals(that.askWithRetryConfig) &&
specialLoggingInspectedNamespaces.equals(that.specialLoggingInspectedNamespaces);
}

@Override
public int hashCode() {
return Objects.hash(askWithRetryConfig, bufferSize, globalLiveResponseDispatching);
return Objects.hash(askWithRetryConfig, bufferSize, globalLiveResponseDispatching,
specialLoggingInspectedNamespaces);
}

@Override
Expand All @@ -94,6 +107,7 @@ public String toString() {
"askWithRetryConfig=" + askWithRetryConfig +
", bufferSize=" + bufferSize +
", globalLiveResponseDispatching=" + globalLiveResponseDispatching +
", specialLoggingInspectedNamespaces=" + specialLoggingInspectedNamespaces +
"]";
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,9 @@
*/
package org.eclipse.ditto.concierge.service.common;

import java.util.List;
import java.util.Set;

import javax.annotation.concurrent.Immutable;

import org.eclipse.ditto.internal.utils.cacheloaders.config.AskWithRetryConfig;
Expand Down Expand Up @@ -45,6 +48,13 @@ public interface EnforcementConfig {
*/
boolean shouldDispatchLiveResponsesGlobally();

/**
* Returns a list of namespaces for which a special usage logging should be enabled in enforcement.
*
* @return list of namespaces which should be inspected.
*/
Set<String> getSpecialLoggingInspectedNamespaces();

/**
* An enumeration of the known config path expressions and their associated default values for
* {@code EnforcementConfig}.
Expand All @@ -59,7 +69,12 @@ enum EnforcementConfigValue implements KnownConfigValue {
/**
* Whether to enable dispatching live responses from channels other than the subscribers.
*/
GLOBAL_LIVE_RESPONSE_DISPATCHING("global-live-response-dispatching", false);
GLOBAL_LIVE_RESPONSE_DISPATCHING("global-live-response-dispatching", false),

/**
* List of namespaces for which a special usage logging should be enabled in enforcement.
*/
SPECIAL_LOGGING_INSPECTED_NAMESPACES("special-logging-inspected-namespaces", List.of());

private final String path;
private final Object defaultValue;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,8 +81,7 @@ default boolean changesAuthorization(final T signal) {
* @return the stream.
*/
default Flow<Contextual<WithDittoHeaders>, EnforcementTask, NotUsed> createEnforcementTask(
final PreEnforcer preEnforcer
) {
final PreEnforcer preEnforcer) {
return Flow.<Contextual<WithDittoHeaders>, Optional<Contextual<T>>>fromFunction(
contextual -> contextual.tryToMapMessage(this::mapToHandledClass))
.filter(Optional::isPresent)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,12 @@
import javax.annotation.Nullable;

import org.eclipse.ditto.base.model.headers.WithDittoHeaders;
import org.eclipse.ditto.internal.utils.cacheloaders.EnforcementCacheKey;
import org.eclipse.ditto.policies.model.enforcers.Enforcer;
import org.eclipse.ditto.internal.utils.cache.Cache;
import org.eclipse.ditto.internal.utils.cache.CacheKey;
import org.eclipse.ditto.internal.utils.cache.entry.Entry;
import org.eclipse.ditto.internal.utils.cacheloaders.EnforcementCacheKey;
import org.eclipse.ditto.internal.utils.cacheloaders.PolicyEnforcer;
import org.eclipse.ditto.policies.model.enforcers.Enforcer;

import akka.Done;
import akka.NotUsed;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,9 @@
import static org.eclipse.ditto.policies.api.Permission.MIN_REQUIRED_POLICY_PERMISSIONS;

import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
Expand All @@ -40,6 +42,8 @@
import org.eclipse.ditto.base.model.signals.commands.CommandToExceptionRegistry;
import org.eclipse.ditto.base.model.signals.commands.exceptions.GatewayInternalErrorException;
import org.eclipse.ditto.concierge.api.ConciergeMessagingConstants;
import org.eclipse.ditto.concierge.service.common.ConciergeConfig;
import org.eclipse.ditto.concierge.service.common.DittoConciergeConfig;
import org.eclipse.ditto.concierge.service.enforcement.placeholders.references.PolicyIdReferencePlaceholderResolver;
import org.eclipse.ditto.concierge.service.enforcement.placeholders.references.ReferencePlaceholder;
import org.eclipse.ditto.internal.utils.akka.logging.DittoLoggerFactory;
Expand All @@ -51,6 +55,7 @@
import org.eclipse.ditto.internal.utils.cacheloaders.EnforcementContext;
import org.eclipse.ditto.internal.utils.cacheloaders.PolicyEnforcer;
import org.eclipse.ditto.internal.utils.cluster.DistPubSubAccess;
import org.eclipse.ditto.internal.utils.config.DefaultScopedConfig;
import org.eclipse.ditto.json.JsonFactory;
import org.eclipse.ditto.json.JsonFieldSelector;
import org.eclipse.ditto.json.JsonObject;
Expand Down Expand Up @@ -108,6 +113,7 @@
import org.eclipse.ditto.things.model.signals.commands.query.ThingQueryCommandResponse;

import akka.actor.ActorRef;
import akka.actor.ActorSystem;
import akka.pattern.AskTimeoutException;

/**
Expand All @@ -119,6 +125,8 @@ public final class ThingCommandEnforcement
private static final ThreadSafeDittoLogger LOGGER =
DittoLoggerFactory.getThreadSafeLogger(ThingCommandEnforcement.class);

private static final Map<String, ThreadSafeDittoLogger> NAMESPACE_INSPECTION_LOGGERS = new HashMap<>();

/**
* Label of default policy entry in default policy.
*/
Expand All @@ -140,6 +148,7 @@ public final class ThingCommandEnforcement
private final PolicyIdReferencePlaceholderResolver policyIdReferencePlaceholderResolver;

private ThingCommandEnforcement(final Contextual<ThingCommand<?>> data,
final ActorSystem actorSystem,
final ActorRef thingsShardRegion,
final ActorRef policiesShardRegion,
final Cache<EnforcementCacheKey, Entry<EnforcementCacheKey>> thingIdCache,
Expand All @@ -150,6 +159,16 @@ private ThingCommandEnforcement(final Contextual<ThingCommand<?>> data,
this.thingsShardRegion = requireNonNull(thingsShardRegion);
this.policiesShardRegion = requireNonNull(policiesShardRegion);

final ConciergeConfig conciergeConfig = DittoConciergeConfig.of(
DefaultScopedConfig.dittoScoped(actorSystem.settings().config())
);

conciergeConfig.getEnforcementConfig().getSpecialLoggingInspectedNamespaces()
.forEach(loggedNamespace -> NAMESPACE_INSPECTION_LOGGERS.put(
loggedNamespace,
DittoLoggerFactory.getThreadSafeLogger(ThingCommandEnforcement.class.getName() +
".namespace." + loggedNamespace)));

this.thingIdCache = requireNonNull(thingIdCache);
this.policyEnforcerCache = requireNonNull(policyEnforcerCache);
this.preEnforcer = preEnforcer;
Expand Down Expand Up @@ -519,6 +538,21 @@ private Contextual<WithDittoHeaders> forwardToThingsShardRegion(final ThingComma
if (command instanceof ThingModifyCommand && ((ThingModifyCommand<?>) command).changesAuthorization()) {
invalidateThingCaches(command.getEntityId());
}

if (NAMESPACE_INSPECTION_LOGGERS.containsKey(command.getEntityId().getNamespace())) {
final ThreadSafeDittoLogger namespaceLogger = NAMESPACE_INSPECTION_LOGGERS
.get(command.getEntityId().getNamespace()).withCorrelationId(command);
if (command instanceof ThingModifyCommand) {
final JsonValue value = ((ThingModifyCommand<?>) command).getEntity().orElse(null);
if (null != value) {
final Set<ResourceKey> resourceKeys = calculateLeaves(command.getResourcePath(), value);
namespaceLogger.info("Forwarding modify command type <{}> with resourceKeys <{}>",
command.getType(),
resourceKeys);
}
}
namespaceLogger.debug("Forwarding command type <{}>: <{}>", command.getType(), command);
}
return withMessageToReceiver(command, thingsShardRegion);
}

Expand Down Expand Up @@ -1129,6 +1163,7 @@ private CreateThingWithEnforcer(final CreateThing createThing, final Enforcer en
*/
public static final class Provider implements EnforcementProvider<ThingCommand<?>> {

private final ActorSystem actorSystem;
private final ActorRef thingsShardRegion;
private final ActorRef policiesShardRegion;
private final Cache<EnforcementCacheKey, Entry<EnforcementCacheKey>> thingIdCache;
Expand All @@ -1138,18 +1173,21 @@ public static final class Provider implements EnforcementProvider<ThingCommand<?
/**
* Constructor.
*
* @param actorSystem the ActorSystem for e.g. looking up config.
* @param thingsShardRegion the ActorRef to the Things shard region.
* @param policiesShardRegion the ActorRef to the Policies shard region.
* @param thingIdCache the thing-id-cache.
* @param policyEnforcerCache the policy-enforcer cache.
* @param preEnforcer pre-enforcer function to block undesirable messages to policies shard region.
*/
public Provider(final ActorRef thingsShardRegion,
public Provider(final ActorSystem actorSystem,
final ActorRef thingsShardRegion,
final ActorRef policiesShardRegion,
final Cache<EnforcementCacheKey, Entry<EnforcementCacheKey>> thingIdCache,
final Cache<EnforcementCacheKey, Entry<Enforcer>> policyEnforcerCache,
@Nullable final PreEnforcer preEnforcer) {

this.actorSystem = requireNonNull(actorSystem);
this.thingsShardRegion = requireNonNull(thingsShardRegion);
this.policiesShardRegion = requireNonNull(policiesShardRegion);
this.thingIdCache = requireNonNull(thingIdCache);
Expand Down Expand Up @@ -1177,7 +1215,7 @@ public boolean changesAuthorization(final ThingCommand<?> signal) {

@Override
public AbstractEnforcement<ThingCommand<?>> createEnforcement(final Contextual<ThingCommand<?>> context) {
return new ThingCommandEnforcement(context, thingsShardRegion, policiesShardRegion, thingIdCache,
return new ThingCommandEnforcement(context, actorSystem, thingsShardRegion, policiesShardRegion, thingIdCache,
policyEnforcerCache, preEnforcer);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,11 @@
import static org.eclipse.ditto.thingsearch.api.ThingsSearchConstants.SEARCH_ACTOR_PATH;

import java.util.Arrays;
import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.function.Consumer;

Expand All @@ -28,15 +31,18 @@
import org.eclipse.ditto.concierge.service.common.DittoConciergeConfig;
import org.eclipse.ditto.concierge.service.common.EnforcementConfig;
import org.eclipse.ditto.concierge.service.enforcement.PreEnforcer;
import org.eclipse.ditto.things.api.commands.sudo.SudoRetrieveThings;
import org.eclipse.ditto.thingsearch.api.commands.sudo.ThingSearchSudoCommand;
import org.eclipse.ditto.internal.utils.akka.controlflow.AbstractGraphActor;
import org.eclipse.ditto.internal.utils.akka.controlflow.Filter;
import org.eclipse.ditto.internal.utils.akka.controlflow.WithSender;
import org.eclipse.ditto.internal.utils.akka.logging.DittoLoggerFactory;
import org.eclipse.ditto.internal.utils.akka.logging.ThreadSafeDittoLogger;
import org.eclipse.ditto.internal.utils.cluster.DistPubSubAccess;
import org.eclipse.ditto.internal.utils.config.DefaultScopedConfig;
import org.eclipse.ditto.things.api.commands.sudo.SudoRetrieveThings;
import org.eclipse.ditto.things.model.signals.commands.query.RetrieveThings;
import org.eclipse.ditto.thingsearch.api.commands.sudo.ThingSearchSudoCommand;
import org.eclipse.ditto.thingsearch.model.signals.commands.ThingSearchCommand;
import org.eclipse.ditto.thingsearch.model.signals.commands.query.ThingSearchQueryCommand;

import akka.Done;
import akka.NotUsed;
Expand All @@ -61,6 +67,8 @@ public final class DispatcherActor
*/
public static final String ACTOR_NAME = "dispatcherActor";

private static final Map<String, ThreadSafeDittoLogger> NAMESPACE_INSPECTION_LOGGERS = new HashMap<>();

private final Flow<ImmutableDispatch, ImmutableDispatch, NotUsed> handler;
private final ActorRef thingsAggregatorActor;
private final EnforcementConfig enforcementConfig;
Expand All @@ -76,6 +84,12 @@ private DispatcherActor(final ActorRef enforcerActor,
DefaultScopedConfig.dittoScoped(getContext().getSystem().settings().config())
).getEnforcementConfig();

enforcementConfig.getSpecialLoggingInspectedNamespaces()
.forEach(loggedNamespace -> NAMESPACE_INSPECTION_LOGGERS.put(
loggedNamespace,
DittoLoggerFactory.getThreadSafeLogger(DispatcherActor.class.getName() +
".namespace." + loggedNamespace)));

this.handler = handler;
final Props props = ThingsAggregatorActor.props(enforcerActor);
thingsAggregatorActor = getContext().actorOf(props, ThingsAggregatorActor.ACTOR_NAME);
Expand Down Expand Up @@ -173,11 +187,32 @@ private static Graph<FanOutShape2<ImmutableDispatch, ImmutableDispatch, Immutabl
private static Sink<ImmutableDispatch, ?> searchActorSink(final ActorRef pubSubMediator,
final PreEnforcer preEnforcer) {
return Sink.foreach(dispatchToPreEnforce ->
preEnforce(dispatchToPreEnforce, preEnforcer, dispatch ->
pubSubMediator.tell(
DistPubSubAccess.send(SEARCH_ACTOR_PATH, dispatch.getMessage()),
dispatch.getSender())
)
preEnforce(dispatchToPreEnforce, preEnforcer, dispatch -> {
final DittoHeadersSettable<?> command = dispatch.message;
if (command instanceof ThingSearchCommand) {
final ThingSearchCommand<?> searchCommand = (ThingSearchCommand<?>) command;
final Set<String> namespaces = searchCommand.getNamespaces().orElseGet(Set::of);

NAMESPACE_INSPECTION_LOGGERS.entrySet().stream()
.filter(entry -> namespaces.contains(entry.getKey()))
.map(Map.Entry::getValue)
.forEach(l -> {
if (searchCommand instanceof ThingSearchQueryCommand) {
final String filter = ((ThingSearchQueryCommand<?>) searchCommand)
.getFilter().orElse(null);
l.withCorrelationId(command).info(
"Forwarding search query command type <{}> with filter <{}> and " +
"fields <{}>",
searchCommand.getType(),
filter,
searchCommand.getSelectedFields().orElse(null));
}
});
}
pubSubMediator.tell(
DistPubSubAccess.send(SEARCH_ACTOR_PATH, dispatch.getMessage()),
dispatch.getSender());
})
);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,6 @@
import org.eclipse.ditto.concierge.service.starter.actors.DispatcherActor;
import org.eclipse.ditto.internal.utils.cache.Cache;
import org.eclipse.ditto.internal.utils.cache.CacheFactory;
import org.eclipse.ditto.internal.utils.cache.CacheKey;
import org.eclipse.ditto.internal.utils.cache.entry.Entry;
import org.eclipse.ditto.internal.utils.cacheloaders.EnforcementCacheKey;
import org.eclipse.ditto.internal.utils.cacheloaders.PolicyEnforcer;
Expand Down Expand Up @@ -117,7 +116,7 @@ public ActorRef startEnforcerActor(final ActorContext context, final ConciergeCo
final LiveSignalPub liveSignalPub = LiveSignalPub.of(context, distributedAcks);

final Set<EnforcementProvider<?>> enforcementProviders = new HashSet<>();
enforcementProviders.add(new ThingCommandEnforcement.Provider(thingsShardRegionProxy,
enforcementProviders.add(new ThingCommandEnforcement.Provider(actorSystem, thingsShardRegionProxy,
policiesShardRegionProxy, thingIdCache, projectedEnforcerCache, preEnforcer));
enforcementProviders.add(new PolicyCommandEnforcement.Provider(policiesShardRegionProxy, policyEnforcerCache));
enforcementProviders.add(
Expand Down
4 changes: 4 additions & 0 deletions concierge/service/src/main/resources/concierge.conf
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,10 @@ ditto {
# whether to dispatch live response from channels other than subscribers of live messages.
global-live-response-dispatching = true
global-live-response-dispatching = ${?ENFORCEMENT_GLOBAL_LIVE_RESPONSE_DISPATCHING}

# list of namespaces for which a special usage logging should be enabled in enforcement
special-logging-inspected-namespaces = []
special-logging-inspected-namespaces = ${?ENFORCEMENT_SPECIAL_LOGGING_INSPECTED_NAMESPACES}
}

caches {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -162,7 +162,7 @@ public ActorRef build() {
CaffeineCache.of(Caffeine.newBuilder(), thingEnforcementIdCacheLoader);

final Set<EnforcementProvider<?>> enforcementProviders = new HashSet<>();
enforcementProviders.add(new ThingCommandEnforcement.Provider(thingsShardRegion,
enforcementProviders.add(new ThingCommandEnforcement.Provider(system, thingsShardRegion,
policiesShardRegion, thingIdCache, projectedEnforcerCache, preEnforcer));
enforcementProviders.add(new PolicyCommandEnforcement.Provider(policiesShardRegion, policyEnforcerCache));
enforcementProviders.add(
Expand Down

0 comments on commit d0e60cc

Please sign in to comment.