Skip to content

Commit

Permalink
Add test to validate preservation of bulk size.
Browse files Browse the repository at this point in the history
Signed-off-by: Yufei Cai <yufei.cai@bosch.io>
  • Loading branch information
yufei-cai committed Mar 20, 2022
1 parent b6f9cfc commit 4a98d57
Showing 1 changed file with 77 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -671,18 +671,91 @@ public TestActor.AutoPilot run(final ActorRef sender, final Object msg) {
}};
}


@Test
public void thereCanBeMultipleUpdatesPerBulk() {
new TestKit(system) {{
final DittoHeaders headers = DittoHeaders.empty();
final PolicyId policyId = PolicyId.of("policy:id");
final Thing thing = Thing.newBuilder().setPolicyId(policyId).build();
final var policy = Policy.newBuilder(policyId).setRevision(1).build();

final List<List<Metadata>> changeMaps = List.of(IntStream.range(1, 5).mapToObj(i -> {
final ThingId thingId = ThingId.of("thing:" + i);
final Thing ithThing = thing.toBuilder().setId(thingId).setRevision(i).build();
final List<ThingEvent<?>> events = List.of(ThingModified.of(ithThing, i, null, headers, null));
return Metadata.of(thingId, i, policyId, 1L, events, null, null);
}).toList());

final TestProbe thingsProbe = TestProbe.apply(system);
final TestProbe policiesProbe = TestProbe.apply(system);

final StreamConfig streamConfig = DefaultStreamConfig.of(ConfigFactory.parseString(
"stream.ask-with-retry.ask-timeout=30s"));
final EnforcementFlow underTest = EnforcementFlow.of(system, streamConfig, thingsProbe.ref(),
policiesProbe.ref(), system.getScheduler());

materializeTestProbes(underTest, 16, 1, 16);

sinkProbe.ensureSubscription();
sourceProbe.ensureSubscription();
sinkProbe.request(4);
assertThat(sourceProbe.expectRequest()).isEqualTo(16);
changeMaps.forEach(sourceProbe::sendNext);
sourceProbe.sendComplete();

final var sudoRetrievePolicyResponse =
SudoRetrievePolicyResponse.of(policyId, policy, DittoHeaders.empty());
policiesProbe.setAutoPilot(new TestActor.AutoPilot() {
@Override
public TestActor.AutoPilot run(final ActorRef sender, final Object msg) {
sender.tell(sudoRetrievePolicyResponse, policiesProbe.ref());
return keepRunning();
}
});

thingsProbe.setAutoPilot(new TestActor.AutoPilot() {
@Override
public TestActor.AutoPilot run(final ActorRef sender, final Object msg) {
if (msg instanceof final SudoRetrieveThing command) {
final var thingId = (ThingId) command.getEntityId();
final int i = Integer.parseInt(thingId.getName());
final var response = SudoRetrieveThingResponse.of(
thing.toBuilder()
.setId(thingId)
.setRevision(i)
.setAttribute(JsonPointer.of("x"), JsonValue.of(i))
.build()
.toJson(FieldType.all()),
command.getDittoHeaders()
);
sender.tell(response, getRef());
}
return keepRunning();
}
});

final var list = sinkProbe.expectNext(FiniteDuration.apply(60, "s"));
assertThat(list.size()).isEqualTo(changeMaps.get(0).size());
sinkProbe.expectComplete();
}};
}

private void materializeTestProbes(final EnforcementFlow enforcementFlow) {
materializeTestProbes(enforcementFlow, 16, 1, 1);
}


private void materializeTestProbes(final EnforcementFlow enforcementFlow, final int parallelism,
final int bulkShardCount, final int bulkSize) {
final var source = TestSource.<Collection<Metadata>>probe(system);
final var sink = TestSink.<List<AbstractWriteModel>>probe(system);
final var runnableGraph = enforcementFlow.create(source, 16, 1, 1, system)
.mapConcat(x -> x)
.map(List::of)
final var runnableGraph = enforcementFlow.create(source, parallelism, bulkShardCount, bulkSize, system)
.mergeSubstreams()
.viaMat(KillSwitches.single(), Keep.both())
.toMat(sink, Keep.both());
final var materializedValue = runnableGraph.run(() -> system);
sourceProbe = materializedValue.first().first();
sinkProbe = materializedValue.second();
}

}

0 comments on commit 4a98d57

Please sign in to comment.