Skip to content

Commit

Permalink
Add mapping.partition-buffer-size to connectivity.conf; reduce mappin…
Browse files Browse the repository at this point in the history
…g buffer size across the board; fix compile error

Signed-off-by: Yufei Cai <yufei.cai@bosch-si.com>
  • Loading branch information
yufei-cai committed Feb 24, 2020
1 parent dac28b2 commit 5ab8fcf
Show file tree
Hide file tree
Showing 3 changed files with 9 additions and 6 deletions.
Expand Up @@ -148,14 +148,17 @@ ditto {
mapping {

# the buffer size used for the queue in the message mapping processor actor
buffer-size = 500
buffer-size = 32
buffer-size = ${?CONNECTIVITY_SIGNAL_ENRICHMENT_BUFFER_SIZE}

# parallelism to use for processing messages in parallel in the message mapping processor actor
# when configured too low, throughput of messages which perform blocking operations will be bad
parallelism = 64
parallelism = 16
parallelism = ${?CONNECTIVITY_SIGNAL_ENRICHMENT_PARALLELISM}

partition-buffer-size = 16
partition-buffer-size = ${?CONNECTIVITY_SIGNAL_ENRICHMENT_PARTITION_BUFFER_SIZE}

javascript {
# the maximum script size in bytes of a mapping script to run
# prevents loading big JS dependencies into the script (e.g. jQuery which has ~250kB)
Expand Down
Expand Up @@ -220,7 +220,7 @@ private <T> Flow<T, T, NotUsed> partitionById(final Graph<FlowShape<T, T>, NotUs
(builder, partition, merge) -> {
for (int i = 0; i < parallelismWithSpecialLane; i++) {
builder.from(partition.out(i))
.via(builder.add(new ErrorRespondingBuffer<T>(partitionBufferSize)))
.via(builder.add(ErrorRespondingBuffer.<T>withSize(partitionBufferSize)))
.via(builder.add(flowToPartition))
.toInlet(merge.in(i));
}
Expand Down
Expand Up @@ -59,7 +59,7 @@ public void bufferRespondsWithTooManyRequestsExceptionIfBufferIsFull() {
Source.from(Arrays.asList(msg1, msg2, msg3));

sourceUnderTest
.via(new ErrorRespondingBuffer<>(1))
.via(ErrorRespondingBuffer.withSize(1))
.runWith(TestSink.probe(actorSystem), ActorMaterializer.create(actorSystem))
.request(1)
.expectNext(msg1);
Expand All @@ -74,7 +74,7 @@ public void bufferDropsNonWithSenderMessageIfBufferIsFull() {
final Source<Integer, NotUsed> sourceUnderTest = Source.from(Arrays.asList(1, 2, 4));

sourceUnderTest
.via(new ErrorRespondingBuffer<>(1))
.via(ErrorRespondingBuffer.withSize(1))
.runWith(TestSink.probe(actorSystem), ActorMaterializer.create(actorSystem))
.request(1)
.expectNext(1);
Expand All @@ -96,7 +96,7 @@ public void bufferWorksOffIncomingMessagesWhenTheyAreAllConsumed() {
Source.from(Arrays.asList(msg1, msg2, msg3));

sourceUnderTest
.via(new ErrorRespondingBuffer<>(1))
.via(ErrorRespondingBuffer.withSize(1))
.runWith(TestSink.probe(actorSystem), ActorMaterializer.create(actorSystem))
.request(3)
.expectNext(msg1, msg2, msg3);
Expand Down

0 comments on commit 5ab8fcf

Please sign in to comment.