Skip to content

Commit

Permalink
Search: Make simple-field-mappings configurable; make parsed query av…
Browse files Browse the repository at this point in the history
…ailable to query criteria validators.

Signed-off-by: Yufei Cai <yufei.cai@bosch.io>
  • Loading branch information
yufei-cai committed Jun 21, 2022
1 parent 44a10be commit 4d697f4
Show file tree
Hide file tree
Showing 10 changed files with 114 additions and 64 deletions.
Expand Up @@ -12,8 +12,10 @@
*/
package org.eclipse.ditto.thingsearch.service.common.config;

import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.stream.Collectors;

import javax.annotation.Nullable;
import javax.annotation.concurrent.Immutable;
Expand All @@ -36,6 +38,8 @@
import org.eclipse.ditto.internal.utils.persistence.operations.PersistenceOperationsConfig;
import org.eclipse.ditto.internal.utils.tracing.config.TracingConfig;

import com.typesafe.config.Config;

/**
* This class is the default implementation of {@link SearchConfig}.
*/
Expand All @@ -54,6 +58,7 @@ public final class DittoSearchConfig implements SearchConfig, WithConfigPath {
private final IndexInitializationConfig indexInitializationConfig;
private final PersistenceOperationsConfig persistenceOperationsConfig;
private final MongoDbConfig mongoDbConfig;
private final Map<String, String> simpleFieldMappings;

private DittoSearchConfig(final ScopedConfig dittoScopedConfig) {
dittoServiceConfig = DittoServiceConfig.of(dittoScopedConfig, CONFIG_PATH);
Expand All @@ -69,6 +74,8 @@ private DittoSearchConfig(final ScopedConfig dittoScopedConfig) {
searchUpdateObserver = configWithFallback.getStringOrNull(SearchConfigValue.SEARCH_UPDATE_OBSERVER);
updaterConfig = DefaultUpdaterConfig.of(configWithFallback);
indexInitializationConfig = DefaultIndexInitializationConfig.of(configWithFallback);
simpleFieldMappings =
convertToMap(configWithFallback.getConfig(SearchConfigValue.SIMPLE_FIELD_MAPPINGS.getConfigPath()));
}

/**
Expand Down Expand Up @@ -111,6 +118,11 @@ public UpdaterConfig getUpdaterConfig() {
return updaterConfig;
}

@Override
public Map<String, String> getSimpleFieldMappings() {
return simpleFieldMappings;
}

@Override
public ClusterConfig getClusterConfig() {
return dittoServiceConfig.getClusterConfig();
Expand Down Expand Up @@ -175,15 +187,15 @@ public boolean equals(final Object o) {
Objects.equals(healthCheckConfig, that.healthCheckConfig) &&
Objects.equals(indexInitializationConfig, that.indexInitializationConfig) &&
Objects.equals(persistenceOperationsConfig, that.persistenceOperationsConfig) &&
Objects.equals(mongoDbConfig, that.mongoDbConfig);
Objects.equals(mongoDbConfig, that.mongoDbConfig) &&
Objects.equals(simpleFieldMappings, that.simpleFieldMappings);
}

@Override
public int hashCode() {
return Objects.hash(mongoHintsByNamespace, queryCriteriaValidator, searchUpdateMapper, searchUpdateObserver,
updaterConfig, dittoServiceConfig, healthCheckConfig, indexInitializationConfig,
persistenceOperationsConfig,
mongoDbConfig);
persistenceOperationsConfig, mongoDbConfig, simpleFieldMappings);
}

@Override
Expand All @@ -199,6 +211,7 @@ public String toString() {
", indexInitializationConfig=" + indexInitializationConfig +
", persistenceOperationsConfig=" + persistenceOperationsConfig +
", mongoDbConfig=" + mongoDbConfig +
", simpleFieldMappings=" + simpleFieldMappings +
"]";
}

Expand All @@ -207,4 +220,12 @@ public String getConfigPath() {
return CONFIG_PATH;
}

private static Map<String, String> convertToMap(final Config config) {
return config.root()
.unwrapped()
.entrySet()
.stream()
.filter(entry -> entry.getValue() instanceof String)
.collect(Collectors.toUnmodifiableMap(Map.Entry::getKey, entry -> (String) entry.getValue()));
}
}
Expand Up @@ -12,6 +12,7 @@
*/
package org.eclipse.ditto.thingsearch.service.common.config;

import java.util.Map;
import java.util.Optional;

import javax.annotation.Nullable;
Expand All @@ -24,6 +25,9 @@
import org.eclipse.ditto.internal.utils.persistence.mongo.config.WithMongoDbConfig;
import org.eclipse.ditto.internal.utils.persistence.operations.WithPersistenceOperationsConfig;

import com.typesafe.config.ConfigFactory;
import com.typesafe.config.ConfigValueFactory;

/**
* Provides the configuration settings of the Search service.
*/
Expand Down Expand Up @@ -66,6 +70,14 @@ public interface SearchConfig extends ServiceSpecificConfig, WithHealthCheckConf
*/
UpdaterConfig getUpdaterConfig();

/**
* Returns how simple fields are mapped during query parsing.
*
* @return the simple field mapping.
* @since 2.5.0
*/
Map<String, String> getSimpleFieldMappings();

/**
* An enumeration of the known config path expressions and their associated default values for SearchConfig.
*/
Expand Down Expand Up @@ -99,7 +111,22 @@ enum SearchConfigValue implements KnownConfigValue {
* @since 2.3.0
*/
SEARCH_UPDATE_OBSERVER("search-update-observer.implementation",
"org.eclipse.ditto.thingsearch.service.updater.actors.DefaultSearchUpdateObserver");
"org.eclipse.ditto.thingsearch.service.updater.actors.DefaultSearchUpdateObserver"),

/**
* How simple fields are mapped during query parsing.
*
* @since 2.5.0
*/
SIMPLE_FIELD_MAPPINGS("simple-field-mappings", ConfigValueFactory.fromMap(Map.of(
"thingId", "_id",
"namespace", "_namespace",
"policyId", "/policyId",
"_revision", "/_revision",
"_modified", "/_modified",
"_created", "/_created",
"definition", "/definition"
)));

private final String path;
private final Object defaultValue;
Expand Down
Expand Up @@ -17,24 +17,24 @@
import java.util.concurrent.CompletionStage;

import org.eclipse.ditto.base.model.headers.DittoHeaders;
import org.eclipse.ditto.rql.model.ParserException;
import org.eclipse.ditto.rql.model.predicates.PredicateParser;
import org.eclipse.ditto.rql.parser.RqlPredicateParser;
import org.eclipse.ditto.rql.parser.thingsearch.RqlOptionParser;
import org.eclipse.ditto.rql.query.Query;
import org.eclipse.ditto.rql.query.QueryBuilder;
import org.eclipse.ditto.rql.query.QueryBuilderFactory;
import org.eclipse.ditto.rql.query.criteria.Criteria;
import org.eclipse.ditto.rql.query.criteria.CriteriaFactory;
import org.eclipse.ditto.rql.query.expression.ThingsFieldExpressionFactory;
import org.eclipse.ditto.rql.query.filter.QueryFilterCriteriaFactory;
import org.eclipse.ditto.rql.model.ParserException;
import org.eclipse.ditto.rql.model.predicates.PredicateParser;
import org.eclipse.ditto.rql.parser.RqlPredicateParser;
import org.eclipse.ditto.rql.parser.thingsearch.RqlOptionParser;
import org.eclipse.ditto.thingsearch.api.commands.sudo.SudoCountThings;
import org.eclipse.ditto.thingsearch.api.query.filter.ParameterOptionVisitor;
import org.eclipse.ditto.thingsearch.service.persistence.query.validation.QueryCriteriaValidator;
import org.eclipse.ditto.thingsearch.model.signals.commands.exceptions.InvalidOptionException;
import org.eclipse.ditto.thingsearch.model.signals.commands.query.QueryThings;
import org.eclipse.ditto.thingsearch.model.signals.commands.query.StreamThings;
import org.eclipse.ditto.thingsearch.model.signals.commands.query.ThingSearchQueryCommand;
import org.eclipse.ditto.thingsearch.service.persistence.query.validation.QueryCriteriaValidator;

/**
* Create Query objects from search commands.
Expand Down Expand Up @@ -78,28 +78,26 @@ public static QueryParser of(final ThingsFieldExpressionFactory fieldExpressionF
/**
* Parses a search command into a query.
*
* @param commandToValidate the search command.
* @param command the search command.
* @return the query.
*/
public CompletionStage<Query> parse(final ThingSearchQueryCommand<?> commandToValidate) {
final Criteria criteria = parseCriteria(commandToValidate);
return queryCriteriaValidator.validateCommand(commandToValidate).thenApply(command -> {
if (command instanceof QueryThings) {
final QueryThings queryThings = (QueryThings) command;
final QueryBuilder queryBuilder = queryBuilderFactory.newBuilder(criteria);
queryThings.getOptions()
.map(optionStrings -> String.join(",", optionStrings))
.ifPresent(options -> setOptions(options, queryBuilder, command.getDittoHeaders()));
return queryBuilder.build();
} else if (command instanceof StreamThings) {
final StreamThings streamThings = (StreamThings) command;
final QueryBuilder queryBuilder = queryBuilderFactory.newUnlimitedBuilder(criteria);
streamThings.getSort().ifPresent(sort -> setOptions(sort, queryBuilder, command.getDittoHeaders()));
return queryBuilder.build();
} else {
return queryBuilderFactory.newUnlimitedBuilder(criteria).build();
}
});
public CompletionStage<Query> parse(final ThingSearchQueryCommand<?> command) {
final Criteria criteria = parseCriteria(command);
final Query query;
if (command instanceof final QueryThings queryThings) {
final QueryBuilder queryBuilder = queryBuilderFactory.newBuilder(criteria);
queryThings.getOptions()
.map(optionStrings -> String.join(",", optionStrings))
.ifPresent(options -> setOptions(options, queryBuilder, command.getDittoHeaders()));
query = queryBuilder.build();
} else if (command instanceof final StreamThings streamThings) {
final QueryBuilder queryBuilder = queryBuilderFactory.newUnlimitedBuilder(criteria);
streamThings.getSort().ifPresent(sort -> setOptions(sort, queryBuilder, command.getDittoHeaders()));
query = queryBuilder.build();
} else {
query = queryBuilderFactory.newUnlimitedBuilder(criteria).build();
}
return queryCriteriaValidator.validateQuery(command, query);
}

private Criteria parseCriteria(final ThingSearchQueryCommand<?> command) {
Expand Down
Expand Up @@ -15,6 +15,7 @@
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;

import org.eclipse.ditto.rql.query.Query;
import org.eclipse.ditto.thingsearch.model.signals.commands.query.ThingSearchQueryCommand;

import akka.actor.ActorSystem;
Expand All @@ -34,7 +35,7 @@ public DefaultQueryCriteriaValidator(final ActorSystem actorSystem) {
}

@Override
public CompletionStage<ThingSearchQueryCommand<?>> validateCommand(final ThingSearchQueryCommand<?> command) {
return CompletableFuture.completedStage(command);
public CompletionStage<Query> validateQuery(final ThingSearchQueryCommand<?> command, final Query query) {
return CompletableFuture.completedStage(query);
}
}
Expand Up @@ -17,6 +17,7 @@

import org.eclipse.ditto.internal.utils.akka.AkkaClassLoader;
import org.eclipse.ditto.internal.utils.config.DefaultScopedConfig;
import org.eclipse.ditto.rql.query.Query;
import org.eclipse.ditto.thingsearch.model.signals.commands.query.ThingSearchQueryCommand;
import org.eclipse.ditto.thingsearch.service.common.config.DittoSearchConfig;
import org.eclipse.ditto.thingsearch.service.common.config.SearchConfig;
Expand All @@ -42,16 +43,13 @@ protected QueryCriteriaValidator(final ActorSystem actorSystem) {
}

/**
* Gets the criteria of a {@link org.eclipse.ditto.thingsearch.model.signals.commands.query.ThingSearchQueryCommand} and
* validates it.
* <p>
* May throw an exception depending on the implementation in the used QueryCriteriaValidator.
* Validate a parsed query of a
* {@link org.eclipse.ditto.thingsearch.model.signals.commands.query.ThingSearchQueryCommand}.
*
* @param command the command to validate.
* @return the validated command in a future if it is valid, or a failed future if it is not.
* @param query The query.
* @return The validated query in a future if it is valid, or a failed future if it is not.
*/
public abstract CompletionStage<ThingSearchQueryCommand<?>> validateCommand(
final ThingSearchQueryCommand<?> command);
public abstract CompletionStage<Query> validateQuery(final ThingSearchQueryCommand<?> command, final Query query);

/**
* Load a {@code QueryCriteriaValidator} dynamically according to the search configuration.
Expand Down
Expand Up @@ -69,7 +69,7 @@ private SearchRootActor(final SearchConfig searchConfig, final ActorRef pubSubMe
final DittoMongoClient mongoDbClient = MongoClientExtension.get(actorSystem).getSearchClient();

final var thingsSearchPersistence = getThingsSearchPersistence(searchConfig, mongoDbClient);
final ActorRef searchActor = initializeSearchActor(searchConfig.getLimitsConfig(), thingsSearchPersistence);
final ActorRef searchActor = initializeSearchActor(searchConfig, thingsSearchPersistence);
pubSubMediator.tell(DistPubSubAccess.put(searchActor), getSelf());

final TimestampPersistence backgroundSyncPersistence =
Expand All @@ -84,8 +84,9 @@ private SearchRootActor(final SearchConfig searchConfig, final ActorRef pubSubMe
bindHttpStatusRoute(searchConfig.getHttpConfig(), healthCheckingActor);
}

static QueryParser getQueryParser(final LimitsConfig limitsConfig, final ActorSystem actorSystem) {
final var fieldExpressionFactory = getThingsFieldExpressionFactory();
static QueryParser getQueryParser(final SearchConfig searchConfig, final ActorSystem actorSystem) {
final var limitsConfig = searchConfig.getLimitsConfig();
final var fieldExpressionFactory = getThingsFieldExpressionFactory(searchConfig);
final QueryBuilderFactory queryBuilderFactory = new MongoQueryBuilderFactory(limitsConfig);
final var queryCriteriaValidator = QueryCriteriaValidator.get(actorSystem);
return QueryParser.of(fieldExpressionFactory, queryBuilderFactory, queryCriteriaValidator);
Expand Down Expand Up @@ -139,24 +140,14 @@ public static Props props(final SearchConfig searchConfig, final ActorRef pubSub
return Props.create(SearchRootActor.class, searchConfig, pubSubMediator);
}

private static ThingsFieldExpressionFactory getThingsFieldExpressionFactory() {
// Not possible to use ModelBasedThingsFieldExpressionFactory
// because the field expression factory is supposed to map 'thingId' to '_id', which is only meaningful for MongoDB
final Map<String, String> mappings = new HashMap<>(6);
mappings.put(FieldExpressionUtil.FIELD_NAME_THING_ID, FieldExpressionUtil.FIELD_ID);
mappings.put(FieldExpressionUtil.FIELD_NAME_NAMESPACE, FieldExpressionUtil.FIELD_NAMESPACE);
addMapping(mappings, Thing.JsonFields.POLICY_ID); // also present as top-level field in search collection, however not indexed
addMapping(mappings, Thing.JsonFields.REVISION); // also present as top-level field in search collection, however not indexed
addMapping(mappings, Thing.JsonFields.MODIFIED);
addMapping(mappings, Thing.JsonFields.CREATED);
addMapping(mappings, Thing.JsonFields.DEFINITION);
return ThingsFieldExpressionFactory.of(mappings);
private static ThingsFieldExpressionFactory getThingsFieldExpressionFactory(final SearchConfig searchConfig) {
return ThingsFieldExpressionFactory.of(searchConfig.getSimpleFieldMappings());
}

private ActorRef initializeSearchActor(final LimitsConfig limitsConfig,
private ActorRef initializeSearchActor(final SearchConfig searchConfig,
final ThingsSearchPersistence thingsSearchPersistence) {

final var queryParser = getQueryParser(limitsConfig, getContext().getSystem());
final var queryParser = getQueryParser(searchConfig, getContext().getSystem());
return startChildActor(SearchActor.ACTOR_NAME, SearchActor.props(queryParser, thingsSearchPersistence));
}

Expand Down
11 changes: 11 additions & 0 deletions thingsearch/service/src/main/resources/search.conf
Expand Up @@ -61,6 +61,17 @@ ditto {
enabled = ${?INDEX_INITIALIZATION_ENABLED}
}

# How simple fields are mapped during query parsing
simple-field-mappings {
thingId = "_id"
namespace = "_namespace"
policyId = "/policyId"
_revision = "/_revision"
_modified = "/_modified"
_created = "/_created"
definition = "/definition"
}

updater {
max-idle-time = 25h
max-idle-time = ${?ACTIVITY_CHECK_INTERVAL}
Expand Down
Expand Up @@ -30,6 +30,7 @@
import org.eclipse.ditto.base.model.auth.DittoAuthorizationContextType;
import org.eclipse.ditto.base.model.headers.DittoHeaders;
import org.eclipse.ditto.base.service.config.limits.DefaultLimitsConfig;
import org.eclipse.ditto.internal.utils.config.DefaultScopedConfig;
import org.eclipse.ditto.internal.utils.persistence.mongo.DittoMongoClient;
import org.eclipse.ditto.internal.utils.persistence.mongo.MongoClientWrapper;
import org.eclipse.ditto.internal.utils.test.mongo.MongoDbResource;
Expand All @@ -54,6 +55,7 @@
import org.eclipse.ditto.thingsearch.model.signals.commands.query.QueryThingsResponse;
import org.eclipse.ditto.thingsearch.model.signals.commands.query.StreamThings;
import org.eclipse.ditto.thingsearch.model.signals.commands.query.ThingSearchQueryCommand;
import org.eclipse.ditto.thingsearch.service.common.config.DittoSearchConfig;
import org.eclipse.ditto.thingsearch.service.persistence.PersistenceConstants;
import org.eclipse.ditto.thingsearch.service.persistence.query.QueryParser;
import org.eclipse.ditto.thingsearch.service.persistence.read.MongoThingsSearchPersistence;
Expand Down Expand Up @@ -108,7 +110,8 @@ public static void startMongoResource() {
""");
actorsTestConfig = ConfigFactory.load("actors-test.conf").withFallback(dispatcherConfig);

queryParser = SearchRootActor.getQueryParser(DefaultLimitsConfig.of(ConfigFactory.empty()),
queryParser = SearchRootActor.getQueryParser(
DittoSearchConfig.of(DefaultScopedConfig.dittoScoped(actorsTestConfig)),
ActorSystem.create(SearchActorIT.class.getSimpleName(), actorsTestConfig));
mongoClient = provideClientWrapper();
policy = createPolicy();
Expand Down
Expand Up @@ -132,9 +132,8 @@ public void cursorForCompositeSortValue() {

final var command =
ThingsSearchCursor.adjust(Optional.of(underTest), QueryThings.of(DittoHeaders.empty()));
final var limitsConfig =
DittoSearchConfig.of(DefaultScopedConfig.dittoScoped(config)).getLimitsConfig();
final var parser = SearchRootActor.getQueryParser(limitsConfig, actorSystem);
final var searchConfig = DittoSearchConfig.of(DefaultScopedConfig.dittoScoped(config));
final var parser = SearchRootActor.getQueryParser(searchConfig, actorSystem);
final Query query = parser.parse(command).toCompletableFuture().join();
final Query result = ThingsSearchCursor.adjust(Optional.of(underTest), query, parser.getCriteriaFactory());
final var bson = CreateBsonVisitor.sudoApply(result.getCriteria())
Expand Down

0 comments on commit 4d697f4

Please sign in to comment.