From 3bebe3b329390c8678b850890d85adff8b61f627 Mon Sep 17 00:00:00 2001 From: Pramod Immaneni Date: Wed, 16 Mar 2016 20:57:39 -0700 Subject: [PATCH] Using CheckpointNotificationListener and beforeCheckpoint callback to do IO in a more optimized fashion --- .../hbase/AbstractHBasePutOutputOperator.java | 57 ++++++------------- .../kinesis/AbstractKinesisInputOperator.java | 8 ++- .../AbstractKinesisOutputOperator.java | 18 +++++- .../AbstractRabbitMQInputOperator.java | 7 ++- .../redis/AbstractRedisInputOperator.java | 9 ++- .../splunk/SplunkTcpOutputOperator.java | 17 +++++- .../hive/AbstractFSRollingOutputOperator.java | 5 +- .../kafka/AbstractKafkaInputOperator.java | 5 ++ .../lib/io/fs/AbstractFileInputOperator.java | 10 +++- .../lib/io/fs/AbstractFileOutputOperator.java | 49 +++++++++------- .../lib/io/fs/AbstractReconciler.java | 12 ++-- .../datatorrent/lib/io/fs/FileSplitter.java | 7 ++- .../lib/io/fs/FileSplitterInput.java | 7 ++- .../lib/io/jms/AbstractJMSInputOperator.java | 7 ++- .../lib/join/AbstractJoinOperator.java | 7 ++- .../io/fs/AbstractFileOutputOperatorTest.java | 47 +++++++++------ .../AbstractWindowFileOutputOperatorTest.java | 4 +- 17 files changed, 176 insertions(+), 100 deletions(-) diff --git a/contrib/src/main/java/com/datatorrent/contrib/hbase/AbstractHBasePutOutputOperator.java b/contrib/src/main/java/com/datatorrent/contrib/hbase/AbstractHBasePutOutputOperator.java index 397300841b..7f9339450b 100644 --- a/contrib/src/main/java/com/datatorrent/contrib/hbase/AbstractHBasePutOutputOperator.java +++ b/contrib/src/main/java/com/datatorrent/contrib/hbase/AbstractHBasePutOutputOperator.java @@ -22,6 +22,7 @@ import javax.validation.constraints.Min; +import com.datatorrent.api.Operator; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -52,27 +53,21 @@ * The tuple type * @since 1.0.2 */ -public abstract class AbstractHBasePutOutputOperator extends AbstractStoreOutputOperator { +public abstract class AbstractHBasePutOutputOperator extends AbstractStoreOutputOperator implements Operator.CheckpointNotificationListener { private static final transient Logger logger = LoggerFactory.getLogger(AbstractHBasePutOutputOperator.class); - public static final int DEFAULT_BATCH_SIZE = 1000; - private int batchSize = DEFAULT_BATCH_SIZE; - protected int unCommittedSize = 0; - public AbstractHBasePutOutputOperator() { + public AbstractHBasePutOutputOperator() + { store = new HBaseStore(); } @Override - public void processTuple(T tuple) { + public void processTuple(T tuple) + { HTable table = store.getTable(); Put put = operationPut(tuple); try { table.put(put); - if( ++unCommittedSize >= batchSize ) - { - table.flushCommits(); - unCommittedSize = 0; - } } catch (RetriesExhaustedWithDetailsException e) { logger.error("Could not output tuple", e); DTThrowable.rethrow(e); @@ -80,46 +75,30 @@ public void processTuple(T tuple) { logger.error("Could not output tuple", e); DTThrowable.rethrow(e); } - } @Override - public void endWindow() + public void beforeCheckpoint(long windowId) { - try - { - if( unCommittedSize > 0 ) { - store.getTable().flushCommits(); - unCommittedSize = 0; - } - } - catch (RetriesExhaustedWithDetailsException e) { - logger.error("Could not output tuple", e); - DTThrowable.rethrow(e); + try { + store.getTable().flushCommits(); } catch (InterruptedIOException e) { - logger.error("Could not output tuple", e); + DTThrowable.rethrow(e); + } catch (RetriesExhaustedWithDetailsException e) { DTThrowable.rethrow(e); } } - public abstract Put operationPut(T t); + @Override + public void checkpointed(long l) { - /** - * the batch size save flush data - */ - @Min(1) - public int getBatchSize() - { - return batchSize; } - /** - * the batch size save flush data - */ - public void setBatchSize(int batchSize) - { - this.batchSize = batchSize; - } + @Override + public void committed(long l) { + } + public abstract Put operationPut(T t); + } diff --git a/contrib/src/main/java/com/datatorrent/contrib/kinesis/AbstractKinesisInputOperator.java b/contrib/src/main/java/com/datatorrent/contrib/kinesis/AbstractKinesisInputOperator.java index c03df21db5..fc10bead53 100644 --- a/contrib/src/main/java/com/datatorrent/contrib/kinesis/AbstractKinesisInputOperator.java +++ b/contrib/src/main/java/com/datatorrent/contrib/kinesis/AbstractKinesisInputOperator.java @@ -86,7 +86,7 @@ public KinesisPair(F first, S second) * @since 2.0.0 */ @SuppressWarnings("rawtypes") -public abstract class AbstractKinesisInputOperator implements InputOperator, ActivationListener, Partitioner, StatsListener,Operator.CheckpointListener +public abstract class AbstractKinesisInputOperator implements InputOperator, ActivationListener, Partitioner, StatsListener,Operator.CheckpointNotificationListener { private static final Logger logger = LoggerFactory.getLogger(AbstractKinesisInputOperator.class); @@ -530,6 +530,12 @@ public void committed(long windowId) public void checkpointed(long windowId) { } + + @Override + public void beforeCheckpoint(long windowId) + { + } + /** * Implement ActivationListener Interface. */ diff --git a/contrib/src/main/java/com/datatorrent/contrib/kinesis/AbstractKinesisOutputOperator.java b/contrib/src/main/java/com/datatorrent/contrib/kinesis/AbstractKinesisOutputOperator.java index 43fc62af77..d6f9d361fc 100644 --- a/contrib/src/main/java/com/datatorrent/contrib/kinesis/AbstractKinesisOutputOperator.java +++ b/contrib/src/main/java/com/datatorrent/contrib/kinesis/AbstractKinesisOutputOperator.java @@ -49,7 +49,7 @@ * @param * @since 2.0.0 */ -public abstract class AbstractKinesisOutputOperator implements Operator +public abstract class AbstractKinesisOutputOperator implements Operator, Operator.CheckpointNotificationListener { private static final Logger logger = LoggerFactory.getLogger( AbstractKinesisOutputOperator.class ); protected String streamName; @@ -91,6 +91,10 @@ public void beginWindow(long windowId) { } + @Override + public void endWindow() { + } + /** * Implement Component Interface. */ @@ -103,7 +107,7 @@ public void teardown() * Implement Operator Interface. */ @Override - public void endWindow() + public void beforeCheckpoint(long windowId) { if (isBatchProcessing && putRecordsRequestEntryList.size() != 0) { try { @@ -114,6 +118,16 @@ public void endWindow() } } + @Override + public void checkpointed(long windowId) + { + } + + @Override + public void committed(long windowId) + { + } + /** * Implement Component Interface. * diff --git a/contrib/src/main/java/com/datatorrent/contrib/rabbitmq/AbstractRabbitMQInputOperator.java b/contrib/src/main/java/com/datatorrent/contrib/rabbitmq/AbstractRabbitMQInputOperator.java index 08157bc0fb..672122c52f 100644 --- a/contrib/src/main/java/com/datatorrent/contrib/rabbitmq/AbstractRabbitMQInputOperator.java +++ b/contrib/src/main/java/com/datatorrent/contrib/rabbitmq/AbstractRabbitMQInputOperator.java @@ -77,7 +77,7 @@ */ public abstract class AbstractRabbitMQInputOperator implements InputOperator, Operator.ActivationListener, - Operator.CheckpointListener + Operator.CheckpointNotificationListener { private static final Logger logger = LoggerFactory.getLogger(AbstractRabbitMQInputOperator.class); @NotNull @@ -311,6 +311,11 @@ public void deactivate() } } + @Override + public void beforeCheckpoint(long windowId) + { + } + @Override public void checkpointed(long windowId) { diff --git a/contrib/src/main/java/com/datatorrent/contrib/redis/AbstractRedisInputOperator.java b/contrib/src/main/java/com/datatorrent/contrib/redis/AbstractRedisInputOperator.java index 0b12574e45..092a3c6290 100644 --- a/contrib/src/main/java/com/datatorrent/contrib/redis/AbstractRedisInputOperator.java +++ b/contrib/src/main/java/com/datatorrent/contrib/redis/AbstractRedisInputOperator.java @@ -31,7 +31,7 @@ import redis.clients.jedis.ScanParams; import redis.clients.jedis.ScanResult; -import com.datatorrent.api.Operator.CheckpointListener; +import com.datatorrent.api.Operator.CheckpointNotificationListener; import com.datatorrent.api.Context.OperatorContext; import com.datatorrent.netlet.util.DTThrowable; import com.datatorrent.lib.db.AbstractKeyValueStoreInputOperator; @@ -47,7 +47,7 @@ * The tuple type. * @since 0.9.3 */ -public abstract class AbstractRedisInputOperator extends AbstractKeyValueStoreInputOperator implements CheckpointListener +public abstract class AbstractRedisInputOperator extends AbstractKeyValueStoreInputOperator implements CheckpointNotificationListener { protected transient List keys = new ArrayList(); protected transient Integer scanOffset; @@ -224,6 +224,11 @@ public void emitTuples() abstract public void processTuples(); + @Override + public void beforeCheckpoint(long windowId) + { + } + @Override public void checkpointed(long windowId) { diff --git a/contrib/src/main/java/com/datatorrent/contrib/splunk/SplunkTcpOutputOperator.java b/contrib/src/main/java/com/datatorrent/contrib/splunk/SplunkTcpOutputOperator.java index cb813754b9..f7e98b6a4f 100644 --- a/contrib/src/main/java/com/datatorrent/contrib/splunk/SplunkTcpOutputOperator.java +++ b/contrib/src/main/java/com/datatorrent/contrib/splunk/SplunkTcpOutputOperator.java @@ -19,6 +19,7 @@ package com.datatorrent.contrib.splunk; import com.datatorrent.api.Context.OperatorContext; +import com.datatorrent.api.Operator; import com.datatorrent.lib.db.AbstractStoreOutputOperator; import com.splunk.TcpInput; @@ -34,7 +35,7 @@ * @tags splunk * @since 1.0.4 */ -public class SplunkTcpOutputOperator extends AbstractStoreOutputOperator { +public class SplunkTcpOutputOperator extends AbstractStoreOutputOperator implements Operator.CheckpointNotificationListener { private String tcpPort; private transient Socket socket; @@ -75,8 +76,8 @@ public void processTuple(T tuple) { } @Override - public void endWindow() { - + public void beforeCheckpoint(long windowId) + { try { stream.flush(); } catch (IOException e) { @@ -84,6 +85,16 @@ public void endWindow() { } } + @Override + public void checkpointed(long windowId) + { + } + + @Override + public void committed(long windowId) + { + } + @Override public void teardown() { diff --git a/hive/src/main/java/com/datatorrent/contrib/hive/AbstractFSRollingOutputOperator.java b/hive/src/main/java/com/datatorrent/contrib/hive/AbstractFSRollingOutputOperator.java index 3c9c4da29a..37b5f2e6ac 100755 --- a/hive/src/main/java/com/datatorrent/contrib/hive/AbstractFSRollingOutputOperator.java +++ b/hive/src/main/java/com/datatorrent/contrib/hive/AbstractFSRollingOutputOperator.java @@ -37,7 +37,7 @@ import com.datatorrent.api.Context.OperatorContext; import com.datatorrent.api.DAG; import com.datatorrent.api.DefaultOutputPort; -import com.datatorrent.api.Operator.CheckpointListener; +import com.datatorrent.api.Operator.CheckpointNotificationListener; import com.datatorrent.api.annotation.Stateless; import com.datatorrent.lib.io.fs.AbstractFileOutputOperator; import com.datatorrent.netlet.util.DTThrowable; @@ -52,8 +52,7 @@ * * @since 2.1.0 */ -public abstract class AbstractFSRollingOutputOperator extends AbstractFileOutputOperator - implements CheckpointListener +public abstract class AbstractFSRollingOutputOperator extends AbstractFileOutputOperator implements CheckpointNotificationListener { private transient String outputFilePath; protected MutableInt partNumber; diff --git a/kafka/src/main/java/org/apache/apex/malhar/kafka/AbstractKafkaInputOperator.java b/kafka/src/main/java/org/apache/apex/malhar/kafka/AbstractKafkaInputOperator.java index 4cf28885bf..1416ebeb7b 100644 --- a/kafka/src/main/java/org/apache/apex/malhar/kafka/AbstractKafkaInputOperator.java +++ b/kafka/src/main/java/org/apache/apex/malhar/kafka/AbstractKafkaInputOperator.java @@ -193,6 +193,11 @@ public void deactivate() consumerWrapper.stop(); } + @Override + public void beforeCheckpoint(long windowId) + { + } + @Override public void checkpointed(long l) { diff --git a/library/src/main/java/com/datatorrent/lib/io/fs/AbstractFileInputOperator.java b/library/src/main/java/com/datatorrent/lib/io/fs/AbstractFileInputOperator.java index f0e3fbb4d5..c7aefe267b 100644 --- a/library/src/main/java/com/datatorrent/lib/io/fs/AbstractFileInputOperator.java +++ b/library/src/main/java/com/datatorrent/lib/io/fs/AbstractFileInputOperator.java @@ -37,6 +37,7 @@ import java.util.regex.Matcher; import java.util.regex.Pattern; +import javax.validation.OverridesAttribute; import javax.validation.constraints.NotNull; import org.slf4j.Logger; @@ -96,8 +97,8 @@ * @param The type of the object that this input operator reads. * @since 1.0.2 */ -public abstract class AbstractFileInputOperator - implements InputOperator, Partitioner>, StatsListener, Operator.CheckpointListener +public abstract class AbstractFileInputOperator implements InputOperator, Partitioner>, StatsListener, + Operator.CheckpointListener, Operator.CheckpointNotificationListener { private static final Logger LOG = LoggerFactory.getLogger(AbstractFileInputOperator.class); @@ -907,6 +908,11 @@ public void partitioned(Map>> pa currentPartitions = partitions.size(); } + @Override + public void beforeCheckpoint(long windowId) + { + } + @Override public void checkpointed(long windowId) { diff --git a/library/src/main/java/com/datatorrent/lib/io/fs/AbstractFileOutputOperator.java b/library/src/main/java/com/datatorrent/lib/io/fs/AbstractFileOutputOperator.java index 0195f7f432..ee1c412f1e 100644 --- a/library/src/main/java/com/datatorrent/lib/io/fs/AbstractFileOutputOperator.java +++ b/library/src/main/java/com/datatorrent/lib/io/fs/AbstractFileOutputOperator.java @@ -110,7 +110,7 @@ * @since 2.0.0 */ @OperatorAnnotation(checkpointableWithinAppWindow = false) -public abstract class AbstractFileOutputOperator extends BaseOperator implements Operator.CheckpointListener +public abstract class AbstractFileOutputOperator extends BaseOperator implements Operator.CheckpointListener, Operator.CheckpointNotificationListener { private static final Logger LOG = LoggerFactory.getLogger(AbstractFileOutputOperator.class); @@ -260,6 +260,8 @@ public abstract class AbstractFileOutputOperator extends BaseOperator imp private Long expireStreamAfterAccessMillis; private final Set filesWithOpenStreams; + private boolean initializeContext; + /** * This input port receives incoming tuples. */ @@ -923,13 +925,16 @@ protected String getPartFileName(String fileName, int part) @Override public void beginWindow(long windowId) { - try { - Map openStreams = streamsCache.asMap(); - for (FSFilterStreamContext streamContext : openStreams.values()) { - streamContext.initializeContext(); + if (initializeContext) { + try { + Map openStreams = streamsCache.asMap(); + for (FSFilterStreamContext streamContext : openStreams.values()) { + streamContext.initializeContext(); + } + } catch (IOException e) { + throw new RuntimeException(e); } - } catch (IOException e) { - throw new RuntimeException(e); + initializeContext = false; } currentWindow = windowId; } @@ -937,18 +942,6 @@ public void beginWindow(long windowId) @Override public void endWindow() { - try { - Map openStreams = streamsCache.asMap(); - for (FSFilterStreamContext streamContext: openStreams.values()) { - long start = System.currentTimeMillis(); - streamContext.finalizeContext(); - totalWritingTime += System.currentTimeMillis() - start; - //streamContext.resetFilter(); - } - } catch (IOException e) { - throw new RuntimeException(e); - } - if (rotationWindows > 0) { if (++rotationCount == rotationWindows) { rotationCount = 0; @@ -1195,6 +1188,24 @@ public void close() throws IOException } } + @Override + public void beforeCheckpoint(long l) + { + try { + Map openStreams = streamsCache.asMap(); + for (FSFilterStreamContext streamContext: openStreams.values()) { + long start = System.currentTimeMillis(); + streamContext.finalizeContext(); + totalWritingTime += System.currentTimeMillis() - start; + //streamContext.resetFilter(); + // Re-initialize context when next window starts after checkpoint + initializeContext = true; + } + } catch (IOException e) { + throw new RuntimeException(e); + } + } + @Override public void checkpointed(long l) { diff --git a/library/src/main/java/com/datatorrent/lib/io/fs/AbstractReconciler.java b/library/src/main/java/com/datatorrent/lib/io/fs/AbstractReconciler.java index 945c00037c..c12becd120 100644 --- a/library/src/main/java/com/datatorrent/lib/io/fs/AbstractReconciler.java +++ b/library/src/main/java/com/datatorrent/lib/io/fs/AbstractReconciler.java @@ -25,17 +25,14 @@ import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.atomic.AtomicReference; - import org.slf4j.Logger; import org.slf4j.LoggerFactory; - import com.google.common.collect.Maps; import com.google.common.collect.Queues; - import com.datatorrent.api.Context; import com.datatorrent.api.DefaultInputPort; -import com.datatorrent.api.Operator.CheckpointListener; import com.datatorrent.api.Operator.IdleTimeHandler; +import com.datatorrent.api.Operator.CheckpointNotificationListener; import com.datatorrent.common.util.BaseOperator; import com.datatorrent.common.util.NameableThreadFactory; import com.datatorrent.netlet.util.DTThrowable; @@ -55,7 +52,7 @@ * @since 2.0.0 */ @org.apache.hadoop.classification.InterfaceStability.Evolving -public abstract class AbstractReconciler extends BaseOperator implements CheckpointListener, IdleTimeHandler +public abstract class AbstractReconciler extends BaseOperator implements CheckpointNotificationListener, IdleTimeHandler { private static final Logger logger = LoggerFactory.getLogger(AbstractReconciler.class); public transient DefaultInputPort input = new DefaultInputPort() @@ -124,6 +121,11 @@ public void handleIdleTime() } + @Override + public void beforeCheckpoint(long l) + { + } + @Override public void checkpointed(long l) { diff --git a/library/src/main/java/com/datatorrent/lib/io/fs/FileSplitter.java b/library/src/main/java/com/datatorrent/lib/io/fs/FileSplitter.java index 4bb53e5ee7..b9594b324c 100644 --- a/library/src/main/java/com/datatorrent/lib/io/fs/FileSplitter.java +++ b/library/src/main/java/com/datatorrent/lib/io/fs/FileSplitter.java @@ -79,7 +79,7 @@ */ @OperatorAnnotation(checkpointableWithinAppWindow = false) @Deprecated -public class FileSplitter implements InputOperator, Operator.CheckpointListener +public class FileSplitter implements InputOperator, Operator.CheckpointListener, Operator.CheckpointNotificationListener { protected Long blockSize; private int sequenceNo; @@ -379,6 +379,11 @@ public WindowDataManager getWindowDataManager() return this.windowDataManager; } + @Override + public void beforeCheckpoint(long l) + { + } + @Override public void checkpointed(long l) { diff --git a/library/src/main/java/com/datatorrent/lib/io/fs/FileSplitterInput.java b/library/src/main/java/com/datatorrent/lib/io/fs/FileSplitterInput.java index 3763ef03a4..9f88243658 100644 --- a/library/src/main/java/com/datatorrent/lib/io/fs/FileSplitterInput.java +++ b/library/src/main/java/com/datatorrent/lib/io/fs/FileSplitterInput.java @@ -77,7 +77,7 @@ * @since 2.0.0 */ @OperatorAnnotation(checkpointableWithinAppWindow = false) -public class FileSplitterInput extends AbstractFileSplitter implements InputOperator, Operator.CheckpointListener +public class FileSplitterInput extends AbstractFileSplitter implements InputOperator, Operator.CheckpointListener, Operator.CheckpointNotificationListener { @NotNull private WindowDataManager windowDataManager; @@ -229,6 +229,11 @@ protected FileStatus getFileStatus(Path path) throws IOException return scanner.fs.getFileStatus(path); } + @Override + public void beforeCheckpoint(long l) + { + } + @Override public void checkpointed(long l) { diff --git a/library/src/main/java/com/datatorrent/lib/io/jms/AbstractJMSInputOperator.java b/library/src/main/java/com/datatorrent/lib/io/jms/AbstractJMSInputOperator.java index 2b8b58daff..72bf63c1dc 100644 --- a/library/src/main/java/com/datatorrent/lib/io/jms/AbstractJMSInputOperator.java +++ b/library/src/main/java/com/datatorrent/lib/io/jms/AbstractJMSInputOperator.java @@ -80,7 +80,7 @@ @OperatorAnnotation(checkpointableWithinAppWindow = false) public abstract class AbstractJMSInputOperator extends JMSBase implements InputOperator, ActivationListener, MessageListener, ExceptionListener, - Operator.IdleTimeHandler, Operator.CheckpointListener + Operator.IdleTimeHandler, Operator.CheckpointListener, Operator.CheckpointNotificationListener { protected static final int DEFAULT_BUFFER_SIZE = 10 * 1024; // 10k @@ -394,6 +394,11 @@ protected void acknowledge() throws JMSException } } + @Override + public void beforeCheckpoint(long windowId) + { + } + @Override public void checkpointed(long windowId) { diff --git a/library/src/main/java/com/datatorrent/lib/join/AbstractJoinOperator.java b/library/src/main/java/com/datatorrent/lib/join/AbstractJoinOperator.java index 89df25e565..d0f722d865 100644 --- a/library/src/main/java/com/datatorrent/lib/join/AbstractJoinOperator.java +++ b/library/src/main/java/com/datatorrent/lib/join/AbstractJoinOperator.java @@ -64,7 +64,7 @@ * @since 3.4.0 */ @InterfaceStability.Unstable -public abstract class AbstractJoinOperator extends BaseOperator implements Operator.CheckpointListener +public abstract class AbstractJoinOperator extends BaseOperator implements Operator.CheckpointNotificationListener { @AutoMetric private long tuplesJoinedPerSec; @@ -225,6 +225,11 @@ public void beginWindow(long windowId) tuplesCount = 0; } + @Override + public void beforeCheckpoint(long windowId) + { + } + @Override public void checkpointed(long windowId) { diff --git a/library/src/test/java/com/datatorrent/lib/io/fs/AbstractFileOutputOperatorTest.java b/library/src/test/java/com/datatorrent/lib/io/fs/AbstractFileOutputOperatorTest.java index 03f3bf69d8..5b58121242 100644 --- a/library/src/test/java/com/datatorrent/lib/io/fs/AbstractFileOutputOperatorTest.java +++ b/library/src/test/java/com/datatorrent/lib/io/fs/AbstractFileOutputOperatorTest.java @@ -243,8 +243,11 @@ private void populateFile(String fileName, String contents) throws IOException * @param writer The writer to checkpoint. * @return new writer. */ - public static AbstractFileOutputOperator checkpoint(AbstractFileOutputOperator writer) + public static AbstractFileOutputOperator checkpoint(AbstractFileOutputOperator writer, long windowId) { + if (windowId >= 0) { + writer.beforeCheckpoint(windowId); + } Kryo kryo = new Kryo(); ByteArrayOutputStream bos = new ByteArrayOutputStream(); Output loutput = new Output(bos); @@ -418,7 +421,7 @@ private void testSingleFileFailedWriteHelper(SingleHDFSExactlyOnceWriter writer) writer.input.put(1); writer.endWindow(); - AbstractFileOutputOperator checkPointWriter = checkpoint(writer); + AbstractFileOutputOperator checkPointWriter = checkpoint(writer, 0); writer.beginWindow(1); writer.input.put(2); @@ -577,6 +580,7 @@ private void testMultiFileCompletedWriteHelperCache1(EvenOddHDFSExactlyOnceWrite writer.requestFinalize(EVEN_FILE); writer.requestFinalize(ODD_FILE); + writer.beforeCheckpoint(1); writer.committed(1); } @@ -603,6 +607,7 @@ private void testMultiFileCompletedWriteHelper(EvenOddHDFSExactlyOnceWriter writ writer.requestFinalize(ODD_FILE); writer.requestFinalize(EVEN_FILE); + writer.beforeCheckpoint(1); writer.committed(1); } @@ -672,7 +677,7 @@ private void testMultiFileFailedWriteHelper(EvenOddHDFSExactlyOnceWriter writer) writer.requestFinalize(EVEN_FILE); writer.endWindow(); - AbstractFileOutputOperator checkPointWriter = checkpoint(writer); + AbstractFileOutputOperator checkPointWriter = checkpoint(writer, 0); writer.beginWindow(1); writer.input.put(4); @@ -690,6 +695,7 @@ private void testMultiFileFailedWriteHelper(EvenOddHDFSExactlyOnceWriter writer) writer.input.put(8); writer.input.put(9); writer.endWindow(); + writer.beforeCheckpoint(2); writer.committed(2); } @@ -815,7 +821,7 @@ private void testSingleRollingFileFailedWriteHelper(SingleHDFSExactlyOnceWriter writer.input.put(2); writer.endWindow(); - AbstractFileOutputOperator checkPointWriter = checkpoint(writer); + AbstractFileOutputOperator checkPointWriter = checkpoint(writer, 0); writer.beginWindow(1); writer.input.put(3); @@ -862,8 +868,8 @@ public void testSingleRollingFileFailedWrite1() writer.input.put(4); writer.endWindow(); - AbstractFileOutputOperator checkPointWriter = checkpoint(writer); - AbstractFileOutputOperator checkPointWriter1 = checkpoint(writer); + AbstractFileOutputOperator checkPointWriter = checkpoint(writer, 0); + AbstractFileOutputOperator checkPointWriter1 = checkpoint(writer, -1); LOG.debug("Checkpoint endOffsets={}", checkPointWriter.endOffsets); @@ -1128,7 +1134,7 @@ private void testMultiRollingFileFailedWriteHelper(EvenOddHDFSExactlyOnceWriter writer.input.put(1); writer.endWindow(); - AbstractFileOutputOperator checkPointWriter = checkpoint(writer); + AbstractFileOutputOperator checkPointWriter = checkpoint(writer, 0); writer.beginWindow(1); writer.input.put(2); @@ -1225,7 +1231,7 @@ private void testMultiRollingFileFailedWriteOverwriteHelperCache1(EvenOddHDFSExa writer.input.put(1); writer.endWindow(); - AbstractFileOutputOperator checkPointWriter = checkpoint(writer); + AbstractFileOutputOperator checkPointWriter = checkpoint(writer, 0); writer.beginWindow(1); writer.input.put(2); @@ -1272,7 +1278,7 @@ private void testMultiRollingFileFailedWriteOverwriteHelper(EvenOddHDFSExactlyOn writer.input.process(1); writer.endWindow(); - AbstractFileOutputOperator checkPointWriter = checkpoint(writer); + AbstractFileOutputOperator checkPointWriter = checkpoint(writer, 0); writer.beginWindow(1); writer.input.process(2); @@ -1363,7 +1369,7 @@ private void singleFileMultiRollingFailureHelper(SingleHDFSExactlyOnceWriter wri writer.input.put(3); writer.input.put(4); - AbstractFileOutputOperator checkPointWriter = checkpoint(writer); + AbstractFileOutputOperator checkPointWriter = checkpoint(writer, -1); writer.input.put(3); writer.input.put(4); @@ -1554,8 +1560,11 @@ public void testCompression() throws IOException writer.input.put(i); } writer.endWindow(); - evenOffsets.add(evenFile.length()); - oddOffsets.add(oddFile.length()); + if ((i % 2) == 1) { + writer.beforeCheckpoint(i); + evenOffsets.add(evenFile.length()); + oddOffsets.add(oddFile.length()); + } } writer.teardown(); @@ -1580,6 +1589,7 @@ public void testRecoveryOfOpenFiles() writer.input.put(2); writer.input.put(3); writer.endWindow(); + writer.beforeCheckpoint(0); //failure and restored writer.setup(testMeta.testOperatorContext); @@ -1756,11 +1766,14 @@ protected FilterStreamContext createFilterStreamContext(Outp for (int j = 0; j < 1000; ++j) { writer.input.put(i); } - writer.endWindow(); - evenOffsets.add(evenCounterContext.getCounter()); - oddOffsets.add(oddCounterContext.getCounter()); - //evenOffsets.add(evenFile.length()); - //oddOffsets.add(oddFile.length()); + //writer.endWindow(); + if ((i % 2) == 1) { + writer.beforeCheckpoint(i); + evenOffsets.add(evenCounterContext.getCounter()); + oddOffsets.add(oddCounterContext.getCounter()); + //evenOffsets.add(evenFile.length()); + //oddOffsets.add(oddFile.length()); + } } writer.teardown(); diff --git a/library/src/test/java/com/datatorrent/lib/io/fs/AbstractWindowFileOutputOperatorTest.java b/library/src/test/java/com/datatorrent/lib/io/fs/AbstractWindowFileOutputOperatorTest.java index 32c32f7bd5..bcb1bc3827 100644 --- a/library/src/test/java/com/datatorrent/lib/io/fs/AbstractWindowFileOutputOperatorTest.java +++ b/library/src/test/java/com/datatorrent/lib/io/fs/AbstractWindowFileOutputOperatorTest.java @@ -70,7 +70,7 @@ public void testOperator() oper.input.process("window 0"); oper.endWindow(); - AbstractFileOutputOperator checkPoint = AbstractFileOutputOperatorTest.checkpoint(oper); + AbstractFileOutputOperator checkPoint = AbstractFileOutputOperatorTest.checkpoint(oper, 0); oper.beginWindow(1); oper.input.process("window 1"); @@ -110,7 +110,7 @@ public void testOperatorMidWindowRestore() oper.beginWindow(1); oper.input.process("1"); - AbstractFileOutputOperator checkPoint = AbstractFileOutputOperatorTest.checkpoint(oper); + AbstractFileOutputOperator checkPoint = AbstractFileOutputOperatorTest.checkpoint(oper, -1); oper.input.process("1"); oper.teardown();