Skip to content

Commit

Permalink
Merge 7fdae1f into 88de0cb
Browse files Browse the repository at this point in the history
  • Loading branch information
manuzhang committed Dec 15, 2016
2 parents 88de0cb + 7fdae1f commit 6233d6c
Show file tree
Hide file tree
Showing 10 changed files with 282 additions and 135 deletions.
39 changes: 2 additions & 37 deletions runners/gearpump/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
<gearpump.version>0.8.1</gearpump.version>
<gearpump.version>0.8.3-SNAPSHOT</gearpump.version>
</properties>

<profiles>
Expand Down Expand Up @@ -121,29 +121,12 @@
<artifactId>gearpump-core_2.11</artifactId>
<version>${gearpump.version}</version>
<scope>provided</scope>
<exclusions>
<exclusion>
<groupId>com.google.code.findbugs</groupId>
<artifactId>jsr305</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.gearpump</groupId>
<artifactId>gearpump-daemon_2.11</artifactId>
<version>${gearpump.version}</version>
<exclusions>
<exclusion>
<groupId>org.apache.gearpump</groupId>
<artifactId>gearpump-experimental-cgroup_2.11</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>com.typesafe</groupId>
<artifactId>config</artifactId>
<scope>provided</scope>
<version>1.3.0</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.scala-lang</groupId>
Expand All @@ -169,10 +152,6 @@
<groupId>org.apache.beam</groupId>
<artifactId>beam-runners-core-java</artifactId>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
</dependency>
<dependency>
<groupId>joda-time</groupId>
<artifactId>joda-time</artifactId>
Expand All @@ -197,20 +176,6 @@
<groupId>com.google.code.findbugs</groupId>
<artifactId>jsr305</artifactId>
</dependency>
<dependency>
<groupId>org.apache.gearpump</groupId>
<artifactId>gearpump-shaded-metrics-graphite_2.11</artifactId>
<version>${gearpump.version}</version>
<classifier>assembly</classifier>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.gearpump</groupId>
<artifactId>gearpump-shaded-guava_2.11</artifactId>
<version>${gearpump.version}</version>
<classifier>assembly</classifier>
<scope>test</scope>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import org.apache.beam.runners.gearpump.translators.ReadUnboundedTranslator;
import org.apache.beam.runners.gearpump.translators.TransformTranslator;
import org.apache.beam.runners.gearpump.translators.TranslationContext;
import org.apache.beam.runners.gearpump.translators.WindowBoundTranslator;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.io.Read;
import org.apache.beam.sdk.runners.TransformHierarchy;
Expand All @@ -37,6 +38,7 @@
import org.apache.beam.sdk.transforms.GroupByKey;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.windowing.Window;
import org.apache.beam.sdk.values.PValue;

import org.apache.gearpump.util.Graph;
Expand Down Expand Up @@ -71,6 +73,7 @@ public class GearpumpPipelineTranslator extends Pipeline.PipelineVisitor.Default
registerTransformTranslator(Flatten.FlattenPCollectionList.class,
new FlattenPCollectionTranslator());
registerTransformTranslator(ParDo.BoundMulti.class, new ParDoBoundMultiTranslator());
registerTransformTranslator(Window.Bound.class, new WindowBoundTranslator());
registerTransformTranslator(Create.Values.class, new CreateValuesTranslator());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,20 +23,19 @@
import java.util.HashMap;
import java.util.Map;

