From c8896e0fd50b179d6f1f49534765e1a7f23803e9 Mon Sep 17 00:00:00 2001 From: Aviem Zur Date: Fri, 17 Feb 2017 12:35:49 +0200 Subject: [PATCH 1/4] [BEAM-1294] Long running UnboundedSource Readers --- .../runners/spark/io/MicrobatchSource.java | 101 ++++++++++++++---- .../beam/runners/spark/io/SourceDStream.java | 7 +- .../spark/stateful/StateSpecFunctions.java | 6 +- .../ResumeFromCheckpointStreamingTest.java | 14 ++- 4 files changed, 102 insertions(+), 26 deletions(-) diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/io/MicrobatchSource.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/io/MicrobatchSource.java index ff818a10e147..1a1497efe037 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/io/MicrobatchSource.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/io/MicrobatchSource.java @@ -19,11 +19,16 @@ package org.apache.beam.runners.spark.io; import com.google.api.client.util.BackOff; +import com.google.common.annotations.VisibleForTesting; +import com.google.common.cache.Cache; +import com.google.common.cache.CacheBuilder; import com.google.common.util.concurrent.Uninterruptibles; import java.io.IOException; import java.util.ArrayList; import java.util.List; import java.util.NoSuchElementException; +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.io.BoundedSource; @@ -49,29 +54,34 @@ public class MicrobatchSource extends BoundedSource { private static final Logger LOG = LoggerFactory.getLogger(MicrobatchSource.class); + private static volatile Cache, BoundedReader> readerCache; private final UnboundedSource source; private final Duration maxReadTime; private final int numInitialSplits; private final long maxNumRecords; private final int sourceId; + private final double readerCacheInterval; // each split of the underlying UnboundedSource is associated with a (consistent) id // to match it's corresponding CheckpointMark state. private final int splitId; - MicrobatchSource(UnboundedSource source, - Duration maxReadTime, - int numInitialSplits, - long maxNumRecords, - int splitId, - int sourceId) { + MicrobatchSource( + UnboundedSource source, + Duration maxReadTime, + int numInitialSplits, + long maxNumRecords, + int splitId, + int sourceId, + double readerCacheInterval) { this.source = source; this.maxReadTime = maxReadTime; this.numInitialSplits = numInitialSplits; this.maxNumRecords = maxNumRecords; this.splitId = splitId; this.sourceId = sourceId; + this.readerCacheInterval = readerCacheInterval; } /** @@ -101,7 +111,8 @@ private static long[] splitNumRecords(long numRecords, int numSplits) { for (int i = 0; i < numSplits; i++) { // splits must be stable, and cannot change during consecutive executions // for example: Kafka should not add partitions if more then one topic is read. - result.add(new MicrobatchSource<>(splits.get(i), maxReadTime, 1, numRecords[i], i, sourceId)); + result.add(new MicrobatchSource<>(splits.get(i), maxReadTime, 1, numRecords[i], i, sourceId, + readerCacheInterval)); } return result; } @@ -113,12 +124,35 @@ public long getEstimatedSizeBytes(PipelineOptions options) throws Exception { @Override public BoundedReader createReader(PipelineOptions options) throws IOException { - return createReader(options, null); + return getOrCreateReader(options, null); } - public BoundedReader createReader(PipelineOptions options, CheckpointMarkT checkpointMark) - throws IOException { - return new Reader(source.createReader(options, checkpointMark)); + @SuppressWarnings("unchecked") + public BoundedReader getOrCreateReader( + PipelineOptions options, + CheckpointMarkT checkpointMark) throws IOException { + try { + initReaderCache((long) readerCacheInterval); + return (BoundedReader) readerCache.get(this, new ReaderLoader(options, checkpointMark)); + } catch (ExecutionException e) { + throw new RuntimeException("Failed to get or create reader", e); + } + } + + private static final Object READER_CACHE_LOCK = new Object(); + + private void initReaderCache(long readerCacheInterval) { + if (readerCache == null) { + synchronized (READER_CACHE_LOCK) { + if (readerCache == null) { + LOG.info("Creating reader cache. Cache interval = " + readerCacheInterval + " ms."); + readerCache = + CacheBuilder.newBuilder() + .expireAfterAccess(readerCacheInterval, TimeUnit.MILLISECONDS) + .build(); + } + } + } } @Override @@ -171,12 +205,12 @@ public int hashCode() { */ public class Reader extends BoundedSource.BoundedReader { private long recordsRead = 0L; - private final Instant endTime; + private Instant endTime; private final FluentBackoff backoffFactory; private final UnboundedSource.UnboundedReader reader; + private boolean started; private Reader(UnboundedSource.UnboundedReader reader) { - endTime = Instant.now().plus(maxReadTime); this.reader = reader; backoffFactory = FluentBackoff.DEFAULT @@ -190,12 +224,16 @@ public boolean start() throws IOException { LOG.debug("MicrobatchReader-{}: Starting a microbatch read from an unbounded source with a " + "max read time of {} msec, and max number of records {}.", splitId, maxReadTime, maxNumRecords); - if (reader.start()) { - recordsRead++; - return true; - } else { - return advanceWithBackoff(); + endTime = Instant.now().plus(maxReadTime); + // Since reader is reused in microbatches only start it if it has not already been started. + if (!started) { + started = true; + if (reader.start()) { + recordsRead++; + return true; + } } + return advanceWithBackoff(); } @Override @@ -262,4 +300,31 @@ public Instant getWatermark() { return reader.getWatermark(); } } + + /** + * {@link Callable} which creates a {@link Reader}. + */ + private class ReaderLoader implements Callable> { + private final PipelineOptions options; + private final CheckpointMarkT checkpointMark; + + ReaderLoader(PipelineOptions options, CheckpointMarkT checkpointMark) { + this.options = options; + this.checkpointMark = checkpointMark; + } + + @Override + public BoundedReader call() throws Exception { + LOG.info("No cached reader found for split: [" + source + + "]. Creating new reader at checkpoint mark " + checkpointMark); + return new Reader(source.createReader(options, checkpointMark)); + } + } + + @VisibleForTesting + public static void clearCache() { + synchronized (MicrobatchSource.class) { + readerCache.invalidateAll(); + } + } } diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/io/SourceDStream.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/io/SourceDStream.java index b7bfeed20048..4d8f630a4775 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/io/SourceDStream.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/io/SourceDStream.java @@ -60,6 +60,7 @@ class SourceDStream private final UnboundedSource unboundedSource; private final SparkRuntimeContext runtimeContext; private final Duration boundReadDuration; + private final double readerCacheInterval; // Number of partitions for the DStream is final and remains the same throughout the entire // lifetime of the pipeline, including when resuming from checkpoint. private final int numPartitions; @@ -84,6 +85,10 @@ class SourceDStream SparkPipelineOptions options = runtimeContext.getPipelineOptions().as( SparkPipelineOptions.class); + // Reader cache interval is set to expire readers if they have not been accessed in the last + // microbatch. 50% of batch interval is added to accommodate latency. + this.readerCacheInterval = 1.5 * options.getBatchIntervalMillis(); + this.boundReadDuration = boundReadDuration(options.getReadTimePercentage(), options.getMinReadTimeMillis()); // set initial parallelism once. @@ -116,7 +121,7 @@ public scala.Option, CheckpointMarkT>>> compute(Time validT private MicrobatchSource createMicrobatchSource() { return new MicrobatchSource<>(unboundedSource, boundReadDuration, initialParallelism, - boundMaxRecords, -1, id()); + boundMaxRecords, -1, id(), readerCacheInterval); } @Override diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/stateful/StateSpecFunctions.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/stateful/StateSpecFunctions.java index 6d1b7c083134..c9de7faf8f0e 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/stateful/StateSpecFunctions.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/stateful/StateSpecFunctions.java @@ -151,8 +151,8 @@ public Tuple2, Metadata> apply( long readDurationMillis = 0; try { - reader = - microbatchSource.createReader(runtimeContext.getPipelineOptions(), checkpointMark); + reader = microbatchSource.getOrCreateReader(runtimeContext.getPipelineOptions(), + checkpointMark); } catch (IOException e) { throw new RuntimeException(e); } @@ -177,8 +177,6 @@ public Tuple2, Metadata> apply( Instant sourceWatermark = ((MicrobatchSource.Reader) reader).getWatermark(); highWatermark = sourceWatermark.isAfter(lowWatermark) ? sourceWatermark : lowWatermark; - // close and checkpoint reader. - reader.close(); readDurationMillis = stopwatch.stop().elapsed(TimeUnit.MILLISECONDS); LOG.info( diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/ResumeFromCheckpointStreamingTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/ResumeFromCheckpointStreamingTest.java index 5c1963dc89c3..6cbf83a8d7cf 100644 --- a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/ResumeFromCheckpointStreamingTest.java +++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/ResumeFromCheckpointStreamingTest.java @@ -41,6 +41,7 @@ import org.apache.beam.runners.spark.UsesCheckpointRecovery; import org.apache.beam.runners.spark.aggregators.AggregatorsAccumulator; import org.apache.beam.runners.spark.coders.CoderHelpers; +import org.apache.beam.runners.spark.io.MicrobatchSource; import org.apache.beam.runners.spark.metrics.MetricsAccumulator; import org.apache.beam.runners.spark.translation.streaming.utils.EmbeddedKafkaCluster; import org.apache.beam.runners.spark.util.GlobalWatermarkHolder; @@ -79,6 +80,7 @@ import org.apache.kafka.common.serialization.StringSerializer; import org.joda.time.Duration; import org.joda.time.Instant; +import org.junit.After; import org.junit.AfterClass; import org.junit.BeforeClass; import org.junit.Rule; @@ -173,9 +175,7 @@ public void testWithResume() throws Exception { //--- between executions: //- clear state. - AggregatorsAccumulator.clear(); - MetricsAccumulator.clear(); - GlobalWatermarkHolder.clear(); + clean(); //- write a bit more. produce(ImmutableMap.of( @@ -272,6 +272,14 @@ public Instant apply(KV kv) { return (SparkPipelineResult) p.run(); } + @After + public void clean() { + AggregatorsAccumulator.clear(); + MetricsAccumulator.clear(); + GlobalWatermarkHolder.clear(); + MicrobatchSource.clearCache(); + } + @AfterClass public static void tearDown() { EMBEDDED_KAFKA_CLUSTER.shutdown(); From 1867596be486c32f5ae837c57e305be719af861b Mon Sep 17 00:00:00 2001 From: Aviem Zur Date: Sun, 9 Apr 2017 20:00:10 +0300 Subject: [PATCH 2/4] [BEAM-1294] Synchronize MicroBatchSource#initReaderCache --- .../runners/spark/io/MicrobatchSource.java | 18 ++++++------------ 1 file changed, 6 insertions(+), 12 deletions(-) diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/io/MicrobatchSource.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/io/MicrobatchSource.java index 1a1497efe037..c3ff40b25233 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/io/MicrobatchSource.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/io/MicrobatchSource.java @@ -139,19 +139,13 @@ public BoundedReader getOrCreateReader( } } - private static final Object READER_CACHE_LOCK = new Object(); - - private void initReaderCache(long readerCacheInterval) { + private synchronized void initReaderCache(long readerCacheInterval) { if (readerCache == null) { - synchronized (READER_CACHE_LOCK) { - if (readerCache == null) { - LOG.info("Creating reader cache. Cache interval = " + readerCacheInterval + " ms."); - readerCache = - CacheBuilder.newBuilder() - .expireAfterAccess(readerCacheInterval, TimeUnit.MILLISECONDS) - .build(); - } - } + LOG.info("Creating reader cache. Cache interval = " + readerCacheInterval + " ms."); + readerCache = + CacheBuilder.newBuilder() + .expireAfterAccess(readerCacheInterval, TimeUnit.MILLISECONDS) + .build(); } } From 815db4d3d8059ba09d81559ac5a2ed21a4739e12 Mon Sep 17 00:00:00 2001 From: Aviem Zur Date: Sun, 9 Apr 2017 20:09:04 +0300 Subject: [PATCH 3/4] [BEAM-1294] Better documentation for SourceDStream#readerCacheInterval --- .../org/apache/beam/runners/spark/io/SourceDStream.java | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/io/SourceDStream.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/io/SourceDStream.java index 4d8f630a4775..fb6da9786c01 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/io/SourceDStream.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/io/SourceDStream.java @@ -60,6 +60,11 @@ class SourceDStream private final UnboundedSource unboundedSource; private final SparkRuntimeContext runtimeContext; private final Duration boundReadDuration; + // Reader cache interval to expire readers if they haven't been accessed in the last microbatch. + // The reason we expire readers is that upon executor death/addition source split ownership can be + // reshuffled between executors. When this happens we want to close and expire unused readers + // in the executor in case it regains ownership of the source split in the future - to avoid + // resuming from an earlier checkpoint. private final double readerCacheInterval; // Number of partitions for the DStream is final and remains the same throughout the entire // lifetime of the pipeline, including when resuming from checkpoint. @@ -85,8 +90,7 @@ class SourceDStream SparkPipelineOptions options = runtimeContext.getPipelineOptions().as( SparkPipelineOptions.class); - // Reader cache interval is set to expire readers if they have not been accessed in the last - // microbatch. 50% of batch interval is added to accommodate latency. + // Reader cache expiration interval. 50% of batch interval is added to accommodate latency. this.readerCacheInterval = 1.5 * options.getBatchIntervalMillis(); this.boundReadDuration = boundReadDuration(options.getReadTimePercentage(), From 0b70a3f8b4bf08a9a6365de2fe651a14026181aa Mon Sep 17 00:00:00 2001 From: Aviem Zur Date: Sun, 9 Apr 2017 20:55:02 +0300 Subject: [PATCH 4/4] [BEAM-1294] Close expired readers. --- .../runners/spark/io/MicrobatchSource.java | 18 ++++++++++++++++++ 1 file changed, 18 insertions(+) diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/io/MicrobatchSource.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/io/MicrobatchSource.java index c3ff40b25233..002eb34e9199 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/io/MicrobatchSource.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/io/MicrobatchSource.java @@ -22,6 +22,8 @@ import com.google.common.annotations.VisibleForTesting; import com.google.common.cache.Cache; import com.google.common.cache.CacheBuilder; +import com.google.common.cache.RemovalListener; +import com.google.common.cache.RemovalNotification; import com.google.common.util.concurrent.Uninterruptibles; import java.io.IOException; import java.util.ArrayList; @@ -145,6 +147,7 @@ private synchronized void initReaderCache(long readerCacheInterval) { readerCache = CacheBuilder.newBuilder() .expireAfterAccess(readerCacheInterval, TimeUnit.MILLISECONDS) + .removalListener(new ReaderCacheRemovalListener()) .build(); } } @@ -315,6 +318,21 @@ public BoundedReader call() throws Exception { } } + /** + * Listener to be called when a reader is removed from {@link MicrobatchSource#readerCache}. + */ + private static class ReaderCacheRemovalListener + implements RemovalListener, BoundedReader> { + @Override public void onRemoval( + RemovalNotification, BoundedReader> notification) { + try { + notification.getValue().close(); + } catch (IOException e) { + throw new RuntimeException(e); + } + } + } + @VisibleForTesting public static void clearCache() { synchronized (MicrobatchSource.class) {