Skip to content

Commit

Permalink
Merge pull request #366: [proxima-direct-core] #156 Added support for…
Browse files Browse the repository at this point in the history
… configurable watermark estimator and idle policy for commit log readers
  • Loading branch information
je-ik authored Jun 5, 2020
2 parents dc3dbe8 + b6b3d5a commit e14db8b
Show file tree
Hide file tree
Showing 40 changed files with 2,468 additions and 567 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,13 +21,14 @@
import cz.o2.proxima.beam.core.BeamDataOperator;
import cz.o2.proxima.direct.core.DirectDataOperator;
import cz.o2.proxima.direct.storage.InMemStorage;
import cz.o2.proxima.direct.storage.InMemStorage.WatermarkEstimator;
import cz.o2.proxima.repository.AttributeDescriptor;
import cz.o2.proxima.repository.EntityDescriptor;
import cz.o2.proxima.repository.Repository;
import cz.o2.proxima.repository.config.ConfigUtils;
import cz.o2.proxima.storage.StreamElement;
import cz.o2.proxima.storage.commitlog.Position;
import cz.o2.proxima.time.WatermarkEstimator;
import cz.o2.proxima.time.Watermarks;
import cz.o2.proxima.util.ExceptionUtils;
import java.net.URI;
import java.util.UUID;
Expand Down Expand Up @@ -125,7 +126,7 @@ private void testReadingFromCommitLog(boolean eventTime, boolean stopAtCurrent)
write("key1");
write("key2");
if (!stopAtCurrent) {
watermark.set(WatermarkEstimator.MAX_TIMESTAMP);
watermark.set(Watermarks.MAX_WATERMARK);
}
if (!err.take()) {
throw new AssertionError(caught.get());
Expand All @@ -150,13 +151,13 @@ private void write(String key) {

private WatermarkEstimator asWatermarkEstimator(AtomicLong watermark) {
return new WatermarkEstimator() {
@Override
public void accumulate(StreamElement element) {}

@Override
public long getWatermark() {
return watermark.get();
}

@Override
public void setMinWatermark(long minWatermark) {}
};
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
/**
* Copyright 2017-2020 O2 Czech Republic, a.s.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package cz.o2.proxima.time;

import com.google.common.base.Preconditions;
import cz.o2.proxima.storage.StreamElement;

/** The watermark estimator base class. */
public abstract class AbstractWatermarkEstimator implements WatermarkEstimator {
/** Used idle policy */
private final WatermarkIdlePolicy idlePolicy;

/** If estimator is in the idle state */
private boolean isIdle = false;

protected AbstractWatermarkEstimator(WatermarkIdlePolicy idlePolicy) {
Preconditions.checkNotNull(idlePolicy, "Idle policy must be provided");
this.idlePolicy = idlePolicy;
}

/**
* Estimates current watermark.
*
* @return the estimated watermark.
*/
protected abstract long estimateWatermark();

/**
* Updates the watermark estimate according to given stream element.
*
* @param element a stream element.
*/
protected abstract void updateWatermark(StreamElement element);

/** Signals that streaming source is idle. */
@Override
public void idle() {
isIdle = true;
idlePolicy.idle(getWatermark());
}

/**
* Updates the watermark estimate according to the given stream element.
*
* @param element a stream element.
*/
@Override
public final void update(StreamElement element) {
isIdle = false;
idlePolicy.update(element);
updateWatermark(element);
}

/**
* Returns monotonically increasing estimated watermark.
*
* @return the watermark estimate.
*/
@Override
public long getWatermark() {
if (isIdle) {
return Math.max(estimateWatermark(), idlePolicy.getIdleWatermark());
}
return estimateWatermark();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
/**
* Copyright 2017-2020 O2 Czech Republic, a.s.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package cz.o2.proxima.time;

import cz.o2.proxima.storage.StreamElement;

/** Watermark estimator wrapper for partitioned sources. */
public interface PartitionedWatermarkEstimator extends WatermarkSupplier {

/**
* Returns estimated watermark across all partitions.
*
* @return the watermark estimate.
*/
long getWatermark();

/**
* Updates the partition watermark estimate according to the given stream element.
*
* @param element a stream element.
*/
default void update(int partition, StreamElement element) {}

/** Signals that a given partition is idle. */
default void idle(int partition) {}
}
111 changes: 0 additions & 111 deletions core/src/main/java/cz/o2/proxima/time/VectorClock.java

This file was deleted.

Loading

0 comments on commit e14db8b

Please sign in to comment.