Skip to content

Commit

Permalink
Little change
Browse files Browse the repository at this point in the history
Signed-off-by: David Schwilk <david.schwilk@bosch.io>
  • Loading branch information
DerSchwilk committed Oct 25, 2021
1 parent d12fdb5 commit d8c14d1
Showing 1 changed file with 5 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
*/
package org.eclipse.ditto.thingsearch.service.persistence.write.streaming;

import java.util.Collection;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
Expand Down Expand Up @@ -118,15 +119,15 @@ public Flow<Source<AbstractWriteModel, NotUsed>, WriteResultAndErrors, NotUsed>
return writeModels;
}
})
.mapAsync(1, this::toIncrementalMongo)
.mapAsync(1, MongoSearchUpdaterFlow::toIncrementalMongo)
.flatMapMerge(parallelism, writeModels ->
executeBulkWrite(shouldAcknowledge, writeModels).async(DISPATCHER_NAME, parallelism));

return batchFlow.via(writeFlow);
}

private CompletionStage<List<Pair<AbstractWriteModel, WriteModel<BsonDocument>>>> toIncrementalMongo(
final List<AbstractWriteModel> models) {
private static CompletionStage<List<Pair<AbstractWriteModel, WriteModel<BsonDocument>>>> toIncrementalMongo(
final Collection<AbstractWriteModel> models) {

final var writeModelFutures = models.stream()
.<CompletionStage<List<Pair<AbstractWriteModel, WriteModel<BsonDocument>>>>>map(model ->
Expand Down Expand Up @@ -158,7 +159,7 @@ private CompletionStage<List<Pair<AbstractWriteModel, WriteModel<BsonDocument>>>
}

private Source<WriteResultAndErrors, NotUsed> executeBulkWrite(final boolean shouldAcknowledge,
final List<Pair<AbstractWriteModel, WriteModel<BsonDocument>>> pairs) {
final Collection<Pair<AbstractWriteModel, WriteModel<BsonDocument>>> pairs) {

final MongoCollection<BsonDocument> theCollection;
if (shouldAcknowledge) {
Expand Down

0 comments on commit d8c14d1

Please sign in to comment.