From 6eab5c9465bda3da4d8a1ea9f73a74e9c8faec85 Mon Sep 17 00:00:00 2001 From: chinmaykolhatkar Date: Wed, 1 Mar 2017 16:59:46 +0530 Subject: [PATCH 1/2] [BEAM-831] ParDo Fusion of Apex Runner --- .../runners/apex/ApexPipelineOptions.java | 5 + .../apex/translation/TranslationContext.java | 103 ++++++++++++++++-- 2 files changed, 97 insertions(+), 11 deletions(-) diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexPipelineOptions.java b/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexPipelineOptions.java index f37e87456fbb..92f6e8f03eeb 100644 --- a/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexPipelineOptions.java +++ b/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexPipelineOptions.java @@ -62,4 +62,9 @@ public interface ApexPipelineOptions extends PipelineOptions, java.io.Serializab @Default.String("classpath:/beam-runners-apex.properties") String getConfigFile(); + @Description("configure whether to perform ParDo fusion") + void setParDoFusionEnabled(boolean enabled); + + @Default.Boolean(true) + boolean isParDoFusionEnabled(); } diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/TranslationContext.java b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/TranslationContext.java index 9c20449d07e0..1224e254e801 100644 --- a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/TranslationContext.java +++ b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/TranslationContext.java @@ -37,6 +37,8 @@ import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.runners.AppliedPTransform; import org.apache.beam.sdk.runners.TransformHierarchy; +import org.apache.beam.sdk.transforms.PTransform; +import org.apache.beam.sdk.transforms.ParDo; import org.apache.beam.sdk.util.WindowedValue.FullWindowedValueCoder; import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.PCollectionView; @@ -54,7 +56,8 @@ class TranslationContext { private final ApexPipelineOptions pipelineOptions; private AppliedPTransform currentTransform; - private final Map, List>>> streams = new HashMap<>(); + private final Map>> streams = + new HashMap<>(); private final Map operators = new HashMap<>(); private final Map, PInput> viewInputs = new HashMap<>(); private Map aliasCollections = new HashMap<>(); @@ -122,8 +125,10 @@ public void addOperator(Operator operator, Map, OutputPort> po addOperator(operator, portEntry.getValue(), portEntry.getKey()); first = false; } else { - this.streams.put(portEntry.getKey(), (Pair) new ImmutablePair<>(portEntry.getValue(), - new ArrayList<>())); + this.streams.put(portEntry.getKey(), + (Pair) new ImmutablePair<>(new OutputPortInfo(portEntry.getValue(), + getCurrentTransform()), + new ArrayList<>())); } } } @@ -142,16 +147,19 @@ public void addOperator(Operator operator, OutputPort port, PCollection output) name = getCurrentTransform().getFullName() + i; } this.operators.put(name, operator); - this.streams.put(output, (Pair) new ImmutablePair<>(port, new ArrayList<>())); + this.streams.put(output, (Pair) new ImmutablePair<>( + new OutputPortInfo(port, getCurrentTransform()), + new ArrayList<>())); } public void addStream(PInput input, InputPort inputPort) { while (aliasCollections.containsKey(input)) { input = aliasCollections.get(input); } - Pair, List>> stream = this.streams.get(input); - checkArgument(stream != null, "no upstream operator defined for %s", input); - stream.getRight().add(inputPort); + + Pair> stream = this.streams.get(input); + checkArgument(stream != null, "no upstream operator defined for " + input); + stream.getRight().add(new InputPortInfo(inputPort, getCurrentTransform())); } /** @@ -168,13 +176,23 @@ public void populateDAG(DAG dag) { for (Map.Entry nameAndOperator : this.operators.entrySet()) { dag.addOperator(nameAndOperator.getKey(), nameAndOperator.getValue()); } + int streamIndex = 0; - for (Map.Entry, List>>> streamEntry : this. + for (Map.Entry>> streamEntry : this. streams.entrySet()) { - List> sinksList = streamEntry.getValue().getRight(); - InputPort[] sinks = sinksList.toArray(new InputPort[sinksList.size()]); + List destInfo = streamEntry.getValue().getRight(); + InputPort[] sinks = new InputPort[destInfo.size()]; + for (int i = 0; i < sinks.length; i++) { + sinks[i] = destInfo.get(i).port; + } + if (sinks.length > 0) { - dag.addStream("stream" + streamIndex++, streamEntry.getValue().getLeft(), sinks); + DAG.StreamMeta streamMeta = dag.addStream("stream" + streamIndex++, + streamEntry.getValue().getLeft().port, sinks); + if (pipelineOptions.isParDoFusionEnabled()) { + optimizeStreams(streamMeta, streamEntry); + } + for (InputPort port : sinks) { PCollection pc = streamEntry.getKey(); Coder coder = pc.getCoder(); @@ -191,6 +209,49 @@ public void populateDAG(DAG dag) { } } + private void optimizeStreams(DAG.StreamMeta streamMeta, + Map.Entry>> streamEntry) { + DAG.Locality loc = null; + + List sinks = streamEntry.getValue().getRight(); + OutputPortInfo source = streamEntry.getValue().getLeft(); + PTransform sourceTransform = source.transform.getTransform(); + if (sourceTransform instanceof ParDo.Bound + || sourceTransform instanceof ParDo.BoundMulti) { + // Source is ParDo.. Check sink(s) + for (InputPortInfo sink : sinks) { + PTransform transform = sink.transform.getTransform(); + if (transform instanceof ParDo.Bound) { + ParDo.Bound t = (ParDo.Bound) transform; + if (t.getSideInputs().size() > 0) { + loc = DAG.Locality.CONTAINER_LOCAL; + break; + } else { + loc = DAG.Locality.THREAD_LOCAL; + } + } else if (transform instanceof ParDo.BoundMulti) { + ParDo.BoundMulti t = (ParDo.BoundMulti) transform; + if (t.getSideInputs().size() > 0) { + loc = DAG.Locality.CONTAINER_LOCAL; + break; + } else { + loc = DAG.Locality.THREAD_LOCAL; + } + } else { + // Sink is not ParDo.. set null locality. + loc = null; + break; + } + } + } else { + // Source is not ParDo... set null locality + loc = null; + } + + streamMeta.setLocality(loc); + } + /** * Return the state backend for the pipeline translation. * @return @@ -198,4 +259,24 @@ public void populateDAG(DAG dag) { public ApexStateBackend getStateBackend() { return new ApexStateInternals.ApexStateBackend(); } + + static class InputPortInfo { + InputPort port; + AppliedPTransform transform; + + public InputPortInfo(InputPort port, AppliedPTransform transform) { + this.port = port; + this.transform = transform; + } + } + + static class OutputPortInfo { + OutputPort port; + AppliedPTransform transform; + + public OutputPortInfo(OutputPort port, AppliedPTransform transform) { + this.port = port; + this.transform = transform; + } + } } From 3f5282d515fa53516fda6d0376cc912560fd6d85 Mon Sep 17 00:00:00 2001 From: Thomas Weise Date: Fri, 5 May 2017 06:45:34 -0700 Subject: [PATCH 2/2] [BEAM-831] Fix chaining, add test. closes #2216 --- .../apache/beam/runners/apex/ApexRunner.java | 11 +++-- .../beam/runners/apex/TestApexRunner.java | 8 ++++ .../apex/translation/TranslationContext.java | 26 ++++------ .../beam/runners/apex/ApexRunnerTest.java | 47 +++++++++++++++---- .../FlattenPCollectionTranslatorTest.java | 13 ++--- 5 files changed, 67 insertions(+), 38 deletions(-) diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexRunner.java b/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexRunner.java index a50e10ef53d8..366308ec6642 100644 --- a/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexRunner.java +++ b/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexRunner.java @@ -70,11 +70,11 @@ * pipeline to an Apex DAG and executes it on an Apex cluster. * */ -@SuppressWarnings({"rawtypes", "unchecked"}) public class ApexRunner extends PipelineRunner { private final ApexPipelineOptions options; public static final String CLASSPATH_SCHEME = "classpath"; + protected boolean translateOnly = false; /** * TODO: this isn't thread safe and may cause issues when tests run in parallel @@ -93,6 +93,7 @@ public static ApexRunner fromOptions(PipelineOptions options) { return new ApexRunner(apexPipelineOptions); } + @SuppressWarnings({"rawtypes"}) private List getOverrides() { return ImmutableList.builder() .add( @@ -156,7 +157,7 @@ public void populateDAG(DAG dag, Configuration conf) { } if (options.isEmbeddedExecution()) { - Launcher launcher = Launcher.getLauncher(LaunchMode.EMBEDDED); + EmbeddedAppLauncher launcher = Launcher.getLauncher(LaunchMode.EMBEDDED); Attribute.AttributeMap launchAttributes = new Attribute.AttributeMap.DefaultAttributeMap(); launchAttributes.put(EmbeddedAppLauncher.RUN_ASYNC, true); if (options.isEmbeddedExecutionDebugMode()) { @@ -166,11 +167,15 @@ public void populateDAG(DAG dag, Configuration conf) { Configuration conf = new Configuration(false); ApexYarnLauncher.addProperties(conf, configProperties); try { + if (translateOnly) { + launcher.prepareDAG(apexApp, conf); + return new ApexRunnerResult(launcher.getDAG(), null); + } ApexRunner.ASSERTION_ERROR.set(null); AppHandle apexAppResult = launcher.launchApp(apexApp, conf, launchAttributes); return new ApexRunnerResult(apexDAG.get(), apexAppResult); } catch (Exception e) { - Throwables.propagateIfPossible(e); + Throwables.throwIfUnchecked(e); throw new RuntimeException(e); } } else { diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/TestApexRunner.java b/runners/apex/src/main/java/org/apache/beam/runners/apex/TestApexRunner.java index e068db086442..b68d3da5891b 100644 --- a/runners/apex/src/main/java/org/apache/beam/runners/apex/TestApexRunner.java +++ b/runners/apex/src/main/java/org/apache/beam/runners/apex/TestApexRunner.java @@ -17,6 +17,7 @@ */ package org.apache.beam.runners.apex; +import com.datatorrent.api.DAG; import java.io.IOException; import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.PipelineRunner; @@ -44,6 +45,13 @@ public static TestApexRunner fromOptions(PipelineOptions options) { return new TestApexRunner(apexOptions); } + public static DAG translate(Pipeline pipeline, ApexPipelineOptions options) { + ApexRunner delegate = new ApexRunner(options); + delegate.translateOnly = true; + DAG dag = delegate.run(pipeline).getApexDAG(); + return dag; + } + @Override public ApexRunnerResult run(Pipeline pipeline) { ApexRunnerResult result = delegate.run(pipeline); diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/TranslationContext.java b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/TranslationContext.java index 1224e254e801..a5e30281a9e3 100644 --- a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/TranslationContext.java +++ b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/TranslationContext.java @@ -39,6 +39,7 @@ import org.apache.beam.sdk.runners.TransformHierarchy; import org.apache.beam.sdk.transforms.PTransform; import org.apache.beam.sdk.transforms.ParDo; +import org.apache.beam.sdk.transforms.windowing.Window; import org.apache.beam.sdk.util.WindowedValue.FullWindowedValueCoder; import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.PCollectionView; @@ -217,36 +218,27 @@ private void optimizeStreams(DAG.StreamMeta streamMeta, List sinks = streamEntry.getValue().getRight(); OutputPortInfo source = streamEntry.getValue().getLeft(); PTransform sourceTransform = source.transform.getTransform(); - if (sourceTransform instanceof ParDo.Bound - || sourceTransform instanceof ParDo.BoundMulti) { - // Source is ParDo.. Check sink(s) + if (sourceTransform instanceof ParDo.MultiOutput + || sourceTransform instanceof Window.Assign) { + // source qualifies for chaining, check sink(s) for (InputPortInfo sink : sinks) { PTransform transform = sink.transform.getTransform(); - if (transform instanceof ParDo.Bound) { - ParDo.Bound t = (ParDo.Bound) transform; - if (t.getSideInputs().size() > 0) { - loc = DAG.Locality.CONTAINER_LOCAL; - break; - } else { - loc = DAG.Locality.THREAD_LOCAL; - } - } else if (transform instanceof ParDo.BoundMulti) { - ParDo.BoundMulti t = (ParDo.BoundMulti) transform; + if (transform instanceof ParDo.MultiOutput) { + ParDo.MultiOutput t = (ParDo.MultiOutput) transform; if (t.getSideInputs().size() > 0) { loc = DAG.Locality.CONTAINER_LOCAL; break; } else { loc = DAG.Locality.THREAD_LOCAL; } + } else if (transform instanceof Window.Assign) { + loc = DAG.Locality.THREAD_LOCAL; } else { - // Sink is not ParDo.. set null locality. + // cannot chain, if there is any other sink loc = null; break; } } - } else { - // Source is not ParDo... set null locality - loc = null; } streamMeta.setLocality(loc); diff --git a/runners/apex/src/test/java/org/apache/beam/runners/apex/ApexRunnerTest.java b/runners/apex/src/test/java/org/apache/beam/runners/apex/ApexRunnerTest.java index e9e9a5b5c6a8..c5521d1c7001 100644 --- a/runners/apex/src/test/java/org/apache/beam/runners/apex/ApexRunnerTest.java +++ b/runners/apex/src/test/java/org/apache/beam/runners/apex/ApexRunnerTest.java @@ -18,15 +18,23 @@ package org.apache.beam.runners.apex; import com.datatorrent.api.DAG; +import com.datatorrent.api.DAG.Locality; import com.datatorrent.api.DAG.OperatorMeta; import com.datatorrent.stram.engine.OperatorContext; +import com.google.common.collect.Sets; import java.io.File; import java.io.FileOutputStream; import java.util.Properties; +import java.util.Set; import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.coders.VoidCoder; +import org.apache.beam.sdk.io.GenerateSequence; import org.apache.beam.sdk.options.PipelineOptionsFactory; +import org.apache.beam.sdk.testing.PAssert; +import org.apache.beam.sdk.transforms.Count; import org.apache.beam.sdk.transforms.Create; +import org.apache.beam.sdk.values.PCollection; +import org.hamcrest.Matchers; import org.junit.Assert; import org.junit.Test; @@ -41,16 +49,13 @@ public void testConfigProperties() throws Exception { String operName = "testProperties"; ApexPipelineOptions options = PipelineOptionsFactory.create() .as(ApexPipelineOptions.class); - options.setRunner(ApexRunner.class); // default configuration from class path - Pipeline p = Pipeline.create(options); + Pipeline p = Pipeline.create(); Create.Values empty = Create.empty(VoidCoder.of()); p.apply(operName, empty); - ApexRunnerResult result = (ApexRunnerResult) p.run(); - result.cancel(); - DAG dag = result.getApexDAG(); + DAG dag = TestApexRunner.translate(p, options); OperatorMeta t1Meta = dag.getOperatorMeta(operName); Assert.assertNotNull(t1Meta); Assert.assertEquals(new Integer(32), t1Meta.getValue(OperatorContext.MEMORY_MB)); @@ -63,14 +68,40 @@ public void testConfigProperties() throws Exception { props.store(fos, ""); } options.setConfigFile(tmp.getAbsolutePath()); - result = (ApexRunnerResult) p.run(); - result.cancel(); + dag = TestApexRunner.translate(p, options); tmp.delete(); - dag = result.getApexDAG(); + t1Meta = dag.getOperatorMeta(operName); Assert.assertNotNull(t1Meta); Assert.assertEquals(new Integer(64), t1Meta.getValue(OperatorContext.MEMORY_MB)); } + @Test + public void testParDoChaining() throws Exception { + Pipeline p = Pipeline.create(); + long numElements = 1000; + PCollection input = p.apply(GenerateSequence.from(0).to(numElements)); + PAssert.thatSingleton(input.apply("Count", Count.globally())).isEqualTo(numElements); + + ApexPipelineOptions options = PipelineOptionsFactory.as(ApexPipelineOptions.class); + DAG dag = TestApexRunner.translate(p, options); + + String[] expectedThreadLocal = { "/CreateActual/FilterActuals/Window.Assign" }; + Set actualThreadLocal = Sets.newHashSet(); + for (DAG.StreamMeta sm : dag.getAllStreamsMeta()) { + DAG.OutputPortMeta opm = sm.getSource(); + if (sm.getLocality() == Locality.THREAD_LOCAL) { + String name = opm.getOperatorMeta().getName(); + String prefix = "PAssert$"; + if (name.startsWith(prefix)) { + // remove indeterministic prefix + name = name.substring(prefix.length() + 1); + } + actualThreadLocal.add(name); + } + } + Assert.assertThat(actualThreadLocal, Matchers.hasItems(expectedThreadLocal)); + } + } diff --git a/runners/apex/src/test/java/org/apache/beam/runners/apex/translation/FlattenPCollectionTranslatorTest.java b/runners/apex/src/test/java/org/apache/beam/runners/apex/translation/FlattenPCollectionTranslatorTest.java index 64ca0ee4fd07..929778a529a6 100644 --- a/runners/apex/src/test/java/org/apache/beam/runners/apex/translation/FlattenPCollectionTranslatorTest.java +++ b/runners/apex/src/test/java/org/apache/beam/runners/apex/translation/FlattenPCollectionTranslatorTest.java @@ -26,12 +26,10 @@ import java.util.Collections; import java.util.List; import java.util.Set; -import org.apache.apex.api.EmbeddedAppLauncher; -import org.apache.apex.api.Launcher; -import org.apache.apex.api.Launcher.LaunchMode; import org.apache.beam.runners.apex.ApexPipelineOptions; import org.apache.beam.runners.apex.ApexRunner; import org.apache.beam.runners.apex.ApexRunnerResult; +import org.apache.beam.runners.apex.TestApexRunner; import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.coders.StringUtf8Coder; import org.apache.beam.sdk.options.PipelineOptionsFactory; @@ -100,16 +98,11 @@ public void processElement(ProcessContext c) throws Exception { @Test public void testFlattenSingleCollection() { ApexPipelineOptions options = PipelineOptionsFactory.as(ApexPipelineOptions.class); - options.setRunner(ApexRunner.class); - ApexPipelineTranslator translator = new ApexPipelineTranslator(options); - EmbeddedAppLauncher launcher = Launcher.getLauncher(LaunchMode.EMBEDDED); - DAG dag = launcher.getDAG(); - - Pipeline p = Pipeline.create(options); + Pipeline p = Pipeline.create(); PCollection single = p.apply(Create.of(Collections.singletonList("1"))); PCollectionList.of(single).apply(Flatten.pCollections()) .apply(ParDo.of(new EmbeddedCollector())); - translator.translate(p, dag); + DAG dag = TestApexRunner.translate(p, options); Assert.assertNotNull( dag.getOperatorMeta("ParDo(EmbeddedCollector)/ParMultiDo(EmbeddedCollector)")); }