Skip to content

Commit

Permalink
Merge 6fb986b into 66b3ce0
Browse files Browse the repository at this point in the history
  • Loading branch information
tgroh committed May 11, 2017
2 parents 66b3ce0 + 6fb986b commit 6bef461
Show file tree
Hide file tree
Showing 2 changed files with 28 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
package org.apache.beam.sdk.transforms;

import org.apache.beam.sdk.annotations.Internal;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.transforms.windowing.ReshuffleTrigger;
import org.apache.beam.sdk.transforms.windowing.TimestampCombiner;
import org.apache.beam.sdk.transforms.windowing.Window;
Expand All @@ -27,7 +26,6 @@
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.TimestampedValue;
import org.apache.beam.sdk.values.WindowingStrategy;
import org.joda.time.Duration;

/**
* <b>For internal use only; no backwards compatibility guarantees.</b>
Expand Down Expand Up @@ -69,7 +67,7 @@ public PCollection<KV<K, V>> expand(PCollection<KV<K, V>> input) {
.triggering(new ReshuffleTrigger<>())
.discardingFiredPanes()
.withTimestampCombiner(TimestampCombiner.EARLIEST)
.withAllowedLateness(Duration.millis(BoundedWindow.TIMESTAMP_MAX_VALUE.getMillis()));
.withAllowedLateness(input.getWindowingStrategy().getAllowedLateness());

return input.apply(rewindow)
.apply("ReifyOriginalTimestamps", ReifyTimestamps.<K, V>inValues())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,11 @@
import org.apache.beam.sdk.coders.KvCoder;
import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.apache.beam.sdk.coders.VarIntCoder;
import org.apache.beam.sdk.coders.VarLongCoder;
import org.apache.beam.sdk.testing.PAssert;
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.testing.TestStream;
import org.apache.beam.sdk.testing.UsesTestStream;
import org.apache.beam.sdk.testing.ValidatesRunner;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.transforms.windowing.FixedWindows;
Expand Down Expand Up @@ -263,4 +266,28 @@ public void testReshuffleAfterSlidingWindows() {

pipeline.run();
}

@Test
@Category({ValidatesRunner.class, UsesTestStream.class})
public void testReshuffleWithTimestampsStreaming() {
TestStream<Long> stream =
TestStream.create(VarLongCoder.of())
.advanceWatermarkTo(new Instant(0L).plus(Duration.standardDays(48L)))
.addElements(
TimestampedValue.of(0L, new Instant(0L)),
TimestampedValue.of(1L, new Instant(0L).plus(Duration.standardDays(48L))),
TimestampedValue.of(
2L, BoundedWindow.TIMESTAMP_MAX_VALUE.minus(Duration.standardDays(48L))))
.advanceWatermarkToInfinity();
PCollection<KV<String, Long>> input =
pipeline
.apply(stream).apply(WithKeys.<String, Long>of(""))
.apply(
Window.<KV<String, Long>>into(FixedWindows.of(Duration.standardMinutes(10L))));

PCollection<KV<String, Long>> reshuffled = input.apply(Reshuffle.<String, Long>of());
PAssert.that(reshuffled.apply(Values.<Long>create())).containsInAnyOrder(0L, 1L, 2L);

pipeline.run();
}
}

0 comments on commit 6bef461

Please sign in to comment.