Skip to content

Commit

Permalink
This closes #3286
Browse files Browse the repository at this point in the history
  • Loading branch information
aviemzur committed Jul 24, 2017
2 parents f54072a + dfa983c commit 0064fb3
Show file tree
Hide file tree
Showing 3 changed files with 209 additions and 1 deletion.
3 changes: 2 additions & 1 deletion runners/spark/pom.xml
Expand Up @@ -77,7 +77,8 @@
<excludedGroups>
org.apache.beam.sdk.testing.UsesSplittableParDo,
org.apache.beam.sdk.testing.UsesCommittedMetrics,
org.apache.beam.sdk.testing.UsesTestStream
org.apache.beam.sdk.testing.UsesTestStream,
org.apache.beam.sdk.testing.UsesCustomWindowMerging
</excludedGroups>
<parallel>none</parallel>
<forkCount>1</forkCount>
Expand Down
@@ -0,0 +1,23 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.beam.sdk.testing;

/**
* Category tag for validation tests which utilize custom window merging.
*/
public interface UsesCustomWindowMerging {}
Expand Up @@ -31,19 +31,30 @@
import static org.mockito.Mockito.when;

import com.google.common.collect.Iterables;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.beam.sdk.Pipeline.PipelineVisitor;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.Coder.NonDeterministicException;
import org.apache.beam.sdk.coders.CustomCoder;
import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.apache.beam.sdk.coders.VarIntCoder;
import org.apache.beam.sdk.io.GenerateSequence;
import org.apache.beam.sdk.runners.TransformHierarchy;
import org.apache.beam.sdk.testing.PAssert;
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.testing.UsesCustomWindowMerging;
import org.apache.beam.sdk.testing.ValidatesRunner;
import org.apache.beam.sdk.transforms.Combine;
import org.apache.beam.sdk.transforms.Count;
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.GroupByKey;
Expand Down Expand Up @@ -570,4 +581,177 @@ public void testDisplayDataExcludesDefaults() {
assertThat(data, not(hasDisplayItem("trigger")));
assertThat(data, not(hasDisplayItem("allowedLateness")));
}
@Test
@Category({ValidatesRunner.class, UsesCustomWindowMerging.class})
public void testMergingCustomWindows() {
Instant startInstant = new Instant(0L);
List<TimestampedValue<String>> input = new ArrayList<>();
input.add(TimestampedValue.of("big", startInstant.plus(Duration.standardSeconds(10))));
input.add(TimestampedValue.of("small1", startInstant.plus(Duration.standardSeconds(20))));
// This one will be outside of bigWindow thus not merged
input.add(TimestampedValue.of("small2", startInstant.plus(Duration.standardSeconds(39))));
PCollection<String> inputCollection = pipeline.apply(Create.timestamped(input));
PCollection<String> windowedCollection = inputCollection
.apply(Window.into(new CustomWindowFn<String>()));
PCollection<Long> count = windowedCollection
.apply(Combine.globally(Count.<String>combineFn()).withoutDefaults());
// "small1" and "big" elements merged into bigWindow "small2" not merged
// because timestamp is not in bigWindow
PAssert.that("Wrong number of elements in output collection", count).containsInAnyOrder(2L, 1L);
pipeline.run();
}

// This test is usefull because some runners have a special merge implementation
// for keyed collections
@Test
@Category({ValidatesRunner.class, UsesCustomWindowMerging.class})
public void testMergingCustomWindowsKeyedCollection() {
Instant startInstant = new Instant(0L);
List<TimestampedValue<KV<Integer, String>>> input = new ArrayList<>();
input
.add(TimestampedValue.of(KV.of(0, "big"), startInstant.plus(Duration.standardSeconds(10))));
input.add(
TimestampedValue.of(KV.of(1, "small1"), startInstant.plus(Duration.standardSeconds(20))));
// This one will be outside of bigWindow thus not merged
input.add(
TimestampedValue.of(KV.of(2, "small2"), startInstant.plus(Duration.standardSeconds(39))));
PCollection<KV<Integer, String>> inputCollection = pipeline.apply(Create.timestamped(input));
PCollection<KV<Integer, String>> windowedCollection = inputCollection
.apply(Window.into(new CustomWindowFn<KV<Integer, String>>()));
PCollection<Long> count = windowedCollection
.apply(Combine.globally(Count.<KV<Integer, String>>combineFn()).withoutDefaults());
// "small1" and "big" elements merged into bigWindow "small2" not merged
// because timestamp is not in bigWindow
PAssert.that("Wrong number of elements in output collection", count).containsInAnyOrder(2L, 1L);
pipeline.run();
}

