Skip to content

Commit

Permalink
[#1081] make query parser asynchronous.
Browse files Browse the repository at this point in the history
Signed-off-by: Yufei Cai <yufei.cai@bosch.io>
  • Loading branch information
yufei-cai committed Jun 27, 2021
1 parent bd36c10 commit 2042389
Show file tree
Hide file tree
Showing 4 changed files with 69 additions and 48 deletions.
Expand Up @@ -13,6 +13,8 @@
package org.eclipse.ditto.thingsearch.service.persistence.query;

import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;

import org.eclipse.ditto.base.model.headers.DittoHeaders;
import org.eclipse.ditto.rql.query.Query;
Expand Down Expand Up @@ -76,27 +78,28 @@ public static QueryParser of(final ThingsFieldExpressionFactory fieldExpressionF
/**
* Parses a search command into a query.
*
* @param command the search command.
* @param commandToValidate the search command.
* @return the query.
*/
public Query parse(final ThingSearchQueryCommand<?> command) {
final Criteria criteria = parseCriteria(command);
queryCriteriaValidator.validateCommand(command);
if (command instanceof QueryThings) {
final QueryThings queryThings = (QueryThings) command;
final QueryBuilder queryBuilder = queryBuilderFactory.newBuilder(criteria);
queryThings.getOptions()
.map(optionStrings -> String.join(",", optionStrings))
.ifPresent(options -> setOptions(options, queryBuilder, command.getDittoHeaders()));
return queryBuilder.build();
} else if (command instanceof StreamThings) {
final StreamThings streamThings = (StreamThings) command;
final QueryBuilder queryBuilder = queryBuilderFactory.newUnlimitedBuilder(criteria);
streamThings.getSort().ifPresent(sort -> setOptions(sort, queryBuilder, command.getDittoHeaders()));
return queryBuilder.build();
} else {
return queryBuilderFactory.newUnlimitedBuilder(criteria).build();
}
public CompletionStage<Query> parse(final ThingSearchQueryCommand<?> commandToValidate) {
final Criteria criteria = parseCriteria(commandToValidate);
return queryCriteriaValidator.validateCommand(commandToValidate).thenApply(command -> {
if (command instanceof QueryThings) {
final QueryThings queryThings = (QueryThings) command;
final QueryBuilder queryBuilder = queryBuilderFactory.newBuilder(criteria);
queryThings.getOptions()
.map(optionStrings -> String.join(",", optionStrings))
.ifPresent(options -> setOptions(options, queryBuilder, command.getDittoHeaders()));
return queryBuilder.build();
} else if (command instanceof StreamThings) {
final StreamThings streamThings = (StreamThings) command;
final QueryBuilder queryBuilder = queryBuilderFactory.newUnlimitedBuilder(criteria);
streamThings.getSort().ifPresent(sort -> setOptions(sort, queryBuilder, command.getDittoHeaders()));
return queryBuilder.build();
} else {
return queryBuilderFactory.newUnlimitedBuilder(criteria).build();
}
});
}

private Criteria parseCriteria(final ThingSearchQueryCommand<?> command) {
Expand All @@ -116,11 +119,11 @@ private Criteria parseCriteria(final ThingSearchQueryCommand<?> command) {
* @param sudoCountThings the command.
* @return the query.
*/
public Query parseSudoCountThings(final SudoCountThings sudoCountThings) {
public CompletionStage<Query> parseSudoCountThings(final SudoCountThings sudoCountThings) {
final DittoHeaders headers = sudoCountThings.getDittoHeaders();
final String filters = sudoCountThings.getFilter().orElse(null);
final Criteria criteria = queryFilterCriteriaFactory.filterCriteria(filters, headers);
return queryBuilderFactory.newUnlimitedBuilder(criteria).build();
return CompletableFuture.completedStage(queryBuilderFactory.newUnlimitedBuilder(criteria).build());
}

/**
Expand Down
Expand Up @@ -12,6 +12,9 @@
*/
package org.eclipse.ditto.thingsearch.service.persistence.query.validation;

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;

import org.eclipse.ditto.thingsearch.model.signals.commands.query.ThingSearchQueryCommand;

import akka.actor.ActorSystem;
Expand All @@ -31,7 +34,7 @@ public DefaultQueryCriteriaValidator(final ActorSystem actorSystem) {
}

@Override
public void validateCommand(final ThingSearchQueryCommand<?> command) {
// do nothing
public CompletionStage<ThingSearchQueryCommand<?>> validateCommand(final ThingSearchQueryCommand<?> command) {
return CompletableFuture.completedStage(command);
}
}
Expand Up @@ -13,12 +13,13 @@
package org.eclipse.ditto.thingsearch.service.persistence.query.validation;

import java.util.List;
import java.util.concurrent.CompletionStage;

import org.eclipse.ditto.thingsearch.service.common.config.DittoSearchConfig;
import org.eclipse.ditto.thingsearch.service.common.config.SearchConfig;
import org.eclipse.ditto.internal.utils.akka.AkkaClassLoader;
import org.eclipse.ditto.internal.utils.config.DefaultScopedConfig;
import org.eclipse.ditto.thingsearch.model.signals.commands.query.ThingSearchQueryCommand;
import org.eclipse.ditto.thingsearch.service.common.config.DittoSearchConfig;
import org.eclipse.ditto.thingsearch.service.common.config.SearchConfig;

import akka.actor.AbstractExtensionId;
import akka.actor.ActorSystem;
Expand Down Expand Up @@ -49,8 +50,12 @@ protected QueryCriteriaValidator(final ActorSystem actorSystem) {
* validates it.
* <p>
* May throw an exception depending on the implementation in the used QueryCriteriaValidator.
*
* @param command the command to validate.
* @return the validated command in a future if it is valid, or a failed future if it is not.
*/
public abstract void validateCommand(final ThingSearchQueryCommand<?> command);
public abstract CompletionStage<ThingSearchQueryCommand<?>> validateCommand(
final ThingSearchQueryCommand<?> command);

/**
* Load a {@code QueryCriteriaValidator} dynamically according to the search configuration.
Expand Down
Expand Up @@ -15,40 +15,42 @@
import java.util.List;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletionException;
import java.util.concurrent.CompletionStage;
import java.util.function.Function;

import javax.annotation.Nullable;

import org.eclipse.ditto.json.JsonArray;
import org.eclipse.ditto.json.JsonCollectors;
import org.eclipse.ditto.json.JsonObject;
import org.eclipse.ditto.json.JsonValue;
import org.eclipse.ditto.base.model.exceptions.DittoRuntimeException;
import org.eclipse.ditto.base.model.headers.DittoHeaders;
import org.eclipse.ditto.base.model.headers.WithDittoHeaders;
import org.eclipse.ditto.base.model.json.JsonSchemaVersion;
import org.eclipse.ditto.base.model.signals.commands.Command;
import org.eclipse.ditto.base.model.signals.commands.exceptions.GatewayInternalErrorException;
import org.eclipse.ditto.internal.utils.akka.logging.DittoLoggerFactory;
import org.eclipse.ditto.internal.utils.akka.logging.ThreadSafeDittoLoggingAdapter;
import org.eclipse.ditto.internal.utils.metrics.DittoMetrics;
import org.eclipse.ditto.internal.utils.metrics.instruments.timer.StartedTimer;
import org.eclipse.ditto.json.JsonArray;
import org.eclipse.ditto.json.JsonCollectors;
import org.eclipse.ditto.json.JsonObject;
import org.eclipse.ditto.json.JsonValue;
import org.eclipse.ditto.rql.query.Query;
import org.eclipse.ditto.things.model.Thing;
import org.eclipse.ditto.things.model.ThingId;
import org.eclipse.ditto.thingsearch.model.SearchModelFactory;
import org.eclipse.ditto.thingsearch.model.SearchResult;
import org.eclipse.ditto.thingsearch.api.commands.sudo.SudoCountThings;
import org.eclipse.ditto.thingsearch.api.commands.sudo.SudoRetrieveNamespaceReport;
import org.eclipse.ditto.thingsearch.service.common.model.ResultList;
import org.eclipse.ditto.thingsearch.service.persistence.query.QueryParser;
import org.eclipse.ditto.thingsearch.service.persistence.read.ThingsSearchPersistence;
import org.eclipse.ditto.internal.utils.akka.logging.DittoLoggerFactory;
import org.eclipse.ditto.internal.utils.akka.logging.ThreadSafeDittoLoggingAdapter;
import org.eclipse.ditto.internal.utils.metrics.DittoMetrics;
import org.eclipse.ditto.internal.utils.metrics.instruments.timer.StartedTimer;
import org.eclipse.ditto.base.model.signals.commands.Command;
import org.eclipse.ditto.base.model.signals.commands.exceptions.GatewayInternalErrorException;
import org.eclipse.ditto.thingsearch.model.SearchModelFactory;
import org.eclipse.ditto.thingsearch.model.SearchResult;
import org.eclipse.ditto.thingsearch.model.signals.commands.ThingSearchCommand;
import org.eclipse.ditto.thingsearch.model.signals.commands.query.CountThings;
import org.eclipse.ditto.thingsearch.model.signals.commands.query.CountThingsResponse;
import org.eclipse.ditto.thingsearch.model.signals.commands.query.QueryThings;
import org.eclipse.ditto.thingsearch.model.signals.commands.query.QueryThingsResponse;
import org.eclipse.ditto.thingsearch.model.signals.commands.query.StreamThings;
import org.eclipse.ditto.thingsearch.service.common.model.ResultList;
import org.eclipse.ditto.thingsearch.service.persistence.query.QueryParser;
import org.eclipse.ditto.thingsearch.service.persistence.read.ThingsSearchPersistence;

import akka.NotUsed;
import akka.actor.AbstractActor;
Expand Down Expand Up @@ -156,8 +158,8 @@ private void sudoCount(final SudoCountThings sudoCountThings) {
executeCount(sudoCountThings, queryParser::parseSudoCountThings, true);
}

private <T extends Command> void executeCount(final T countCommand,
final Function<T, Query> queryParseFunction,
private <T extends Command<?>> void executeCount(final T countCommand,
final Function<T, CompletionStage<Query>> queryParseFunction,
final boolean isSudo) {
final DittoHeaders dittoHeaders = countCommand.getDittoHeaders();
log.withCorrelationId(dittoHeaders)
Expand Down Expand Up @@ -193,7 +195,8 @@ private <T extends Command> void executeCount(final T countCommand,
.via(stopTimerAndHandleError(countTimer, countCommand));

Materializer.createMaterializer(this::getContext);
Patterns.pipe(replySource.runWith(Sink.head(), SystemMaterializer.get(getSystem()).materializer()), getContext().dispatcher()).to(sender);
Patterns.pipe(replySource.runWith(Sink.head(), SystemMaterializer.get(getSystem()).materializer()),
getContext().dispatcher()).to(sender);
}

private void stream(final StreamThings streamThings) {
Expand Down Expand Up @@ -223,7 +226,9 @@ private void stream(final StreamThings streamThings) {
final Source<Object, NotUsed> replySourceWithErrorHandling =
sourceRefSource.via(stopTimerAndHandleError(searchTimer, streamThings));

Patterns.pipe(replySourceWithErrorHandling.runWith(Sink.head(), SystemMaterializer.get(getSystem()).materializer()), getContext().dispatcher())
Patterns.pipe(
replySourceWithErrorHandling.runWith(Sink.head(), SystemMaterializer.get(getSystem()).materializer()),
getContext().dispatcher())
.to(sender);
}

Expand Down Expand Up @@ -272,7 +277,9 @@ private void query(final QueryThings queryThings) {
final Source<Object, ?> replySourceWithErrorHandling =
replySource.via(stopTimerAndHandleError(searchTimer, queryThings));

Patterns.pipe(replySourceWithErrorHandling.runWith(Sink.head(), SystemMaterializer.get(getSystem()).materializer()), getContext().dispatcher())
Patterns.pipe(
replySourceWithErrorHandling.runWith(Sink.head(), SystemMaterializer.get(getSystem()).materializer()),
getContext().dispatcher())
.to(sender);
}

Expand Down Expand Up @@ -353,11 +360,14 @@ private static StartedTimer startNewTimer(final JsonSchemaVersion version, final
.start();
}

private static <T> Source<Query, NotUsed> createQuerySource(final Function<T, Query> parser,
private static <T> Source<Query, NotUsed> createQuerySource(final Function<T, CompletionStage<Query>> parser,
final T command) {

try {
return Source.single(parser.apply(command));
return Source.fromCompletionStage(parser.apply(command))
.recoverWithRetries(1, new PFBuilder<Throwable, Source<Query, NotUsed>>()
.match(CompletionException.class, e -> Source.failed(e.getCause()))
.build());
} catch (final Throwable e) {
return Source.failed(e);
}
Expand Down

0 comments on commit 2042389

Please sign in to comment.