Skip to content

Commit

Permalink
fix overzealous start of stream.
Browse files Browse the repository at this point in the history
DefaultStreamSupervisor started stream without waiting for
stream interval.

Signed-off-by: Cai Yufei (INST/ECS1) <yufei.cai@bosch-si.com>
  • Loading branch information
yufei-cai committed Jan 21, 2019
1 parent ca711fd commit 3b513ed
Showing 1 changed file with 14 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
import akka.japi.pf.DeciderBuilder;
import akka.japi.pf.ReceiveBuilder;
import akka.pattern.PatternsCS;
import akka.stream.DelayOverflowStrategy;
import akka.stream.Materializer;
import akka.stream.javadsl.Sink;
import akka.stream.javadsl.Source;
Expand Down Expand Up @@ -179,7 +180,13 @@ public final class DefaultStreamSupervisor<E> extends AbstractActor {
activityCheck = scheduleActivityCheck(streamConsumerSettings);
lastStreamStartOrStop = Instant.now();

computeAndScheduleNextStreamTrigger(null);
// schedule first stream after delay
PatternsCS.pipe(
computeNextStreamTriggerSource(null)
.delay(streamConsumerSettings.getStreamInterval(), DelayOverflowStrategy.backpressure())
.runWith(Sink.head(), materializer),
getContext().dispatcher()
).to(getSelf());
}

/**
Expand Down Expand Up @@ -328,6 +335,11 @@ private void computeAndScheduleNextStreamTrigger(@Nullable final Instant lastSuc
}

private CompletionStage<StreamTrigger> computeNextStreamTrigger(@Nullable final Instant lastSuccessfulQueryEnd) {
return computeNextStreamTriggerSource(lastSuccessfulQueryEnd).runWith(Sink.head(), materializer);
}

private Source<StreamTrigger, NotUsed> computeNextStreamTriggerSource(
@Nullable final Instant lastSuccessfulQueryEnd) {
final Instant now = Instant.now();

final Source<Instant, NotUsed> queryStartSource;
Expand Down Expand Up @@ -359,7 +371,7 @@ private CompletionStage<StreamTrigger> computeNextStreamTrigger(@Nullable final
return StreamTrigger.calculateStreamTrigger(now, queryStart, startOffset, streamInterval);
});

return triggerSource.runWith(Sink.head(), materializer);
return triggerSource;
}

private void scheduleStream(final StreamTrigger streamTrigger) {
Expand Down

0 comments on commit 3b513ed

Please sign in to comment.