Skip to content

Commit

Permalink
fixed parallelism issues in search-updater bulk-writes coming from th…
Browse files Browse the repository at this point in the history
…e same thing

* by creating SubSources using "groupBy(bulkWriteSize)" in EnforcementFlow in order to perform updates regarding a certain thingId always from the same "BulkWriter"
* removing behavior changes in ThingUpdater in order to not block it unnecessarily

Signed-off-by: Thomas Jaeckle <thomas.jaeckle@bosch.io>
  • Loading branch information
thjaeckle committed Mar 10, 2022
1 parent a67e56e commit a7af56e
Show file tree
Hide file tree
Showing 12 changed files with 106 additions and 246 deletions.
3 changes: 2 additions & 1 deletion .run/SearchService.run.xml
Expand Up @@ -18,6 +18,7 @@
<env name="BACKGROUND_SYNC_QUIET_PERIOD" value="1h" />
<env name="THINGS_SEARCH_UPDATER_STREAM_PERSISTENCE_PARALLELISM" value="4" />
<env name="THINGS_SEARCH_UPDATER_STREAM_WRITE_INTERVAL" value="30s" />
<env name="THINGS_SEARCH_UPDATER_STREAM_PERSISTENCE_MAX_BULK_SIZE" value="2" />
</envs>
<option name="MAIN_CLASS_NAME" value="org.eclipse.ditto.thingsearch.service.starter.SearchService" />
<module name="ditto-thingsearch-service" />
Expand All @@ -32,4 +33,4 @@
<option name="Make" enabled="true" />
</method>
</configuration>
</component>
</component>
Expand Up @@ -14,9 +14,7 @@

import java.time.Duration;
import java.time.Instant;
import java.util.AbstractMap;
import java.util.Comparator;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletionStage;
Expand Down Expand Up @@ -59,10 +57,12 @@
import akka.actor.ActorRef;
import akka.actor.ActorSystem;
import akka.actor.Scheduler;
import akka.japi.Pair;
import akka.japi.pf.PFBuilder;
import akka.stream.javadsl.Flow;
import akka.stream.javadsl.Keep;
import akka.stream.javadsl.Source;
import akka.stream.javadsl.SubSource;

