Skip to content

Commit

Permalink
fixed EnforcementFlow parallelism
Browse files Browse the repository at this point in the history
Signed-off-by: Thomas Jaeckle <thomas.jaeckle@bosch.io>
  • Loading branch information
thjaeckle committed Jun 14, 2021
1 parent f5493ff commit bfbba80
Show file tree
Hide file tree
Showing 2 changed files with 3 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -168,6 +168,7 @@ public Flow<Map<ThingId, Metadata>, Source<AbstractWriteModel, NotUsed>, NotUsed
computeWriteModel(metadataRef, responseMap.get(metadataRef.getThingId()))
.async(MongoSearchUpdaterFlow.DISPATCHER_NAME, parallelism)
)
.withAttributes(Attributes.inputBuffer(1, 1))
);
})
.withAttributes(Attributes.inputBuffer(1, 1));
Expand All @@ -180,6 +181,7 @@ private Source<Map<ThingId, SudoRetrieveThingResponse>, NotUsed> sudoRetrieveThi
return Source.fromIterator(changeMap.entrySet()::iterator)
.flatMapMerge(parallelism, entry -> sudoRetrieveThing(entry)
.async(MongoSearchUpdaterFlow.DISPATCHER_NAME, parallelism))
.withAttributes(Attributes.inputBuffer(1, 1))
.<Map<ThingId, SudoRetrieveThingResponse>>fold(new HashMap<>(), (map, response) -> {
map.put(getThingId(response), response);
return map;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ public void updateThingAndPolicyRevisions() {
sinkProbe.ensureSubscription();
sourceProbe.ensureSubscription();
sinkProbe.request(1);
assertThat(sourceProbe.expectRequest()).isEqualTo(16); // TODO TJ this changed from 1 to 16 with latest Akka update .. why??
assertThat(sourceProbe.expectRequest()).isEqualTo(1);
sourceProbe.sendNext(inputMap);
sourceProbe.sendComplete();

Expand Down

0 comments on commit bfbba80

Please sign in to comment.