Skip to content

Commit

Permalink
This closes #2725: Revert "Replace OutputTimeFn UDF with TimestampCom…
Browse files Browse the repository at this point in the history
…biner enum"
  • Loading branch information
kennknowles committed Apr 27, 2017
2 parents 3bcbba1 + 83d41fc commit b82cd24
Show file tree
Hide file tree
Showing 53 changed files with 1,130 additions and 740 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -43,8 +43,8 @@
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.transforms.windowing.FixedWindows;
import org.apache.beam.sdk.transforms.windowing.IntervalWindow;
import org.apache.beam.sdk.transforms.windowing.OutputTimeFns;
import org.apache.beam.sdk.transforms.windowing.Sessions;
import org.apache.beam.sdk.transforms.windowing.TimestampCombiner;
import org.apache.beam.sdk.transforms.windowing.Window;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
Expand Down Expand Up @@ -313,7 +313,7 @@ public void processElement(ProcessContext c) {
userEvents
.apply("WindowIntoSessions", Window.<KV<String, Integer>>into(
Sessions.withGapDuration(Duration.standardMinutes(options.getSessionGap())))
.withTimestampCombiner(TimestampCombiner.END_OF_WINDOW))
.withOutputTimeFn(OutputTimeFns.outputAtEndOfWindow()))
// For this use, we care only about the existence of the session, not any particular
// information aggregated over it, so the following is an efficient way to do that.
.apply(Combine.perKey(x -> 0))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@
import org.apache.beam.sdk.transforms.Combine.KeyedCombineFn;
import org.apache.beam.sdk.transforms.CombineWithContext.KeyedCombineFnWithContext;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.transforms.windowing.TimestampCombiner;
import org.apache.beam.sdk.transforms.windowing.OutputTimeFn;
import org.apache.beam.sdk.util.CoderUtils;
import org.apache.beam.sdk.util.CombineFnUtil;
import org.apache.beam.sdk.util.state.BagState;
Expand Down Expand Up @@ -150,10 +150,10 @@ public <KeyT, ValueT> MapState<KeyT, ValueT> bindMap(
}