private static class CustomWindow extends IntervalWindow {

private boolean isBig;


CustomWindow(Instant start, Instant end) {
super(start, end);
this.isBig = false;
}

CustomWindow(Instant start, Instant end, boolean isBig) {
super(start, end);
this.isBig = isBig;
}

@Override public boolean equals(Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
if (!super.equals(o)) {
return false;
}
CustomWindow that = (CustomWindow) o;
return isBig == that.isBig;
}

@Override public int hashCode() {
return Objects.hash(super.hashCode(), isBig);
}
}

private static class CustomWindowCoder extends
CustomCoder<CustomWindow> {

private static final CustomWindowCoder INSTANCE = new CustomWindowCoder();
private static final Coder<IntervalWindow> INTERVAL_WINDOW_CODER = IntervalWindow.getCoder();
private static final VarIntCoder VAR_INT_CODER = VarIntCoder.of();

public static CustomWindowCoder of() {
return INSTANCE;
}

@Override
public void encode(CustomWindow window, OutputStream outStream)
throws IOException {
INTERVAL_WINDOW_CODER.encode(window, outStream);
VAR_INT_CODER.encode(window.isBig ? 1 : 0, outStream);
}

@Override
public CustomWindow decode(InputStream inStream) throws IOException {
IntervalWindow superWindow = INTERVAL_WINDOW_CODER.decode(inStream);
boolean isBig = VAR_INT_CODER.decode(inStream) != 0;
return new CustomWindow(superWindow.start(), superWindow.end(), isBig);
}

@Override
public void verifyDeterministic() throws NonDeterministicException {
INTERVAL_WINDOW_CODER.verifyDeterministic();
VAR_INT_CODER.verifyDeterministic();
}
}

private static class CustomWindowFn<T> extends WindowFn<T, CustomWindow> {

@Override public Collection<CustomWindow> assignWindows(AssignContext c) throws Exception {
String element;
// It loses genericity of type T but this is not a big deal for a test.
// And it allows to avoid duplicating CustomWindowFn to support PCollection<KV>
if (c.element() instanceof KV){
element = ((KV<Integer, String>) c.element()).getValue();
} else {
element = (String) c.element();
}
// put big elements in windows of 30s and small ones in windows of 5s
if ("big".equals(element)) {
return Collections.singletonList(
new CustomWindow(c.timestamp(), c.timestamp().plus(Duration.standardSeconds(30)),
true));
} else {
return Collections.singletonList(
new CustomWindow(c.timestamp(), c.timestamp().plus(Duration.standardSeconds(5)),
false));
}
}

@Override
public void mergeWindows(MergeContext c) throws Exception {
List<CustomWindow> toBeMerged = new ArrayList<>();
CustomWindow bigWindow = null;
for (CustomWindow customWindow : c.windows()) {
if (customWindow.isBig) {
bigWindow = customWindow;
toBeMerged.add(customWindow);
} else if (bigWindow != null
&& customWindow.start().isAfter(bigWindow.start())
&& customWindow.end().isBefore(bigWindow.end())) {
toBeMerged.add(customWindow);
}
}
// in case bigWindow has not been seen yet
if (bigWindow != null) {
// merge small windows into big windows
c.merge(toBeMerged, bigWindow);
}
}

@Override
public boolean isCompatible(WindowFn<?, ?> other) {
return other instanceof CustomWindowFn;
}

@Override
public Coder<CustomWindow> windowCoder() {
return CustomWindowCoder.of();
}

@Override
public WindowMappingFn<CustomWindow> getDefaultWindowMappingFn() {
throw new UnsupportedOperationException("side inputs not supported");
}


}

}

0 comments on commit 0064fb3

Please sign in to comment.