From f1b679e402734f20dcd9645babaec0a3f291e259 Mon Sep 17 00:00:00 2001 From: Aviem Zur Date: Mon, 12 Jun 2017 17:04:00 +0300 Subject: [PATCH 1/5] [BEAM-2359] Fix watermark broadcasting to executors in Spark runner --- .../beam/runners/spark/TestSparkRunner.java | 2 +- .../SparkGroupAlsoByWindowViaWindowSet.java | 7 +- .../spark/stateful/SparkTimerInternals.java | 18 ++- .../StreamingTransformTranslator.java | 4 +- .../spark/util/GlobalWatermarkHolder.java | 129 ++++++++++++++---- .../spark/GlobalWatermarkHolderTest.java | 18 +-- 6 files changed, 130 insertions(+), 48 deletions(-) diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/TestSparkRunner.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/TestSparkRunner.java index eccee574afaa..a13a3b141aa4 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/TestSparkRunner.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/TestSparkRunner.java @@ -169,7 +169,7 @@ private static void awaitWatermarksOrTimeout( result.waitUntilFinish(Duration.millis(batchDurationMillis)); do { SparkTimerInternals sparkTimerInternals = - SparkTimerInternals.global(GlobalWatermarkHolder.get()); + SparkTimerInternals.global(GlobalWatermarkHolder.get(batchDurationMillis)); sparkTimerInternals.advanceWatermark(); globalWatermark = sparkTimerInternals.currentInputWatermarkTime(); // let another batch-interval period of execution, just to reason about WM propagation. diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/stateful/SparkGroupAlsoByWindowViaWindowSet.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/stateful/SparkGroupAlsoByWindowViaWindowSet.java index be4f3f65a3b7..ae1a5868d112 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/stateful/SparkGroupAlsoByWindowViaWindowSet.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/stateful/SparkGroupAlsoByWindowViaWindowSet.java @@ -104,12 +104,13 @@ private abstract static class SerializableFunction1 public static JavaDStream>>> groupAlsoByWindow( - JavaDStream>>>> inputDStream, + final JavaDStream>>>> inputDStream, final Coder keyCoder, final Coder> wvCoder, final WindowingStrategy windowingStrategy, final SparkRuntimeContext runtimeContext, - final List sourceIds) { + final List sourceIds, + final Long batchDurationMillis) { final IterableCoder> itrWvCoder = IterableCoder.of(wvCoder); final Coder iCoder = ((FullWindowedValueCoder) wvCoder).getValueCoder(); @@ -239,7 +240,7 @@ public JavaPairRDD call( SparkStateInternals stateInternals; SparkTimerInternals timerInternals = SparkTimerInternals.forStreamFromSources( - sourceIds, GlobalWatermarkHolder.get()); + sourceIds, GlobalWatermarkHolder.get(batchDurationMillis)); // get state(internals) per key. if (prevStateAndTimersOpt.isEmpty()) { // no previous state. diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/stateful/SparkTimerInternals.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/stateful/SparkTimerInternals.java index 107915f7f43d..a68da5516da7 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/stateful/SparkTimerInternals.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/stateful/SparkTimerInternals.java @@ -34,7 +34,6 @@ import org.apache.beam.runners.spark.util.GlobalWatermarkHolder.SparkWatermarks; import org.apache.beam.sdk.state.TimeDomain; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; -import org.apache.spark.broadcast.Broadcast; import org.joda.time.Instant; @@ -58,10 +57,10 @@ private SparkTimerInternals( /** Build the {@link TimerInternals} according to the feeding streams. */ public static SparkTimerInternals forStreamFromSources( List sourceIds, - @Nullable Broadcast> broadcast) { - // if broadcast is invalid for the specific ids, use defaults. - if (broadcast == null || broadcast.getValue().isEmpty() - || Collections.disjoint(sourceIds, broadcast.getValue().keySet())) { + Map watermarks) { + // if watermarks are invalid for the specific ids, use defaults. + if (watermarks == null || watermarks.isEmpty() + || Collections.disjoint(sourceIds, watermarks.keySet())) { return new SparkTimerInternals( BoundedWindow.TIMESTAMP_MIN_VALUE, BoundedWindow.TIMESTAMP_MIN_VALUE, new Instant(0)); } @@ -71,7 +70,7 @@ public static SparkTimerInternals forStreamFromSources( // synchronized processing time should clearly be synchronized. Instant synchronizedProcessingTime = null; for (Integer sourceId: sourceIds) { - SparkWatermarks sparkWatermarks = broadcast.getValue().get(sourceId); + SparkWatermarks sparkWatermarks = watermarks.get(sourceId); if (sparkWatermarks != null) { // keep slowest WMs. slowestLowWatermark = slowestLowWatermark.isBefore(sparkWatermarks.getLowWatermark()) @@ -94,10 +93,9 @@ public static SparkTimerInternals forStreamFromSources( } /** Build a global {@link TimerInternals} for all feeding streams.*/ - public static SparkTimerInternals global( - @Nullable Broadcast> broadcast) { - return broadcast == null ? forStreamFromSources(Collections.emptyList(), null) - : forStreamFromSources(Lists.newArrayList(broadcast.getValue().keySet()), broadcast); + public static SparkTimerInternals global(Map watermarks) { + return watermarks == null ? forStreamFromSources(Collections.emptyList(), null) + : forStreamFromSources(Lists.newArrayList(watermarks.keySet()), watermarks); } Collection getTimers() { diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/StreamingTransformTranslator.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/StreamingTransformTranslator.java index cd5bb3ee5df6..667f17183420 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/StreamingTransformTranslator.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/StreamingTransformTranslator.java @@ -33,6 +33,7 @@ import java.util.concurrent.LinkedBlockingQueue; import javax.annotation.Nonnull; import org.apache.beam.runners.core.metrics.MetricsContainerStepMap; +import org.apache.beam.runners.spark.SparkPipelineOptions; import org.apache.beam.runners.spark.aggregators.AggregatorsAccumulator; import org.apache.beam.runners.spark.aggregators.NamedAggregators; import org.apache.beam.runners.spark.coders.CoderHelpers; @@ -304,7 +305,8 @@ public JavaRDD>>>> call( wvCoder, windowingStrategy, runtimeContext, - streamSources); + streamSources, + ((SparkPipelineOptions) context.getOptions()).getBatchIntervalMillis()); context.putDataset(transform, new UnboundedDataset<>(outStream, streamSources)); } diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/util/GlobalWatermarkHolder.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/util/GlobalWatermarkHolder.java index 8b384d8e2ab0..d5a43827d42c 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/util/GlobalWatermarkHolder.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/util/GlobalWatermarkHolder.java @@ -21,31 +21,43 @@ import static com.google.common.base.Preconditions.checkState; import com.google.common.annotations.VisibleForTesting; +import com.google.common.cache.CacheBuilder; +import com.google.common.cache.CacheLoader; +import com.google.common.cache.LoadingCache; +import com.google.common.collect.Maps; import java.io.Serializable; import java.util.HashMap; import java.util.Map; import java.util.Queue; import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; -import org.apache.spark.api.java.JavaSparkContext; +import org.apache.spark.SparkEnv; import org.apache.spark.broadcast.Broadcast; +import org.apache.spark.storage.BlockId; +import org.apache.spark.storage.BlockManager; +import org.apache.spark.storage.BlockResult; +import org.apache.spark.storage.BlockStore; +import org.apache.spark.storage.StorageLevel; import org.apache.spark.streaming.api.java.JavaStreamingContext; import org.apache.spark.streaming.api.java.JavaStreamingListener; import org.apache.spark.streaming.api.java.JavaStreamingListenerBatchCompleted; import org.joda.time.Instant; - +import scala.Option; /** - * A {@link Broadcast} variable to hold the global watermarks for a micro-batch. + * A {@link BlockStore} variable to hold the global watermarks for a micro-batch. * *

For each source, holds a queue for the watermarks of each micro-batch that was read, * and advances the watermarks according to the queue (first-in-first-out). */ public class GlobalWatermarkHolder { - // the broadcast is broadcasted to the workers. - private static volatile Broadcast> broadcast = null; - // this should only live in the driver so transient. - private static final transient Map> sourceTimes = new HashMap<>(); + private static final Map> sourceTimes = new HashMap<>(); + private static final BlockId WATERMARKS_BLOCK_ID = BlockId.apply("broadcast_0WATERMARKS"); + + private static volatile LoadingCache> watermarkCache = null; + private static boolean synchronizeAccess = false; public static void add(int sourceId, SparkWatermarks sparkWatermarks) { Queue timesQueue = sourceTimes.get(sourceId); @@ -71,22 +83,56 @@ public static void addAll(Map> sourceTimes) { * Returns the {@link Broadcast} containing the {@link SparkWatermarks} mapped * to their sources. */ - public static Broadcast> get() { - return broadcast; + @SuppressWarnings("unchecked") + public static Map get(Long cacheInterval) { + if (synchronizeAccess) { + // synchronize in local mode to avoid race condition with updates in advance() method. + synchronized (GlobalWatermarkHolder.class) { + return getWatermarks(cacheInterval); + } + } else { + // no need to synchronize when running on a remote executor. + return getWatermarks(cacheInterval); + } + } + + @SuppressWarnings("unchecked") + private static Map getWatermarks(Long cacheInterval) { + if (watermarkCache == null) { + initWatermarkCache(cacheInterval); + } + try { + return watermarkCache.get("SINGLETON"); + } catch (ExecutionException e) { + throw new RuntimeException(e); + } + } + + private static synchronized void initWatermarkCache(Long batchDuration) { + if (watermarkCache == null) { + watermarkCache = + CacheBuilder.newBuilder() + // expire watermarks every half batch duration to ensure they update in every batch. + .expireAfterWrite(batchDuration / 2, TimeUnit.MILLISECONDS) + .build(new WatermarksLoader()); + } } /** * Advances the watermarks to the next-in-line watermarks. * SparkWatermarks are monotonically increasing. */ - public static void advance(JavaSparkContext jsc) { - synchronized (GlobalWatermarkHolder.class){ + @SuppressWarnings("unchecked") + public static void advance() { + synchronized (GlobalWatermarkHolder.class) { + BlockManager blockManager = SparkEnv.get().blockManager(); + if (sourceTimes.isEmpty()) { return; } // update all sources' watermarks into the new broadcast. - Map newBroadcast = new HashMap<>(); + Map newValues = new HashMap<>(); for (Map.Entry> en: sourceTimes.entrySet()) { if (en.getValue().isEmpty()) { @@ -99,8 +145,22 @@ public static void advance(JavaSparkContext jsc) { Instant currentLowWatermark = BoundedWindow.TIMESTAMP_MIN_VALUE; Instant currentHighWatermark = BoundedWindow.TIMESTAMP_MIN_VALUE; Instant currentSynchronizedProcessingTime = BoundedWindow.TIMESTAMP_MIN_VALUE; - if (broadcast != null && broadcast.getValue().containsKey(sourceId)) { - SparkWatermarks currentTimes = broadcast.getValue().get(sourceId); + + Option currentOption = blockManager.getRemote(WATERMARKS_BLOCK_ID); + Map current; + if (currentOption.isDefined()) { + current = (Map) currentOption.get().data().next(); + } else { + current = Maps.newHashMap(); + blockManager.putSingle( + WATERMARKS_BLOCK_ID, + current, + StorageLevel.MEMORY_ONLY(), + true); + } + + if (current.containsKey(sourceId)) { + SparkWatermarks currentTimes = current.get(sourceId); currentLowWatermark = currentTimes.getLowWatermark(); currentHighWatermark = currentTimes.getHighWatermark(); currentSynchronizedProcessingTime = currentTimes.getSynchronizedProcessingTime(); @@ -119,20 +179,21 @@ public static void advance(JavaSparkContext jsc) { nextLowWatermark, nextHighWatermark)); checkState(nextSynchronizedProcessingTime.isAfter(currentSynchronizedProcessingTime), "Synchronized processing time must advance."); - newBroadcast.put( + newValues.put( sourceId, new SparkWatermarks( nextLowWatermark, nextHighWatermark, nextSynchronizedProcessingTime)); } // update the watermarks broadcast only if something has changed. - if (!newBroadcast.isEmpty()) { - if (broadcast != null) { - // for now this is blocking, we could make this asynchronous - // but it could slow down WM propagation. - broadcast.destroy(); - } - broadcast = jsc.broadcast(newBroadcast); + if (!newValues.isEmpty()) { + blockManager.removeBlock(WATERMARKS_BLOCK_ID, true); + blockManager.putSingle( + WATERMARKS_BLOCK_ID, + newValues, + StorageLevel.MEMORY_ONLY(), + true); + String breaK = "point"; } } } @@ -140,7 +201,12 @@ public static void advance(JavaSparkContext jsc) { @VisibleForTesting public static synchronized void clear() { sourceTimes.clear(); - broadcast = null; + synchronizeAccess = true; + SparkEnv sparkEnv = SparkEnv.get(); + if (sparkEnv != null) { + BlockManager blockManager = sparkEnv.blockManager(); + blockManager.removeBlock(WATERMARKS_BLOCK_ID, true); + } } /** @@ -193,7 +259,22 @@ public WatermarksListener(JavaStreamingContext jssc) { @Override public void onBatchCompleted(JavaStreamingListenerBatchCompleted batchCompleted) { - GlobalWatermarkHolder.advance(jssc.sparkContext()); + GlobalWatermarkHolder.advance(); + } + } + + private static class WatermarksLoader extends CacheLoader> { + + @SuppressWarnings("unchecked") + @Override + public Map load(String key) throws Exception { + Option blockResultOption = + SparkEnv.get().blockManager().getRemote(WATERMARKS_BLOCK_ID); + if (blockResultOption.isDefined()) { + return (Map) blockResultOption.get().data().next(); + } else { + return Maps.newHashMap(); + } } } } diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/GlobalWatermarkHolderTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/GlobalWatermarkHolderTest.java index 47a6e3fe7499..17081236cf52 100644 --- a/runners/spark/src/test/java/org/apache/beam/runners/spark/GlobalWatermarkHolderTest.java +++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/GlobalWatermarkHolderTest.java @@ -65,17 +65,17 @@ public void testLowHighWatermarksAdvance() { instant.plus(Duration.millis(5)), instant.plus(Duration.millis(5)), instant)); - GlobalWatermarkHolder.advance(jsc); + GlobalWatermarkHolder.advance(); // low < high. GlobalWatermarkHolder.add(1, new SparkWatermarks( instant.plus(Duration.millis(10)), instant.plus(Duration.millis(15)), instant.plus(Duration.millis(100)))); - GlobalWatermarkHolder.advance(jsc); + GlobalWatermarkHolder.advance(); // assert watermarks in Broadcast. - SparkWatermarks currentWatermarks = GlobalWatermarkHolder.get().getValue().get(1); + SparkWatermarks currentWatermarks = GlobalWatermarkHolder.get(0L).get(1); assertThat(currentWatermarks.getLowWatermark(), equalTo(instant.plus(Duration.millis(10)))); assertThat(currentWatermarks.getHighWatermark(), equalTo(instant.plus(Duration.millis(15)))); assertThat(currentWatermarks.getSynchronizedProcessingTime(), @@ -93,7 +93,7 @@ public void testLowHighWatermarksAdvance() { instant.plus(Duration.millis(25)), instant.plus(Duration.millis(20)), instant.plus(Duration.millis(200)))); - GlobalWatermarkHolder.advance(jsc); + GlobalWatermarkHolder.advance(); } @Test @@ -106,7 +106,7 @@ public void testSynchronizedTimeMonotonic() { instant.plus(Duration.millis(5)), instant.plus(Duration.millis(10)), instant)); - GlobalWatermarkHolder.advance(jsc); + GlobalWatermarkHolder.advance(); thrown.expect(IllegalStateException.class); thrown.expectMessage("Synchronized processing time must advance."); @@ -117,7 +117,7 @@ public void testSynchronizedTimeMonotonic() { instant.plus(Duration.millis(5)), instant.plus(Duration.millis(10)), instant)); - GlobalWatermarkHolder.advance(jsc); + GlobalWatermarkHolder.advance(); } @Test @@ -136,15 +136,15 @@ public void testMultiSource() { instant.plus(Duration.millis(6)), instant)); - GlobalWatermarkHolder.advance(jsc); + GlobalWatermarkHolder.advance(); // assert watermarks for source 1. - SparkWatermarks watermarksForSource1 = GlobalWatermarkHolder.get().getValue().get(1); + SparkWatermarks watermarksForSource1 = GlobalWatermarkHolder.get(0L).get(1); assertThat(watermarksForSource1.getLowWatermark(), equalTo(instant.plus(Duration.millis(5)))); assertThat(watermarksForSource1.getHighWatermark(), equalTo(instant.plus(Duration.millis(10)))); // assert watermarks for source 2. - SparkWatermarks watermarksForSource2 = GlobalWatermarkHolder.get().getValue().get(2); + SparkWatermarks watermarksForSource2 = GlobalWatermarkHolder.get(0L).get(2); assertThat(watermarksForSource2.getLowWatermark(), equalTo(instant.plus(Duration.millis(3)))); assertThat(watermarksForSource2.getHighWatermark(), equalTo(instant.plus(Duration.millis(6)))); } From db5d9750d0965afd330488945c5183bc673a24f1 Mon Sep 17 00:00:00 2001 From: Aviem Zur Date: Tue, 13 Jun 2017 12:15:59 +0300 Subject: [PATCH 2/5] Fix flaky tests in local mode and address PR review comments --- .../beam/runners/spark/SparkRunner.java | 2 +- .../spark/util/GlobalWatermarkHolder.java | 46 +++++++------------ 2 files changed, 17 insertions(+), 31 deletions(-) diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkRunner.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkRunner.java index 9e2426ef8381..2ea2c08c2f73 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkRunner.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkRunner.java @@ -170,7 +170,7 @@ public SparkPipelineResult run(final Pipeline pipeline) { } // register Watermarks listener to broadcast the advanced WMs. - jssc.addStreamingListener(new JavaStreamingListenerWrapper(new WatermarksListener(jssc))); + jssc.addStreamingListener(new JavaStreamingListenerWrapper(new WatermarksListener())); // The reason we call initAccumulators here even though it is called in // SparkRunnerStreamingContextFactory is because the factory is not called when resuming diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/util/GlobalWatermarkHolder.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/util/GlobalWatermarkHolder.java index d5a43827d42c..29b2d5f38a07 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/util/GlobalWatermarkHolder.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/util/GlobalWatermarkHolder.java @@ -32,6 +32,7 @@ import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; +import javax.annotation.Nonnull; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.spark.SparkEnv; import org.apache.spark.broadcast.Broadcast; @@ -40,7 +41,6 @@ import org.apache.spark.storage.BlockResult; import org.apache.spark.storage.BlockStore; import org.apache.spark.storage.StorageLevel; -import org.apache.spark.streaming.api.java.JavaStreamingContext; import org.apache.spark.streaming.api.java.JavaStreamingListener; import org.apache.spark.streaming.api.java.JavaStreamingListenerBatchCompleted; import org.joda.time.Instant; @@ -56,8 +56,8 @@ public class GlobalWatermarkHolder { private static final Map> sourceTimes = new HashMap<>(); private static final BlockId WATERMARKS_BLOCK_ID = BlockId.apply("broadcast_0WATERMARKS"); + private static Map driverWatermarks = null; private static volatile LoadingCache> watermarkCache = null; - private static boolean synchronizeAccess = false; public static void add(int sourceId, SparkWatermarks sparkWatermarks) { Queue timesQueue = sourceTimes.get(sourceId); @@ -85,26 +85,18 @@ public static void addAll(Map> sourceTimes) { */ @SuppressWarnings("unchecked") public static Map get(Long cacheInterval) { - if (synchronizeAccess) { - // synchronize in local mode to avoid race condition with updates in advance() method. - synchronized (GlobalWatermarkHolder.class) { - return getWatermarks(cacheInterval); - } + if (driverWatermarks != null) { + // if we are executing in local mode simply return the local values. + return driverWatermarks; } else { - // no need to synchronize when running on a remote executor. - return getWatermarks(cacheInterval); - } - } - - @SuppressWarnings("unchecked") - private static Map getWatermarks(Long cacheInterval) { - if (watermarkCache == null) { - initWatermarkCache(cacheInterval); - } - try { - return watermarkCache.get("SINGLETON"); - } catch (ExecutionException e) { - throw new RuntimeException(e); + if (watermarkCache == null) { + initWatermarkCache(cacheInterval); + } + try { + return watermarkCache.get("SINGLETON"); + } catch (ExecutionException e) { + throw new RuntimeException(e); + } } } @@ -187,13 +179,13 @@ public static void advance() { // update the watermarks broadcast only if something has changed. if (!newValues.isEmpty()) { + driverWatermarks = newValues; blockManager.removeBlock(WATERMARKS_BLOCK_ID, true); blockManager.putSingle( WATERMARKS_BLOCK_ID, newValues, StorageLevel.MEMORY_ONLY(), true); - String breaK = "point"; } } } @@ -201,7 +193,7 @@ public static void advance() { @VisibleForTesting public static synchronized void clear() { sourceTimes.clear(); - synchronizeAccess = true; + driverWatermarks = null; SparkEnv sparkEnv = SparkEnv.get(); if (sparkEnv != null) { BlockManager blockManager = sparkEnv.blockManager(); @@ -251,12 +243,6 @@ public String toString() { /** Advance the WMs onBatchCompleted event. */ public static class WatermarksListener extends JavaStreamingListener { - private final JavaStreamingContext jssc; - - public WatermarksListener(JavaStreamingContext jssc) { - this.jssc = jssc; - } - @Override public void onBatchCompleted(JavaStreamingListenerBatchCompleted batchCompleted) { GlobalWatermarkHolder.advance(); @@ -267,7 +253,7 @@ private static class WatermarksLoader extends CacheLoader load(String key) throws Exception { + public Map load(@Nonnull String key) throws Exception { Option blockResultOption = SparkEnv.get().blockManager().getRemote(WATERMARKS_BLOCK_ID); if (blockResultOption.isDefined()) { From 85289cb29d3c2df02038cb1e31547e370b6b78b5 Mon Sep 17 00:00:00 2001 From: Aviem Zur Date: Sun, 18 Jun 2017 18:21:28 +0300 Subject: [PATCH 3/5] Remove unnecessary parameter --- .../spark/stateful/SparkGroupAlsoByWindowViaWindowSet.java | 5 +++-- .../translation/streaming/StreamingTransformTranslator.java | 4 +--- 2 files changed, 4 insertions(+), 5 deletions(-) diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/stateful/SparkGroupAlsoByWindowViaWindowSet.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/stateful/SparkGroupAlsoByWindowViaWindowSet.java index ae1a5868d112..1385e071978f 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/stateful/SparkGroupAlsoByWindowViaWindowSet.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/stateful/SparkGroupAlsoByWindowViaWindowSet.java @@ -109,9 +109,10 @@ JavaDStream>>> groupAlsoByWindow( final Coder> wvCoder, final WindowingStrategy windowingStrategy, final SparkRuntimeContext runtimeContext, - final List sourceIds, - final Long batchDurationMillis) { + final List sourceIds) { + final long batchDurationMillis = + runtimeContext.getPipelineOptions().as(SparkPipelineOptions.class).getBatchIntervalMillis(); final IterableCoder> itrWvCoder = IterableCoder.of(wvCoder); final Coder iCoder = ((FullWindowedValueCoder) wvCoder).getValueCoder(); final Coder wCoder = diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/StreamingTransformTranslator.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/StreamingTransformTranslator.java index 667f17183420..cd5bb3ee5df6 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/StreamingTransformTranslator.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/StreamingTransformTranslator.java @@ -33,7 +33,6 @@ import java.util.concurrent.LinkedBlockingQueue; import javax.annotation.Nonnull; import org.apache.beam.runners.core.metrics.MetricsContainerStepMap; -import org.apache.beam.runners.spark.SparkPipelineOptions; import org.apache.beam.runners.spark.aggregators.AggregatorsAccumulator; import org.apache.beam.runners.spark.aggregators.NamedAggregators; import org.apache.beam.runners.spark.coders.CoderHelpers; @@ -305,8 +304,7 @@ public JavaRDD>>>> call( wvCoder, windowingStrategy, runtimeContext, - streamSources, - ((SparkPipelineOptions) context.getOptions()).getBatchIntervalMillis()); + streamSources); context.putDataset(transform, new UnboundedDataset<>(outStream, streamSources)); } From 48858514d073ac0b31fc49d646a59bffe11bf883 Mon Sep 17 00:00:00 2001 From: Aviem Zur Date: Tue, 20 Jun 2017 13:34:13 +0300 Subject: [PATCH 4/5] Change static variable back to volatile --- .../apache/beam/runners/spark/util/GlobalWatermarkHolder.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/util/GlobalWatermarkHolder.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/util/GlobalWatermarkHolder.java index 29b2d5f38a07..2cb6f26f8a0f 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/util/GlobalWatermarkHolder.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/util/GlobalWatermarkHolder.java @@ -56,7 +56,7 @@ public class GlobalWatermarkHolder { private static final Map> sourceTimes = new HashMap<>(); private static final BlockId WATERMARKS_BLOCK_ID = BlockId.apply("broadcast_0WATERMARKS"); - private static Map driverWatermarks = null; + private static volatile Map driverWatermarks = null; private static volatile LoadingCache> watermarkCache = null; public static void add(int sourceId, SparkWatermarks sparkWatermarks) { From bf1743d93cc75b1d4c81a7059f26c7c24452b325 Mon Sep 17 00:00:00 2001 From: Aviem Zur Date: Wed, 21 Jun 2017 17:53:21 +0300 Subject: [PATCH 5/5] Move Spark runner streaming tests to post commit. --- runners/spark/pom.xml | 42 +++++++++++++++++++++--------------------- 1 file changed, 21 insertions(+), 21 deletions(-) diff --git a/runners/spark/pom.xml b/runners/spark/pom.xml index ddb4aca73327..60de70ee46a3 100644 --- a/runners/spark/pom.xml +++ b/runners/spark/pom.xml @@ -103,6 +103,27 @@ 4 + + streaming-tests + test + + test + + + + org.apache.beam.runners.spark.StreamingTest + + + + [ + "--runner=TestSparkRunner", + "--forceStreaming=true", + "--enableSparkMetricSinks=true" + ] + + + + @@ -360,27 +381,6 @@ - - streaming-tests - test - - test - - - - org.apache.beam.runners.spark.StreamingTest - - - - [ - "--runner=TestSparkRunner", - "--forceStreaming=true", - "--enableSparkMetricSinks=true" - ] - - - -