@Override
public <W extends BoundedWindow> WatermarkHoldState bindWatermark(
StateTag<? super K, WatermarkHoldState> address,
TimestampCombiner timestampCombiner) {
return new ApexWatermarkHoldState<>(namespace, address, timestampCombiner);
public <W extends BoundedWindow> WatermarkHoldState<W> bindWatermark(
StateTag<? super K, WatermarkHoldState<W>> address,
OutputTimeFn<? super W> outputTimeFn) {
return new ApexWatermarkHoldState<>(namespace, address, outputTimeFn);
}

@Override
Expand Down Expand Up @@ -269,16 +269,16 @@ public void write(T input) {
}

private final class ApexWatermarkHoldState<W extends BoundedWindow>
extends AbstractState<Instant> implements WatermarkHoldState {
extends AbstractState<Instant> implements WatermarkHoldState<W> {

private final TimestampCombiner timestampCombiner;
private final OutputTimeFn<? super W> outputTimeFn;

public ApexWatermarkHoldState(
StateNamespace namespace,
StateTag<?, WatermarkHoldState> address,
TimestampCombiner timestampCombiner) {
StateTag<?, WatermarkHoldState<W>> address,
OutputTimeFn<? super W> outputTimeFn) {
super(namespace, address, InstantCoder.of());
this.timestampCombiner = timestampCombiner;
this.outputTimeFn = outputTimeFn;
}

@Override
Expand All @@ -294,7 +294,7 @@ public Instant read() {
@Override
public void add(Instant outputTime) {
Instant combined = read();
combined = (combined == null) ? outputTime : timestampCombiner.combine(combined, outputTime);
combined = (combined == null) ? outputTime : outputTimeFn.combine(combined, outputTime);
writeValue(combined);
}

Expand All @@ -313,8 +313,8 @@ public Boolean read() {
}

@Override
public TimestampCombiner getTimestampCombiner() {
return timestampCombiner;
public OutputTimeFn<? super W> getOutputTimeFn() {
return outputTimeFn;
}

}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.windowing.FixedWindows;
import org.apache.beam.sdk.transforms.windowing.TimestampCombiner;
import org.apache.beam.sdk.transforms.windowing.OutputTimeFns;
import org.apache.beam.sdk.transforms.windowing.Window;
import org.apache.beam.sdk.values.KV;
import org.joda.time.Duration;
Expand Down Expand Up @@ -83,12 +83,12 @@ public void test() throws Exception {
);

p.apply(Read.from(new TestSource(data, new Instant(5000))))
.apply(
Window.<String>into(FixedWindows.of(Duration.standardSeconds(1)))
.withTimestampCombiner(TimestampCombiner.LATEST))
.apply(Window.<String>into(FixedWindows.of(Duration.standardSeconds(1)))
.withOutputTimeFn(OutputTimeFns.outputAtLatestInputTimestamp()))
.apply(Count.<String>perElement())
.apply(ParDo.of(new KeyedByTimestamp<KV<String, Long>>()))
.apply(ParDo.of(new EmbeddedCollector()));
.apply(ParDo.of(new EmbeddedCollector()))
;

ApexRunnerResult result = (ApexRunnerResult) p.run();
result.getApexDAG();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@
import org.apache.beam.sdk.transforms.Sum;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.transforms.windowing.IntervalWindow;
import org.apache.beam.sdk.transforms.windowing.TimestampCombiner;
import org.apache.beam.sdk.transforms.windowing.OutputTimeFns;
import org.apache.beam.sdk.util.state.BagState;
import org.apache.beam.sdk.util.state.CombiningState;
import org.apache.beam.sdk.util.state.GroupingState;
Expand Down Expand Up @@ -65,13 +65,14 @@ public class ApexStateInternalsTest {
"sumInteger", VarIntCoder.of(), Sum.ofIntegers());
private static final StateTag<Object, BagState<String>> STRING_BAG_ADDR =
StateTags.bag("stringBag", StringUtf8Coder.of());
private static final StateTag<Object, WatermarkHoldState>
private static final StateTag<Object, WatermarkHoldState<BoundedWindow>>
WATERMARK_EARLIEST_ADDR =
StateTags.watermarkStateInternal("watermark", TimestampCombiner.EARLIEST);
private static final StateTag<Object, WatermarkHoldState> WATERMARK_LATEST_ADDR =
StateTags.watermarkStateInternal("watermark", TimestampCombiner.LATEST);
private static final StateTag<Object, WatermarkHoldState> WATERMARK_EOW_ADDR =
StateTags.watermarkStateInternal("watermark", TimestampCombiner.END_OF_WINDOW);
StateTags.watermarkStateInternal("watermark", OutputTimeFns.outputAtEarliestInputTimestamp());
private static final StateTag<Object, WatermarkHoldState<BoundedWindow>>
WATERMARK_LATEST_ADDR =
StateTags.watermarkStateInternal("watermark", OutputTimeFns.outputAtLatestInputTimestamp());
private static final StateTag<Object, WatermarkHoldState<BoundedWindow>> WATERMARK_EOW_ADDR =
StateTags.watermarkStateInternal("watermark", OutputTimeFns.outputAtEndOfWindow());

private ApexStateInternals<String> underTest;

Expand Down Expand Up @@ -226,7 +227,7 @@ public void testMergeCombiningValueIntoNewNamespace() throws Exception {

@Test
public void testWatermarkEarliestState() throws Exception {
WatermarkHoldState value =
WatermarkHoldState<BoundedWindow> value =
underTest.state(NAMESPACE_1, WATERMARK_EARLIEST_ADDR);

// State instances are cached, but depend on the namespace.
Expand All @@ -250,7 +251,7 @@ public void testWatermarkEarliestState() throws Exception {

@Test
public void testWatermarkLatestState() throws Exception {
WatermarkHoldState value =
WatermarkHoldState<BoundedWindow> value =
underTest.state(NAMESPACE_1, WATERMARK_LATEST_ADDR);

// State instances are cached, but depend on the namespace.
Expand All @@ -274,7 +275,7 @@ public void testWatermarkLatestState() throws Exception {

@Test
public void testWatermarkEndOfWindowState() throws Exception {
WatermarkHoldState value = underTest.state(NAMESPACE_1, WATERMARK_EOW_ADDR);
WatermarkHoldState<BoundedWindow> value = underTest.state(NAMESPACE_1, WATERMARK_EOW_ADDR);

// State instances are cached, but depend on the namespace.
assertEquals(value, underTest.state(NAMESPACE_1, WATERMARK_EOW_ADDR));
Expand All @@ -291,7 +292,7 @@ public void testWatermarkEndOfWindowState() throws Exception {

@Test
public void testWatermarkStateIsEmpty() throws Exception {
WatermarkHoldState value =
WatermarkHoldState<BoundedWindow> value =
underTest.state(NAMESPACE_1, WATERMARK_EARLIEST_ADDR);

assertThat(value.isEmpty().read(), Matchers.is(true));
Expand All @@ -305,9 +306,9 @@ public void testWatermarkStateIsEmpty() throws Exception {

@Test
public void testMergeEarliestWatermarkIntoSource() throws Exception {
WatermarkHoldState value1 =
WatermarkHoldState<BoundedWindow> value1 =
underTest.state(NAMESPACE_1, WATERMARK_EARLIEST_ADDR);
WatermarkHoldState value2 =
WatermarkHoldState<BoundedWindow> value2 =
underTest.state(NAMESPACE_2, WATERMARK_EARLIEST_ADDR);

value1.add(new Instant(3000));
Expand All @@ -324,11 +325,11 @@ public void testMergeEarliestWatermarkIntoSource() throws Exception {

@Test
public void testMergeLatestWatermarkIntoSource() throws Exception {
WatermarkHoldState value1 =
WatermarkHoldState<BoundedWindow> value1 =
underTest.state(NAMESPACE_1, WATERMARK_LATEST_ADDR);
WatermarkHoldState value2 =
WatermarkHoldState<BoundedWindow> value2 =
underTest.state(NAMESPACE_2, WATERMARK_LATEST_ADDR);
WatermarkHoldState value3 =
WatermarkHoldState<BoundedWindow> value3 =
underTest.state(NAMESPACE_3, WATERMARK_LATEST_ADDR);

value1.add(new Instant(3000));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,15 +28,16 @@
import org.apache.beam.sdk.common.runner.v1.RunnerApi;
import org.apache.beam.sdk.common.runner.v1.RunnerApi.Components;
import org.apache.beam.sdk.common.runner.v1.RunnerApi.FunctionSpec;
import org.apache.beam.sdk.common.runner.v1.RunnerApi.OutputTime;
import org.apache.beam.sdk.common.runner.v1.RunnerApi.SdkFunctionSpec;
import org.apache.beam.sdk.transforms.windowing.TimestampCombiner;
import org.apache.beam.sdk.transforms.windowing.OutputTimeFn;
import org.apache.beam.sdk.transforms.windowing.OutputTimeFns;
import org.apache.beam.sdk.transforms.windowing.Trigger;
import org.apache.beam.sdk.transforms.windowing.Window.ClosingBehavior;
import org.apache.beam.sdk.transforms.windowing.WindowFn;
import org.apache.beam.sdk.util.SerializableUtils;
import org.apache.beam.sdk.util.WindowingStrategy;
import org.apache.beam.sdk.util.WindowingStrategy.AccumulationMode;
import org.apache.beam.sdk.util.WindowingStrategy.CombineWindowFnOutputTimes;
import org.joda.time.Duration;

/** Utilities for working with {@link WindowingStrategy WindowingStrategies}. */
Expand Down Expand Up @@ -114,42 +115,11 @@ public static ClosingBehavior fromProto(RunnerApi.ClosingBehavior proto) {
}
}

public static RunnerApi.OutputTime toProto(TimestampCombiner timestampCombiner) {
switch(timestampCombiner) {
case EARLIEST:
return OutputTime.EARLIEST_IN_PANE;
case END_OF_WINDOW:
return OutputTime.END_OF_WINDOW;
case LATEST:
return OutputTime.LATEST_IN_PANE;
default:
throw new IllegalArgumentException(
String.format(
"Unknown %s: %s",
TimestampCombiner.class.getSimpleName(),
timestampCombiner));
}
}

public static TimestampCombiner timestampCombinerFromProto(RunnerApi.OutputTime proto) {
switch (proto) {
case EARLIEST_IN_PANE:
return TimestampCombiner.EARLIEST;
case END_OF_WINDOW:
return TimestampCombiner.END_OF_WINDOW;
case LATEST_IN_PANE:
return TimestampCombiner.LATEST;
case UNRECOGNIZED:
default:
// Whether or not it is proto that cannot recognize it (due to the version of the
// generated code we link to) or the switch hasn't been updated to handle it,
// the situation is the same: we don't know what this OutputTime means
throw new IllegalArgumentException(
String.format(
"Cannot convert unknown %s to %s: %s",
RunnerApi.OutputTime.class.getCanonicalName(),
OutputTime.class.getCanonicalName(),
proto));
public static RunnerApi.OutputTime toProto(OutputTimeFn<?> outputTimeFn) {
if (outputTimeFn instanceof WindowingStrategy.CombineWindowFnOutputTimes) {
return toProto(((CombineWindowFnOutputTimes<?>) outputTimeFn).getOutputTimeFn());
} else {
return OutputTimeFns.toProto(outputTimeFn);
}
}

Expand Down Expand Up @@ -207,7 +177,7 @@ public static RunnerApi.WindowingStrategy toProto(

RunnerApi.WindowingStrategy.Builder windowingStrategyProto =
RunnerApi.WindowingStrategy.newBuilder()
.setOutputTime(toProto(windowingStrategy.getTimestampCombiner()))
.setOutputTime(toProto(windowingStrategy.getOutputTimeFn()))
.setAccumulationMode(toProto(windowingStrategy.getMode()))
.setClosingBehavior(toProto(windowingStrategy.getClosingBehavior()))
.setAllowedLateness(windowingStrategy.getAllowedLateness().getMillis())
Expand Down Expand Up @@ -259,7 +229,7 @@ public static RunnerApi.WindowingStrategy toProto(
"WindowFn");

WindowFn<?, ?> windowFn = (WindowFn<?, ?>) deserializedWindowFn;
TimestampCombiner timestampCombiner = timestampCombinerFromProto(proto.getOutputTime());
OutputTimeFn<?> outputTimeFn = OutputTimeFns.fromProto(proto.getOutputTime());
AccumulationMode accumulationMode = fromProto(proto.getAccumulationMode());
Trigger trigger = Triggers.fromProto(proto.getTrigger());
ClosingBehavior closingBehavior = fromProto(proto.getClosingBehavior());
Expand All @@ -269,7 +239,7 @@ public static RunnerApi.WindowingStrategy toProto(
.withAllowedLateness(allowedLateness)
.withMode(accumulationMode)
.withTrigger(trigger)
.withTimestampCombiner(timestampCombiner)
.withOutputTimeFn(outputTimeFn)
.withClosingBehavior(closingBehavior);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
import org.apache.beam.sdk.common.runner.v1.RunnerApi;
import org.apache.beam.sdk.transforms.windowing.AfterWatermark;
import org.apache.beam.sdk.transforms.windowing.FixedWindows;
import org.apache.beam.sdk.transforms.windowing.TimestampCombiner;
import org.apache.beam.sdk.transforms.windowing.OutputTimeFns;
import org.apache.beam.sdk.transforms.windowing.Trigger;
import org.apache.beam.sdk.transforms.windowing.Window.ClosingBehavior;
import org.apache.beam.sdk.transforms.windowing.WindowFn;
Expand Down Expand Up @@ -68,14 +68,14 @@ public static Iterable<ToProtoAndBackSpec> data() {
.withMode(AccumulationMode.ACCUMULATING_FIRED_PANES)
.withTrigger(REPRESENTATIVE_TRIGGER)
.withAllowedLateness(Duration.millis(71))
.withTimestampCombiner(TimestampCombiner.EARLIEST)),
.withOutputTimeFn(OutputTimeFns.outputAtEarliestInputTimestamp())),
toProtoAndBackSpec(
WindowingStrategy.of(REPRESENTATIVE_WINDOW_FN)
.withClosingBehavior(ClosingBehavior.FIRE_IF_NON_EMPTY)
.withMode(AccumulationMode.DISCARDING_FIRED_PANES)
.withTrigger(REPRESENTATIVE_TRIGGER)
.withAllowedLateness(Duration.millis(93))
.withTimestampCombiner(TimestampCombiner.LATEST)));
.withOutputTimeFn(OutputTimeFns.outputAtLatestInputTimestamp())));
}

@Parameter(0)
Expand Down
Loading

0 comments on commit b82cd24

Please sign in to comment.