Skip to content

Commit

Permalink
Add pre-enforcement for search commands
Browse files Browse the repository at this point in the history
Signed-off-by: David Schwilk <david.schwilk@bosch.io>
  • Loading branch information
DerSchwilk committed Jun 2, 2022
1 parent 1787b64 commit 3b85931
Show file tree
Hide file tree
Showing 4 changed files with 127 additions and 64 deletions.
Expand Up @@ -14,6 +14,7 @@

import java.util.List;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.regex.Pattern;

Expand All @@ -31,6 +32,10 @@
import org.eclipse.ditto.policies.enforcement.config.CreationRestrictionConfig;
import org.eclipse.ditto.policies.enforcement.config.DefaultEntityCreationConfig;
import org.eclipse.ditto.policies.enforcement.config.EntityCreationConfig;
import org.eclipse.ditto.policies.model.signals.commands.modify.CreatePolicy;
import org.eclipse.ditto.policies.model.signals.commands.modify.ModifyPolicy;
import org.eclipse.ditto.things.model.signals.commands.modify.CreateThing;
import org.eclipse.ditto.things.model.signals.commands.modify.ModifyThing;

import akka.actor.ActorSystem;

Expand All @@ -40,7 +45,8 @@
@Immutable
public class CreationRestrictionEnforcer implements PreEnforcer {

private static final ThreadSafeDittoLogger log = DittoLoggerFactory.getThreadSafeLogger(CreationRestrictionEnforcer.class);
private static final ThreadSafeDittoLogger log =
DittoLoggerFactory.getThreadSafeLogger(CreationRestrictionEnforcer.class);

private final EntityCreationConfig config;
private final ExistenceChecker existenceChecker;
Expand Down Expand Up @@ -136,16 +142,35 @@ public String toString() {

@Override
public CompletionStage<DittoHeadersSettable<?>> apply(final DittoHeadersSettable<?> dittoHeadersSettable) {
final CompletionStage<DittoHeadersSettable<?>> result;

final Signal<?> messageAsSignal = getMessageAsSignal(dittoHeadersSettable);
if (isCreatingCommand(messageAsSignal)) {
result = handleCreatingCommand(dittoHeadersSettable, messageAsSignal);
} else {
result = CompletableFuture.completedFuture(dittoHeadersSettable);
}
return result;
}

private static boolean isCreatingCommand(final Signal<?> signal) {
return signal instanceof CreateThing || signal instanceof ModifyThing || signal instanceof CreatePolicy ||
signal instanceof ModifyPolicy;
}

private CompletionStage<DittoHeadersSettable<?>> handleCreatingCommand(
final DittoHeadersSettable<?> dittoHeadersSettable, final Signal<?> signal) {

final WithEntityId withEntityId = getMessageAsWithEntityId(dittoHeadersSettable);
final NamespacedEntityId entityId = getEntityIdAsNamespacedEntityId(withEntityId.getEntityId());
final var context = new Context(messageAsSignal.getResourceType(), entityId.getNamespace(), messageAsSignal.getDittoHeaders());
return existenceChecker.checkExistence(messageAsSignal).thenApply(exists -> {
final var context = new Context(signal.getResourceType(), entityId.getNamespace(),
signal.getDittoHeaders());
return existenceChecker.checkExistence(signal).thenApply(exists -> {
if (Boolean.TRUE.equals(exists) || canCreate(context)) {
return dittoHeadersSettable;
} else {
throw EntityNotCreatableException.newBuilder(withEntityId.getEntityId())
.dittoHeaders(messageAsSignal.getDittoHeaders())
.dittoHeaders(signal.getDittoHeaders())
.build();
}
});
Expand Down
Expand Up @@ -43,6 +43,7 @@
import org.eclipse.ditto.json.JsonValue;
import org.eclipse.ditto.policies.enforcement.config.DefaultEnforcementConfig;
import org.eclipse.ditto.policies.enforcement.config.EnforcementConfig;
import org.eclipse.ditto.policies.enforcement.pre_enforcement.PreEnforcerProvider;
import org.eclipse.ditto.rql.query.Query;
import org.eclipse.ditto.things.model.Thing;
import org.eclipse.ditto.things.model.ThingId;
Expand Down Expand Up @@ -114,16 +115,18 @@ public final class SearchActor extends AbstractActor {

private final QueryParser queryParser;
private final ThingsSearchPersistence searchPersistence;
private final PreEnforcerProvider preEnforcer;

@SuppressWarnings("unused")
private SearchActor(final QueryParser queryParser,
final ThingsSearchPersistence searchPersistence) {

this.queryParser = queryParser;
this.searchPersistence = searchPersistence;
preEnforcer = PreEnforcerProvider.get(getSystem());

final DefaultScopedConfig dittoScopedConfig =
DefaultScopedConfig.dittoScoped(getContext().getSystem().settings().config());
DefaultScopedConfig.dittoScoped(getSystem().settings().config());

final EnforcementConfig enforcementConfig = DefaultEnforcementConfig.of(dittoScopedConfig);
enforcementConfig.getSpecialLoggingInspectedNamespaces()
Expand Down Expand Up @@ -173,50 +176,56 @@ private void namespaceReport(final SudoRetrieveNamespaceReport namespaceReport)
}

private void count(final CountThings countThings) {
final var sender = getSender();
performLogging(countThings);

final ThreadSafeDittoLoggingAdapter l = log.withCorrelationId(countThings);
l.info("Processing CountThings command with namespaces <{}> and filter: <{}>",
countThings.getNamespaces(), countThings.getFilter());
l.debug("Processing CountThings command: <{}>", countThings);
executeCount(countThings, queryParser::parse, false);
preEnforcer.apply(countThings).thenAccept(signal -> executeCount((CountThings) signal, queryParser::parse,
false, sender));
}

private void sudoCount(final SudoCountThings sudoCountThings) {
final var sender = getSender();
final ThreadSafeDittoLoggingAdapter l = log.withCorrelationId(sudoCountThings);
l.info("Processing SudoCountThings command with filter: <{}>", sudoCountThings.getFilter());
l.debug("Processing SudoCountThings command: <{}>", sudoCountThings);
executeCount(sudoCountThings, queryParser::parseSudoCountThings, true);
executeCount(sudoCountThings, queryParser::parseSudoCountThings, true, sender);
}

private <T extends Command<?>> void executeCount(final T countCommand,
final Function<T, CompletionStage<Query>> queryParseFunction,
final boolean isSudo) {
final boolean isSudo,
final ActorRef sender) {

final var dittoHeaders = countCommand.getDittoHeaders();
final JsonSchemaVersion version = countCommand.getImplementedSchemaVersion();
final var queryType = "count";
final StartedTimer countTimer = startNewTimer(version, queryType, countCommand);
final StartedTimer queryParsingTimer = countTimer.startNewSegment(QUERY_PARSING_SEGMENT_NAME);
final ActorRef sender = getSender();

final Source<CountThingsResponse, ?> countThingsResponseSource =
createQuerySource(queryParseFunction, countCommand)
.flatMapConcat(query -> {
stopTimer(queryParsingTimer);
final StartedTimer databaseAccessTimer =
countTimer.startNewSegment(DATABASE_ACCESS_SEGMENT_NAME);

final Source<Long, NotUsed> countResultSource = isSudo
? searchPersistence.sudoCount(query)
: searchPersistence.count(query,
countCommand.getDittoHeaders().getAuthorizationContext().getAuthorizationSubjectIds());

return processSearchPersistenceResult(countResultSource, dittoHeaders)
.via(Flow.fromFunction(result -> {
stopTimer(databaseAccessTimer);
return result;
}))
.map(count -> CountThingsResponse.of(count, dittoHeaders));
stopTimer(queryParsingTimer);
final StartedTimer databaseAccessTimer =
countTimer.startNewSegment(DATABASE_ACCESS_SEGMENT_NAME);

final Source<Long, NotUsed> countResultSource = isSudo
? searchPersistence.sudoCount(query)
: searchPersistence.count(query,
countCommand.getDittoHeaders()
.getAuthorizationContext()
.getAuthorizationSubjectIds());

return processSearchPersistenceResult(countResultSource, dittoHeaders)
.via(Flow.fromFunction(result -> {
stopTimer(databaseAccessTimer);
return result;
}))
.map(count -> CountThingsResponse.of(count, dittoHeaders));
});

final Source<Object, ?> replySourceWithErrorHandling =
Expand All @@ -229,29 +238,37 @@ private <T extends Command<?>> void executeCount(final T countCommand,
}

private void stream(final StreamThings streamThings) {
final var sender = getSender();
final ThreadSafeDittoLoggingAdapter l = log.withCorrelationId(streamThings);
l.info("Processing StreamThings command: {}", streamThings);
preEnforcer.apply(streamThings).thenAccept(stream -> performStream((StreamThings) stream, sender, l));

}

private void performStream(final StreamThings streamThings, final ActorRef sender,
final ThreadSafeDittoLoggingAdapter l) {

final JsonSchemaVersion version = streamThings.getImplementedSchemaVersion();
final var queryType = "query"; // same as queryThings
final StartedTimer searchTimer = startNewTimer(version, queryType, streamThings);
final StartedTimer queryParsingTimer = searchTimer.startNewSegment(QUERY_PARSING_SEGMENT_NAME);
final ActorRef sender = getSender();
final Set<String> namespaces = streamThings.getNamespaces().orElse(null);

final Source<SourceRef<String>, NotUsed> thingIdSourceRefSource =
ThingsSearchCursor.extractCursor(streamThings).flatMapConcat(cursor -> {
cursor.ifPresent(c -> c.logCursorCorrelationId(l));
return createQuerySource(queryParser::parse, streamThings).map(parsedQuery -> {
final var query = ThingsSearchCursor.adjust(cursor, parsedQuery, queryParser.getCriteriaFactory());
stopTimer(queryParsingTimer);
searchTimer.startNewSegment(DATABASE_ACCESS_SEGMENT_NAME); // segment stopped by stopTimerAndHandleError
final List<String> subjectIds =
streamThings.getDittoHeaders().getAuthorizationContext().getAuthorizationSubjectIds();
return searchPersistence.findAllUnlimited(query, subjectIds, namespaces)
.map(ThingId::toString) // for serialization???
.runWith(StreamRefs.sourceRef(), SystemMaterializer.get(getSystem()).materializer());
});
cursor.ifPresent(c -> c.logCursorCorrelationId(l));
return createQuerySource(queryParser::parse, streamThings).map(parsedQuery -> {
final var query =
ThingsSearchCursor.adjust(cursor, parsedQuery, queryParser.getCriteriaFactory());
stopTimer(queryParsingTimer);
searchTimer.startNewSegment(
DATABASE_ACCESS_SEGMENT_NAME); // segment stopped by stopTimerAndHandleError
final List<String> subjectIds =
streamThings.getDittoHeaders().getAuthorizationContext().getAuthorizationSubjectIds();
return searchPersistence.findAllUnlimited(query, subjectIds, namespaces)
.map(ThingId::toString) // for serialization???
.runWith(StreamRefs.sourceRef(), SystemMaterializer.get(getSystem()).materializer());
});
});

final Source<Object, NotUsed> replySourceWithErrorHandling =
Expand All @@ -264,6 +281,15 @@ private void stream(final StreamThings streamThings) {
}

private void query(final QueryThings queryThings) {
final var sender = getSender();
performLogging(queryThings);

final ThreadSafeDittoLoggingAdapter l = log.withCorrelationId(queryThings);
l.debug("Starting to process QueryThings command: {}", queryThings);
preEnforcer.apply(queryThings).thenAccept(query -> performQuery((QueryThings) query, sender));
}

private void performQuery(final QueryThings queryThings, final ActorRef sender) {
performLogging(queryThings);

final ThreadSafeDittoLoggingAdapter l = log.withCorrelationId(queryThings);
Expand All @@ -273,38 +299,40 @@ private void query(final QueryThings queryThings) {
final var queryType = "query";
final StartedTimer searchTimer = startNewTimer(version, queryType, queryThings);
final StartedTimer queryParsingTimer = searchTimer.startNewSegment(QUERY_PARSING_SEGMENT_NAME);
final ActorRef sender = getSender();
final Set<String> namespaces = queryThings.getNamespaces().orElse(null);

final Source<QueryThingsResponse, ?> queryThingsResponseSource =
ThingsSearchCursor.extractCursor(queryThings, getSystem()).flatMapConcat(cursor -> {
cursor.ifPresent(c -> c.logCursorCorrelationId(l));
final QueryThings command = ThingsSearchCursor.adjust(cursor, queryThings);
final var dittoHeaders = command.getDittoHeaders();
l.info("Processing QueryThings command with namespaces <{}> and filter: <{}>",
queryThings.getNamespaces(), queryThings.getFilter());
l.debug("Processing QueryThings command: <{}>", queryThings);
return createQuerySource(queryParser::parse, command)
.flatMapConcat(parsedQuery -> {
final var query =
ThingsSearchCursor.adjust(cursor, parsedQuery, queryParser.getCriteriaFactory());

stopTimer(queryParsingTimer);
final StartedTimer databaseAccessTimer =
searchTimer.startNewSegment(DATABASE_ACCESS_SEGMENT_NAME);

final List<String> subjectIds =
command.getDittoHeaders().getAuthorizationContext().getAuthorizationSubjectIds();
final Source<ResultList<TimestampedThingId>, NotUsed> findAllResult =
searchPersistence.findAll(query, subjectIds, namespaces);
return processSearchPersistenceResult(findAllResult, dittoHeaders)
.via(Flow.fromFunction(result -> {
stopTimer(databaseAccessTimer);
return result;
}))
.map(ids -> toQueryThingsResponse(command, cursor.orElse(null), ids));
});
});
cursor.ifPresent(c -> c.logCursorCorrelationId(l));
final QueryThings command = ThingsSearchCursor.adjust(cursor, queryThings);
final var dittoHeaders = command.getDittoHeaders();
l.info("Processing QueryThings command with namespaces <{}> and filter: <{}>",
queryThings.getNamespaces(), queryThings.getFilter());
l.debug("Processing QueryThings command: <{}>", queryThings);
return createQuerySource(queryParser::parse, command)
.flatMapConcat(parsedQuery -> {
final var query =
ThingsSearchCursor.adjust(cursor, parsedQuery,
queryParser.getCriteriaFactory());

stopTimer(queryParsingTimer);
final StartedTimer databaseAccessTimer =
searchTimer.startNewSegment(DATABASE_ACCESS_SEGMENT_NAME);

final List<String> subjectIds =
command.getDittoHeaders()
.getAuthorizationContext()
.getAuthorizationSubjectIds();
final Source<ResultList<TimestampedThingId>, NotUsed> findAllResult =
searchPersistence.findAll(query, subjectIds, namespaces);
return processSearchPersistenceResult(findAllResult, dittoHeaders)
.via(Flow.fromFunction(result -> {
stopTimer(databaseAccessTimer);
return result;
}))
.map(ids -> toQueryThingsResponse(command, cursor.orElse(null), ids));
});
});

final Source<Object, ?> replySourceWithErrorHandling =
queryThingsResponseSource.via(stopTimerAndHandleError(searchTimer, queryThings));
Expand Down
5 changes: 5 additions & 0 deletions thingsearch/service/src/main/resources/search.conf
Expand Up @@ -4,6 +4,11 @@ ditto {
root-child-actor-starter = "org.eclipse.ditto.base.service.NoOpRootChildActorStarter"
root-actor-starter = "org.eclipse.ditto.base.service.NoOpRootActorStarter"

pre-enforcers = [
"org.eclipse.ditto.policies.enforcement.pre_enforcement.HeaderSetter",
"org.eclipse.ditto.policies.enforcement.placeholders.PlaceholderSubstitution",
]

persistence.operations.delay-after-persistence-actor-shutdown = 5s
persistence.operations.delay-after-persistence-actor-shutdown = ${?DELAY_AFTER_PERSISTENCE_ACTOR_SHUTDOWN}

Expand Down
5 changes: 5 additions & 0 deletions thingsearch/service/src/test/resources/actors-test.conf
Expand Up @@ -2,6 +2,11 @@ ditto {
signal-enrichment.caching-signal-enrichment-facade.provider = org.eclipse.ditto.internal.models.signalenrichment.DittoCachingSignalEnrichmentFacadeProvider
mapping-strategy.implementation = "org.eclipse.ditto.thingsearch.api.ThingSearchMappingStrategies"

pre-enforcers = [
"org.eclipse.ditto.policies.enforcement.pre_enforcement.HeaderSetter",
"org.eclipse.ditto.policies.enforcement.placeholders.PlaceholderSubstitution",
]

persistence.operations.delay-after-persistence-actor-shutdown = 5s
persistence.operations.delay-after-persistence-actor-shutdown = ${?DELAY_AFTER_PERSISTENCE_ACTOR_SHUTDOWN}

Expand Down

0 comments on commit 3b85931

Please sign in to comment.