From e56d3a3b54b62130c7d33f00d01b56bae283f14a Mon Sep 17 00:00:00 2001 From: kl0u Date: Thu, 3 Nov 2016 10:50:04 +0100 Subject: [PATCH 1/4] [FLINK-5021] Remove the special EOS TimestampedFileInputSplit. Without this special split signaling that no more splits are to arrive, the ContinuousFileReaderOperator now closes by setting a flag that marks it as closed and exiting when the flag is set to true and the pending split queue is empty. --- .../source/ContinuousFileReaderOperator.java | 53 ++++++++++--------- .../source/TimestampedFileInputSplit.java | 25 ++++----- .../TimestampedFileInputSplitTest.java | 25 --------- 3 files changed, 37 insertions(+), 66 deletions(-) diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileReaderOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileReaderOperator.java index c8e9846601ab7..e4fb50d341e4a 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileReaderOperator.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileReaderOperator.java @@ -42,12 +42,10 @@ import java.io.ObjectOutputStream; import java.io.Serializable; import java.util.ArrayList; -import java.util.Comparator; import java.util.List; import java.util.PriorityQueue; import java.util.Queue; -import static org.apache.flink.streaming.api.functions.source.TimestampedFileInputSplit.EOS; import static org.apache.flink.util.Preconditions.checkState; import static org.apache.flink.util.Preconditions.checkNotNull; @@ -162,22 +160,18 @@ public void dispose() throws Exception { public void close() throws Exception { super.close(); - // signal that no more splits will come, wait for the reader to finish - // and close the collector. Further cleaning up is handled by the dispose(). + // close the reader to signal that no more splits will come. By doing this, + // the reader will exit as soon as it finishes processing the already pending splits. + // This method will wait until then. Further cleaning up is handled by the dispose(). if (reader != null && reader.isAlive() && reader.isRunning()) { - // add a dummy element to signal that no more splits will - // arrive and wait until the reader finishes - reader.addSplit(EOS); - - // we already have the checkpoint lock because close() is - // called by the StreamTask while having it. + reader.close(); checkpointLock.wait(); } - // finally if we are closed normally and we are operating on - // event or ingestion time, emit the max watermark indicating - // the end of the stream, like a normal source would do. + // finally if we are operating on event or ingestion time, + // emit the long-max watermark indicating the end of the stream, + // like a normal source would do. if (readerContext != null) { readerContext.emitWatermark(Watermark.MAX_WATERMARK); @@ -188,6 +182,8 @@ public void close() throws Exception { private class SplitReader extends Thread { + private volatile boolean isClosed; + private volatile boolean isRunning; private final FileInputFormat format; @@ -213,14 +209,10 @@ private SplitReader(FileInputFormat format, this.readerContext = checkNotNull(readerContext, "Unspecified Reader Context."); this.checkpointLock = checkNotNull(checkpointLock, "Unspecified checkpoint lock."); + this.isClosed = false; this.isRunning = true; - this.pendingSplits = new PriorityQueue<>(10, new Comparator() { - @Override - public int compare(TimestampedFileInputSplit o1, TimestampedFileInputSplit o2) { - return o1.compareTo(o2); - } - }); + this.pendingSplits = new PriorityQueue<>(); // this is the case where a task recovers from a previous failed attempt if (restoredState != null) { @@ -252,17 +244,21 @@ public void run() { if (currentSplit == null) { currentSplit = this.pendingSplits.poll(); + + // if the list of pending splits is empty (currentSplit == null) then: + // 1) if close() was called on the operator then exit the while loop + // 2) if not wait 50 ms and try again to fetch a new split to read + if (currentSplit == null) { - checkpointLock.wait(50); + if (!this.isClosed) { + checkpointLock.wait(50); + } else { + isRunning = false; + } continue; } } - if (currentSplit.equals(EOS)) { - isRunning = false; - break; - } - if (this.format instanceof CheckpointableInputFormat && currentSplit.getSplitState() != null) { // recovering after a node failure with an input // format that supports resetting the offset @@ -332,7 +328,8 @@ private List getReaderState() throws IOException { List snapshot = new ArrayList<>(this.pendingSplits.size()); if (currentSplit != null ) { if (this.format instanceof CheckpointableInputFormat && this.isSplitOpen) { - Serializable formatState = ((CheckpointableInputFormat) this.format).getCurrentState(); + Serializable formatState = + ((CheckpointableInputFormat) this.format).getCurrentState(); this.currentSplit.setSplitState(formatState); } snapshot.add(this.currentSplit); @@ -344,6 +341,10 @@ private List getReaderState() throws IOException { public void cancel() { this.isRunning = false; } + + public void close() { + this.isClosed = true; + } } // --------------------- Checkpointing -------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/TimestampedFileInputSplit.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/TimestampedFileInputSplit.java index 6a3ba0d15db1e..2a0be98bcb062 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/TimestampedFileInputSplit.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/TimestampedFileInputSplit.java @@ -44,10 +44,6 @@ public class TimestampedFileInputSplit extends FileInputSplit implements Compara * */ private Serializable splitState; - /** A special {@link TimestampedFileInputSplit} signaling the end of the stream of splits.*/ - public static final TimestampedFileInputSplit EOS = - new TimestampedFileInputSplit(Long.MAX_VALUE, -1, null, -1, -1, null); - /** * Creates a {@link TimestampedFileInputSplit} based on the file modification time and * the rest of the information of the {@link FileInputSplit}, as returned by the @@ -101,24 +97,23 @@ public long getModificationTime() { @Override public int compareTo(TimestampedFileInputSplit o) { - long modTimeComp = this.modificationTime - o.modificationTime; + int modTimeComp = Long.compare(this.modificationTime, o.modificationTime); if (modTimeComp != 0L) { - // we cannot just cast the modTimeComp to int - // because it may overflow - return modTimeComp > 0 ? 1 : -1; + return modTimeComp; } - // the file input split allows for null paths - if (this.getPath() == o.getPath()) { - return 0; - } else if (this.getPath() == null) { + // the file input split does not prevent null paths. + if (this.getPath() == null && o.getPath() != null) { return 1; - } else if (o.getPath() == null) { + } else if (this.getPath() != null && o.getPath() == null) { return -1; } - int pathComp = this.getPath().compareTo(o.getPath()); - return pathComp != 0 ? pathComp : this.getSplitNumber() - o.getSplitNumber(); + int pathComp = this.getPath() == o.getPath() ? 0 : + this.getPath().compareTo(o.getPath()); + + return pathComp != 0 ? pathComp : + this.getSplitNumber() - o.getSplitNumber(); } @Override diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/TimestampedFileInputSplitTest.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/TimestampedFileInputSplitTest.java index 9dc90d3e214a0..0a89ab9a98b0f 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/TimestampedFileInputSplitTest.java +++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/TimestampedFileInputSplitTest.java @@ -23,7 +23,6 @@ import org.junit.Test; import java.util.ArrayList; -import java.util.Comparator; import java.util.List; import java.util.PriorityQueue; import java.util.Queue; @@ -33,14 +32,8 @@ public class TimestampedFileInputSplitTest { @Test public void testSplitEquality() { - TimestampedFileInputSplit eos1 = TimestampedFileInputSplit.EOS; - TimestampedFileInputSplit eos2 = TimestampedFileInputSplit.EOS; - - Assert.assertEquals(eos1, eos2); - TimestampedFileInputSplit richFirstSplit = new TimestampedFileInputSplit(10, 2, new Path("test"), 0, 100, null); - Assert.assertNotEquals(eos1, richFirstSplit); TimestampedFileInputSplit richSecondSplit = new TimestampedFileInputSplit(10, 2, new Path("test"), 0, 100, null); @@ -88,18 +81,6 @@ public void testSplitComparison() { // smaller modification time first Assert.assertTrue(richThirdSplit.compareTo(richForthSplit) < 0); - - Assert.assertTrue(richFirstSplit.compareTo(TimestampedFileInputSplit.EOS) < 0); - Assert.assertTrue(richSecondSplit.compareTo(TimestampedFileInputSplit.EOS) < 0); - Assert.assertTrue(richThirdSplit.compareTo(TimestampedFileInputSplit.EOS) < 0); - Assert.assertTrue(richForthSplit.compareTo(TimestampedFileInputSplit.EOS) < 0); - - Assert.assertEquals(0, TimestampedFileInputSplit.EOS.compareTo(TimestampedFileInputSplit.EOS)); - - Assert.assertTrue(TimestampedFileInputSplit.EOS.compareTo(richFirstSplit) > 0); - Assert.assertTrue(TimestampedFileInputSplit.EOS.compareTo(richSecondSplit) > 0); - Assert.assertTrue(TimestampedFileInputSplit.EOS.compareTo(richThirdSplit) > 0); - Assert.assertTrue(TimestampedFileInputSplit.EOS.compareTo(richForthSplit) > 0); } @Test @@ -130,14 +111,10 @@ public void testPriorityQ() { TimestampedFileInputSplit richFifthSplit = new TimestampedFileInputSplit(11, 1, new Path("test/test3"), 0, 100, null); - TimestampedFileInputSplit eos = TimestampedFileInputSplit.EOS; - Queue pendingSplits = new PriorityQueue<>(); - pendingSplits.add(eos); pendingSplits.add(richSecondSplit); pendingSplits.add(richForthSplit); - pendingSplits.add(eos); pendingSplits.add(richFirstSplit); pendingSplits.add(richFifthSplit); pendingSplits.add(richFifthSplit); @@ -158,8 +135,6 @@ public void testPriorityQ() { expectedSortedSplits.add(richForthSplit); expectedSortedSplits.add(richFifthSplit); expectedSortedSplits.add(richFifthSplit); - expectedSortedSplits.add(eos); - expectedSortedSplits.add(eos); Assert.assertArrayEquals(expectedSortedSplits.toArray(), actualSortedSplits.toArray()); } From c990594c7912e9553239514f04dd0e376985f9d8 Mon Sep 17 00:00:00 2001 From: kl0u Date: Thu, 3 Nov 2016 11:08:45 +0100 Subject: [PATCH 2/4] [FLINK-5021] Guarantee PROCESS_ONCE works correctly after recovering. --- .../source/ContinuousFileMonitoringFunction.java | 11 +++++++++-- 1 file changed, 9 insertions(+), 2 deletions(-) diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileMonitoringFunction.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileMonitoringFunction.java index a6c5e4945e56c..027623ee41f61 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileMonitoringFunction.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileMonitoringFunction.java @@ -143,8 +143,15 @@ public void run(SourceFunction.SourceContext context) break; case PROCESS_ONCE: synchronized (checkpointLock) { - monitorDirAndForwardSplits(fileSystem, context); - globalModificationTime = Long.MAX_VALUE; + + // the following check guarantees that if we restart + // after a failure and we managed to have a successful + // checkpoint, we will not reprocess the directory. + + if (globalModificationTime == Long.MIN_VALUE) { + monitorDirAndForwardSplits(fileSystem, context); + globalModificationTime = Long.MAX_VALUE; + } isRunning = false; } break; From f6020f5102ec3e1c52d2b2195949ac3ee38093c8 Mon Sep 17 00:00:00 2001 From: kl0u Date: Thu, 3 Nov 2016 11:21:08 +0100 Subject: [PATCH 3/4] [FLINK-5021] Make the ContinuousFileReaderOperator rescalable. This is the last commit that completes the refactoring of the ContinuousFileReaderOperator so that it can be rescalable. With this, the reader can restart from a savepoint with a different parallelism without compromising the provided exactly-once guarantees. --- .../ContinuousFileProcessingTest.java | 182 +++++++++- .../source/ContinuousFileReaderOperator.java | 70 ++-- ...ContinuousFileProcessingRescalingTest.java | 316 ++++++++++++++++++ ...tinuousFileProcessingCheckpointITCase.java | 34 +- 4 files changed, 554 insertions(+), 48 deletions(-) create mode 100644 flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/ContinuousFileProcessingRescalingTest.java diff --git a/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/ContinuousFileProcessingTest.java b/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/ContinuousFileProcessingTest.java index 0283f68178013..1b272d28462f4 100644 --- a/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/ContinuousFileProcessingTest.java +++ b/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/ContinuousFileProcessingTest.java @@ -19,6 +19,7 @@ package org.apache.flink.hdfstests; import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.common.io.FileInputFormat; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.java.io.TextInputFormat; import org.apache.flink.api.java.tuple.Tuple2; @@ -36,7 +37,9 @@ import org.apache.flink.streaming.api.functions.source.SourceFunction; import org.apache.flink.streaming.api.watermark.Watermark; import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; +import org.apache.flink.streaming.runtime.tasks.OperatorStateHandles; import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness; +import org.apache.flink.util.Preconditions; import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileUtil; import org.apache.hadoop.hdfs.MiniDFSCluster; @@ -360,6 +363,149 @@ public int compare(String o1, String o2) { } } + @Test + public void testReaderSnapshotRestore() throws Exception { + + TimestampedFileInputSplit split1 = + new TimestampedFileInputSplit(0, 3, new Path("test/test1"), 0, 100, null); + + TimestampedFileInputSplit split2 = + new TimestampedFileInputSplit(10, 2, new Path("test/test2"), 101, 200, null); + + TimestampedFileInputSplit split3 = + new TimestampedFileInputSplit(10, 1, new Path("test/test2"), 0, 100, null); + + TimestampedFileInputSplit split4 = + new TimestampedFileInputSplit(11, 0, new Path("test/test3"), 0, 100, null); + + + final OneShotLatch latch = new OneShotLatch(); + + BlockingFileInputFormat format = new BlockingFileInputFormat(latch, new Path(hdfsURI)); + TypeInformation typeInfo = TypeExtractor.getInputFormatTypes(format); + + ContinuousFileReaderOperator initReader = new ContinuousFileReaderOperator<>(format); + initReader.setOutputType(typeInfo, new ExecutionConfig()); + + OneInputStreamOperatorTestHarness initTestInstance = + new OneInputStreamOperatorTestHarness<>(initReader); + initTestInstance.setTimeCharacteristic(TimeCharacteristic.EventTime); + initTestInstance.open(); + + // create some state in the reader + initTestInstance.processElement(new StreamRecord<>(split1)); + initTestInstance.processElement(new StreamRecord<>(split2)); + initTestInstance.processElement(new StreamRecord<>(split3)); + initTestInstance.processElement(new StreamRecord<>(split4)); + + // take a snapshot of the operator's state. This will be used + // to initialize another reader and compare the results of the + // two operators. + + final OperatorStateHandles snapshot; + synchronized (initTestInstance.getCheckpointLock()) { + snapshot = initTestInstance.snapshot(0L, 0L); + } + + ContinuousFileReaderOperator restoredReader = new ContinuousFileReaderOperator<>( + new BlockingFileInputFormat(latch, new Path(hdfsURI))); + restoredReader.setOutputType(typeInfo, new ExecutionConfig()); + + OneInputStreamOperatorTestHarness restoredTestInstance = + new OneInputStreamOperatorTestHarness<>(restoredReader); + restoredTestInstance.setTimeCharacteristic(TimeCharacteristic.EventTime); + + restoredTestInstance.initializeState(snapshot); + restoredTestInstance.open(); + + // now let computation start + latch.trigger(); + + // ... and wait for the operators to close gracefully + + synchronized (initTestInstance.getCheckpointLock()) { + initTestInstance.close(); + } + + synchronized (restoredTestInstance.getCheckpointLock()) { + restoredTestInstance.close(); + } + + FileInputSplit fsSplit1 = createSplitFromTimestampedSplit(split1); + FileInputSplit fsSplit2 = createSplitFromTimestampedSplit(split2); + FileInputSplit fsSplit3 = createSplitFromTimestampedSplit(split3); + FileInputSplit fsSplit4 = createSplitFromTimestampedSplit(split4); + + // compare if the results contain what they should contain and also if + // they are the same, as they should. + + Assert.assertTrue(initTestInstance.getOutput().contains(new StreamRecord<>(fsSplit1))); + Assert.assertTrue(initTestInstance.getOutput().contains(new StreamRecord<>(fsSplit2))); + Assert.assertTrue(initTestInstance.getOutput().contains(new StreamRecord<>(fsSplit3))); + Assert.assertTrue(initTestInstance.getOutput().contains(new StreamRecord<>(fsSplit4))); + + Assert.assertArrayEquals( + initTestInstance.getOutput().toArray(), + restoredTestInstance.getOutput().toArray() + ); + } + + private FileInputSplit createSplitFromTimestampedSplit(TimestampedFileInputSplit split) { + Preconditions.checkNotNull(split); + + return new FileInputSplit( + split.getSplitNumber(), + split.getPath(), + split.getStart(), + split.getLength(), + split.getHostnames() + ); + } + + private static class BlockingFileInputFormat extends FileInputFormat { + + private final OneShotLatch latch; + + private FileInputSplit split; + + private boolean reachedEnd; + + BlockingFileInputFormat(OneShotLatch latch, Path filePath) { + super(filePath); + this.latch = latch; + this.reachedEnd = false; + } + + @Override + public void open(FileInputSplit fileSplit) throws IOException { + this.split = fileSplit; + this.reachedEnd = false; + } + + @Override + public boolean reachedEnd() throws IOException { + if (!latch.isTriggered()) { + try { + latch.await(); + } catch (InterruptedException e) { + e.printStackTrace(); + } + } + return reachedEnd; + } + + @Override + public FileInputSplit nextRecord(FileInputSplit reuse) throws IOException { + this.reachedEnd = true; + return split; + } + + @Override + public void close() { + + } + } + //// Monitoring Function Tests ////// @Test @@ -394,7 +540,7 @@ public boolean filterPath(Path filePath) { FileProcessingMode.PROCESS_ONCE, 1, INTERVAL); final FileVerifyingSourceContext context = - new FileVerifyingSourceContext(new OneShotLatch(), monitoringFunction, 0, -1); + new FileVerifyingSourceContext(new OneShotLatch(), monitoringFunction); monitoringFunction.open(new Configuration()); monitoringFunction.run(context); @@ -463,8 +609,7 @@ public void testProcessOnce() throws Exception { new ContinuousFileMonitoringFunction<>(format, hdfsURI, FileProcessingMode.PROCESS_ONCE, 1, INTERVAL); - final FileVerifyingSourceContext context = - new FileVerifyingSourceContext(latch, monitoringFunction, 1, -1); + final FileVerifyingSourceContext context = new FileVerifyingSourceContext(latch, monitoringFunction); final Thread t = new Thread() { @Override @@ -472,6 +617,13 @@ public void run() { try { monitoringFunction.open(new Configuration()); monitoringFunction.run(context); + + // we would never arrive here if we were in + // PROCESS_CONTINUOUSLY mode. + + // this will trigger the latch + context.close(); + } catch (Exception e) { Assert.fail(e.getMessage()); } @@ -572,10 +724,15 @@ private static class FileVerifyingSourceContext extends DummySourceContext { private final ContinuousFileMonitoringFunction src; private final OneShotLatch latch; private final Set seenFiles; - private final int elementsBeforeNotifying; + private int elementsBeforeNotifying = -1; private int elementsBeforeCanceling = -1; + FileVerifyingSourceContext(OneShotLatch latch, + ContinuousFileMonitoringFunction src) { + this(latch, src, -1, -1); + } + FileVerifyingSourceContext(OneShotLatch latch, ContinuousFileMonitoringFunction src, int elementsBeforeNotifying, @@ -594,16 +751,27 @@ Set getSeenFiles() { @Override public void collect(TimestampedFileInputSplit element) { String seenFileName = element.getPath().getName(); - this.seenFiles.add(seenFileName); - if (seenFiles.size() == elementsBeforeNotifying) { + + if (seenFiles.size() == elementsBeforeNotifying && !latch.isTriggered()) { latch.trigger(); } - if (elementsBeforeCanceling != -1 && seenFiles.size() == elementsBeforeCanceling) { + if (seenFiles.size() == elementsBeforeCanceling) { src.cancel(); } } + + @Override + public void close() { + // the context was terminated so trigger so + // that all threads that were waiting for this + // are un-blocked. + if (!latch.isTriggered()) { + latch.trigger(); + } + src.cancel(); + } } private static class ModTimeVerifyingSourceContext extends DummySourceContext { diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileReaderOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileReaderOperator.java index e4fb50d341e4a..9d833abe65596 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileReaderOperator.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileReaderOperator.java @@ -20,13 +20,13 @@ import org.apache.flink.api.common.ExecutionConfig; import org.apache.flink.api.common.io.CheckpointableInputFormat; import org.apache.flink.api.common.io.FileInputFormat; +import org.apache.flink.api.common.state.ListState; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.common.typeutils.TypeSerializer; import org.apache.flink.configuration.Configuration; -import org.apache.flink.core.fs.FSDataInputStream; -import org.apache.flink.core.fs.FSDataOutputStream; import org.apache.flink.metrics.Counter; -import org.apache.flink.streaming.api.operators.StreamCheckpointedOperator; +import org.apache.flink.runtime.state.StateInitializationContext; +import org.apache.flink.runtime.state.StateSnapshotContext; import org.apache.flink.streaming.api.TimeCharacteristic; import org.apache.flink.streaming.api.operators.AbstractStreamOperator; import org.apache.flink.streaming.api.operators.OneInputStreamOperator; @@ -38,8 +38,6 @@ import org.slf4j.LoggerFactory; import java.io.IOException; -import java.io.ObjectInputStream; -import java.io.ObjectOutputStream; import java.io.Serializable; import java.util.ArrayList; import java.util.List; @@ -61,7 +59,7 @@ */ @Internal public class ContinuousFileReaderOperator extends AbstractStreamOperator - implements OneInputStreamOperator, OutputTypeConfigurable, StreamCheckpointedOperator { + implements OneInputStreamOperator, OutputTypeConfigurable { private static final long serialVersionUID = 1L; @@ -74,6 +72,8 @@ public class ContinuousFileReaderOperator extends AbstractStreamOperator reader; private transient SourceFunction.SourceContext readerContext; + + private ListState checkpointedState; private List restoredReaderState; public ContinuousFileReaderOperator(FileInputFormat format) { @@ -85,6 +85,32 @@ public void setOutputType(TypeInformation outTypeInfo, ExecutionConfig exec this.serializer = outTypeInfo.createSerializer(executionConfig); } + @Override + public void initializeState(StateInitializationContext context) throws Exception { + super.initializeState(context); + + checkState(this.checkpointedState == null && this.restoredReaderState == null, + "The reader state has already been initialized."); + + checkpointedState = context.getManagedOperatorStateStore().getSerializableListState("splits"); + + int subtaskIdx = getRuntimeContext().getIndexOfThisSubtask(); + if (context.isRestored()) { + LOG.info("Restoring state for the ContinuousFileReaderOperator (taskIdx={}).", subtaskIdx); + + this.restoredReaderState = new ArrayList<>(); + for (TimestampedFileInputSplit split : this.checkpointedState.get()) { + this.restoredReaderState.add(split); + } + + if (LOG.isDebugEnabled()) { + LOG.debug("ContinuousFileReaderOperator idx {} restored {}.", subtaskIdx, this.restoredReaderState); + } + } else { + LOG.info("No state to restore for the ContinuousFileReaderOperator (taskIdx={}).", subtaskIdx); + } + } + @Override public void open() throws Exception { super.open(); @@ -350,30 +376,24 @@ public void close() { // --------------------- Checkpointing -------------------------- @Override - public void snapshotState(FSDataOutputStream os, long checkpointId, long timestamp) throws Exception { - final ObjectOutputStream oos = new ObjectOutputStream(os); + public void snapshotState(StateSnapshotContext context) throws Exception { + super.snapshotState(context); + + checkState(this.checkpointedState != null, + "The operator state has not been properly initialized."); + int subtaskIdx = getRuntimeContext().getIndexOfThisSubtask(); + LOG.info("Checkpointing state for the ContinuousFileReaderOperator (taskIdx={}).", subtaskIdx); + + this.checkpointedState.clear(); List readerState = this.reader.getReaderState(); - oos.writeInt(readerState.size()); for (TimestampedFileInputSplit split : readerState) { - oos.writeObject(split); + // create a new partition for each entry. + this.checkpointedState.add(split); } - oos.flush(); - } - - @Override - public void restoreState(FSDataInputStream is) throws Exception { - - checkState(this.restoredReaderState == null, - "The reader state has already been initialized."); - - final ObjectInputStream ois = new ObjectInputStream(is); - int noOfSplits = ois.readInt(); - List pendingSplits = new ArrayList<>(noOfSplits); - for (int i = 0; i < noOfSplits; i++) { - pendingSplits.add((TimestampedFileInputSplit) ois.readObject()); + if (LOG.isDebugEnabled()) { + LOG.debug("ContinuousFileReaderOperator idx {} checkpointed {}.", subtaskIdx, readerState); } - this.restoredReaderState = pendingSplits; } } diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/ContinuousFileProcessingRescalingTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/ContinuousFileProcessingRescalingTest.java new file mode 100644 index 0000000000000..466ca65752707 --- /dev/null +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/ContinuousFileProcessingRescalingTest.java @@ -0,0 +1,316 @@ +/* +c * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.runtime.operators; + +import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.common.io.CheckpointableInputFormat; +import org.apache.flink.api.common.io.FileInputFormat; +import org.apache.flink.api.java.typeutils.TypeExtractor; +import org.apache.flink.core.fs.FileInputSplit; +import org.apache.flink.core.fs.Path; +import org.apache.flink.core.testutils.OneShotLatch; +import org.apache.flink.streaming.api.TimeCharacteristic; +import org.apache.flink.streaming.api.functions.source.ContinuousFileReaderOperator; +import org.apache.flink.streaming.api.functions.source.TimestampedFileInputSplit; +import org.apache.flink.streaming.api.watermark.Watermark; +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; +import org.apache.flink.streaming.runtime.tasks.OperatorStateHandles; +import org.apache.flink.streaming.util.AbstractStreamOperatorTestHarness; +import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness; +import org.apache.flink.util.Preconditions; +import org.junit.Assert; +import org.junit.Test; + +import java.io.IOException; +import java.util.ArrayDeque; +import java.util.Queue; + +public class ContinuousFileProcessingRescalingTest { + + @Test + public void testReaderScalingDown() throws Exception { + // simulates the scenario of scaling down from 2 to 1 instances + + final OneShotLatch waitingLatch = new OneShotLatch(); + + // create the first instance and let it process the first split till element 5 + final OneShotLatch triggerLatch1 = new OneShotLatch(); + BlockingFileInputFormat format1 = new BlockingFileInputFormat( + triggerLatch1, waitingLatch, new Path("test"), 20, 5); + FileInputSplit[] splits = format1.createInputSplits(2); + + OneInputStreamOperatorTestHarness testHarness1 = getTestHarness(format1, 2, 0); + testHarness1.open(); + testHarness1.processElement(new StreamRecord<>(getTimestampedSplit(0, splits[0]))); + + // wait until its arrives to element 5 + if (!triggerLatch1.isTriggered()) { + triggerLatch1.await(); + } + + // create the second instance and let it process the second split till element 15 + final OneShotLatch triggerLatch2 = new OneShotLatch(); + BlockingFileInputFormat format2 = new BlockingFileInputFormat( + triggerLatch2, waitingLatch, new Path("test"), 20, 15); + + OneInputStreamOperatorTestHarness testHarness2 = getTestHarness(format2, 2, 1); + testHarness2.open(); + testHarness2.processElement(new StreamRecord<>(getTimestampedSplit(0, splits[1]))); + + // wait until its arrives to element 15 + if (!triggerLatch2.isTriggered()) { + triggerLatch2.await(); + } + + // 1) clear the outputs of the two previous instances so that + // we can compare their newly produced outputs with the merged one + testHarness1.getOutput().clear(); + testHarness2.getOutput().clear(); + + + // 2) and take the snapshots from the previous instances and merge them + // into a new one which will be then used to initialize a third instance + OperatorStateHandles mergedState = AbstractStreamOperatorTestHarness. + repackageState( + testHarness2.snapshot(0, 0), + testHarness1.snapshot(0, 0) + ); + + // create the third instance + final OneShotLatch wLatch = new OneShotLatch(); + final OneShotLatch tLatch = new OneShotLatch(); + + BlockingFileInputFormat format = new BlockingFileInputFormat(wLatch, tLatch, new Path("test"), 20, 5); + OneInputStreamOperatorTestHarness testHarness = getTestHarness(format, 1, 0); + + // initialize the state of the new operator with the constructed by + // combining the partial states of the instances above. + testHarness.initializeState(mergedState); + testHarness.open(); + + // now restart the waiting operators + wLatch.trigger(); + tLatch.trigger(); + waitingLatch.trigger(); + + // and wait for the processing to finish + synchronized (testHarness1.getCheckpointLock()) { + testHarness1.close(); + } + synchronized (testHarness2.getCheckpointLock()) { + testHarness2.close(); + } + synchronized (testHarness.getCheckpointLock()) { + testHarness.close(); + } + + Queue expectedResult = new ArrayDeque<>(); + putElementsInQ(expectedResult, testHarness1.getOutput()); + putElementsInQ(expectedResult, testHarness2.getOutput()); + + Queue actualResult = new ArrayDeque<>(); + putElementsInQ(actualResult, testHarness.getOutput()); + + Assert.assertEquals(20, actualResult.size()); + Assert.assertArrayEquals(expectedResult.toArray(), actualResult.toArray()); + } + + @Test + public void testReaderScalingUp() throws Exception { + // simulates the scenario of scaling up from 1 to 2 instances + + final OneShotLatch waitingLatch1 = new OneShotLatch(); + final OneShotLatch triggerLatch1 = new OneShotLatch(); + + BlockingFileInputFormat format1 = new BlockingFileInputFormat( + triggerLatch1, waitingLatch1, new Path("test"), 20, 5); + FileInputSplit[] splits = format1.createInputSplits(2); + + OneInputStreamOperatorTestHarness testHarness1 = getTestHarness(format1, 1, 0); + testHarness1.open(); + + testHarness1.processElement(new StreamRecord<>(getTimestampedSplit(0, splits[0]))); + testHarness1.processElement(new StreamRecord<>(getTimestampedSplit(1, splits[1]))); + + // wait until its arrives to element 5 + if (!triggerLatch1.isTriggered()) { + triggerLatch1.await(); + } + + // this will be the state shared by the 2 new instances. + OperatorStateHandles snapshot = testHarness1.snapshot(0, 0); + + // 1) clear the output of instance so that we can compare it with one created by the new instances, and + // 2) let the operator process the rest of its state + testHarness1.getOutput().clear(); + waitingLatch1.trigger(); + + // create the second instance and let it process the second split till element 15 + final OneShotLatch triggerLatch2 = new OneShotLatch(); + final OneShotLatch waitingLatch2 = new OneShotLatch(); + + BlockingFileInputFormat format2 = new BlockingFileInputFormat( + triggerLatch2, waitingLatch2, new Path("test"), 20, 15); + + OneInputStreamOperatorTestHarness testHarness2 = getTestHarness(format2, 2, 0); + testHarness2.setup(); + testHarness2.initializeState(snapshot); + testHarness2.open(); + + BlockingFileInputFormat format3 = new BlockingFileInputFormat( + triggerLatch2, waitingLatch2, new Path("test"), 20, 15); + + OneInputStreamOperatorTestHarness testHarness3 = getTestHarness(format3, 2, 1); + testHarness3.setup(); + testHarness3.initializeState(snapshot); + testHarness3.open(); + + triggerLatch2.trigger(); + waitingLatch2.trigger(); + + // and wait for the processing to finish + synchronized (testHarness1.getCheckpointLock()) { + testHarness1.close(); + } + synchronized (testHarness2.getCheckpointLock()) { + testHarness2.close(); + } + synchronized (testHarness3.getCheckpointLock()) { + testHarness3.close(); + } + + Queue expectedResult = new ArrayDeque<>(); + putElementsInQ(expectedResult, testHarness1.getOutput()); + + Queue actualResult = new ArrayDeque<>(); + putElementsInQ(actualResult, testHarness2.getOutput()); + putElementsInQ(actualResult, testHarness3.getOutput()); + + Assert.assertEquals(35, actualResult.size()); + Assert.assertArrayEquals(expectedResult.toArray(), actualResult.toArray()); + } + + private void putElementsInQ(Queue res, Queue partial) { + for (Object o : partial) { + if (o instanceof Watermark) { + continue; + } + res.add(o); + } + } + + private OneInputStreamOperatorTestHarness getTestHarness( + BlockingFileInputFormat format, int noOfTasks, int taksIdx) throws Exception { + + ContinuousFileReaderOperator reader = new ContinuousFileReaderOperator<>(format); + reader.setOutputType(TypeExtractor.getInputFormatTypes(format), new ExecutionConfig()); + + OneInputStreamOperatorTestHarness testHarness = + new OneInputStreamOperatorTestHarness<>(reader, 10, noOfTasks, taksIdx); + testHarness.setTimeCharacteristic(TimeCharacteristic.EventTime); + return testHarness; + } + + private TimestampedFileInputSplit getTimestampedSplit(long modTime, FileInputSplit split) { + Preconditions.checkNotNull(split); + return new TimestampedFileInputSplit( + modTime, + split.getSplitNumber(), + split.getPath(), + split.getStart(), + split.getLength(), + split.getHostnames()); + } + + private static class BlockingFileInputFormat + extends FileInputFormat + implements CheckpointableInputFormat { + + private final OneShotLatch triggerLatch; + private final OneShotLatch waitingLatch; + + private final int elementsBeforeCheckpoint; + private final int linesPerSplit; + + private FileInputSplit split; + + private int state; + + BlockingFileInputFormat(OneShotLatch triggerLatch, + OneShotLatch waitingLatch, + Path filePath, + int sizeOfSplit, + int elementsBeforeCheckpoint) { + super(filePath); + + this.triggerLatch = triggerLatch; + this.waitingLatch = waitingLatch; + this.elementsBeforeCheckpoint = elementsBeforeCheckpoint; + this.linesPerSplit = sizeOfSplit; + + this.state = 0; + } + + @Override + public FileInputSplit[] createInputSplits(int minNumSplits) throws IOException { + FileInputSplit[] splits = new FileInputSplit[minNumSplits]; + for (int i = 0; i < minNumSplits; i++) { + splits[i] = new FileInputSplit(i, getFilePath(), i * linesPerSplit + 1, linesPerSplit, null); + } + return splits; + } + + @Override + public void open(FileInputSplit fileSplit) throws IOException { + this.split = fileSplit; + this.state = 0; + } + + @Override + public void reopen(FileInputSplit split, Integer state) throws IOException { + this.split = split; + this.state = state; + } + + @Override + public Integer getCurrentState() throws IOException { + return state; + } + + @Override + public boolean reachedEnd() throws IOException { + if (state == elementsBeforeCheckpoint) { + triggerLatch.trigger(); + if (!waitingLatch.isTriggered()) { + try { + waitingLatch.await(); + } catch (InterruptedException e) { + e.printStackTrace(); + } + } + } + return state == linesPerSplit; + } + + @Override + public String nextRecord(String reuse) throws IOException { + return reachedEnd() ? null : split.getSplitNumber() + ": test line " + state++; + } + } +} diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/ContinuousFileProcessingCheckpointITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/ContinuousFileProcessingCheckpointITCase.java index 0e9b054af6407..90d88617f5efd 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/ContinuousFileProcessingCheckpointITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/ContinuousFileProcessingCheckpointITCase.java @@ -24,7 +24,7 @@ import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.configuration.Configuration; import org.apache.flink.runtime.state.CheckpointListener; -import org.apache.flink.streaming.api.checkpoint.Checkpointed; +import org.apache.flink.streaming.api.checkpoint.ListCheckpointed; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.sink.RichSinkFunction; @@ -163,11 +163,11 @@ private int getLineNo(String line) { // -------------------------- Task Sink ------------------------------ private static class TestingSinkFunction extends RichSinkFunction - implements Checkpointed>>>, CheckpointListener { + implements ListCheckpointed>>>, CheckpointListener { - private boolean hasFailed; + private boolean hasRestoredAfterFailure; - private volatile boolean hasSuccessfulCheckpoints; + private volatile int successfulCheckpoints; private long elementsToFailure; @@ -176,9 +176,9 @@ private static class TestingSinkFunction extends RichSinkFunction private Map> actualContent = new HashMap<>(); TestingSinkFunction() { - hasFailed = false; + hasRestoredAfterFailure = false; elementCounter = 0; - hasSuccessfulCheckpoints = false; + successfulCheckpoints = 0; } @Override @@ -216,13 +216,13 @@ public void invoke(String value) throws Exception { throw new SuccessException(); } - // add some latency so that we have at least one checkpoint in - if (!hasFailed && !hasSuccessfulCheckpoints) { + // add some latency so that we have at least two checkpoint in + if (!hasRestoredAfterFailure && successfulCheckpoints < 2) { Thread.sleep(5); } // simulate a node failure - if (!hasFailed && hasSuccessfulCheckpoints && elementCounter >= elementsToFailure) { + if (!hasRestoredAfterFailure && successfulCheckpoints >= 2 && elementCounter >= elementsToFailure) { throw new Exception("Task Failure @ elem: " + elementCounter + " / " + elementsToFailure); } } @@ -237,20 +237,22 @@ public void close() { } @Override - public Tuple2>> snapshotState(long checkpointId, long checkpointTimestamp) throws Exception { - return new Tuple2<>(elementCounter, actualContent); + public List>>> snapshotState(long checkpointId, long checkpointTimestamp) throws Exception { + Tuple2>> state = new Tuple2<>(elementCounter, actualContent); + return Collections.singletonList(state); } @Override - public void restoreState(Tuple2>> state) throws Exception { - this.hasFailed = true; - this.elementCounter = state.f0; - this.actualContent = state.f1; + public void restoreState(List>>> state) throws Exception { + Tuple2>> s = state.get(0); + this.elementCounter = s.f0; + this.actualContent = s.f1; + this.hasRestoredAfterFailure = this.elementCounter != 0; // because now restore is also called at initialization } @Override public void notifyCheckpointComplete(long checkpointId) throws Exception { - hasSuccessfulCheckpoints = true; + this.successfulCheckpoints++; } private int getFileIdx(String line) { From fd8d28679b18fb2b3d46b0b05d20017cfe49ab08 Mon Sep 17 00:00:00 2001 From: kl0u Date: Fri, 11 Nov 2016 13:45:04 +0100 Subject: [PATCH 4/4] Decreased logging verbosity --- .../source/ContinuousFileReaderOperator.java | 17 ++++++++--------- 1 file changed, 8 insertions(+), 9 deletions(-) diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileReaderOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileReaderOperator.java index 9d833abe65596..74c58f9a1210d 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileReaderOperator.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileReaderOperator.java @@ -208,7 +208,7 @@ public void close() throws Exception { private class SplitReader extends Thread { - private volatile boolean isClosed; + private volatile boolean shouldClose; private volatile boolean isRunning; @@ -235,7 +235,7 @@ private SplitReader(FileInputFormat format, this.readerContext = checkNotNull(readerContext, "Unspecified Reader Context."); this.checkpointLock = checkNotNull(checkpointLock, "Unspecified checkpoint lock."); - this.isClosed = false; + this.shouldClose = false; this.isRunning = true; this.pendingSplits = new PriorityQueue<>(); @@ -276,10 +276,10 @@ public void run() { // 2) if not wait 50 ms and try again to fetch a new split to read if (currentSplit == null) { - if (!this.isClosed) { - checkpointLock.wait(50); - } else { + if (this.shouldClose) { isRunning = false; + } else { + checkpointLock.wait(50); } continue; } @@ -301,7 +301,7 @@ public void run() { this.isSplitOpen = true; } - LOG.info("Reading split: " + currentSplit); + LOG.debug("Reading split: " + currentSplit); try { OT nextElement = serializer.createInstance(); @@ -333,7 +333,7 @@ public void run() { } finally { synchronized (checkpointLock) { - LOG.info("Reader terminated, and exiting..."); + LOG.debug("Reader terminated, and exiting..."); try { this.format.closeInputFormat(); @@ -369,7 +369,7 @@ public void cancel() { } public void close() { - this.isClosed = true; + this.shouldClose = true; } } @@ -383,7 +383,6 @@ public void snapshotState(StateSnapshotContext context) throws Exception { "The operator state has not been properly initialized."); int subtaskIdx = getRuntimeContext().getIndexOfThisSubtask(); - LOG.info("Checkpointing state for the ContinuousFileReaderOperator (taskIdx={}).", subtaskIdx); this.checkpointedState.clear(); List readerState = this.reader.getReaderState();