From eb3d84a0ee929709607478be466e732eb0c68646 Mon Sep 17 00:00:00 2001 From: Aviem Zur Date: Sat, 4 Mar 2017 23:16:24 +0200 Subject: [PATCH 1/2] [BEAM-797] A PipelineVisitor that creates a Spark-native pipeline. --- runners/spark/pom.xml | 5 + .../spark/SparkNativePipelineVisitor.java | 202 ++++++++++++++++++ .../beam/runners/spark/SparkRunner.java | 16 +- .../runners/spark/SparkRunnerDebugger.java | 121 +++++++++++ .../spark/translation/TransformEvaluator.java | 1 + .../translation/TransformTranslator.java | 105 +++++++++ .../StreamingTransformTranslator.java | 53 ++++- .../spark/SparkRunnerDebuggerTest.java | 196 +++++++++++++++++ 8 files changed, 689 insertions(+), 10 deletions(-) create mode 100644 runners/spark/src/main/java/org/apache/beam/runners/spark/SparkNativePipelineVisitor.java create mode 100644 runners/spark/src/main/java/org/apache/beam/runners/spark/SparkRunnerDebugger.java create mode 100644 runners/spark/src/test/java/org/apache/beam/runners/spark/SparkRunnerDebuggerTest.java diff --git a/runners/spark/pom.xml b/runners/spark/pom.xml index ebd987d37a94..a33082087a28 100644 --- a/runners/spark/pom.xml +++ b/runners/spark/pom.xml @@ -195,6 +195,11 @@ 1.9 provided + + commons-lang + commons-lang + 2.6 + commons-io commons-io diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkNativePipelineVisitor.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkNativePipelineVisitor.java new file mode 100644 index 000000000000..056da97bbfa1 --- /dev/null +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkNativePipelineVisitor.java @@ -0,0 +1,202 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.beam.runners.spark; + +import com.google.common.base.Joiner; +import com.google.common.base.Optional; +import com.google.common.base.Predicate; +import com.google.common.collect.Iterables; +import com.google.common.collect.Lists; +import java.lang.reflect.Field; +import java.lang.reflect.InvocationTargetException; +import java.util.ArrayList; +import java.util.List; +import org.apache.beam.runners.spark.metrics.MetricsAccumulator; +import org.apache.beam.runners.spark.translation.EvaluationContext; +import org.apache.beam.runners.spark.translation.SparkPipelineTranslator; +import org.apache.beam.runners.spark.translation.TransformEvaluator; +import org.apache.beam.runners.spark.translation.streaming.Checkpoint; +import org.apache.beam.sdk.io.Read; +import org.apache.beam.sdk.runners.TransformHierarchy; +import org.apache.beam.sdk.transforms.MapElements; +import org.apache.beam.sdk.transforms.PTransform; +import org.apache.beam.sdk.values.PInput; +import org.apache.beam.sdk.values.POutput; +import org.apache.commons.lang.WordUtils; + + +/** + * Pipeline visitor for translating a Beam pipeline into equivalent Spark operations. + * Used for debugging purposes using {@link SparkRunnerDebugger}. + */ +public class SparkNativePipelineVisitor extends SparkRunner.Evaluator { + private final List transforms; + private final List knownCompositesPackages = + Lists.newArrayList( + "org.apache.beam.sdk.transforms", + "org.apache.beam.runners.spark.examples"); + + SparkNativePipelineVisitor(SparkPipelineTranslator translator, EvaluationContext ctxt) { + super(translator, ctxt); + this.transforms = new ArrayList<>(); + MetricsAccumulator.init(ctxt.getSparkContext(), Optional.absent()); + } + + @Override + public CompositeBehavior enterCompositeTransform(TransformHierarchy.Node node) { + CompositeBehavior compositeBehavior = super.enterCompositeTransform(node); + PTransform transform = node.getTransform(); + if (transform != null) { + @SuppressWarnings("unchecked") + final Class> transformClass = (Class>) transform.getClass(); + if (compositeBehavior == CompositeBehavior.ENTER_TRANSFORM + && !knownComposite(transformClass) + && shouldDebug(node)) { + transforms.add(new NativeTransform(node, null, transform, true)); + } + } + return compositeBehavior; + } + + private boolean knownComposite(Class> transform) { + String transformPackage = transform.getPackage().getName(); + for (String knownCompositePackage : knownCompositesPackages) { + if (transformPackage.startsWith(knownCompositePackage)) { + return true; + } + } + return false; + } + + private boolean shouldDebug(final TransformHierarchy.Node node) { + return node == null || !Iterables.any(transforms, new Predicate() { + @Override + public boolean apply(NativeTransform debugTransform) { + return debugTransform.getNode().equals(node) && debugTransform.isComposite(); + } + }) && shouldDebug(node.getEnclosingNode()); + } + + @Override + > void + doVisitTransform(TransformHierarchy.Node node) { + super.doVisitTransform(node); + @SuppressWarnings("unchecked") + TransformT transform = (TransformT) node.getTransform(); + @SuppressWarnings("unchecked") + Class transformClass = (Class) transform.getClass(); + @SuppressWarnings("unchecked") + TransformEvaluator evaluator = translate(node, transform, transformClass); + if (shouldDebug(node)) { + transforms.add(new NativeTransform(node, evaluator, transform, false)); + } + } + + String getDebugString() { + return Joiner.on("\n").join(transforms); + } + + private static class NativeTransform { + private final TransformHierarchy.Node node; + private final TransformEvaluator transformEvaluator; + private final PTransform transform; + private final boolean composite; + + NativeTransform( + TransformHierarchy.Node node, + TransformEvaluator transformEvaluator, + PTransform transform, + boolean composite) { + this.node = node; + this.transformEvaluator = transformEvaluator; + this.transform = transform; + this.composite = composite; + } + + TransformHierarchy.Node getNode() { + return node; + } + + boolean isComposite() { + return composite; + } + + @Override + public String toString() { + try { + Class transformClass = transform.getClass(); + if (node.getFullName().equals("KafkaIO.Read")) { + return "KafkaUtils.createDirectStream(...)"; + } + if (composite) { + return "_.<" + transformClass.getName() + ">"; + } + String transformString = transformEvaluator.toNativeString(); + if (transformString.contains("")) { + transformString = replaceFnString(transformClass, transformString, "fn"); + } else if (transformString.contains("")) { + transformString = replaceFnString(transformClass, transformString, "windowFn"); + } else if (transformString.contains("")) { + String sourceName = "..."; + if (transform instanceof Read.Bounded) { + sourceName = ((Read.Bounded) transform).getSource().getClass().getName(); + } else if (transform instanceof Read.Unbounded) { + sourceName = ((Read.Unbounded) transform).getSource().getClass().getName(); + } + transformString = transformString.replace("", sourceName); + } + if (transformString.startsWith("sparkContext") + || transformString.startsWith("streamingContext")) { + return transformString; + } + return "_." + transformString; + } catch ( + NoSuchMethodException + | InvocationTargetException + | IllegalAccessException + | NoSuchFieldException e) { + return ""; + } + } + + private String replaceFnString( + Class transformClass, + String transformString, + String fnFieldName) + throws IllegalAccessException, InvocationTargetException, NoSuchMethodException, + NoSuchFieldException { + Object fn = + transformClass.getMethod("get" + WordUtils.capitalize(fnFieldName)).invoke(transform); + Class fnClass = fn.getClass(); + String doFnName; + Class enclosingClass = fnClass.getEnclosingClass(); + if (enclosingClass != null && enclosingClass.equals(MapElements.class)) { + Field parent = fnClass.getDeclaredField("this$0"); + parent.setAccessible(true); + Field fnField = enclosingClass.getDeclaredField(fnFieldName); + fnField.setAccessible(true); + doFnName = fnField.get(parent.get(fn)).getClass().getName(); + } else { + doFnName = fnClass.getName(); + } + transformString = transformString.replace("<" + fnFieldName + ">", doFnName); + return transformString; + } + } +} diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkRunner.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkRunner.java index 3f002da74497..a706f005ddaf 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkRunner.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkRunner.java @@ -297,11 +297,12 @@ public void visitPrimitiveTransform(TransformHierarchy.Node node) { /** * Evaluator on the pipeline. */ + @SuppressWarnings("WeakerAccess") public static class Evaluator extends Pipeline.PipelineVisitor.Defaults { private static final Logger LOG = LoggerFactory.getLogger(Evaluator.class); - private final EvaluationContext ctxt; - private final SparkPipelineTranslator translator; + protected final EvaluationContext ctxt; + protected final SparkPipelineTranslator translator; public Evaluator(SparkPipelineTranslator translator, EvaluationContext ctxt) { this.translator = translator; @@ -324,7 +325,7 @@ public CompositeBehavior enterCompositeTransform(TransformHierarchy.Node node) { return CompositeBehavior.ENTER_TRANSFORM; } - private boolean shouldDefer(TransformHierarchy.Node node) { + protected boolean shouldDefer(TransformHierarchy.Node node) { // if the input is not a PCollection, or it is but with non merging windows, don't defer. if (node.getInputs().size() != 1) { return false; @@ -361,7 +362,7 @@ public void visitPrimitiveTransform(TransformHierarchy.Node node) { } > void - doVisitTransform(TransformHierarchy.Node node) { + doVisitTransform(TransformHierarchy.Node node) { @SuppressWarnings("unchecked") TransformT transform = (TransformT) node.getTransform(); @SuppressWarnings("unchecked") @@ -379,8 +380,8 @@ public void visitPrimitiveTransform(TransformHierarchy.Node node) { * Determine if this Node belongs to a Bounded branch of the pipeline, or Unbounded, and * translate with the proper translator. */ - private > - TransformEvaluator translate( + protected > + TransformEvaluator translate( TransformHierarchy.Node node, TransformT transform, Class transformClass) { //--- determine if node is bounded/unbounded. // usually, the input determines if the PCollection to apply the next transformation to @@ -400,7 +401,7 @@ TransformEvaluator translate( : translator.translateUnbounded(transformClass); } - private PCollection.IsBounded isBoundedCollection(Collection pValues) { + protected PCollection.IsBounded isBoundedCollection(Collection pValues) { // anything that is not a PCollection, is BOUNDED. // For PCollections: // BOUNDED behaves as the Identity Element, BOUNDED + BOUNDED = BOUNDED @@ -417,4 +418,3 @@ private PCollection.IsBounded isBoundedCollection(Collection pValu } } } - diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkRunnerDebugger.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkRunnerDebugger.java new file mode 100644 index 000000000000..395acfffa479 --- /dev/null +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkRunnerDebugger.java @@ -0,0 +1,121 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.beam.runners.spark; + +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeoutException; +import org.apache.beam.runners.spark.translation.EvaluationContext; +import org.apache.beam.runners.spark.translation.SparkPipelineTranslator; +import org.apache.beam.runners.spark.translation.TransformTranslator; +import org.apache.beam.runners.spark.translation.streaming.StreamingTransformTranslator; +import org.apache.beam.sdk.Pipeline; +import org.apache.beam.sdk.options.PipelineOptions; +import org.apache.beam.sdk.runners.PipelineRunner; +import org.apache.spark.api.java.JavaSparkContext; +import org.apache.spark.streaming.api.java.JavaStreamingContext; +import org.joda.time.Duration; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + + +/** + * Pipeline runner which translates a Beam pipeline into equivalent Spark operations, without + * running them. Used for debugging purposes. + * + *

Example:

+ * + *
{@code
+ * SparkPipelineOptions options = PipelineOptionsFactory.as(SparkPipelineOptions.class);
+ * options.setRunner(SparkRunnerDebugger.class);
+ * Pipeline pipeline = Pipeline.create(options);
+ * SparkRunnerDebugger.DebugSparkPipelineResult result =
+ *     (SparkRunnerDebugger.DebugSparkPipelineResult) pipeline.run();
+ * String sparkPipeline = result.getDebugString();
+ * }
+ */ +public final class SparkRunnerDebugger extends PipelineRunner { + + private static final Logger LOG = LoggerFactory.getLogger(SparkRunnerDebugger.class); + + private SparkRunnerDebugger() {} + + @SuppressWarnings("unused") + public static SparkRunnerDebugger fromOptions(PipelineOptions options) { + return new SparkRunnerDebugger(); + } + + @Override + public SparkPipelineResult run(Pipeline pipeline) { + SparkPipelineResult result; + + SparkPipelineOptions options = (SparkPipelineOptions) pipeline.getOptions(); + + JavaSparkContext jsc = new JavaSparkContext("local[1]", "Debug_Pipeline"); + JavaStreamingContext jssc = + new JavaStreamingContext(jsc, new org.apache.spark.streaming.Duration(1000)); + TransformTranslator.Translator translator = new TransformTranslator.Translator(); + SparkNativePipelineVisitor visitor; + if (options.isStreaming() + || options instanceof TestSparkPipelineOptions + && ((TestSparkPipelineOptions) options).isForceStreaming()) { + SparkPipelineTranslator streamingTranslator = + new StreamingTransformTranslator.Translator(translator); + EvaluationContext ctxt = new EvaluationContext(jsc, pipeline, jssc); + visitor = new SparkNativePipelineVisitor(streamingTranslator, ctxt); + } else { + EvaluationContext ctxt = new EvaluationContext(jsc, pipeline, jssc); + visitor = new SparkNativePipelineVisitor(translator, ctxt); + } + pipeline.traverseTopologically(visitor); + jsc.stop(); + String debugString = visitor.getDebugString(); + LOG.info("Translated Native Spark pipeline:\n" + debugString); + return new DebugSparkPipelineResult(debugString); + } + + /** + * PipelineResult of running a {@link Pipeline} using {@link SparkRunnerDebugger} + * Use {@link #getDebugString} to get a {@link String} representation of the {@link Pipeline} + * translated into Spark native operations. + */ + public static class DebugSparkPipelineResult extends SparkPipelineResult { + private final String debugString; + + DebugSparkPipelineResult(String debugString) { + super(null, null); + this.debugString = debugString; + } + + /** + * Returns Beam pipeline translated into Spark native operations. + */ + String getDebugString() { + return debugString; + } + + @Override protected void stop() { + // Empty implementation + } + + @Override protected State awaitTermination(Duration duration) + throws TimeoutException, ExecutionException, InterruptedException { + return State.DONE; + } + } +} diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TransformEvaluator.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TransformEvaluator.java index fbfa84d76b50..585b93304bde 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TransformEvaluator.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TransformEvaluator.java @@ -26,4 +26,5 @@ */ public interface TransformEvaluator> extends Serializable { void evaluate(TransformT transform, EvaluationContext context); + String toNativeString(); } diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TransformTranslator.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TransformTranslator.java index 725d157b4659..44b403928ead 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TransformTranslator.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TransformTranslator.java @@ -121,6 +121,11 @@ public void evaluate(Flatten.PCollections transform, EvaluationContext contex } context.putDataset(transform, new BoundedDataset<>(unionRDD)); } + + @Override + public String toNativeString() { + return "sparkContext.union(...)"; + } }; } @@ -162,6 +167,11 @@ public void evaluate(GroupByKey transform, EvaluationContext context) { context.putDataset(transform, new BoundedDataset<>(groupedAlsoByWindow)); } + + @Override + public String toNativeString() { + return "groupByKey()"; + } }; } @@ -201,6 +211,11 @@ public WindowedValue> call( }); context.putDataset(transform, new BoundedDataset<>(outRDD)); } + + @Override + public String toNativeString() { + return "map(new ())"; + } }; } @@ -267,6 +282,11 @@ public void evaluate( } context.putDataset(transform, new BoundedDataset<>(outRdd)); } + + @Override + public String toNativeString () { + return "aggregate(..., new (), ...)"; + } }; } @@ -321,6 +341,11 @@ public Iterable> call( context.putDataset(transform, new BoundedDataset<>(outRdd)); } + + @Override + public String toNativeString() { + return "combineByKey(..., new (), ...)"; + } }; } @@ -348,6 +373,11 @@ public void evaluate(ParDo.Bound transform, EvaluationContext c new BoundedDataset<>(inRDD.mapPartitions(new DoFnFunction<>(aggAccum, metricsAccum, stepName, doFn, context.getRuntimeContext(), sideInputs, windowingStrategy)))); } + + @Override + public String toNativeString() { + return "mapPartitions(new ())"; + } }; } @@ -388,6 +418,11 @@ public void evaluate(ParDo.BoundMulti transform, EvaluationCont context.putDataset(e.getValue(), new BoundedDataset<>(values)); } } + + @Override + public String toNativeString() { + return "mapPartitions(new ())"; + } }; } @@ -401,6 +436,11 @@ public void evaluate(TextIO.Read.Bound transform, EvaluationContext context) { .map(WindowingHelpers.windowFunction()); context.putDataset(transform, new BoundedDataset<>(rdd)); } + + @Override + public String toNativeString() { + return "sparkContext.textFile(...)"; + } }; } @@ -426,6 +466,11 @@ public Tuple2 call(T t) throws Exception { writeHadoopFile(last, new Configuration(), shardTemplateInfo, Text.class, NullWritable.class, TemplatedTextOutputFormat.class); } + + @Override + public String toNativeString() { + return "saveAsNewAPIHadoopFile(...)"; + } }; } @@ -450,6 +495,11 @@ public T call(AvroKey key) { }).map(WindowingHelpers.windowFunction()); context.putDataset(transform, new BoundedDataset<>(rdd)); } + + @Override + public String toNativeString() { + return "sparkContext.newAPIHadoopFile(...)"; + } }; } @@ -481,6 +531,11 @@ public Tuple2, NullWritable> call(T t) throws Exception { writeHadoopFile(last, job.getConfiguration(), shardTemplateInfo, AvroKey.class, NullWritable.class, TemplatedAvroKeyOutputFormat.class); } + + @Override + public String toNativeString() { + return "mapToPair().saveAsNewAPIHadoopFile(...)"; + } }; } @@ -496,6 +551,11 @@ public void evaluate(Read.Bounded transform, EvaluationContext context) { // cache to avoid re-evaluation of the source by Spark's lazy DAG evaluation. context.putDataset(transform, new BoundedDataset<>(input.cache())); } + + @Override + public String toNativeString() { + return "sparkContext.)>()"; + } }; } @@ -519,6 +579,11 @@ public KV call(Tuple2 t2) throws Exception { }).map(WindowingHelpers.>windowFunction()); context.putDataset(transform, new BoundedDataset<>(rdd)); } + + @Override + public String toNativeString() { + return "sparkContext.newAPIHadoopFile(...)"; + } }; } @@ -547,6 +612,11 @@ public Tuple2 call(KV t) throws Exception { writeHadoopFile(last, conf, shardTemplateInfo, transform.getKeyClass(), transform.getValueClass(), transform.getFormatClass()); } + + @Override + public String toNativeString() { + return "saveAsNewAPIHadoopFile(...)"; + } }; } @@ -619,6 +689,11 @@ public void evaluate(Window.Assign transform, EvaluationContext context) { inRDD.map(new SparkAssignWindowFn<>(transform.getWindowFn())))); } } + + @Override + public String toNativeString() { + return "map(new ())"; + } }; } @@ -632,6 +707,11 @@ public void evaluate(Create.Values transform, EvaluationContext context) { Coder coder = context.getOutput(transform).getCoder(); context.putBoundedDatasetFromValues(transform, elems, coder); } + + @Override + public String toNativeString() { + return "sparkContext.parallelize(Arrays.asList(...))"; + } }; } @@ -649,6 +729,11 @@ public void evaluate(View.AsSingleton transform, EvaluationContext context) { context.putPView(output, iterCast, coderInternal); } + + @Override + public String toNativeString() { + return "collect()"; + } }; } @@ -666,6 +751,11 @@ public void evaluate(View.AsIterable transform, EvaluationContext context) { context.putPView(output, iterCast, coderInternal); } + + @Override + public String toNativeString() { + return "collect()"; + } }; } @@ -685,6 +775,11 @@ public void evaluate(View.CreatePCollectionView transform, context.putPView(output, iterCast, coderInternal); } + + @Override + public String toNativeString() { + return ""; + } }; } @@ -706,6 +801,11 @@ public void evaluate(StorageLevelPTransform transform, EvaluationContext context context.putDataset(transform, new BoundedDataset(output)); } + + @Override + public String toNativeString() { + return "sparkContext.parallelize(rdd.getStorageLevel().description())"; + } }; } @@ -732,6 +832,11 @@ private static TransformEvaluator(reshuffled)); } + + @Override + public String toNativeString() { + return "repartition(...)"; + } }; } diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/StreamingTransformTranslator.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/StreamingTransformTranslator.java index ccf84b232c2e..8a05fbb07284 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/StreamingTransformTranslator.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/StreamingTransformTranslator.java @@ -96,7 +96,7 @@ /** * Supports translation between a Beam transform, and Spark's operations on DStreams. */ -final class StreamingTransformTranslator { +public final class StreamingTransformTranslator { private StreamingTransformTranslator() { } @@ -110,6 +110,11 @@ public void evaluate(ConsoleIO.Write.Unbound transform, EvaluationContext con ((UnboundedDataset) (context).borrowDataset(transform)).getDStream(); dstream.map(WindowingHelpers.unwindowFunction()).print(transform.getNum()); } + + @Override + public String toNativeString() { + return ".print(...)"; + } }; } @@ -124,6 +129,11 @@ public void evaluate(Read.Unbounded transform, EvaluationContext context) { context.getRuntimeContext(), transform.getSource())); } + + @Override + public String toNativeString() { + return "streamingContext.)>()"; + } }; } @@ -168,6 +178,11 @@ public WindowedValue apply(@Nonnull TimestampedValue timestampedValue) { ImmutableMap.of(unboundedDataset.getStreamSources().get(0), times)); context.putDataset(transform, unboundedDataset); } + + @Override + public String toNativeString() { + return "streamingContext.queueStream(...)"; + } }; } @@ -208,6 +223,11 @@ public void evaluate(Flatten.PCollections transform, EvaluationContext contex context.getStreamingContext().union(dStreams.remove(0), dStreams); context.putDataset(transform, new UnboundedDataset<>(unifiedStreams, streamingSources)); } + + @Override + public String toNativeString() { + return "streamingContext.union(...)"; + } }; } @@ -235,6 +255,11 @@ public JavaRDD> call(JavaRDD> rdd) throws Exce context.putDataset(transform, new UnboundedDataset<>(outputStream, unboundedDataset.getStreamSources())); } + + @Override + public String toNativeString() { + return "map(new ())"; + } }; } @@ -283,6 +308,11 @@ public JavaRDD>>>> call( context.putDataset(transform, new UnboundedDataset<>(outStream, streamSources)); } + + @Override + public String toNativeString() { + return "groupByKey()"; + } }; } @@ -329,6 +359,11 @@ public void evaluate(final Combine.GroupedValues transform, context.putDataset(transform, new UnboundedDataset<>(outStream, unboundedDataset.getStreamSources())); } + + @Override + public String toNativeString() { + return "map(new ())"; + } }; } @@ -375,6 +410,11 @@ public JavaRDD> call(JavaRDD> rdd) context.putDataset(transform, new UnboundedDataset<>(outStream, unboundedDataset.getStreamSources())); } + + @Override + public String toNativeString() { + return "mapPartitions(new ())"; + } }; } @@ -431,6 +471,11 @@ public JavaPairRDD, WindowedValue> call( new UnboundedDataset<>(values, unboundedDataset.getStreamSources())); } } + + @Override + public String toNativeString() { + return "mapPartitions(new ())"; + } }; } @@ -465,6 +510,10 @@ public JavaRDD>> call( context.putDataset(transform, new UnboundedDataset<>(reshuffledStream, streamSources)); } + + @Override public String toNativeString() { + return "repartition(...)"; + } }; } @@ -491,7 +540,7 @@ public static class Translator implements SparkPipelineTranslator { private final SparkPipelineTranslator batchTranslator; - Translator(SparkPipelineTranslator batchTranslator) { + public Translator(SparkPipelineTranslator batchTranslator) { this.batchTranslator = batchTranslator; } diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/SparkRunnerDebuggerTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/SparkRunnerDebuggerTest.java new file mode 100644 index 000000000000..e4cf049cac42 --- /dev/null +++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/SparkRunnerDebuggerTest.java @@ -0,0 +1,196 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.beam.runners.spark; + +import static org.junit.Assert.assertThat; + +import java.io.File; +import java.io.IOException; +import java.util.Collections; +import org.apache.beam.runners.spark.examples.WordCount; +import org.apache.beam.sdk.Pipeline; +import org.apache.beam.sdk.coders.KvCoder; +import org.apache.beam.sdk.coders.StringUtf8Coder; +import org.apache.beam.sdk.io.TextIO; +import org.apache.beam.sdk.io.kafka.KafkaIO; +import org.apache.beam.sdk.transforms.Combine; +import org.apache.beam.sdk.transforms.Create; +import org.apache.beam.sdk.transforms.Distinct; +import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.Flatten; +import org.apache.beam.sdk.transforms.GroupByKey; +import org.apache.beam.sdk.transforms.MapElements; +import org.apache.beam.sdk.transforms.ParDo; +import org.apache.beam.sdk.transforms.SerializableFunction; +import org.apache.beam.sdk.transforms.SimpleFunction; +import org.apache.beam.sdk.transforms.Sum; +import org.apache.beam.sdk.transforms.WithKeys; +import org.apache.beam.sdk.transforms.windowing.FixedWindows; +import org.apache.beam.sdk.transforms.windowing.Window; +import org.apache.beam.sdk.values.KV; +import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.PCollectionList; +import org.hamcrest.Matchers; +import org.joda.time.Duration; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; + + +/** + * Test {@link SparkRunnerDebugger} with different pipelines. + */ +public class SparkRunnerDebuggerTest { + + @Rule + public final PipelineRule batchPipelineRule = PipelineRule.batch(); + + @Rule + public final PipelineRule streamingPipelineRule = PipelineRule.streaming(); + + private File outputDir; + + private final TemporaryFolder tmpDir = new TemporaryFolder(); + + @SuppressWarnings("ResultOfMethodCallIgnored") + @Before + public void setUp() throws IOException { + tmpDir.create(); + outputDir = tmpDir.newFolder("out"); + outputDir.delete(); + } + + @Test + public void debugBatchPipeline() { + TestSparkPipelineOptions options = batchPipelineRule.getOptions(); + options.setRunner(SparkRunnerDebugger.class); + + Pipeline pipeline = Pipeline.create(options); + + PCollection lines = pipeline + .apply(Create.of(Collections.emptyList()).withCoder(StringUtf8Coder.of())); + + PCollection> wordCounts = lines + .apply(new WordCount.CountWords()); + + wordCounts + .apply(GroupByKey.create()) + .apply(Combine.groupedValues(Sum.ofLongs())); + + PCollection> wordCountsPlusOne = wordCounts + .apply(MapElements.via(new PlusOne())); + + PCollectionList.of(wordCounts).and(wordCountsPlusOne) + .apply(Flatten.>pCollections()); + + wordCounts + .apply(MapElements.via(new WordCount.FormatAsTextFn())) + .apply(TextIO.Write.to(outputDir.getAbsolutePath()).withNumShards(3).withSuffix(".txt")); + + final String expectedPipeline = "sparkContext.parallelize(Arrays.asList(...))\n" + + "_.mapPartitions(new org.apache.beam.runners.spark.examples.WordCount$ExtractWordsFn())\n" + + "_.mapPartitions(new org.apache.beam.sdk.transforms.Count$PerElement$1())\n" + + "_.combineByKey(..., new org.apache.beam.sdk.transforms" + + ".Combine$CombineFn$KeyIgnoringCombineFn(), ...)\n" + + "_.groupByKey()\n" + + "_.map(new org.apache.beam.sdk.transforms.Combine$CombineFn$KeyIgnoringCombineFn())\n" + + "_.mapPartitions(new org.apache.beam.runners.spark" + + ".SparkRunnerDebuggerTest$PlusOne())\n" + + "sparkContext.union(...)\n" + + "_.mapPartitions(new org.apache.beam.runners.spark.examples.WordCount$FormatAsTextFn())\n" + + "_."; + + SparkRunnerDebugger.DebugSparkPipelineResult result = + (SparkRunnerDebugger.DebugSparkPipelineResult) pipeline.run(); + + assertThat("Debug pipeline did not equal expected", result.getDebugString(), + Matchers.equalTo(expectedPipeline)); + } + + @Test + public void debugStreamingPipeline() { + TestSparkPipelineOptions options = streamingPipelineRule.getOptions(); + options.setRunner(SparkRunnerDebugger.class); + + Pipeline pipeline = Pipeline.create(options); + + KafkaIO.Read read = KafkaIO.read() + .withBootstrapServers("mykafka:9092") + .withTopics(Collections.singletonList("my_input_topic")) + .withKeyCoder(StringUtf8Coder.of()) + .withValueCoder(StringUtf8Coder.of()); + + KafkaIO.Write write = KafkaIO.write() + .withBootstrapServers("myotherkafka:9092") + .withTopic("my_output_topic") + .withKeyCoder(StringUtf8Coder.of()) + .withValueCoder(StringUtf8Coder.of()); + + KvCoder stringKvCoder = KvCoder.of(StringUtf8Coder.of(), StringUtf8Coder.of()); + + pipeline + .apply(read.withoutMetadata()).setCoder(stringKvCoder) + .apply(Window.>into(FixedWindows.of(Duration.standardSeconds(5)))) + .apply(ParDo.of(new SparkRunnerDebuggerTest.FormatKVFn())) + .apply(Distinct.create()) + .apply(WithKeys.of(new SparkRunnerDebuggerTest.ArbitraryKeyFunction())) + .apply(write); + + final String expectedPipeline = "KafkaUtils.createDirectStream(...)\n" + + "_.map(new org.apache.beam.sdk.transforms.windowing.FixedWindows())\n" + + "_.mapPartitions(new org.apache.beam.runners.spark." + + "SparkRunnerDebuggerTest$FormatKVFn())\n" + + "_.mapPartitions(new org.apache.beam.sdk.transforms.Distinct$2())\n" + + "_.groupByKey()\n" + + "_.map(new org.apache.beam.sdk.transforms.Combine$CombineFn$KeyIgnoringCombineFn())\n" + + "_.mapPartitions(new org.apache.beam.sdk.transforms.Keys$1())\n" + + "_.mapPartitions(new org.apache.beam.sdk.transforms.WithKeys$2())\n" + + "_."; + + SparkRunnerDebugger.DebugSparkPipelineResult result = + (SparkRunnerDebugger.DebugSparkPipelineResult) pipeline.run(); + + assertThat("Debug pipeline did not equal expected", + result.getDebugString(), + Matchers.equalTo(expectedPipeline)); + } + + private static class FormatKVFn extends DoFn, String> { + @SuppressWarnings("unused") + @ProcessElement + public void processElement(ProcessContext c) { + c.output(c.element().getKey() + "," + c.element().getValue()); + } + } + + private static class ArbitraryKeyFunction implements SerializableFunction { + @Override + public String apply(String input) { + return "someKey"; + } + } + + private static class PlusOne extends SimpleFunction, KV> { + @Override + public KV apply(KV input) { + return KV.of(input.getKey(), input.getValue() + 1); + } + } +} From 20992c960b3d7ba1e690733a3738e5eb2f9c98b8 Mon Sep 17 00:00:00 2001 From: Aviem Zur Date: Fri, 10 Mar 2017 15:08:46 +0200 Subject: [PATCH 2/2] [BEAM-797] Remove unnecessary temp dir from test --- .../runners/spark/SparkRunnerDebuggerTest.java | 18 +----------------- 1 file changed, 1 insertion(+), 17 deletions(-) diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/SparkRunnerDebuggerTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/SparkRunnerDebuggerTest.java index e4cf049cac42..905b30eee940 100644 --- a/runners/spark/src/test/java/org/apache/beam/runners/spark/SparkRunnerDebuggerTest.java +++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/SparkRunnerDebuggerTest.java @@ -20,8 +20,6 @@ import static org.junit.Assert.assertThat; -import java.io.File; -import java.io.IOException; import java.util.Collections; import org.apache.beam.runners.spark.examples.WordCount; import org.apache.beam.sdk.Pipeline; @@ -48,10 +46,8 @@ import org.apache.beam.sdk.values.PCollectionList; import org.hamcrest.Matchers; import org.joda.time.Duration; -import org.junit.Before; import org.junit.Rule; import org.junit.Test; -import org.junit.rules.TemporaryFolder; /** @@ -65,18 +61,6 @@ public class SparkRunnerDebuggerTest { @Rule public final PipelineRule streamingPipelineRule = PipelineRule.streaming(); - private File outputDir; - - private final TemporaryFolder tmpDir = new TemporaryFolder(); - - @SuppressWarnings("ResultOfMethodCallIgnored") - @Before - public void setUp() throws IOException { - tmpDir.create(); - outputDir = tmpDir.newFolder("out"); - outputDir.delete(); - } - @Test public void debugBatchPipeline() { TestSparkPipelineOptions options = batchPipelineRule.getOptions(); @@ -102,7 +86,7 @@ public void debugBatchPipeline() { wordCounts .apply(MapElements.via(new WordCount.FormatAsTextFn())) - .apply(TextIO.Write.to(outputDir.getAbsolutePath()).withNumShards(3).withSuffix(".txt")); + .apply(TextIO.Write.to("!!PLACEHOLDER-OUTPUT-DIR!!").withNumShards(3).withSuffix(".txt")); final String expectedPipeline = "sparkContext.parallelize(Arrays.asList(...))\n" + "_.mapPartitions(new org.apache.beam.runners.spark.examples.WordCount$ExtractWordsFn())\n"