import org.apache.beam.runners.core.AssignWindows;
import org.apache.beam.runners.gearpump.translators.TranslationContext;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsValidator;
import org.apache.beam.sdk.runners.PipelineRunner;
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.Flatten;
import org.apache.beam.sdk.transforms.OldDoFn;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.transforms.windowing.Window;
import org.apache.beam.sdk.transforms.windowing.WindowFn;
import org.apache.beam.sdk.util.IdentityWindowFn;
import org.apache.beam.sdk.util.WindowingStrategy;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PCollectionList;
Expand Down Expand Up @@ -75,9 +74,10 @@ public static GearpumpRunner fromOptions(PipelineOptions options) {

public <OutputT extends POutput, InputT extends PInput> OutputT apply(
PTransform<InputT, OutputT> transform, InputT input) {
if (Window.Bound.class.equals(transform.getClass())) {
if (Window.Bound.class.equals(transform.getClass())
&& isNullOrIdentityWindowFn(((Window.Bound) transform).getWindowFn())) {
return (OutputT) super.apply(
new AssignWindowsAndSetStrategy((Window.Bound) transform), input);
ParDo.of(new IdentityFn()), input);
} else if (Flatten.FlattenPCollectionList.class.equals(transform.getClass())
&& ((PCollectionList<?>) input).size() == 0) {
return (OutputT) Pipeline.applyTransform(input.getPipeline().begin(), Create.of());
Expand Down Expand Up @@ -139,53 +139,15 @@ private Config registerSerializers(Config config, Map<String, String> userSerial
return config.withValue(GEARPUMP_SERIALIZERS, ConfigValueFactory.fromMap(serializers));
}

private static class IdentityFn<T> extends DoFn<T, T> {

/**
* copied from DirectPipelineRunner.
* used to replace Window.Bound till window function is added to Gearpump Stream DSL
*/
private static class AssignWindowsAndSetStrategy<T, W extends BoundedWindow>
extends PTransform<PCollection<T>, PCollection<T>> {

private final Window.Bound<T> wrapped;

AssignWindowsAndSetStrategy(Window.Bound<T> wrapped) {
this.wrapped = wrapped;
}

@Override
public PCollection<T> apply(PCollection<T> input) {
WindowingStrategy<?, ?> outputStrategy =
wrapped.getOutputStrategyInternal(input.getWindowingStrategy());

WindowFn<T, BoundedWindow> windowFn =
(WindowFn<T, BoundedWindow>) outputStrategy.getWindowFn();

if (!windowFn.isNonMerging()) {
throw new UnsupportedOperationException(
"merging window is not supported in Gearpump pipeline");
}

// If the Window.Bound transform only changed parts other than the WindowFn, then
// we skip AssignWindows even though it should be harmless in a perfect world.
// The world is not perfect, and a GBK may have set it to InvalidWindows to forcibly
// crash if another GBK is performed without explicitly setting the WindowFn. So we skip
// AssignWindows in this case.
if (wrapped.getWindowFn() == null) {
return input.apply("Identity", ParDo.of(new IdentityFn<T>()))
.setWindowingStrategyInternal(outputStrategy);
} else {
return input
.apply("AssignWindows", new AssignWindows<>(windowFn))
.setWindowingStrategyInternal(outputStrategy);
}
@ProcessElement
public void process(ProcessContext c) {
c.output(c.element());
}
}

private static class IdentityFn<T> extends OldDoFn<T, T> {
@Override
public void processElement(ProcessContext c) {
c.output(c.element());
}
private boolean isNullOrIdentityWindowFn(WindowFn windowFn) {
return windowFn == null || windowFn.getClass().equals(IdentityWindowFn.class);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@ public class FlattenPCollectionTranslator<T> implements
@Override
public void translate(Flatten.FlattenPCollectionList<T> transform, TranslationContext context) {
JavaStream<T> merged = null;
System.out.println("PCollectionList size " + context.getInput(transform).size());
for (PCollection<T> collection : context.getInput(transform).getAll()) {
JavaStream<T> inputStream = context.getInputStream(collection);
if (null == merged) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,23 +19,33 @@
package org.apache.beam.runners.gearpump.translators;

import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;

import java.util.ArrayList;
import java.util.Collections;
import java.util.Iterator;
import java.io.Serializable;
import java.time.Instant;
import java.util.Collection;
import java.util.LinkedList;
import java.util.List;

import org.apache.beam.runners.gearpump.translators.utils.TranslatorUtils;
import org.apache.beam.sdk.transforms.GroupByKey;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
import org.apache.beam.sdk.transforms.windowing.IntervalWindow;
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.sdk.values.KV;

import org.apache.beam.sdk.values.PCollection;
import org.apache.gearpump.streaming.dsl.javaapi.JavaStream;
import org.apache.gearpump.streaming.javaapi.dsl.functions.FlatMapFunction;
import org.apache.gearpump.streaming.dsl.window.api.Accumulating$;
import org.apache.gearpump.streaming.dsl.window.api.EventTimeTrigger$;
import org.apache.gearpump.streaming.dsl.window.api.Window;
import org.apache.gearpump.streaming.dsl.window.api.WindowFn;
import org.apache.gearpump.streaming.dsl.window.impl.Bucket;
import org.apache.gearpump.streaming.javaapi.dsl.functions.GroupByFunction;
import org.apache.gearpump.streaming.javaapi.dsl.functions.MapFunction;
import org.apache.gearpump.streaming.javaapi.dsl.functions.ReduceFunction;

import scala.collection.JavaConversions;


/**
Expand All @@ -44,56 +54,97 @@
public class GroupByKeyTranslator<K, V> implements TransformTranslator<GroupByKey<K, V>> {
@Override
public void translate(GroupByKey<K, V> transform, TranslationContext context) {
PCollection<KV<K, V>> input = context.getInput(transform);
JavaStream<WindowedValue<KV<K, V>>> inputStream =
context.getInputStream(context.getInput(transform));
context.getInputStream(input);
int parallelism = context.getPipelineOptions().getParallelism();
JavaStream<WindowedValue<KV<K, Iterable<V>>>> outputStream = inputStream
.flatMap(new KeyedByKeyAndWindow<K, V>(), "keyed_by_Key_and_Window")
.groupBy(new GroupByKeyAndWindow<K, V>(), parallelism, "group_by_Key_and_Window")
.map(new ExtractKeyValue<K, V>(), "extract_Key_and_Value")
.window(Window.apply(new GearpumpWindowFn(input.getWindowingStrategy().getWindowFn()),
EventTimeTrigger$.MODULE$, Accumulating$.MODULE$), "assign_window")
.groupBy(new GroupByFn<K, V>(), parallelism, "group_by_Key_and_Window")
.map(new ValueToIterable<K, V>(), "map_value_to_iterable")
.reduce(new MergeValue<K, V>(), "merge_value");

context.setOutputStream(context.getOutput(transform), outputStream);
}

private static class KeyedByKeyAndWindow<K, V> implements
FlatMapFunction<WindowedValue<KV<K, V>>, WindowedValue<KV<KV<K, BoundedWindow>, V>>> {
private static class GearpumpWindowFn<T, W extends BoundedWindow> implements WindowFn,
Serializable {

private org.apache.beam.sdk.transforms.windowing.WindowFn<T, W> windowFn;

GearpumpWindowFn(org.apache.beam.sdk.transforms.windowing.WindowFn<T, W> windowFn) {
this.windowFn = windowFn;
}

@Override
public Iterator<WindowedValue<KV<KV<K, BoundedWindow>, V>>> apply(WindowedValue<KV<K, V>> wv) {
List<WindowedValue<KV<KV<K, BoundedWindow>, V>>> ret = new ArrayList<>(wv.getWindows().size
());
for (BoundedWindow window : wv.getWindows()) {
KV<K, BoundedWindow> keyWin = KV.of(wv.getValue().getKey(), window);
ret.add(WindowedValue.of(KV.of(keyWin, wv.getValue().getValue()),
wv.getTimestamp(), window, wv.getPane()));
public scala.collection.immutable.List<Bucket> apply(final Instant timestamp) {
try {
Collection<W> windows = windowFn.assignWindows(windowFn.new AssignContext() {
@Override
public T element() {
throw new UnsupportedOperationException();
}

@Override
public org.joda.time.Instant timestamp() {
return TranslatorUtils.java8TimeToJodaTime(timestamp);
}

@Override
public W window() {
throw new UnsupportedOperationException();
}
});

List<Bucket> buckets = new LinkedList<>();
for (BoundedWindow window : windows) {
buckets.add(getBucket(window));
}
return JavaConversions.asScalaBuffer(buckets).toList();
} catch (Exception e) {
throw new RuntimeException(e);
}
}

private Bucket getBucket(BoundedWindow window) {
if (window instanceof IntervalWindow) {
IntervalWindow intervalWindow = (IntervalWindow) window;
Instant start = TranslatorUtils.jodaTimeToJava8Time(intervalWindow.start());
Instant end = TranslatorUtils.jodaTimeToJava8Time(intervalWindow.end());
return new Bucket(start, end);
} else if (window instanceof GlobalWindow) {
Instant end = TranslatorUtils.jodaTimeToJava8Time(window.maxTimestamp());
return new Bucket(Instant.MIN, end);
} else {
throw new RuntimeException("unknown window " + window.getClass().getName());
}
return ret.iterator();
}
}

private static class GroupByKeyAndWindow<K, V> implements
GroupByFunction<WindowedValue<KV<KV<K, BoundedWindow>, V>>, KV<K, BoundedWindow>> {
private static class GroupByFn<K, V> implements
GroupByFunction<WindowedValue<KV<K, V>>, K> {

@Override
public KV<K, BoundedWindow> apply(WindowedValue<KV<KV<K, BoundedWindow>, V>> wv) {
public K apply(WindowedValue<KV<K, V>> wv) {
return wv.getValue().getKey();
}
}

private static class ExtractKeyValue<K, V> implements
MapFunction<WindowedValue<KV<KV<K, BoundedWindow>, V>>,
WindowedValue<KV<K, Iterable<V>>>> {
private static class ValueToIterable<K, V>
implements MapFunction<WindowedValue<KV<K, V>>, WindowedValue<KV<K, Iterable<V>>>> {


@Override
public WindowedValue<KV<K, Iterable<V>>> apply(WindowedValue<KV<KV<K, BoundedWindow>, V>> wv) {
return WindowedValue.of(KV.of(wv.getValue().getKey().getKey(),
(Iterable<V>) Collections.singletonList(wv.getValue().getValue())),
wv.getTimestamp(), wv.getWindows(), wv.getPane());
public WindowedValue<KV<K, Iterable<V>>> apply(WindowedValue<KV<K, V>> wv) {
Iterable<V> values = Lists.newArrayList(wv.getValue().getValue());
return wv.withValue(KV.of(wv.getValue().getKey(), values));
}
}

private static class MergeValue<K, V> implements
ReduceFunction<WindowedValue<KV<K, Iterable<V>>>> {

@Override
public WindowedValue<KV<K, Iterable<V>>> apply(WindowedValue<KV<K, Iterable<V>>> wv1,
WindowedValue<KV<K, Iterable<V>>> wv2) {
Expand Down
Loading

0 comments on commit 6233d6c

Please sign in to comment.