Skip to content

Commit

Permalink
Improve event scheduling
Browse files Browse the repository at this point in the history
  • Loading branch information
Marko Topolnik committed Jul 28, 2020
1 parent 905d7c5 commit ce2e40f
Showing 1 changed file with 30 additions and 31 deletions.
Expand Up @@ -37,52 +37,50 @@
*/
public class LongStreamSourceP extends AbstractProcessor {

private static final long SOURCE_THROUGHPUT_REPORTING_PERIOD_SECONDS = 10;

private static final long REPORT_PERIOD_NANOS = SECONDS.toNanos(SOURCE_THROUGHPUT_REPORTING_PERIOD_SECONDS);
private static final long SOURCE_THROUGHPUT_REPORTING_PERIOD_SECONDS = 5;
private static final long HICCUP_REPORT_THRESHOLD_MILLIS = 10;
private static final long NANOS_PER_SECOND = SECONDS.toNanos(1);

private final long nanoTimeMillisToCurrentTimeMillis = determineTimeOffset();
private final long startTime;
private final long itemsPerSecond;
private final long eventsPerSecond;
private final ILogger logger = Logger.getLogger(LongStreamSourceP.class);
private final long wmGranularity;
private final long wmOffset;
private long startNanoTime;
private long globalProcessorIndex;
private long totalParallelism;
private long emitPeriod;

private final AppendableTraverser<Object> traverser = new AppendableTraverser<>(2);
private long emitSchedule;
private long lastReport;
private long counterAtLastReport;
private long lastReportNanos;
private long valueAtLastReport;
private long lastCallNanos;
private long emittedCount;
private long valueToEmit;
private long lastEmittedWm;
private long nowNanos;
private long nowNanoTime;

LongStreamSourceP(
long startTime,
long itemsPerSecond,
long eventsPerSecond,
EventTimePolicy<? super Long> eventTimePolicy
) {
this.wmGranularity = eventTimePolicy.watermarkThrottlingFrameSize();
this.wmOffset = eventTimePolicy.watermarkThrottlingFrameOffset();
this.startTime = MILLISECONDS.toNanos(startTime + nanoTimeMillisToCurrentTimeMillis);
this.itemsPerSecond = itemsPerSecond;
this.startNanoTime = MILLISECONDS.toNanos(startTime + nanoTimeMillisToCurrentTimeMillis);
this.eventsPerSecond = eventsPerSecond;
}

@Override
protected void init(@Nonnull Context context) {
totalParallelism = context.totalParallelism();
globalProcessorIndex = context.globalProcessorIndex();
emitPeriod = SECONDS.toNanos(1) * totalParallelism / itemsPerSecond;
lastCallNanos = lastReport = emitSchedule =
startTime + SECONDS.toNanos(1) * globalProcessorIndex / itemsPerSecond;
startNanoTime += NANOS_PER_SECOND * globalProcessorIndex / eventsPerSecond;
valueToEmit = globalProcessorIndex;
lastCallNanos = lastReportNanos = startNanoTime;
}

@Override
public boolean complete() {
nowNanos = System.nanoTime();
nowNanoTime = System.nanoTime();
emitEvents();
detectAndReportHiccup();
if (logger.isFineEnabled()) {
Expand All @@ -92,11 +90,12 @@ public boolean complete() {
}

private void emitEvents() {
while (emitFromTraverser(traverser) && emitSchedule <= nowNanos) {
long emitValuesUpTo = (nowNanoTime - startNanoTime) * eventsPerSecond / NANOS_PER_SECOND;
while (emitFromTraverser(traverser) && valueToEmit < emitValuesUpTo) {
long emitSchedule = startNanoTime + valueToEmit * NANOS_PER_SECOND / eventsPerSecond;
long timestamp = NANOSECONDS.toMillis(emitSchedule) - nanoTimeMillisToCurrentTimeMillis;
traverser.append(jetEvent(timestamp, emittedCount * totalParallelism + globalProcessorIndex));
emittedCount++;
emitSchedule += emitPeriod;
traverser.append(jetEvent(timestamp, valueToEmit));
valueToEmit += totalParallelism;
if (timestamp >= lastEmittedWm + wmGranularity) {
long wmToEmit = timestamp - (timestamp % wmGranularity) + wmOffset;
traverser.append(new Watermark(wmToEmit));
Expand All @@ -106,24 +105,24 @@ private void emitEvents() {
}

private void detectAndReportHiccup() {
long millisSinceLastCall = NANOSECONDS.toMillis(nowNanos - lastCallNanos);
long millisSinceLastCall = NANOSECONDS.toMillis(nowNanoTime - lastCallNanos);
if (millisSinceLastCall > HICCUP_REPORT_THRESHOLD_MILLIS) {
logger.info(String.format("*** Source #%d hiccup: %,d ms%n", globalProcessorIndex, millisSinceLastCall));
}
lastCallNanos = nowNanos;
lastCallNanos = nowNanoTime;
}

private void reportThroughput() {
long nanosSinceLastReport = nowNanos - lastReport;
if (nanosSinceLastReport < REPORT_PERIOD_NANOS) {
long nanosSinceLastReport = nowNanoTime - lastReportNanos;
if (nanosSinceLastReport < SECONDS.toNanos(SOURCE_THROUGHPUT_REPORTING_PERIOD_SECONDS)) {
return;
}
lastReport = nowNanos;
long itemCountSinceLastReport = emittedCount - counterAtLastReport;
counterAtLastReport = emittedCount;
logger.fine(String.format("p%d: %,.0f items/second%n",
lastReportNanos = nowNanoTime;
long localItemCountSinceLastReport = (valueToEmit - valueAtLastReport) / totalParallelism;
valueAtLastReport = valueToEmit;
logger.fine(String.format("p%d: %,.0f items/second",
globalProcessorIndex,
itemCountSinceLastReport / ((double) nanosSinceLastReport / SECONDS.toNanos(1))));
localItemCountSinceLastReport / ((double) nanosSinceLastReport / NANOS_PER_SECOND)));
}

private static long determineTimeOffset() {
Expand Down

0 comments on commit ce2e40f

Please sign in to comment.