Skip to content

Commit

Permalink
Fix EnforcementFlowTest.
Browse files Browse the repository at this point in the history
Signed-off-by: Yufei Cai <yufei.cai@bosch.io>
  • Loading branch information
yufei-cai committed Jul 14, 2022
1 parent 81d4ea3 commit f65a793
Showing 1 changed file with 8 additions and 11 deletions.
Expand Up @@ -596,9 +596,7 @@ public void onlyApplyRelevantEvents() {
}

@Test
@Ignore("TODO")
public void thereCanBeMultipleUpdatesPerBulk() {
Assume.assumeThat(System.getProperty("build.environment"), Matchers.not(Matchers.equalTo("Github")));
new TestKit(system) {{
final DittoHeaders headers = DittoHeaders.empty();
final PolicyId policyId = PolicyId.of("policy:id");
Expand All @@ -620,15 +618,6 @@ public void thereCanBeMultipleUpdatesPerBulk() {
final EnforcementFlow underTest = EnforcementFlow.of(system, streamConfig, thingsProbe.ref(),
policiesProbe.ref(), system.getScheduler());

materializeTestProbes(underTest, 16, 16);

sinkProbe.ensureSubscription();
sourceProbe.ensureSubscription();
sinkProbe.request(4);
assertThat(sourceProbe.expectRequest()).isEqualTo(16);
changeMaps.forEach(sourceProbe::sendNext);
sourceProbe.sendComplete();

final var sudoRetrievePolicyResponse =
SudoRetrievePolicyResponse.of(policyId, policy, DittoHeaders.empty());
policiesProbe.setAutoPilot(new TestActor.AutoPilot() {
Expand Down Expand Up @@ -660,6 +649,14 @@ public TestActor.AutoPilot run(final ActorRef sender, final Object msg) {
}
});

materializeTestProbes(underTest, 16, 16);
sinkProbe.ensureSubscription();
sourceProbe.ensureSubscription();
sinkProbe.request(4);
assertThat(sourceProbe.expectRequest()).isEqualTo(16);
changeMaps.forEach(sourceProbe::sendNext);
sourceProbe.sendComplete();

final var list = sinkProbe.expectNext(FiniteDuration.apply(60, "s"));
assertThat(list.size()).isEqualTo(changeMaps.get(0).size());
sinkProbe.expectComplete();
Expand Down

0 comments on commit f65a793

Please sign in to comment.