Skip to content

Commit

Permalink
Watermark unified
Browse files Browse the repository at this point in the history
  • Loading branch information
Zdenek Tison committed May 26, 2020
1 parent 59e5c33 commit adfb35b
Show file tree
Hide file tree
Showing 14 changed files with 487 additions and 227 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,13 +21,14 @@
import cz.o2.proxima.beam.core.BeamDataOperator;
import cz.o2.proxima.direct.core.DirectDataOperator;
import cz.o2.proxima.direct.storage.InMemStorage;
import cz.o2.proxima.direct.storage.InMemStorage.WatermarkEstimator;
import cz.o2.proxima.repository.AttributeDescriptor;
import cz.o2.proxima.repository.EntityDescriptor;
import cz.o2.proxima.repository.Repository;
import cz.o2.proxima.repository.config.ConfigUtils;
import cz.o2.proxima.storage.StreamElement;
import cz.o2.proxima.storage.commitlog.Position;
import cz.o2.proxima.time.WatermarkEstimator;
import cz.o2.proxima.time.Watermarks;
import cz.o2.proxima.util.ExceptionUtils;
import java.net.URI;
import java.util.UUID;
Expand Down Expand Up @@ -125,7 +126,7 @@ private void testReadingFromCommitLog(boolean eventTime, boolean stopAtCurrent)
write("key1");
write("key2");
if (!stopAtCurrent) {
watermark.set(WatermarkEstimator.MAX_TIMESTAMP);
watermark.set(Watermarks.MAX_WATERMARK);
}
if (!err.take()) {
throw new AssertionError(caught.get());
Expand All @@ -151,7 +152,7 @@ private void write(String key) {
private WatermarkEstimator asWatermarkEstimator(AtomicLong watermark) {
return new WatermarkEstimator() {
@Override
public void accumulate(StreamElement element) {}
public void onElement(StreamElement element) {}

@Override
public long getWatermark() {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
/**
* Copyright 2017-2020 O2 Czech Republic, a.s.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package cz.o2.proxima.time;

import com.google.common.base.Preconditions;
import cz.o2.proxima.storage.StreamElement;

import java.util.Optional;

public abstract class AbstractWatermarkEstimator implements WatermarkEstimator {
private final IdlePolicy idlePolicy;
private boolean isIdle = false;

public AbstractWatermarkEstimator(IdlePolicy idlePolicy) {
Preconditions.checkNotNull(idlePolicy, "IdlePolicy must be provided");
this.idlePolicy = idlePolicy;
}

protected abstract long estimateWatermark();

protected abstract void updateWatermark(StreamElement element);

@Override
public void onIdle() {
isIdle = true;
idlePolicy.onIdle(getWatermark());
}

@Override
public void onElement(StreamElement element) {
isIdle = false;
updateWatermark(element);
}

@Override
public long getWatermark() {
if (isIdle) {
final Optional<Long> idleWatermark = idlePolicy.getIdleWatermark();
if (idleWatermark.isPresent()) {
return Math.max(estimateWatermark(), idleWatermark.get());
}
}
return estimateWatermark();
}
}
29 changes: 29 additions & 0 deletions core/src/main/java/cz/o2/proxima/time/IdlePolicy.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
/**
* Copyright 2017-2020 O2 Czech Republic, a.s.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package cz.o2.proxima.time;

import java.io.Serializable;
import java.util.Optional;

/** Policy supplies watermark on idle source. */
public interface IdlePolicy extends Serializable {

default void onIdle(long currentWatermark) {}

default Optional<Long> getIdleWatermark() {
return Optional.empty();
}
}
174 changes: 11 additions & 163 deletions core/src/main/java/cz/o2/proxima/time/WatermarkEstimator.java
Original file line number Diff line number Diff line change
Expand Up @@ -15,178 +15,26 @@
*/
package cz.o2.proxima.time;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import cz.o2.proxima.annotations.Internal;
import cz.o2.proxima.storage.StreamElement;
import java.io.Serializable;
import java.util.Arrays;
import java.util.Objects;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import lombok.extern.slf4j.Slf4j;

/**
* Estimator of watermark based on timestamps of flowing elements. The estimator tries to estimate
* when it reaches near-realtime consumption of events and holds the watermark until then.
*/
@Slf4j
@Internal
public class WatermarkEstimator implements WatermarkSupplier {

private static final long serialVersionUID = 1L;

@VisibleForTesting static final long MIN_WATERMARK = Long.MIN_VALUE + 365 * 86400000L;

@Internal
@FunctionalInterface
public interface TimestampSupplier extends Serializable {
long get();
}

/** Builder of the {@link WatermarkEstimator}. */
public static class Builder {

private final long durationMs;
private final long stepMs;
private final long allowedTimestampSkew;
private final long minWatermark;
private final TimestampSupplier timestampSupplier;

Builder() {
this(10000, 200, 200, MIN_WATERMARK, System::currentTimeMillis);
}

private Builder(
long durationMs,
long stepMs,
long allowedTimestampSkew,
long minWatermark,
TimestampSupplier timestampSupplier) {

this.durationMs = durationMs;
this.stepMs = stepMs;
this.allowedTimestampSkew = allowedTimestampSkew;
this.minWatermark = minWatermark;
this.timestampSupplier = timestampSupplier;
}

public Builder withDurationMs(long durationMs) {
return new Builder(durationMs, stepMs, allowedTimestampSkew, minWatermark, timestampSupplier);
}

public Builder withStepMs(long stepMs) {
return new Builder(durationMs, stepMs, allowedTimestampSkew, minWatermark, timestampSupplier);
}

public Builder withAllowedTimestampSkew(long allowedTimestampSkew) {
return new Builder(durationMs, stepMs, allowedTimestampSkew, minWatermark, timestampSupplier);
}

public Builder withMinWatermark(long minWatermark) {
return new Builder(durationMs, stepMs, allowedTimestampSkew, minWatermark, timestampSupplier);
}

public Builder withTimestampSupplier(TimestampSupplier timestampSupplier) {
return new Builder(durationMs, stepMs, allowedTimestampSkew, minWatermark, timestampSupplier);
}

public WatermarkEstimator build() {
return new WatermarkEstimator(
durationMs, stepMs, allowedTimestampSkew, minWatermark, timestampSupplier);
}
}

public static Builder newBuilder() {
return new Builder();
}

private final long stepMs;
private final TimestampSupplier timestampSupplier;
private final long[] stepDiffs;
private final long allowedTimestampSkew;
private final AtomicLong lastRotate;
private final AtomicInteger rotatesToInitialize;
private final AtomicLong watermark;
private final AtomicLong lastStatLogged = new AtomicLong();

@VisibleForTesting
WatermarkEstimator(
long durationMs,
long stepMs,
long allowedTimestampSkew,
long minWatermark,
TimestampSupplier supplier) {

this.stepMs = stepMs;
this.allowedTimestampSkew = allowedTimestampSkew;
this.timestampSupplier = Objects.requireNonNull(supplier);
this.watermark = new AtomicLong(minWatermark);
Preconditions.checkArgument(durationMs > 0, "durationMs must be positive");
Preconditions.checkArgument(stepMs > 0, "stepMs must be positive");
Preconditions.checkArgument(
durationMs / stepMs * stepMs == durationMs, "durationMs must be divisible by stepMs");
stepDiffs = new long[(int) (durationMs / stepMs) + 1];
for (int i = 0; i < stepDiffs.length; i++) {
stepDiffs[i] = 0;
}
rotatesToInitialize = new AtomicInteger(stepDiffs.length - 1);
lastRotate = new AtomicLong(supplier.get() - stepMs);
}
/** Estimates current watermark according to incoming stream elements. */
public interface WatermarkEstimator extends WatermarkSupplier, Serializable {

/**
* Accumulate given timestamp at current processing time.
* Estimates the current watermark.
*
* @param stamp the stamp to accumulate
* @return the watermark estimate.
*/
public void add(long stamp) {
rotateIfNeeded();
long diff = timestampSupplier.get() - stamp;
if (stepDiffs[0] < diff) {
stepDiffs[0] = diff;
}
}
long getWatermark();

/**
* Retrieve watermark estimate.
* Updates the watermark estimate according to the given stream element.
*
* @return the watermark estimate
* @param element a stream element.
*/
@Override
public long getWatermark() {
rotateIfNeeded();
if (rotatesToInitialize.get() <= 0) {
boolean isProcessingBacklog =
Arrays.stream(stepDiffs).anyMatch(diff -> diff > allowedTimestampSkew);
if (!isProcessingBacklog) {
watermark.accumulateAndGet(timestampSupplier.get() - allowedTimestampSkew, Math::max);
}
}
return watermark.get();
}

private void rotateIfNeeded() {
long now = timestampSupplier.get();
if (now > lastRotate.get() + stepMs) {
rotate(now, (int) ((now - lastRotate.get()) / stepMs));
}
if (log.isDebugEnabled() && now - lastStatLogged.get() > 10000) {
log.debug(
"Watermark delay stats: {} with allowedTimestampSkew {}",
Arrays.toString(stepDiffs),
allowedTimestampSkew);
lastStatLogged.set(now);
}
}
default void onElement(StreamElement element) {};

private void rotate(long now, int moveCount) {
moveCount = Math.min(stepDiffs.length - 1, moveCount);
System.arraycopy(stepDiffs, 0, stepDiffs, moveCount, stepDiffs.length - moveCount);
if (rotatesToInitialize.get() > 0) {
rotatesToInitialize.addAndGet(-moveCount);
}
for (int i = 0; i < moveCount; i++) {
stepDiffs[i] = 0;
}
lastRotate.set(now);
}
/** Idles watermark estimation. */
default void onIdle() {}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
package cz.o2.proxima.time;

import java.util.Map;

public interface WatermarkEstimatorFactory {
WatermarkEstimator create(Map<String, Object> config);
}
4 changes: 2 additions & 2 deletions core/src/main/java/cz/o2/proxima/time/WatermarkSupplier.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,15 +18,15 @@
import cz.o2.proxima.annotations.Internal;
import java.io.Serializable;

/** */
/** Supplies the current watermark to clients. */
@Internal
@FunctionalInterface
public interface WatermarkSupplier extends Serializable {

/**
* Retrieve watermark.
*
* @return the watermark estimate
* @return the current watermark.
*/
long getWatermark();
}
21 changes: 21 additions & 0 deletions core/src/main/java/cz/o2/proxima/time/Watermarks.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
/**
* Copyright 2017-2020 O2 Czech Republic, a.s.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package cz.o2.proxima.time;

public class Watermarks {
public static final long MIN_WATERMARK = Long.MIN_VALUE;
public static final long MAX_WATERMARK = Long.MAX_VALUE;
}
Loading

0 comments on commit adfb35b

Please sign in to comment.