Skip to content

Commit

Permalink
More test
Browse files Browse the repository at this point in the history
  • Loading branch information
Zdenek Tison committed Jun 3, 2020
1 parent 696bd13 commit 9e19949
Show file tree
Hide file tree
Showing 7 changed files with 121 additions and 37 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import java.util.Optional;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;

/**
Expand Down Expand Up @@ -198,10 +199,12 @@ public static Builder newBuilder() {
return new Builder();
}

private final long stepMs;
@Getter private final long stepMs;
@Getter private final long durationMs;
@Getter private final long allowedTimestampSkew;

private final TimestampSupplier timestampSupplier;
private final long[] stepDiffs;
private final long allowedTimestampSkew;
private final AtomicLong lastRotate;
private final AtomicInteger rotatesToInitialize;
private final AtomicLong watermark;
Expand All @@ -216,7 +219,7 @@ public static Builder newBuilder() {
TimestampSupplier supplier,
WatermarkIdlePolicy idlePolicy) {
super(idlePolicy);

this.durationMs = durationMs;
this.stepMs = stepMs;
this.allowedTimestampSkew = allowedTimestampSkew;
this.timestampSupplier = Objects.requireNonNull(supplier);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import java.util.Optional;
import lombok.Getter;

/** Watermark configuration */
public abstract class WatermarkConfiguration implements Serializable {
private static final long serialVersionUID = 1L;

Expand All @@ -32,33 +33,35 @@ public abstract class WatermarkConfiguration implements Serializable {
@Getter private WatermarkIdlePolicyFactory watermarkIdlePolicyFactory;
@Getter private WatermarkEstimatorFactory watermarkEstimatorFactory;

public WatermarkConfiguration(Map<String, Object> cfg) {
protected WatermarkConfiguration(Map<String, Object> cfg) {
this.cfg = cfg;
configure();
}

private void configure() {
this.watermarkIdlePolicyFactory =
/** Returns configuration key with added watermark config prefix */
public static String cfgKey(String cfgName) {
if (cfgName.startsWith(CFG_PREFIX)) {
return cfgName;
}
return CFG_PREFIX + cfgName;
}

/** Returns default idle policy factory when none user's factory is provided */
protected abstract WatermarkIdlePolicyFactory getDefaultIdlePolicyFactory();

/** Returns default estimator factory when none user's factory is provided */
protected abstract WatermarkEstimatorFactory getDefaultEstimatorFactory();

public void configure() {
watermarkIdlePolicyFactory =
Optional.ofNullable(cfg.get(cfgKey("idle-policy-factory")))
.map(Object::toString)
.map(cls -> Classpath.newInstance(cls, WatermarkIdlePolicyFactory.class))
.orElse(getDefaultIdlePolicyFactory());

this.watermarkEstimatorFactory =
watermarkEstimatorFactory =
Optional.ofNullable(cfg.get(cfgKey("estimator-factory")))
.map(Object::toString)
.map(cls -> Classpath.newInstance(cls, WatermarkEstimatorFactory.class))
.orElse(getDefaultEstimatorFactory());
}

protected abstract WatermarkIdlePolicyFactory getDefaultIdlePolicyFactory();

protected abstract WatermarkEstimatorFactory getDefaultEstimatorFactory();

public static String cfgKey(String configuration) {
if (configuration.startsWith(CFG_PREFIX)) {
return configuration;
}
return CFG_PREFIX + configuration;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@ static class CustomWatermarkConfiguration extends WatermarkConfiguration {

public CustomWatermarkConfiguration(Map<String, Object> cfg) {
super(cfg);
configure();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ public class KafkaWatermarkConfiguration extends WatermarkConfiguration {

public KafkaWatermarkConfiguration(Map<String, Object> cfg) {
super(cfg);
configure();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,8 +74,14 @@ class PubSubAccessor extends AbstractStorage implements DataAccessor {
.map(Integer::valueOf)
.orElse(storage.getDefaultSubscriptionAckDeadlineSeconds());

long defaultEstimateDuration =
storage.getDefaultWatermarkEstimateDuration() == null
? subscriptionAckDeadline * 1000
: storage.getDefaultWatermarkEstimateDuration();

watermarkConfiguration =
new PubSubWatermarkConfiguration(cfg, storage, subscriptionAckDeadline);
new PubSubWatermarkConfiguration(
cfg, defaultEstimateDuration, storage.getDefaultAllowedTimestampSkew());

Preconditions.checkArgument(!Strings.isNullOrEmpty(project), "Authority cannot be empty");
Preconditions.checkArgument(!Strings.isNullOrEmpty(topic), "Path has to represent topic");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,19 +21,24 @@
import cz.o2.proxima.direct.time.NotProgressingWatermarkIdlePolicy;
import cz.o2.proxima.direct.time.UnboundedOutOfOrdernessWatermarkEstimator;
import cz.o2.proxima.direct.time.WatermarkConfiguration;
import cz.o2.proxima.time.WatermarkEstimator;
import cz.o2.proxima.time.WatermarkEstimatorFactory;
import cz.o2.proxima.time.WatermarkIdlePolicyFactory;
import java.util.HashMap;
import java.util.Map;

/** Watermark configuration for PubSub */
public class PubSubWatermarkConfiguration extends WatermarkConfiguration {
private final PubSubStorage storage;
private final long subscriptionAckDeadline;

private final long defaultEstimateDuration;
private final long defaultAllowedTimestampSkew;

public PubSubWatermarkConfiguration(
Map<String, Object> cfg, PubSubStorage storage, long subscriptionAckDeadline) {
Map<String, Object> cfg, long defaultEstimateDuration, long defaultAllowedTimestampSkew) {
super(cfg);
this.storage = storage;
this.subscriptionAckDeadline = subscriptionAckDeadline;
this.defaultEstimateDuration = defaultEstimateDuration;
this.defaultAllowedTimestampSkew = defaultAllowedTimestampSkew;
configure();
}

@Override
Expand All @@ -43,20 +48,32 @@ protected WatermarkIdlePolicyFactory getDefaultIdlePolicyFactory() {

@Override
protected WatermarkEstimatorFactory getDefaultEstimatorFactory() {
// Preserves backward compatible behaviour by adding default PubSub storage configuration to
// config.
if (!cfg.containsKey(cfgKey(ALLOWED_TIMESTAMP_SKEW)) && storage != null) {
cfg.put(cfgKey(ALLOWED_TIMESTAMP_SKEW), storage.getDefaultAllowedTimestampSkew());
}
return new PubSubWatermarkEstimatorFactory(
defaultEstimateDuration, defaultAllowedTimestampSkew);
}

static class PubSubWatermarkEstimatorFactory implements WatermarkEstimatorFactory {

if (!cfg.containsKey(cfgKey(ESTIMATE_DURATION_MS)) && storage != null) {
cfg.put(
cfgKey(ESTIMATE_DURATION_MS),
storage.getDefaultWatermarkEstimateDuration() == null
? subscriptionAckDeadline * 1000
: storage.getDefaultWatermarkEstimateDuration());
private final long defaultEstimateDuration;
private final long defaultAllowedTimestampSkew;

PubSubWatermarkEstimatorFactory(
long defaultEstimateDuration, long defaultAllowedTimestampSkew) {
this.defaultEstimateDuration = defaultEstimateDuration;
this.defaultAllowedTimestampSkew = defaultAllowedTimestampSkew;
}

return new UnboundedOutOfOrdernessWatermarkEstimator.Factory();
@Override
public WatermarkEstimator create(
Map<String, Object> cfg, WatermarkIdlePolicyFactory idlePolicyFactory) {
// Preserves backward compatible behaviour by adding default values to config.
HashMap<String, Object> newConfig = new HashMap<>(cfg);

newConfig.putIfAbsent(cfgKey(ESTIMATE_DURATION_MS), defaultEstimateDuration);
newConfig.putIfAbsent(cfgKey(ALLOWED_TIMESTAMP_SKEW), defaultAllowedTimestampSkew);

return new UnboundedOutOfOrdernessWatermarkEstimator.Factory()
.create(newConfig, idlePolicyFactory);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
/**
* Copyright 2017-2020 O2 Czech Republic, a.s.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package cz.o2.proxima.direct.pubsub;

import static java.util.Collections.emptyMap;
import static org.junit.Assert.*;

import cz.o2.proxima.direct.time.NotProgressingWatermarkIdlePolicy;
import cz.o2.proxima.direct.time.UnboundedOutOfOrdernessWatermarkEstimator;
import cz.o2.proxima.time.WatermarkIdlePolicyFactory;
import java.util.Map;
import org.junit.Test;

public class PubSubWatermarkConfigurationTest {

private static final long ESTIMATE_DURATION = 20_000L;
private static final long TIMESTAMP_SKEW = 10_000L;

@Test
public void testConfigureDefault() {
// Check backward compatibility with legacy behaviour
Map<String, Object> cfg = emptyMap();
PubSubWatermarkConfiguration configuration =
new PubSubWatermarkConfiguration(cfg, ESTIMATE_DURATION, TIMESTAMP_SKEW);
WatermarkIdlePolicyFactory policyFactory = configuration.getWatermarkIdlePolicyFactory();
UnboundedOutOfOrdernessWatermarkEstimator estimator =
(UnboundedOutOfOrdernessWatermarkEstimator)
configuration.getWatermarkEstimatorFactory().create(cfg, policyFactory);
NotProgressingWatermarkIdlePolicy policy =
(NotProgressingWatermarkIdlePolicy) policyFactory.create(cfg);

assertNotNull(estimator);
assertNotNull(policy);
assertEquals(ESTIMATE_DURATION, estimator.getDurationMs());
assertEquals(TIMESTAMP_SKEW, estimator.getAllowedTimestampSkew());
}

@Test
public void testConfigureEstimator() {}
}

0 comments on commit 9e19949

Please sign in to comment.