Skip to content

Commit

Permalink
Merge 85dcfbd into 4c445dd
Browse files Browse the repository at this point in the history
  • Loading branch information
manuzhang committed Jan 20, 2017
2 parents 4c445dd + 85dcfbd commit d6ef12c
Show file tree
Hide file tree
Showing 11 changed files with 232 additions and 54 deletions.
1 change: 1 addition & 0 deletions .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ before_install:
install:
# Removing this here protects from inadvertent caching
- rm -rf "$HOME/.m2/repository/org/apache/beam"
- rm -rf "$HOME/.m2/repository/org/apache/gearpump"

script:
- travis_retry mvn --batch-mode --update-snapshots --no-snapshot-updates $MAVEN_OVERRIDE install && travis_retry bash -ex .travis/test_wordcount.sh
Expand Down
9 changes: 8 additions & 1 deletion runners/gearpump/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@
<profiles>
<profile>
<id>local-runnable-on-service-tests</id>
<activation><activeByDefault>false</activeByDefault></activation>
<activation><activeByDefault>true</activeByDefault></activation>
<build>
<plugins>
<plugin>
Expand All @@ -64,11 +64,18 @@
</goals>
<configuration>
<groups>org.apache.beam.sdk.testing.RunnableOnService</groups>
<excludedGroups>
org.apache.beam.sdk.testing.UsesStatefulParDo,
org.apache.beam.sdk.testing.UsesTimersInParDo,
org.apache.beam.sdk.testing.UsesSplittableParDo,
org.apache.beam.sdk.testing.UsesMetrics
</excludedGroups>
<parallel>none</parallel>
<failIfNoTests>true</failIfNoTests>
<dependenciesToScan>
<dependency>org.apache.beam:beam-sdks-java-core</dependency>
</dependenciesToScan>
<argLine>-noverify</argLine>
<excludes>
<!-- side input is not supported in Gearpump -->
<exclude>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.beam.runners.gearpump;

import java.io.IOException;
import java.util.List;

import org.apache.beam.sdk.AggregatorRetrievalException;
import org.apache.beam.sdk.AggregatorValues;
Expand All @@ -26,31 +27,62 @@
import org.apache.beam.sdk.metrics.MetricResults;
import org.apache.beam.sdk.transforms.Aggregator;

import org.apache.gearpump.cluster.MasterToAppMaster;
import org.apache.gearpump.cluster.MasterToAppMaster.AppMasterData;
import org.apache.gearpump.cluster.client.ClientContext;
import org.joda.time.Duration;

import scala.collection.JavaConverters;
import scala.collection.Seq;

