Skip to content

Commit

Permalink
Make SearchUpdateMapper asynchronous.
Browse files Browse the repository at this point in the history
Signed-off-by: Yufei Cai <yufei.cai@bosch.io>
  • Loading branch information
yufei-cai committed Oct 25, 2021
1 parent 9afd80c commit b0bce92
Show file tree
Hide file tree
Showing 3 changed files with 16 additions and 14 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,9 @@

import org.eclipse.ditto.thingsearch.service.persistence.write.model.AbstractWriteModel;

import akka.NotUsed;
import akka.actor.ActorSystem;
import akka.stream.javadsl.Source;

/**
* Default {@code SearchUpdateMapper} for custom search update processing.
Expand All @@ -32,8 +34,8 @@ protected DefaultSearchUpdateMapper(final ActorSystem actorSystem) {
}

@Override
public List<AbstractWriteModel> processWriteModels(final List<AbstractWriteModel> writeModels) {
return writeModels;
public Source<List<AbstractWriteModel>, NotUsed> processWriteModels(final List<AbstractWriteModel> writeModels) {
return Source.single(writeModels);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@
import org.eclipse.ditto.thingsearch.service.persistence.write.model.WriteResultAndErrors;

import com.mongodb.MongoBulkWriteException;
import com.mongodb.bulk.BulkWriteResult;
import com.mongodb.client.model.BulkWriteOptions;
import com.mongodb.client.model.WriteModel;
import com.mongodb.reactivestreams.client.MongoCollection;
Expand Down Expand Up @@ -110,16 +109,15 @@ public Flow<Source<AbstractWriteModel, NotUsed>, WriteResultAndErrors, NotUsed>

final Flow<List<AbstractWriteModel>, WriteResultAndErrors, NotUsed> writeFlow =
Flow.<List<AbstractWriteModel>>create()
.map(writeModels -> {
try {
return searchUpdateMapper.processWriteModels(writeModels);
} catch (final Exception e) {
LOGGER.error(
"Skipping mapping of search update write models because an unexpected error occurred.",
e);
return writeModels;
}
})
.flatMapConcat(writeModels -> searchUpdateMapper.processWriteModels(writeModels)
.recoverWithRetries(1,
new PFBuilder<Throwable, Source<List<AbstractWriteModel>, NotUsed>>()
.matchAny(e -> {
LOGGER.error("Skipping mapping of search update write models " +
"because an unexpected error occurred.", e);
return Source.single(writeModels);
})
.build()))
.mapAsync(1, MongoSearchUpdaterFlow::toIncrementalMongo)
.flatMapMerge(parallelism, writeModels ->
executeBulkWrite(shouldAcknowledge, writeModels).async(DISPATCHER_NAME, parallelism));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,12 @@
import org.eclipse.ditto.thingsearch.service.common.config.SearchConfig;
import org.eclipse.ditto.thingsearch.service.persistence.write.model.AbstractWriteModel;

import akka.NotUsed;
import akka.actor.AbstractExtensionId;
import akka.actor.ActorSystem;
import akka.actor.ExtendedActorSystem;
import akka.actor.Extension;
import akka.stream.javadsl.Source;

/**
* Search Update Mapper to be loaded by reflection.
Expand All @@ -48,7 +50,7 @@ protected SearchUpdateMapper(final ActorSystem actorSystem) {
* Should not throw an exception. If a exception is thrown, the mapping is ignored.
* If no search update should be executed an empty list can be returned.
*/
public abstract List<AbstractWriteModel> processWriteModels(final List<AbstractWriteModel> writeModels);
public abstract Source<List<AbstractWriteModel>, NotUsed> processWriteModels(final List<AbstractWriteModel> writeModels);

/**
* Load a {@code SearchUpdateListener} dynamically according to the search configuration.
Expand Down

0 comments on commit b0bce92

Please sign in to comment.