From 7d7a0a88adde6ed604b7f4bb46fe32a774971ee8 Mon Sep 17 00:00:00 2001 From: Stas Levin Date: Wed, 29 Mar 2017 15:29:20 +0300 Subject: [PATCH] Extracted captures to static classes to prevent them from capturing the scope. --- .../spark/io/SparkUnboundedSource.java | 37 ++++++++++++------- .../spark/stateful/StateSpecFunctions.java | 17 ++++----- 2 files changed, 31 insertions(+), 23 deletions(-) diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/io/SparkUnboundedSource.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/io/SparkUnboundedSource.java index a538907108fe..162bca47feec 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/io/SparkUnboundedSource.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/io/SparkUnboundedSource.java @@ -102,13 +102,7 @@ public static UnboundedDataset re // report the number of input elements for this InputDStream to the InputInfoTracker. int id = inputDStream.inputDStream().id(); - JavaDStream metadataDStream = mapWithStateDStream.map( - new Function, Metadata>, Metadata>() { - @Override - public Metadata call(Tuple2, Metadata> t2) throws Exception { - return t2._2(); - } - }); + JavaDStream metadataDStream = mapWithStateDStream.map(new Tuple2MetadataFunction()); // register ReadReportDStream to report information related to this read. new ReadReportDStream(metadataDStream.dstream(), id, getSourceName(source, id)).register(); @@ -118,13 +112,10 @@ public Metadata call(Tuple2, Metadata> t2) throws Exception { WindowedValue.FullWindowedValueCoder.of( source.getDefaultOutputCoder(), GlobalWindow.Coder.INSTANCE); - JavaDStream> readUnboundedStream = mapWithStateDStream.flatMap( - new FlatMapFunction, Metadata>, byte[]>() { - @Override - public Iterable call(Tuple2, Metadata> t2) throws Exception { - return t2._1(); - } - }).map(CoderHelpers.fromByteFunction(coder)); + JavaDStream> readUnboundedStream = + mapWithStateDStream + .flatMap(new Tuple2byteFlatMapFunction()) + .map(CoderHelpers.fromByteFunction(coder)); return new UnboundedDataset<>(readUnboundedStream, Collections.singletonList(id)); } @@ -274,4 +265,22 @@ SparkMetricsContainer getMetricsContainer() { return metricsContainer; } } + + private static class Tuple2MetadataFunction + implements Function, Metadata>, Metadata> { + + @Override + public Metadata call(Tuple2, Metadata> t2) throws Exception { + return t2._2(); + } + } + + private static class Tuple2byteFlatMapFunction + implements FlatMapFunction, Metadata>, byte[]> { + + @Override + public Iterable call(Tuple2, Metadata> t2) throws Exception { + return t2._1(); + } + } } diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/stateful/StateSpecFunctions.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/stateful/StateSpecFunctions.java index ec4fce324da1..803fe458d1e9 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/stateful/StateSpecFunctions.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/stateful/StateSpecFunctions.java @@ -20,11 +20,11 @@ import com.google.common.base.Stopwatch; import com.google.common.collect.Iterators; +import com.google.common.collect.Lists; import java.io.Closeable; import java.io.IOException; import java.io.Serializable; import java.util.ArrayList; -import java.util.Iterator; import java.util.List; import java.util.concurrent.TimeUnit; import org.apache.beam.runners.spark.coders.CoderHelpers; @@ -197,14 +197,13 @@ public Tuple2, Metadata> apply( throw new RuntimeException("Failed to read from reader.", e); } - Iterable iterable = new Iterable() { - @Override - public Iterator iterator() { - return Iterators.unmodifiableIterator(readValues.iterator()); - } - }; - return new Tuple2<>(iterable, - new Metadata(readValues.size(), lowWatermark, highWatermark, sparkMetricsContainer)); + final ArrayList payload = + Lists.newArrayList(Iterators.unmodifiableIterator(readValues.iterator())); + + return new Tuple2<>( + (Iterable) payload, + new Metadata(readValues.size(), lowWatermark, highWatermark, sparkMetricsContainer)); + } catch (IOException e) { throw new RuntimeException(e); }