/**
* Result of executing a {@link Pipeline} with Gearpump.
*/
public class GearpumpPipelineResult implements PipelineResult {

private final ClientContext client;
private final int appId;
private final Duration defaultWaitDuration = Duration.standardSeconds(30);
private final Duration defaultWaitInterval = Duration.standardSeconds(5);

public GearpumpPipelineResult(ClientContext client, int appId) {
this.client = client;
this.appId = appId;
}

@Override
public State getState() {
return null;
return getGearpumpState();
}

@Override
public State cancel() throws IOException {
return null;
client.shutdown(appId);
return State.CANCELLED;
}

@Override
public State waitUntilFinish(Duration duration) {
return null;
long start = System.currentTimeMillis();
do {
try {
Thread.sleep(defaultWaitInterval.getMillis());
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
} while (State.RUNNING == getGearpumpState()
&& (System.currentTimeMillis() - start) < duration.getMillis());

if (State.RUNNING == getGearpumpState()) {
return State.DONE;
} else {
return State.FAILED;
}
}

@Override
public State waitUntilFinish() {
return null;
return waitUntilFinish(defaultWaitDuration);
}

@Override
Expand All @@ -66,4 +98,23 @@ public MetricResults metrics() {
return null;
}

private State getGearpumpState() {
String status = null;
List<AppMasterData> apps =
JavaConverters.<AppMasterData>seqAsJavaListConverter(
(Seq<AppMasterData>) client.listApps().appMasters()).asJava();
for (AppMasterData app: apps) {
if (app.appId() == appId) {
status = app.status();
}
}
if (null == status || status.equals(MasterToAppMaster.AppMasterNonExist())) {
return State.UNKNOWN;
} else if (status.equals(MasterToAppMaster.AppMasterActive())) {
return State.RUNNING;
} else {
return State.STOPPED;
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -102,14 +102,15 @@ public GearpumpPipelineResult run(Pipeline pipeline) {
options.getSerializers());
ClientContext clientContext = getClientContext(options, config);
options.setClientContext(clientContext);
UserConfig userConfig = UserConfig.empty();
JavaStreamApp streamApp = new JavaStreamApp(
appName, clientContext, UserConfig.empty());
appName, clientContext, userConfig);
TranslationContext translationContext = new TranslationContext(streamApp, options);
GearpumpPipelineTranslator translator = new GearpumpPipelineTranslator(translationContext);
translator.translate(pipeline);
streamApp.submit();
int appId = streamApp.submit();

return null;
return new GearpumpPipelineResult(clientContext, appId);
}

private ClientContext getClientContext(GearpumpPipelineOptions options, Config config) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package org.apache.beam.runners.gearpump;

import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.PipelineResult;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsValidator;
import org.apache.beam.sdk.runners.PipelineRunner;
Expand Down Expand Up @@ -52,7 +53,10 @@ public static TestGearpumpRunner fromOptions(PipelineOptions options) {
@Override
public GearpumpPipelineResult run(Pipeline pipeline) {
GearpumpPipelineResult result = delegate.run(pipeline);
PipelineResult.State state = result.waitUntilFinish();
cluster.stop();
assert(state == PipelineResult.State.DONE);

return result;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,29 +22,35 @@
import com.google.common.collect.Lists;

import java.io.Serializable;
import java.nio.ByteBuffer;
import java.time.Instant;
import java.util.Collection;
import java.util.LinkedList;
import java.util.List;

import org.apache.beam.runners.gearpump.translators.utils.TranslatorUtils;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.CoderException;
import org.apache.beam.sdk.coders.KvCoder;
import org.apache.beam.sdk.transforms.GroupByKey;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
import org.apache.beam.sdk.transforms.windowing.IntervalWindow;
import org.apache.beam.sdk.transforms.windowing.OutputTimeFn;
import org.apache.beam.sdk.util.CoderUtils;
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.sdk.values.KV;

import org.apache.beam.sdk.values.PCollection;
import org.apache.gearpump.streaming.dsl.api.functions.MapFunction;
import org.apache.gearpump.streaming.dsl.api.functions.ReduceFunction;
import org.apache.gearpump.streaming.dsl.javaapi.JavaStream;
import org.apache.gearpump.streaming.dsl.window.api.Accumulating$;
import org.apache.gearpump.streaming.dsl.javaapi.functions.GroupByFunction;
import org.apache.gearpump.streaming.dsl.window.api.Discarding$;
import org.apache.gearpump.streaming.dsl.window.api.EventTimeTrigger$;
import org.apache.gearpump.streaming.dsl.window.api.Window;
import org.apache.gearpump.streaming.dsl.window.api.WindowFn;
import org.apache.gearpump.streaming.dsl.window.impl.Bucket;
import org.apache.gearpump.streaming.javaapi.dsl.functions.GroupByFunction;
import org.apache.gearpump.streaming.javaapi.dsl.functions.MapFunction;
import org.apache.gearpump.streaming.javaapi.dsl.functions.ReduceFunction;
import scala.collection.JavaConversions;


Expand All @@ -55,15 +61,20 @@ public class GroupByKeyTranslator<K, V> implements TransformTranslator<GroupByKe
@Override
public void translate(GroupByKey<K, V> transform, TranslationContext context) {
PCollection<KV<K, V>> input = context.getInput(transform);
Coder<K> inputKeyCoder = ((KvCoder<K, V>) input.getCoder()).getKeyCoder();
JavaStream<WindowedValue<KV<K, V>>> inputStream =
context.getInputStream(input);
int parallelism = context.getPipelineOptions().getParallelism();
OutputTimeFn<? super BoundedWindow> outputTimeFn = (OutputTimeFn<? super BoundedWindow>)
input.getWindowingStrategy().getOutputTimeFn();
JavaStream<WindowedValue<KV<K, Iterable<V>>>> outputStream = inputStream
.window(Window.apply(new GearpumpWindowFn(input.getWindowingStrategy().getWindowFn()),
EventTimeTrigger$.MODULE$, Accumulating$.MODULE$), "assign_window")
.groupBy(new GroupByFn<K, V>(), parallelism, "group_by_Key_and_Window")
EventTimeTrigger$.MODULE$, Discarding$.MODULE$), "assign_window")
.groupBy(new GroupByFn<K, V>(inputKeyCoder), parallelism, "group_by_Key_and_Window")
.map(new ValueToIterable<K, V>(), "map_value_to_iterable")
.reduce(new MergeValue<K, V>(), "merge_value");
.map(new KeyedByTimestamp<K, V>(), "keyed_by_timestamp")
.reduce(new Merge<K, V>(outputTimeFn), "merge")
.map(new Values<K, V>(), "values");

context.setOutputStream(context.getOutput(transform), outputStream);
}
Expand Down Expand Up @@ -122,18 +133,27 @@ private Bucket getBucket(BoundedWindow window) {
}
}

private static class GroupByFn<K, V> implements
GroupByFunction<WindowedValue<KV<K, V>>, K> {
private static class GroupByFn<K, V> extends
GroupByFunction<WindowedValue<KV<K, V>>, ByteBuffer> {

private final Coder<K> keyCoder;

GroupByFn(Coder<K> keyCoder) {
this.keyCoder = keyCoder;
}

@Override
public K apply(WindowedValue<KV<K, V>> wv) {
return wv.getValue().getKey();
public ByteBuffer apply(WindowedValue<KV<K, V>> wv) {
try {
return ByteBuffer.wrap(CoderUtils.encodeToByteArray(keyCoder, wv.getValue().getKey()));
} catch (CoderException e) {
throw new RuntimeException(e);
}
}
}

private static class ValueToIterable<K, V>
implements MapFunction<WindowedValue<KV<K, V>>, WindowedValue<KV<K, Iterable<V>>>> {

extends MapFunction<WindowedValue<KV<K, V>>, WindowedValue<KV<K, Iterable<V>>>> {

@Override
public WindowedValue<KV<K, Iterable<V>>> apply(WindowedValue<KV<K, V>> wv) {
Expand All @@ -142,15 +162,53 @@ public WindowedValue<KV<K, Iterable<V>>> apply(WindowedValue<KV<K, V>> wv) {
}
}

private static class MergeValue<K, V> implements
ReduceFunction<WindowedValue<KV<K, Iterable<V>>>> {
private static class KeyedByTimestamp<K, V>
extends MapFunction<WindowedValue<KV<K, Iterable<V>>>,
KV<org.joda.time.Instant, WindowedValue<KV<K, Iterable<V>>>>> {

@Override
public KV<org.joda.time.Instant, WindowedValue<KV<K, Iterable<V>>>> apply(
WindowedValue<KV<K, Iterable<V>>> wv) {
return KV.of(wv.getTimestamp(), wv);
}
}

private static class Merge<K, V> extends
ReduceFunction<KV<org.joda.time.Instant, WindowedValue<KV<K, Iterable<V>>>>> {

private final OutputTimeFn<? super BoundedWindow> outputTimeFn;

Merge(OutputTimeFn<? super BoundedWindow> outputTimeFn) {
this.outputTimeFn = outputTimeFn;
}

@Override
public WindowedValue<KV<K, Iterable<V>>> apply(WindowedValue<KV<K, Iterable<V>>> wv1,
WindowedValue<KV<K, Iterable<V>>> wv2) {
return WindowedValue.of(KV.of(wv1.getValue().getKey(),
public KV<org.joda.time.Instant, WindowedValue<KV<K, Iterable<V>>>> apply(
KV<org.joda.time.Instant, WindowedValue<KV<K, Iterable<V>>>> kv1,
KV<org.joda.time.Instant, WindowedValue<KV<K, Iterable<V>>>> kv2) {
org.joda.time.Instant t1 = kv1.getKey();
org.joda.time.Instant t2 = kv2.getKey();

WindowedValue<KV<K, Iterable<V>>> wv1 = kv1.getValue();
WindowedValue<KV<K, Iterable<V>>> wv2 = kv2.getValue();

return KV.of(outputTimeFn.combine(t1, t2),
WindowedValue.of(KV.of(wv1.getValue().getKey(),
Iterables.concat(wv1.getValue().getValue(), wv2.getValue().getValue())),
wv1.getTimestamp(), wv1.getWindows(), wv1.getPane());
wv1.getTimestamp(), wv1.getWindows(), wv1.getPane()));
}
}

private static class Values<K, V> extends
MapFunction<KV<org.joda.time.Instant, WindowedValue<KV<K, Iterable<V>>>>,
WindowedValue<KV<K, Iterable<V>>>> {

@Override
public WindowedValue<KV<K, Iterable<V>>> apply(KV<org.joda.time.Instant,
WindowedValue<KV<K, Iterable<V>>>> kv) {
org.joda.time.Instant timestamp = kv.getKey();
WindowedValue<KV<K, Iterable<V>>> wv = kv.getValue();
return WindowedValue.of(wv.getValue(), timestamp, wv.getWindows(), wv.getPane());
}
}
}

0 comments on commit d6ef12c

Please sign in to comment.