/**
* Converts Thing changes into write models by retrieving data and applying enforcement via an enforcer cache.
Expand Down Expand Up @@ -158,48 +158,36 @@ private static boolean shouldReloadCache(@Nullable final Entry<?> entry, final M
* Create a flow from Thing changes to write models by retrieving data from Things shard region and enforcer cache.
*
* @param parallelism how many thing retrieves to perform in parallel to the caching facade.
* @param maxBulkSize the maximum configured bulk size which is used in this context to create this amount of
* subSources.
* @return the flow.
*/
public Flow<Map<ThingId, Metadata>, Source<AbstractWriteModel, NotUsed>, NotUsed> create(final int parallelism) {
public Flow<Map<ThingId, Metadata>, SubSource<AbstractWriteModel, NotUsed>, NotUsed> create(
final int parallelism,
final int maxBulkSize) {

return Flow.<Map<ThingId, Metadata>>create()
.map(changeMap -> {
log.info("Updating search index for <{}> changed things", changeMap.size());
return retrieveThingJsonsFromCachingFacade(parallelism, changeMap).flatMapConcat(responseMap ->
Source.fromIterator(changeMap.values()::iterator)
.flatMapMerge(parallelism, metadataRef -> {
final JsonObject thing = responseMap.get(metadataRef.getThingId());
return Source.fromIterator(changeMap.values()::iterator)
.groupBy(maxBulkSize, m -> Math.floorMod(m.getThingId().hashCode(), maxBulkSize))
.flatMapMerge(parallelism, changedMetadata ->
retrieveThingFromCachingFacade(changedMetadata.getThingId(), changedMetadata)
.async(MongoSearchUpdaterFlow.DISPATCHER_NAME, parallelism)
.map(pair -> {
final Metadata metadataRef = changeMap.get(pair.first());
final JsonObject thing = pair.second();
searchUpdateObserver.process(metadataRef, thing);
return computeWriteModel(metadataRef, thing)
.async(MongoSearchUpdaterFlow.DISPATCHER_NAME, parallelism);
}
)
);
})
).flatMapConcat(source -> source);
});

}

private Source<Map<ThingId, JsonObject>, NotUsed> retrieveThingJsonsFromCachingFacade(
final int parallelism, final Map<ThingId, Metadata> changeMap) {

return Source.fromIterator(changeMap.entrySet()::iterator)
.flatMapMerge(parallelism, entry -> retrieveThingFromCachingFacade(entry)
.async(MongoSearchUpdaterFlow.DISPATCHER_NAME, parallelism))
.<Map<ThingId, JsonObject>>fold(new HashMap<>(), (map, entry) -> {
map.put(entry.getKey(), entry.getValue());
return map;
})
.map(result -> {
log.debug("Got things from caching facade with size: <{}>", result.size());
return result;
});
}

private Source<Map.Entry<ThingId, JsonObject>, NotUsed> retrieveThingFromCachingFacade(
final Map.Entry<ThingId, Metadata> entry) {

final var thingId = entry.getKey();
final var metadata = entry.getValue();
private Source<Pair<ThingId, JsonObject>, NotUsed> retrieveThingFromCachingFacade(final ThingId thingId,
final Metadata metadata) {
ConsistencyLag.startS3RetrieveThing(metadata);
final CompletionStage<JsonObject> thingFuture;
if (metadata.shouldInvalidateThing()) {
Expand All @@ -210,8 +198,8 @@ private Source<Map.Entry<ThingId, JsonObject>, NotUsed> retrieveThingFromCaching

return Source.completionStage(thingFuture)
.filter(thing -> !thing.isEmpty())
.<Map.Entry<ThingId, JsonObject>>map(thing -> new AbstractMap.SimpleImmutableEntry<>(thingId, thing))
.recoverWithRetries(1, new PFBuilder<Throwable, Source<Map.Entry<ThingId, JsonObject>, NotUsed>>()
.map(thing -> Pair.create(thingId, thing))
.recoverWithRetries(1, new PFBuilder<Throwable, Source<Pair<ThingId, JsonObject>, NotUsed>>()
.match(Throwable.class, error -> {
log.error("Unexpected response for SudoRetrieveThing via cache: <{}>", thingId, error);
return Source.empty();
Expand Down
Expand Up @@ -43,6 +43,7 @@
import akka.japi.pf.PFBuilder;
import akka.stream.javadsl.Flow;
import akka.stream.javadsl.Source;
import akka.stream.javadsl.SubSource;

/**
* Flow mapping write models to write results via the search persistence.
Expand Down Expand Up @@ -103,22 +104,18 @@ public static MongoSearchUpdaterFlow of(final MongoDatabase database,
* @param maxBulkSize How many writes to perform in one bulk.
* @return the sink.
*/
public Flow<Source<AbstractWriteModel, NotUsed>, WriteResultAndErrors, NotUsed> start(
public Flow<SubSource<AbstractWriteModel, NotUsed>, WriteResultAndErrors, NotUsed> start(
final boolean shouldAcknowledge,
final int parallelism,
final int maxBulkSize) {

final Flow<Source<AbstractWriteModel, NotUsed>, List<AbstractWriteModel>, NotUsed> batchFlow =
Flow.<Source<AbstractWriteModel, NotUsed>>create()
.flatMapConcat(source -> source.grouped(maxBulkSize));

final Flow<List<AbstractWriteModel>, WriteResultAndErrors, NotUsed> writeFlow =
Flow.<List<AbstractWriteModel>>create()
return Flow.<SubSource<AbstractWriteModel, NotUsed>>create()
.flatMapConcat(source -> source.grouped(maxBulkSize)
.flatMapConcat(searchUpdateMapper::processWriteModels)
.flatMapMerge(parallelism, writeModels ->
executeBulkWrite(shouldAcknowledge, writeModels).async(DISPATCHER_NAME, parallelism));

return batchFlow.via(writeFlow);
.flatMapMerge(parallelism, writeModels -> executeBulkWrite(shouldAcknowledge, writeModels))
.mergeSubstreamsWithParallelism(maxBulkSize)
.async(MongoSearchUpdaterFlow.DISPATCHER_NAME, Math.max(parallelism, maxBulkSize))
);
}

private Source<WriteResultAndErrors, NotUsed> executeBulkWrite(final boolean shouldAcknowledge,
Expand Down Expand Up @@ -146,14 +143,17 @@ private Source<WriteResultAndErrors, NotUsed> executeBulkWrite(final boolean sho

if (LOGGER.isDebugEnabled()) {
LOGGER.withCorrelationId(bulkWriteCorrelationId)
.debug("Executing BulkWrite containing [<thingId>:{correlationIds}:<filter>]: {}", pairs.stream()
.map(writeModelPair -> "<" + writeModelPair.first().getMetadata().getThingId() + ">:" +
writeModelPair.first().getMetadata().getEventsCorrelationIds()
.stream()
.collect(Collectors.joining(",", "{", "}"))
+ ":<" + extractFilterBson(writeModelPair.second()) + ">"
)
.toList());
.debug("Executing BulkWrite containing <{}> things: [<thingId>:{correlationIds}:<filter>]: {}",
pairs.size(),
pairs.stream()
.map(writeModelPair -> "<" + writeModelPair.first().getMetadata().getThingId() +
">:" +
writeModelPair.first().getMetadata().getEventsCorrelationIds()
.stream()
.collect(Collectors.joining(",", "{", "}"))
+ ":<" + extractFilterBson(writeModelPair.second()) + ">"
)
.toList());

// only log the complete MongoDB writeModels on "TRACE" as they get really big and almost crash the logging backend:
LOGGER.withCorrelationId(bulkWriteCorrelationId)
Expand Down
Expand Up @@ -40,6 +40,7 @@
import akka.stream.javadsl.RestartSource;
import akka.stream.javadsl.Sink;
import akka.stream.javadsl.Source;
import akka.stream.javadsl.SubSource;

/**
* Stream from the cache of Thing changes to the persistence of the search index.
Expand Down Expand Up @@ -117,17 +118,18 @@ public static SearchUpdaterStream of(final UpdaterConfig updaterConfig,
* @return kill-switch to terminate the stream.
*/
public KillSwitch start(final ActorContext actorContext) {
final Source<Source<AbstractWriteModel, NotUsed>, NotUsed> restartSource = createRestartSource();
final Sink<Source<AbstractWriteModel, NotUsed>, NotUsed> restartSink = createRestartSink();
final Source<SubSource<AbstractWriteModel, NotUsed>, NotUsed> restartSource = createRestartSource();
final Sink<SubSource<AbstractWriteModel, NotUsed>, NotUsed> restartSink = createRestartSink();

return restartSource.viaMat(KillSwitches.single(), Keep.right())
.toMat(restartSink, Keep.left())
.run(actorContext.system());
}

private Source<Source<AbstractWriteModel, NotUsed>, NotUsed> createRestartSource() {
private Source<SubSource<AbstractWriteModel, NotUsed>, NotUsed> createRestartSource() {
final var streamConfig = updaterConfig.getStreamConfig();
final StreamStageConfig retrievalConfig = streamConfig.getRetrievalConfig();
final PersistenceStreamConfig persistenceConfig = streamConfig.getPersistenceConfig();

final var acknowledgedSource =
ChangeQueueActor.createSource(changeQueueActor, true, streamConfig.getWriteInterval());
Expand All @@ -137,9 +139,10 @@ private Source<Source<AbstractWriteModel, NotUsed>, NotUsed> createRestartSource
final var mergedSource =
acknowledgedSource.mergePrioritized(unacknowledgedSource, 1023, 1, true);

final Source<Source<AbstractWriteModel, NotUsed>, NotUsed> source =
final Source<SubSource<AbstractWriteModel, NotUsed>, NotUsed> source =
mergedSource.via(filterMapKeysByBlockedNamespaces())
.via(enforcementFlow.create(retrievalConfig.getParallelism()))
.via(enforcementFlow.create(
retrievalConfig.getParallelism(), persistenceConfig.getMaxBulkSize()))
.map(writeModelSource ->
writeModelSource.via(blockNamespaceFlow(SearchUpdaterStream::namespaceOfWriteModel)));

Expand All @@ -150,14 +153,14 @@ private Source<Source<AbstractWriteModel, NotUsed>, NotUsed> createRestartSource
() -> source);
}

private Sink<Source<AbstractWriteModel, NotUsed>, NotUsed> createRestartSink() {
private Sink<SubSource<AbstractWriteModel, NotUsed>, NotUsed> createRestartSink() {
final var streamConfig = updaterConfig.getStreamConfig();
final PersistenceStreamConfig persistenceConfig = streamConfig.getPersistenceConfig();

final int parallelism = persistenceConfig.getParallelism();
final int maxBulkSize = persistenceConfig.getMaxBulkSize();
final String logName = "SearchUpdaterStream/BulkWriteResult";
final Sink<Source<AbstractWriteModel, NotUsed>, NotUsed> sink =
final Sink<SubSource<AbstractWriteModel, NotUsed>, NotUsed> sink =
mongoSearchUpdaterFlow.start(true, parallelism, maxBulkSize)
.via(bulkWriteResultAckFlow.start(persistenceConfig.getAckDelay()))
.log(logName)
Expand Down
Expand Up @@ -75,7 +75,7 @@ public Source<WriteResultAndErrors, NotUsed> write(final Thing thing,
final AbstractWriteModel writeModel = EnforcedThingMapper.toWriteModel(thingJson, enforcer, policyRevision, -1,
null);

return Source.single(Source.single(writeModel))
return Source.single(Source.single(writeModel).groupBy(1, foo -> 0))
.via(mongoSearchUpdaterFlow.start(false, 1, 1));
}

Expand All @@ -101,7 +101,7 @@ public Source<WriteResultAndErrors, NotUsed> delete(final ThingId thingId, final
*/
private Source<WriteResultAndErrors, NotUsed> delete(final Metadata metadata) {
final AbstractWriteModel writeModel = ThingDeleteModel.of(metadata);
return Source.single(Source.single(writeModel))
return Source.single(Source.single(writeModel).groupBy(1, foo -> 0))
.via(mongoSearchUpdaterFlow.start(false, 1, 1));
}

Expand Down
Expand Up @@ -71,7 +71,6 @@
import akka.actor.Props;
import akka.actor.ReceiveTimeout;
import akka.cluster.sharding.ShardRegion;
import akka.japi.pf.ReceiveBuilder;
import akka.pattern.Patterns;
import akka.stream.javadsl.Sink;

Expand All @@ -86,9 +85,6 @@ final class ThingUpdater extends AbstractActorWithStashWithTimers {
AcknowledgementRequest.of(DittoAcknowledgementLabel.SEARCH_PERSISTED);

static final String FORCE_UPDATE_AFTER_START = "FORCE_UPDATE_AFTER_START";
static final String BULK_RESULT_AWAITING_TIMEOUT = "BULK_RESULT_AWAITING_TIMEOUT";

private static final Duration BULK_RESULT_AWAITING_TIMEOUT_DURATION = Duration.ofMinutes(2);

private static final Counter INCORRECT_PATCH_UPDATE_COUNT = DittoMetrics.counter("search_incorrect_patch_updates");
private static final Counter UPDATE_FAILURE_COUNT = DittoMetrics.counter("search_update_failures");
Expand Down Expand Up @@ -209,53 +205,25 @@ private void recoveryComplete(final AbstractWriteModel writeModel) {
}

private Receive recoveredBehavior() {
return recoveredBehavior("recoveredBehavior");
}

private Receive recoveredBehavior(final String currentBehaviorHintForLogging) {
return shutdownBehaviour.createReceive()
.match(ThingEvent.class, this::processThingEvent)
.match(AbstractWriteModel.class, this::onNextWriteModel)
.match(PolicyReferenceTag.class, this::processPolicyReferenceTag)
.match(UpdateThing.class, this::updateThing)
.match(UpdateThingResponse.class, this::processUpdateThingResponse)
.match(BulkWriteComplete.class, bwc -> log
.withCorrelationId(bwc.getBulkWriteCorrelationId().orElse(null))
.debug("Received BulkWriteComplete")
)
.match(ReceiveTimeout.class, this::stopThisActor)
.matchEquals(FORCE_UPDATE_AFTER_START, this::forceUpdateAfterStart)
.matchAny(m -> {
log.warning("Unknown message in '{}': {}", currentBehaviorHintForLogging, m);
log.warning("Unknown message in 'recoveredBehavior': {}", m);
unhandled(m);
})
.build();
}

private Receive recoveredAwaitingBulkWriteResultBehavior() {
getTimers().startSingleTimer(BULK_RESULT_AWAITING_TIMEOUT, BULK_RESULT_AWAITING_TIMEOUT,
BULK_RESULT_AWAITING_TIMEOUT_DURATION);
return ReceiveBuilder.create()
.match(AbstractWriteModel.class, writeModel -> {
log.info("Stashing received writeModel while being in " +
"'recoveredAwaitingBulkWriteResultBehavior': <{}> with revision: <{}>",
writeModel.getClass().getSimpleName(), writeModel.getMetadata().getThingRevision());
stash();
})
.match(BulkWriteComplete.class, bulkWriteComplete -> {
log.withCorrelationId(bulkWriteComplete.getBulkWriteCorrelationId().orElse(null))
.debug("Got confirmation bulkWrite was performed - switching to 'recoveredBehavior'");
getTimers().cancel(BULK_RESULT_AWAITING_TIMEOUT);
getContext().become(recoveredBehavior(), true);
unstashAll();
})
.matchEquals(BULK_RESULT_AWAITING_TIMEOUT, bra -> {
log.warning("Encountered timeout being in 'recoveredAwaitingBulkWriteResultBehavior' - " +
"switching back to 'recoveredBehavior'");
getTimers().cancel(BULK_RESULT_AWAITING_TIMEOUT);
getContext().become(recoveredBehavior(), true);
unstashAll();
})
.build()
.orElse(recoveredBehavior("recoveredAwaitingBulkWriteResultBehavior"));
}

private void matchAnyDuringRecovery(final Object message) {
log.debug("Stashing during initialization: <{}>", message);
stash();
Expand Down Expand Up @@ -318,9 +286,6 @@ private void onNextWriteModel(final AbstractWriteModel nextWriteModel) {
}
getSender().tell(mongoWriteModel, getSelf());
lastWriteModel = nextWriteModel;

log.debug("Responded with mongoWriteModel - switching to 'recoveredAwaitingBulkWriteResultBehavior'");
getContext().become(recoveredAwaitingBulkWriteResultBehavior(), true);
}

private Optional<BsonDiff> tryComputeDiff(final BsonDocument minuend, final BsonDocument subtrahend) {
Expand Down
8 changes: 1 addition & 7 deletions thingsearch/service/src/main/resources/things-search.conf
Expand Up @@ -312,14 +312,8 @@ thing-cache-dispatcher {
}

search-updater-dispatcher {
type = "Dispatcher"
type = "PinnedDispatcher"
executor = "thread-pool-executor"
thread-pool-executor {
keep-alive-time = 60s
fixed-pool-size = off
max-pool-size-max = 256
max-pool-size-max = ${?SEARCH_UPDATER_DISPATCHER_POOL_SIZE_MAX}
}
}

include "things-search-extension.conf"

0 comments on commit a7af56e

Please sign in to comment.