From d8f91ad826ba703b27090d93da0f58a31bd23d40 Mon Sep 17 00:00:00 2001 From: manuzhang Date: Fri, 19 May 2017 09:19:42 +0800 Subject: [PATCH 1/4] [BEAM-79] Respect WindowFn#getOutputTime in gearpump-runner --- .../gearpump/translators/GroupByKeyTranslator.java | 12 ++++++++---- .../translators/GroupByKeyTranslatorTest.java | 8 ++++---- 2 files changed, 12 insertions(+), 8 deletions(-) diff --git a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/GroupByKeyTranslator.java b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/GroupByKeyTranslator.java index 521f665fca5b..7d944a41580c 100644 --- a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/GroupByKeyTranslator.java +++ b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/GroupByKeyTranslator.java @@ -74,7 +74,7 @@ public void translate(GroupByKey transform, TranslationContext context) { new GearpumpWindowFn(windowFn.isNonMerging()), EventTimeTrigger$.MODULE$, Discarding$.MODULE$), "assign_window") .groupBy(new GroupByFn(inputKeyCoder), parallelism, "group_by_Key_and_Window") - .map(new KeyedByTimestamp(timestampCombiner), "keyed_by_timestamp") + .map(new KeyedByTimestamp(windowFn, timestampCombiner), "keyed_by_timestamp") .fold(new Merge<>(windowFn, timestampCombiner), "merge") .map(new Values(), "values"); @@ -146,17 +146,21 @@ protected static class KeyedByTimestamp extends MapFunction>, KV>>> { + private final WindowFn, BoundedWindow> windowFn; private final TimestampCombiner timestampCombiner; - public KeyedByTimestamp(TimestampCombiner timestampCombiner) { + public KeyedByTimestamp(WindowFn, BoundedWindow> windowFn, + TimestampCombiner timestampCombiner) { + this.windowFn = windowFn; this.timestampCombiner = timestampCombiner; } @Override public KV>> map( WindowedValue> wv) { - Instant timestamp = timestampCombiner.assign( - Iterables.getOnlyElement(wv.getWindows()), wv.getTimestamp()); + BoundedWindow window = Iterables.getOnlyElement(wv.getWindows()); + Instant timestamp = timestampCombiner.assign(window + , windowFn.getOutputTime(wv.getTimestamp(), window)); return KV.of(timestamp, wv); } } diff --git a/runners/gearpump/src/test/java/org/apache/beam/runners/gearpump/translators/GroupByKeyTranslatorTest.java b/runners/gearpump/src/test/java/org/apache/beam/runners/gearpump/translators/GroupByKeyTranslatorTest.java index 86b60aa38515..d5b931b78868 100644 --- a/runners/gearpump/src/test/java/org/apache/beam/runners/gearpump/translators/GroupByKeyTranslatorTest.java +++ b/runners/gearpump/src/test/java/org/apache/beam/runners/gearpump/translators/GroupByKeyTranslatorTest.java @@ -22,7 +22,6 @@ import static org.junit.Assert.assertThat; import com.google.common.collect.ImmutableList; -import com.google.common.collect.Iterables; import com.google.common.collect.Lists; import java.time.Instant; @@ -95,18 +94,19 @@ public static Iterable data() { @Test @SuppressWarnings({"rawtypes", "unchecked"}) public void testKeyedByTimestamp() { + WindowFn slidingWindows = Sessions.withGapDuration(Duration.millis(10)); BoundedWindow window = new IntervalWindow(new org.joda.time.Instant(0), new org.joda.time.Instant(10)); GroupByKeyTranslator.KeyedByTimestamp keyedByTimestamp = - new GroupByKeyTranslator.KeyedByTimestamp(timestampCombiner); + new GroupByKeyTranslator.KeyedByTimestamp(slidingWindows, timestampCombiner); WindowedValue> value = WindowedValue.of( KV.of("key", "val"), org.joda.time.Instant.now(), window, PaneInfo.NO_FIRING); KV>> result = keyedByTimestamp.map(value); org.joda.time.Instant time = - timestampCombiner.assign(Iterables.getOnlyElement(value.getWindows()), - value.getTimestamp()); + timestampCombiner.assign(window, + slidingWindows.getOutputTime(value.getTimestamp(), window)); assertThat(result, equalTo(KV.of(time, value))); } From 2677fbb6491d583c7f9e909226971357f76a8230 Mon Sep 17 00:00:00 2001 From: Kenneth Knowles Date: Thu, 18 May 2017 13:09:36 -0700 Subject: [PATCH 2/4] Activate Gearpump local-validates-runner-tests in precommit --- runners/gearpump/pom.xml | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/runners/gearpump/pom.xml b/runners/gearpump/pom.xml index d4dade1fbd56..be384903cefd 100644 --- a/runners/gearpump/pom.xml +++ b/runners/gearpump/pom.xml @@ -49,7 +49,8 @@ local-validates-runner-tests - false + + true From 84d3120bd5749171d02b71c9547b6e932f436abd Mon Sep 17 00:00:00 2001 From: manuzhang Date: Mon, 22 May 2017 11:39:19 +0800 Subject: [PATCH 3/4] Remove unneeded translators --- runners/gearpump/pom.xml | 1 + .../gearpump/GearpumpPipelineTranslator.java | 5 -- .../beam/runners/gearpump/GearpumpRunner.java | 7 +- .../CreatePCollectionViewTranslator.java | 45 ----------- .../ParDoSingleOutputTranslator.java | 75 ------------------- .../translators/utils/TranslatorUtils.java | 1 - .../CreatePCollectionViewTranslatorTest.java | 55 -------------- 7 files changed, 6 insertions(+), 183 deletions(-) delete mode 100644 runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/CreatePCollectionViewTranslator.java delete mode 100644 runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/ParDoSingleOutputTranslator.java delete mode 100644 runners/gearpump/src/test/java/org/apache/beam/runners/gearpump/translators/CreatePCollectionViewTranslatorTest.java diff --git a/runners/gearpump/pom.xml b/runners/gearpump/pom.xml index be384903cefd..7e39a484db4c 100644 --- a/runners/gearpump/pom.xml +++ b/runners/gearpump/pom.xml @@ -87,6 +87,7 @@ ] + 4 diff --git a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/GearpumpPipelineTranslator.java b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/GearpumpPipelineTranslator.java index dc4592c67be0..b2939c241512 100644 --- a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/GearpumpPipelineTranslator.java +++ b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/GearpumpPipelineTranslator.java @@ -30,11 +30,9 @@ import org.apache.beam.runners.core.construction.PTransformReplacements; import org.apache.beam.runners.core.construction.SingleInputOutputOverrideFactory; import org.apache.beam.runners.gearpump.translators.CreateGearpumpPCollectionViewTranslator; -import org.apache.beam.runners.gearpump.translators.CreatePCollectionViewTranslator; import org.apache.beam.runners.gearpump.translators.FlattenPCollectionsTranslator; import org.apache.beam.runners.gearpump.translators.GroupByKeyTranslator; import org.apache.beam.runners.gearpump.translators.ParDoMultiOutputTranslator; -import org.apache.beam.runners.gearpump.translators.ParDoSingleOutputTranslator; import org.apache.beam.runners.gearpump.translators.ReadBoundedTranslator; import org.apache.beam.runners.gearpump.translators.ReadUnboundedTranslator; import org.apache.beam.runners.gearpump.translators.TransformTranslator; @@ -89,7 +87,6 @@ public class GearpumpPipelineTranslator implements Pipeline.PipelineVisitor { static { // register TransformTranslators - registerTransformTranslator(ParDo.SingleOutput.class, new ParDoSingleOutputTranslator()); registerTransformTranslator(Read.Unbounded.class, new ReadUnboundedTranslator()); registerTransformTranslator(Read.Bounded.class, new ReadBoundedTranslator()); registerTransformTranslator(GroupByKey.class, new GroupByKeyTranslator()); @@ -97,8 +94,6 @@ public class GearpumpPipelineTranslator implements Pipeline.PipelineVisitor { new FlattenPCollectionsTranslator()); registerTransformTranslator(ParDo.MultiOutput.class, new ParDoMultiOutputTranslator()); registerTransformTranslator(Window.Assign.class, new WindowAssignTranslator()); - registerTransformTranslator(View.CreatePCollectionView.class, - new CreatePCollectionViewTranslator()); registerTransformTranslator(CreateGearpumpPCollectionView.class, new CreateGearpumpPCollectionViewTranslator<>()); } diff --git a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/GearpumpRunner.java b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/GearpumpRunner.java index 6df3f2da5edb..30b19353dc93 100644 --- a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/GearpumpRunner.java +++ b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/GearpumpRunner.java @@ -95,19 +95,22 @@ private ClientContext getClientContext(GearpumpPipelineOptions options, Config c */ private Config registerSerializers(Config config, Map userSerializers) { Map serializers = new HashMap<>(); + serializers.put("org.apache.beam.sdk.util.WindowedValue$ValueInGlobalWindow", ""); serializers.put("org.apache.beam.sdk.util.WindowedValue$TimestampedValueInSingleWindow", ""); + serializers.put("org.apache.beam.sdk.util.WindowedValue$TimestampedValueInGlobalWindow", ""); + serializers.put("org.apache.beam.sdk.util.WindowedValue$TimestampedValueInMultipleWindows", ""); serializers.put("org.apache.beam.sdk.transforms.windowing.PaneInfo", ""); serializers.put("org.apache.beam.sdk.transforms.windowing.PaneInfo$Timing", ""); serializers.put("org.joda.time.Instant", ""); serializers.put("org.apache.beam.sdk.values.KV", ""); serializers.put("org.apache.beam.sdk.transforms.windowing.IntervalWindow", ""); serializers.put("org.apache.beam.sdk.values.TimestampedValue", ""); + if (userSerializers != null && !userSerializers.isEmpty()) { serializers.putAll(userSerializers); } + return config.withValue(GEARPUMP_SERIALIZERS, ConfigValueFactory.fromMap(serializers)); } - - } diff --git a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/CreatePCollectionViewTranslator.java b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/CreatePCollectionViewTranslator.java deleted file mode 100644 index da55d705a18f..000000000000 --- a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/CreatePCollectionViewTranslator.java +++ /dev/null @@ -1,45 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.beam.runners.gearpump.translators; - -import java.util.List; - -import org.apache.beam.sdk.transforms.View; -import org.apache.beam.sdk.util.WindowedValue; -import org.apache.beam.sdk.values.PCollectionView; -import org.apache.gearpump.streaming.dsl.javaapi.JavaStream; - -/** - * View.CreatePCollectionView bridges input stream to down stream - * transforms. - */ -public class CreatePCollectionViewTranslator implements - TransformTranslator> { - - private static final long serialVersionUID = -2394386873317515748L; - - @Override - public void translate(View.CreatePCollectionView transform, - TranslationContext context) { - JavaStream>> inputStream = - context.getInputStream(context.getInput()); - PCollectionView view = (PCollectionView) context.getOutput(); - context.setOutputStream(view, inputStream); - } -} diff --git a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/ParDoSingleOutputTranslator.java b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/ParDoSingleOutputTranslator.java deleted file mode 100644 index 6b0e610e23bf..000000000000 --- a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/ParDoSingleOutputTranslator.java +++ /dev/null @@ -1,75 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.beam.runners.gearpump.translators; - -import java.util.Collection; -import java.util.List; -import java.util.Map; - -import org.apache.beam.runners.gearpump.translators.functions.DoFnFunction; -import org.apache.beam.runners.gearpump.translators.utils.TranslatorUtils; -import org.apache.beam.sdk.transforms.DoFn; -import org.apache.beam.sdk.transforms.ParDo; -import org.apache.beam.sdk.util.WindowedValue; -import org.apache.beam.sdk.values.PCollection; - -import org.apache.beam.sdk.values.PCollectionView; -import org.apache.beam.sdk.values.TupleTag; -import org.apache.beam.sdk.values.TupleTagList; -import org.apache.beam.sdk.values.WindowingStrategy; -import org.apache.gearpump.streaming.dsl.javaapi.JavaStream; - -/** - * {@link ParDo.SingleOutput} is translated to Gearpump flatMap function - * with {@link DoFn} wrapped in {@link DoFnFunction}. - */ -public class ParDoSingleOutputTranslator implements - TransformTranslator> { - - private static final long serialVersionUID = -3413205558160983784L; - private final TupleTag mainOutput = new TupleTag<>(); - private final List> sideOutputs = TupleTagList.empty().getAll(); - - @Override - public void translate(ParDo.SingleOutput transform, TranslationContext context) { - DoFn doFn = transform.getFn(); - PCollection output = (PCollection) context.getOutput(); - WindowingStrategy windowingStrategy = output.getWindowingStrategy(); - - Collection> sideInputs = transform.getSideInputs(); - Map> tagsToSideInputs = - TranslatorUtils.getTagsToSideInputs(sideInputs); - JavaStream> inputStream = context.getInputStream( - context.getInput()); - JavaStream unionStream = - TranslatorUtils.withSideInputStream(context, - inputStream, tagsToSideInputs); - - DoFnFunction doFnFunction = new DoFnFunction<>(context.getPipelineOptions(), - doFn, windowingStrategy, sideInputs, tagsToSideInputs, - mainOutput, sideOutputs); - - JavaStream> outputStream = - TranslatorUtils.toList(unionStream) - .flatMap(doFnFunction, transform.getName()) - .map(new TranslatorUtils.FromRawUnionValue(), "from_RawUnionValue"); - - context.setOutputStream(context.getOutput(), outputStream); - } -} diff --git a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/utils/TranslatorUtils.java b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/utils/TranslatorUtils.java index b8f0ccb41e7e..999afae8a7f8 100644 --- a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/utils/TranslatorUtils.java +++ b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/utils/TranslatorUtils.java @@ -74,7 +74,6 @@ public static JavaStream withSideInputStream( for (Map.Entry> tagToSideInput: tagsToSideInputs.entrySet()) { // actually JavaStream>> - // check CreatePCollectionViewTranslator JavaStream> sideInputStream = context.getInputStream( tagToSideInput.getValue()); mainStream = mainStream.merge(sideInputStream.map(new ToRawUnionValue<>( diff --git a/runners/gearpump/src/test/java/org/apache/beam/runners/gearpump/translators/CreatePCollectionViewTranslatorTest.java b/runners/gearpump/src/test/java/org/apache/beam/runners/gearpump/translators/CreatePCollectionViewTranslatorTest.java deleted file mode 100644 index 42ff14e60adc..000000000000 --- a/runners/gearpump/src/test/java/org/apache/beam/runners/gearpump/translators/CreatePCollectionViewTranslatorTest.java +++ /dev/null @@ -1,55 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.beam.runners.gearpump.translators; - -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.times; -import static org.mockito.Mockito.verify; -import static org.mockito.Mockito.when; - -import org.apache.beam.sdk.transforms.View; -import org.apache.beam.sdk.values.PCollectionView; -import org.apache.beam.sdk.values.PValue; -import org.apache.gearpump.streaming.dsl.javaapi.JavaStream; -import org.junit.Test; - -/** Tests for {@link CreatePCollectionViewTranslator}. */ -public class CreatePCollectionViewTranslatorTest { - - @Test - @SuppressWarnings({"rawtypes", "unchecked"}) - public void testTranslate() { - CreatePCollectionViewTranslator translator = new CreatePCollectionViewTranslator(); - View.CreatePCollectionView> createView = - mock(View.CreatePCollectionView.class); - - JavaStream javaStream = mock(JavaStream.class); - TranslationContext translationContext = mock(TranslationContext.class); - - PValue mockInput = mock(PValue.class); - when(translationContext.getInput()).thenReturn(mockInput); - when(translationContext.getInputStream(mockInput)).thenReturn(javaStream); - - PCollectionView view = mock(PCollectionView.class); - when(translationContext.getOutput()).thenReturn(view); - - translator.translate(createView, translationContext); - verify(translationContext, times(1)).setOutputStream(view, javaStream); - } -} From 033dd4d61de860112a991a1e9069448856acffe1 Mon Sep 17 00:00:00 2001 From: manuzhang Date: Sat, 27 May 2017 06:24:54 +0800 Subject: [PATCH 4/4] Minor to retrigger to tests --- .../beam/runners/gearpump/translators/utils/TranslatorUtils.java | 1 + 1 file changed, 1 insertion(+) diff --git a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/utils/TranslatorUtils.java b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/utils/TranslatorUtils.java index 999afae8a7f8..7e748c13c6a0 100644 --- a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/utils/TranslatorUtils.java +++ b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/utils/TranslatorUtils.java @@ -74,6 +74,7 @@ public static JavaStream withSideInputStream( for (Map.Entry> tagToSideInput: tagsToSideInputs.entrySet()) { // actually JavaStream>> + // check CreateGearpumpPCollectionViewTranslator JavaStream> sideInputStream = context.getInputStream( tagToSideInput.getValue()); mainStream = mainStream.merge(sideInputStream.map(new ToRawUnionValue<>(