From 28a7e347691dc5dbe364888c4520f5c31139bc39 Mon Sep 17 00:00:00 2001 From: Pramod Immaneni Date: Wed, 16 Mar 2016 20:57:39 -0700 Subject: [PATCH 1/2] Using CheckpointNotificationListener and beforeCheckpoint callback to do IO in a more optimized fashion --- .../hbase/AbstractHBasePutOutputOperator.java | 57 ++++++------------- .../kinesis/AbstractKinesisInputOperator.java | 8 ++- .../AbstractKinesisOutputOperator.java | 18 +++++- .../AbstractRabbitMQInputOperator.java | 7 ++- .../redis/AbstractRedisInputOperator.java | 9 ++- .../splunk/SplunkTcpOutputOperator.java | 17 +++++- .../hive/AbstractFSRollingOutputOperator.java | 5 +- .../lib/io/fs/AbstractFileInputOperator.java | 10 +++- .../lib/io/fs/AbstractFileOutputOperator.java | 49 +++++++++------- .../lib/io/fs/AbstractReconciler.java | 12 ++-- .../datatorrent/lib/io/fs/FileSplitter.java | 7 ++- .../lib/io/fs/FileSplitterInput.java | 7 ++- .../lib/io/jms/AbstractJMSInputOperator.java | 7 ++- .../lib/join/AbstractJoinOperator.java | 7 ++- .../io/fs/AbstractFileOutputOperatorTest.java | 47 +++++++++------ .../AbstractWindowFileOutputOperatorTest.java | 4 +- 16 files changed, 171 insertions(+), 100 deletions(-) diff --git a/contrib/src/main/java/com/datatorrent/contrib/hbase/AbstractHBasePutOutputOperator.java b/contrib/src/main/java/com/datatorrent/contrib/hbase/AbstractHBasePutOutputOperator.java index 397300841b..7f9339450b 100644 --- a/contrib/src/main/java/com/datatorrent/contrib/hbase/AbstractHBasePutOutputOperator.java +++ b/contrib/src/main/java/com/datatorrent/contrib/hbase/AbstractHBasePutOutputOperator.java @@ -22,6 +22,7 @@ import javax.validation.constraints.Min; +import com.datatorrent.api.Operator; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -52,27 +53,21 @@ * The tuple type * @since 1.0.2 */ -public abstract class AbstractHBasePutOutputOperator extends AbstractStoreOutputOperator { +public abstract class AbstractHBasePutOutputOperator extends AbstractStoreOutputOperator implements Operator.CheckpointNotificationListener { private static final transient Logger logger = LoggerFactory.getLogger(AbstractHBasePutOutputOperator.class); - public static final int DEFAULT_BATCH_SIZE = 1000; - private int batchSize = DEFAULT_BATCH_SIZE; - protected int unCommittedSize = 0; - public AbstractHBasePutOutputOperator() { + public AbstractHBasePutOutputOperator() + { store = new HBaseStore(); } @Override - public void processTuple(T tuple) { + public void processTuple(T tuple) + { HTable table = store.getTable(); Put put = operationPut(tuple); try { table.put(put); - if( ++unCommittedSize >= batchSize ) - { - table.flushCommits(); - unCommittedSize = 0; - } } catch (RetriesExhaustedWithDetailsException e) { logger.error("Could not output tuple", e); DTThrowable.rethrow(e); @@ -80,46 +75,30 @@ public void processTuple(T tuple) { logger.error("Could not output tuple", e); DTThrowable.rethrow(e); } - } @Override - public void endWindow() + public void beforeCheckpoint(long windowId) { - try - { - if( unCommittedSize > 0 ) { - store.getTable().flushCommits(); - unCommittedSize = 0; - } - } - catch (RetriesExhaustedWithDetailsException e) { - logger.error("Could not output tuple", e); - DTThrowable.rethrow(e); + try { + store.getTable().flushCommits(); } catch (InterruptedIOException e) { - logger.error("Could not output tuple", e); + DTThrowable.rethrow(e); + } catch (RetriesExhaustedWithDetailsException e) { DTThrowable.rethrow(e); } } - public abstract Put operationPut(T t); + @Override + public void checkpointed(long l) { - /** - * the batch size save flush data - */ - @Min(1) - public int getBatchSize() - { - return batchSize; } - /** - * the batch size save flush data - */ - public void setBatchSize(int batchSize) - { - this.batchSize = batchSize; - } + @Override + public void committed(long l) { + } + public abstract Put operationPut(T t); + } diff --git a/contrib/src/main/java/com/datatorrent/contrib/kinesis/AbstractKinesisInputOperator.java b/contrib/src/main/java/com/datatorrent/contrib/kinesis/AbstractKinesisInputOperator.java index c03df21db5..fc10bead53 100644 --- a/contrib/src/main/java/com/datatorrent/contrib/kinesis/AbstractKinesisInputOperator.java +++ b/contrib/src/main/java/com/datatorrent/contrib/kinesis/AbstractKinesisInputOperator.java @@ -86,7 +86,7 @@ public KinesisPair(F first, S second) * @since 2.0.0 */ @SuppressWarnings("rawtypes") -public abstract class AbstractKinesisInputOperator implements InputOperator, ActivationListener, Partitioner, StatsListener,Operator.CheckpointListener +public abstract class AbstractKinesisInputOperator implements InputOperator, ActivationListener, Partitioner, StatsListener,Operator.CheckpointNotificationListener { private static final Logger logger = LoggerFactory.getLogger(AbstractKinesisInputOperator.class); @@ -530,6 +530,12 @@ public void committed(long windowId) public void checkpointed(long windowId) { } + + @Override + public void beforeCheckpoint(long windowId) + { + } + /** * Implement ActivationListener Interface. */ diff --git a/contrib/src/main/java/com/datatorrent/contrib/kinesis/AbstractKinesisOutputOperator.java b/contrib/src/main/java/com/datatorrent/contrib/kinesis/AbstractKinesisOutputOperator.java index 43fc62af77..d6f9d361fc 100644 --- a/contrib/src/main/java/com/datatorrent/contrib/kinesis/AbstractKinesisOutputOperator.java +++ b/contrib/src/main/java/com/datatorrent/contrib/kinesis/AbstractKinesisOutputOperator.java @@ -49,7 +49,7 @@ * @param * @since 2.0.0 */ -public abstract class AbstractKinesisOutputOperator implements Operator +public abstract class AbstractKinesisOutputOperator implements Operator, Operator.CheckpointNotificationListener { private static final Logger logger = LoggerFactory.getLogger( AbstractKinesisOutputOperator.class ); protected String streamName; @@ -91,6 +91,10 @@ public void beginWindow(long windowId) { } + @Override + public void endWindow() { + } + /** * Implement Component Interface. */ @@ -103,7 +107,7 @@ public void teardown() * Implement Operator Interface. */ @Override - public void endWindow() + public void beforeCheckpoint(long windowId) { if (isBatchProcessing && putRecordsRequestEntryList.size() != 0) { try { @@ -114,6 +118,16 @@ public void endWindow() } } + @Override + public void checkpointed(long windowId) + { + } + + @Override + public void committed(long windowId) + { + } + /** * Implement Component Interface. * diff --git a/contrib/src/main/java/com/datatorrent/contrib/rabbitmq/AbstractRabbitMQInputOperator.java b/contrib/src/main/java/com/datatorrent/contrib/rabbitmq/AbstractRabbitMQInputOperator.java index 08157bc0fb..672122c52f 100644 --- a/contrib/src/main/java/com/datatorrent/contrib/rabbitmq/AbstractRabbitMQInputOperator.java +++ b/contrib/src/main/java/com/datatorrent/contrib/rabbitmq/AbstractRabbitMQInputOperator.java @@ -77,7 +77,7 @@ */ public abstract class AbstractRabbitMQInputOperator implements InputOperator, Operator.ActivationListener, - Operator.CheckpointListener + Operator.CheckpointNotificationListener { private static final Logger logger = LoggerFactory.getLogger(AbstractRabbitMQInputOperator.class); @NotNull @@ -311,6 +311,11 @@ public void deactivate() } } + @Override + public void beforeCheckpoint(long windowId) + { + } + @Override public void checkpointed(long windowId) { diff --git a/contrib/src/main/java/com/datatorrent/contrib/redis/AbstractRedisInputOperator.java b/contrib/src/main/java/com/datatorrent/contrib/redis/AbstractRedisInputOperator.java index 0b12574e45..092a3c6290 100644 --- a/contrib/src/main/java/com/datatorrent/contrib/redis/AbstractRedisInputOperator.java +++ b/contrib/src/main/java/com/datatorrent/contrib/redis/AbstractRedisInputOperator.java @@ -31,7 +31,7 @@ import redis.clients.jedis.ScanParams; import redis.clients.jedis.ScanResult; -import com.datatorrent.api.Operator.CheckpointListener; +import com.datatorrent.api.Operator.CheckpointNotificationListener; import com.datatorrent.api.Context.OperatorContext; import com.datatorrent.netlet.util.DTThrowable; import com.datatorrent.lib.db.AbstractKeyValueStoreInputOperator; @@ -47,7 +47,7 @@ * The tuple type. * @since 0.9.3 */ -public abstract class AbstractRedisInputOperator extends AbstractKeyValueStoreInputOperator implements CheckpointListener +public abstract class AbstractRedisInputOperator extends AbstractKeyValueStoreInputOperator implements CheckpointNotificationListener { protected transient List keys = new ArrayList(); protected transient Integer scanOffset; @@ -224,6 +224,11 @@ public void emitTuples() abstract public void processTuples(); + @Override + public void beforeCheckpoint(long windowId) + { + } + @Override public void checkpointed(long windowId) { diff --git a/contrib/src/main/java/com/datatorrent/contrib/splunk/SplunkTcpOutputOperator.java b/contrib/src/main/java/com/datatorrent/contrib/splunk/SplunkTcpOutputOperator.java index cb813754b9..f7e98b6a4f 100644 --- a/contrib/src/main/java/com/datatorrent/contrib/splunk/SplunkTcpOutputOperator.java +++ b/contrib/src/main/java/com/datatorrent/contrib/splunk/SplunkTcpOutputOperator.java @@ -19,6 +19,7 @@ package com.datatorrent.contrib.splunk; import com.datatorrent.api.Context.OperatorContext; +import com.datatorrent.api.Operator; import com.datatorrent.lib.db.AbstractStoreOutputOperator; import com.splunk.TcpInput; @@ -34,7 +35,7 @@ * @tags splunk * @since 1.0.4 */ -public class SplunkTcpOutputOperator extends AbstractStoreOutputOperator { +public class SplunkTcpOutputOperator extends AbstractStoreOutputOperator implements Operator.CheckpointNotificationListener { private String tcpPort; private transient Socket socket; @@ -75,8 +76,8 @@ public void processTuple(T tuple) { } @Override - public void endWindow() { - + public void beforeCheckpoint(long windowId) + { try { stream.flush(); } catch (IOException e) { @@ -84,6 +85,16 @@ public void endWindow() { } } + @Override + public void checkpointed(long windowId) + { + } + + @Override + public void committed(long windowId) + { + } + @Override public void teardown() { diff --git a/hive/src/main/java/com/datatorrent/contrib/hive/AbstractFSRollingOutputOperator.java b/hive/src/main/java/com/datatorrent/contrib/hive/AbstractFSRollingOutputOperator.java index 3c9c4da29a..37b5f2e6ac 100755 --- a/hive/src/main/java/com/datatorrent/contrib/hive/AbstractFSRollingOutputOperator.java +++ b/hive/src/main/java/com/datatorrent/contrib/hive/AbstractFSRollingOutputOperator.java @@ -37,7 +37,7 @@ import com.datatorrent.api.Context.OperatorContext; import com.datatorrent.api.DAG; import com.datatorrent.api.DefaultOutputPort; -import com.datatorrent.api.Operator.CheckpointListener; +import com.datatorrent.api.Operator.CheckpointNotificationListener; import com.datatorrent.api.annotation.Stateless; import com.datatorrent.lib.io.fs.AbstractFileOutputOperator; import com.datatorrent.netlet.util.DTThrowable; @@ -52,8 +52,7 @@ * * @since 2.1.0 */ -public abstract class AbstractFSRollingOutputOperator extends AbstractFileOutputOperator - implements CheckpointListener +public abstract class AbstractFSRollingOutputOperator extends AbstractFileOutputOperator implements CheckpointNotificationListener { private transient String outputFilePath; protected MutableInt partNumber; diff --git a/library/src/main/java/com/datatorrent/lib/io/fs/AbstractFileInputOperator.java b/library/src/main/java/com/datatorrent/lib/io/fs/AbstractFileInputOperator.java index 9e80b4ee6f..14cabfced9 100644 --- a/library/src/main/java/com/datatorrent/lib/io/fs/AbstractFileInputOperator.java +++ b/library/src/main/java/com/datatorrent/lib/io/fs/AbstractFileInputOperator.java @@ -37,6 +37,7 @@ import java.util.regex.Matcher; import java.util.regex.Pattern; +import javax.validation.OverridesAttribute; import javax.validation.constraints.NotNull; import org.slf4j.Logger; @@ -97,8 +98,8 @@ * @since 1.0.2 */ @org.apache.hadoop.classification.InterfaceStability.Evolving -public abstract class AbstractFileInputOperator - implements InputOperator, Partitioner>, StatsListener, Operator.CheckpointListener +public abstract class AbstractFileInputOperator implements InputOperator, Partitioner>, StatsListener, + Operator.CheckpointListener, Operator.CheckpointNotificationListener { private static final Logger LOG = LoggerFactory.getLogger(AbstractFileInputOperator.class); @@ -908,6 +909,11 @@ public void partitioned(Map>> pa currentPartitions = partitions.size(); } + @Override + public void beforeCheckpoint(long windowId) + { + } + @Override public void checkpointed(long windowId) { diff --git a/library/src/main/java/com/datatorrent/lib/io/fs/AbstractFileOutputOperator.java b/library/src/main/java/com/datatorrent/lib/io/fs/AbstractFileOutputOperator.java index ab8bedc3db..f703a19123 100644 --- a/library/src/main/java/com/datatorrent/lib/io/fs/AbstractFileOutputOperator.java +++ b/library/src/main/java/com/datatorrent/lib/io/fs/AbstractFileOutputOperator.java @@ -110,7 +110,7 @@ * @since 2.0.0 */ @OperatorAnnotation(checkpointableWithinAppWindow = false) -public abstract class AbstractFileOutputOperator extends BaseOperator implements Operator.CheckpointListener +public abstract class AbstractFileOutputOperator extends BaseOperator implements Operator.CheckpointListener, Operator.CheckpointNotificationListener { private static final Logger LOG = LoggerFactory.getLogger(AbstractFileOutputOperator.class); @@ -262,6 +262,8 @@ public abstract class AbstractFileOutputOperator extends BaseOperator imp private Long expireStreamAfterAccessMillis; private final Set filesWithOpenStreams; + private boolean initializeContext; + /** * This input port receives incoming tuples. */ @@ -966,13 +968,16 @@ protected String getPartFileName(String fileName, int part) @Override public void beginWindow(long windowId) { - try { - Map openStreams = streamsCache.asMap(); - for (FSFilterStreamContext streamContext : openStreams.values()) { - streamContext.initializeContext(); + if (initializeContext) { + try { + Map openStreams = streamsCache.asMap(); + for (FSFilterStreamContext streamContext : openStreams.values()) { + streamContext.initializeContext(); + } + } catch (IOException e) { + throw new RuntimeException(e); } - } catch (IOException e) { - throw new RuntimeException(e); + initializeContext = false; } currentWindow = windowId; } @@ -980,18 +985,6 @@ public void beginWindow(long windowId) @Override public void endWindow() { - try { - Map openStreams = streamsCache.asMap(); - for (FSFilterStreamContext streamContext: openStreams.values()) { - long start = System.currentTimeMillis(); - streamContext.finalizeContext(); - totalWritingTime += System.currentTimeMillis() - start; - //streamContext.resetFilter(); - } - } catch (IOException e) { - throw new RuntimeException(e); - } - if (rotationWindows > 0) { if (++rotationCount == rotationWindows) { rotationCount = 0; @@ -1238,6 +1231,24 @@ public void close() throws IOException } } + @Override + public void beforeCheckpoint(long l) + { + try { + Map openStreams = streamsCache.asMap(); + for (FSFilterStreamContext streamContext: openStreams.values()) { + long start = System.currentTimeMillis(); + streamContext.finalizeContext(); + totalWritingTime += System.currentTimeMillis() - start; + //streamContext.resetFilter(); + // Re-initialize context when next window starts after checkpoint + initializeContext = true; + } + } catch (IOException e) { + throw new RuntimeException(e); + } + } + @Override public void checkpointed(long l) { diff --git a/library/src/main/java/com/datatorrent/lib/io/fs/AbstractReconciler.java b/library/src/main/java/com/datatorrent/lib/io/fs/AbstractReconciler.java index 945c00037c..c12becd120 100644 --- a/library/src/main/java/com/datatorrent/lib/io/fs/AbstractReconciler.java +++ b/library/src/main/java/com/datatorrent/lib/io/fs/AbstractReconciler.java @@ -25,17 +25,14 @@ import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.atomic.AtomicReference; - import org.slf4j.Logger; import org.slf4j.LoggerFactory; - import com.google.common.collect.Maps; import com.google.common.collect.Queues; - import com.datatorrent.api.Context; import com.datatorrent.api.DefaultInputPort; -import com.datatorrent.api.Operator.CheckpointListener; import com.datatorrent.api.Operator.IdleTimeHandler; +import com.datatorrent.api.Operator.CheckpointNotificationListener; import com.datatorrent.common.util.BaseOperator; import com.datatorrent.common.util.NameableThreadFactory; import com.datatorrent.netlet.util.DTThrowable; @@ -55,7 +52,7 @@ * @since 2.0.0 */ @org.apache.hadoop.classification.InterfaceStability.Evolving -public abstract class AbstractReconciler extends BaseOperator implements CheckpointListener, IdleTimeHandler +public abstract class AbstractReconciler extends BaseOperator implements CheckpointNotificationListener, IdleTimeHandler { private static final Logger logger = LoggerFactory.getLogger(AbstractReconciler.class); public transient DefaultInputPort input = new DefaultInputPort() @@ -124,6 +121,11 @@ public void handleIdleTime() } + @Override + public void beforeCheckpoint(long l) + { + } + @Override public void checkpointed(long l) { diff --git a/library/src/main/java/com/datatorrent/lib/io/fs/FileSplitter.java b/library/src/main/java/com/datatorrent/lib/io/fs/FileSplitter.java index 4bb53e5ee7..b9594b324c 100644 --- a/library/src/main/java/com/datatorrent/lib/io/fs/FileSplitter.java +++ b/library/src/main/java/com/datatorrent/lib/io/fs/FileSplitter.java @@ -79,7 +79,7 @@ */ @OperatorAnnotation(checkpointableWithinAppWindow = false) @Deprecated -public class FileSplitter implements InputOperator, Operator.CheckpointListener +public class FileSplitter implements InputOperator, Operator.CheckpointListener, Operator.CheckpointNotificationListener { protected Long blockSize; private int sequenceNo; @@ -379,6 +379,11 @@ public WindowDataManager getWindowDataManager() return this.windowDataManager; } + @Override + public void beforeCheckpoint(long l) + { + } + @Override public void checkpointed(long l) { diff --git a/library/src/main/java/com/datatorrent/lib/io/fs/FileSplitterInput.java b/library/src/main/java/com/datatorrent/lib/io/fs/FileSplitterInput.java index 2d290cd8bc..745f953b50 100644 --- a/library/src/main/java/com/datatorrent/lib/io/fs/FileSplitterInput.java +++ b/library/src/main/java/com/datatorrent/lib/io/fs/FileSplitterInput.java @@ -76,7 +76,7 @@ * @since 2.0.0 */ @OperatorAnnotation(checkpointableWithinAppWindow = false) -public class FileSplitterInput extends AbstractFileSplitter implements InputOperator, Operator.CheckpointListener +public class FileSplitterInput extends AbstractFileSplitter implements InputOperator, Operator.CheckpointListener, Operator.CheckpointNotificationListener { @NotNull private WindowDataManager windowDataManager; @@ -218,6 +218,11 @@ protected FileStatus getFileStatus(Path path) throws IOException return scanner.fs.getFileStatus(path); } + @Override + public void beforeCheckpoint(long l) + { + } + @Override public void checkpointed(long l) { diff --git a/library/src/main/java/com/datatorrent/lib/io/jms/AbstractJMSInputOperator.java b/library/src/main/java/com/datatorrent/lib/io/jms/AbstractJMSInputOperator.java index 2b8b58daff..72bf63c1dc 100644 --- a/library/src/main/java/com/datatorrent/lib/io/jms/AbstractJMSInputOperator.java +++ b/library/src/main/java/com/datatorrent/lib/io/jms/AbstractJMSInputOperator.java @@ -80,7 +80,7 @@ @OperatorAnnotation(checkpointableWithinAppWindow = false) public abstract class AbstractJMSInputOperator extends JMSBase implements InputOperator, ActivationListener, MessageListener, ExceptionListener, - Operator.IdleTimeHandler, Operator.CheckpointListener + Operator.IdleTimeHandler, Operator.CheckpointListener, Operator.CheckpointNotificationListener { protected static final int DEFAULT_BUFFER_SIZE = 10 * 1024; // 10k @@ -394,6 +394,11 @@ protected void acknowledge() throws JMSException } } + @Override + public void beforeCheckpoint(long windowId) + { + } + @Override public void checkpointed(long windowId) { diff --git a/library/src/main/java/com/datatorrent/lib/join/AbstractJoinOperator.java b/library/src/main/java/com/datatorrent/lib/join/AbstractJoinOperator.java index 89df25e565..d0f722d865 100644 --- a/library/src/main/java/com/datatorrent/lib/join/AbstractJoinOperator.java +++ b/library/src/main/java/com/datatorrent/lib/join/AbstractJoinOperator.java @@ -64,7 +64,7 @@ * @since 3.4.0 */ @InterfaceStability.Unstable -public abstract class AbstractJoinOperator extends BaseOperator implements Operator.CheckpointListener +public abstract class AbstractJoinOperator extends BaseOperator implements Operator.CheckpointNotificationListener { @AutoMetric private long tuplesJoinedPerSec; @@ -225,6 +225,11 @@ public void beginWindow(long windowId) tuplesCount = 0; } + @Override + public void beforeCheckpoint(long windowId) + { + } + @Override public void checkpointed(long windowId) { diff --git a/library/src/test/java/com/datatorrent/lib/io/fs/AbstractFileOutputOperatorTest.java b/library/src/test/java/com/datatorrent/lib/io/fs/AbstractFileOutputOperatorTest.java index 03f3bf69d8..5b58121242 100644 --- a/library/src/test/java/com/datatorrent/lib/io/fs/AbstractFileOutputOperatorTest.java +++ b/library/src/test/java/com/datatorrent/lib/io/fs/AbstractFileOutputOperatorTest.java @@ -243,8 +243,11 @@ private void populateFile(String fileName, String contents) throws IOException * @param writer The writer to checkpoint. * @return new writer. */ - public static AbstractFileOutputOperator checkpoint(AbstractFileOutputOperator writer) + public static AbstractFileOutputOperator checkpoint(AbstractFileOutputOperator writer, long windowId) { + if (windowId >= 0) { + writer.beforeCheckpoint(windowId); + } Kryo kryo = new Kryo(); ByteArrayOutputStream bos = new ByteArrayOutputStream(); Output loutput = new Output(bos); @@ -418,7 +421,7 @@ private void testSingleFileFailedWriteHelper(SingleHDFSExactlyOnceWriter writer) writer.input.put(1); writer.endWindow(); - AbstractFileOutputOperator checkPointWriter = checkpoint(writer); + AbstractFileOutputOperator checkPointWriter = checkpoint(writer, 0); writer.beginWindow(1); writer.input.put(2); @@ -577,6 +580,7 @@ private void testMultiFileCompletedWriteHelperCache1(EvenOddHDFSExactlyOnceWrite writer.requestFinalize(EVEN_FILE); writer.requestFinalize(ODD_FILE); + writer.beforeCheckpoint(1); writer.committed(1); } @@ -603,6 +607,7 @@ private void testMultiFileCompletedWriteHelper(EvenOddHDFSExactlyOnceWriter writ writer.requestFinalize(ODD_FILE); writer.requestFinalize(EVEN_FILE); + writer.beforeCheckpoint(1); writer.committed(1); } @@ -672,7 +677,7 @@ private void testMultiFileFailedWriteHelper(EvenOddHDFSExactlyOnceWriter writer) writer.requestFinalize(EVEN_FILE); writer.endWindow(); - AbstractFileOutputOperator checkPointWriter = checkpoint(writer); + AbstractFileOutputOperator checkPointWriter = checkpoint(writer, 0); writer.beginWindow(1); writer.input.put(4); @@ -690,6 +695,7 @@ private void testMultiFileFailedWriteHelper(EvenOddHDFSExactlyOnceWriter writer) writer.input.put(8); writer.input.put(9); writer.endWindow(); + writer.beforeCheckpoint(2); writer.committed(2); } @@ -815,7 +821,7 @@ private void testSingleRollingFileFailedWriteHelper(SingleHDFSExactlyOnceWriter writer.input.put(2); writer.endWindow(); - AbstractFileOutputOperator checkPointWriter = checkpoint(writer); + AbstractFileOutputOperator checkPointWriter = checkpoint(writer, 0); writer.beginWindow(1); writer.input.put(3); @@ -862,8 +868,8 @@ public void testSingleRollingFileFailedWrite1() writer.input.put(4); writer.endWindow(); - AbstractFileOutputOperator checkPointWriter = checkpoint(writer); - AbstractFileOutputOperator checkPointWriter1 = checkpoint(writer); + AbstractFileOutputOperator checkPointWriter = checkpoint(writer, 0); + AbstractFileOutputOperator checkPointWriter1 = checkpoint(writer, -1); LOG.debug("Checkpoint endOffsets={}", checkPointWriter.endOffsets); @@ -1128,7 +1134,7 @@ private void testMultiRollingFileFailedWriteHelper(EvenOddHDFSExactlyOnceWriter writer.input.put(1); writer.endWindow(); - AbstractFileOutputOperator checkPointWriter = checkpoint(writer); + AbstractFileOutputOperator checkPointWriter = checkpoint(writer, 0); writer.beginWindow(1); writer.input.put(2); @@ -1225,7 +1231,7 @@ private void testMultiRollingFileFailedWriteOverwriteHelperCache1(EvenOddHDFSExa writer.input.put(1); writer.endWindow(); - AbstractFileOutputOperator checkPointWriter = checkpoint(writer); + AbstractFileOutputOperator checkPointWriter = checkpoint(writer, 0); writer.beginWindow(1); writer.input.put(2); @@ -1272,7 +1278,7 @@ private void testMultiRollingFileFailedWriteOverwriteHelper(EvenOddHDFSExactlyOn writer.input.process(1); writer.endWindow(); - AbstractFileOutputOperator checkPointWriter = checkpoint(writer); + AbstractFileOutputOperator checkPointWriter = checkpoint(writer, 0); writer.beginWindow(1); writer.input.process(2); @@ -1363,7 +1369,7 @@ private void singleFileMultiRollingFailureHelper(SingleHDFSExactlyOnceWriter wri writer.input.put(3); writer.input.put(4); - AbstractFileOutputOperator checkPointWriter = checkpoint(writer); + AbstractFileOutputOperator checkPointWriter = checkpoint(writer, -1); writer.input.put(3); writer.input.put(4); @@ -1554,8 +1560,11 @@ public void testCompression() throws IOException writer.input.put(i); } writer.endWindow(); - evenOffsets.add(evenFile.length()); - oddOffsets.add(oddFile.length()); + if ((i % 2) == 1) { + writer.beforeCheckpoint(i); + evenOffsets.add(evenFile.length()); + oddOffsets.add(oddFile.length()); + } } writer.teardown(); @@ -1580,6 +1589,7 @@ public void testRecoveryOfOpenFiles() writer.input.put(2); writer.input.put(3); writer.endWindow(); + writer.beforeCheckpoint(0); //failure and restored writer.setup(testMeta.testOperatorContext); @@ -1756,11 +1766,14 @@ protected FilterStreamContext createFilterStreamContext(Outp for (int j = 0; j < 1000; ++j) { writer.input.put(i); } - writer.endWindow(); - evenOffsets.add(evenCounterContext.getCounter()); - oddOffsets.add(oddCounterContext.getCounter()); - //evenOffsets.add(evenFile.length()); - //oddOffsets.add(oddFile.length()); + //writer.endWindow(); + if ((i % 2) == 1) { + writer.beforeCheckpoint(i); + evenOffsets.add(evenCounterContext.getCounter()); + oddOffsets.add(oddCounterContext.getCounter()); + //evenOffsets.add(evenFile.length()); + //oddOffsets.add(oddFile.length()); + } } writer.teardown(); diff --git a/library/src/test/java/com/datatorrent/lib/io/fs/AbstractWindowFileOutputOperatorTest.java b/library/src/test/java/com/datatorrent/lib/io/fs/AbstractWindowFileOutputOperatorTest.java index 32c32f7bd5..bcb1bc3827 100644 --- a/library/src/test/java/com/datatorrent/lib/io/fs/AbstractWindowFileOutputOperatorTest.java +++ b/library/src/test/java/com/datatorrent/lib/io/fs/AbstractWindowFileOutputOperatorTest.java @@ -70,7 +70,7 @@ public void testOperator() oper.input.process("window 0"); oper.endWindow(); - AbstractFileOutputOperator checkPoint = AbstractFileOutputOperatorTest.checkpoint(oper); + AbstractFileOutputOperator checkPoint = AbstractFileOutputOperatorTest.checkpoint(oper, 0); oper.beginWindow(1); oper.input.process("window 1"); @@ -110,7 +110,7 @@ public void testOperatorMidWindowRestore() oper.beginWindow(1); oper.input.process("1"); - AbstractFileOutputOperator checkPoint = AbstractFileOutputOperatorTest.checkpoint(oper); + AbstractFileOutputOperator checkPoint = AbstractFileOutputOperatorTest.checkpoint(oper, -1); oper.input.process("1"); oper.teardown(); From 0f84424720268e1eebb06992bcda00d2e7d591d4 Mon Sep 17 00:00:00 2001 From: Lakshmi Prasanna Velineni Date: Mon, 3 Oct 2016 09:01:30 -0700 Subject: [PATCH 2/2] APEXMALHAR-2017 Fixed failing tests. --- .../contrib/hive/HiveMockTest.java | 16 +++++++++++- .../lib/io/fs/AbstractFileInputOperator.java | 3 +-- .../lib/io/fs/AbstractFileOutputOperator.java | 6 +++-- .../lib/io/fs/AbstractReconciler.java | 2 +- .../io/fs/AbstractFileOutputOperatorTest.java | 25 +++++++++---------- .../AbstractWindowFileOutputOperatorTest.java | 5 ++-- .../lib/io/fs/FSInputModuleAppTest.java | 1 + .../lib/fs/GenericFileOutputOperatorTest.java | 6 +++++ 8 files changed, 43 insertions(+), 21 deletions(-) diff --git a/hive/src/test/java/com/datatorrent/contrib/hive/HiveMockTest.java b/hive/src/test/java/com/datatorrent/contrib/hive/HiveMockTest.java index 3c64bdf006..4ec92c977c 100755 --- a/hive/src/test/java/com/datatorrent/contrib/hive/HiveMockTest.java +++ b/hive/src/test/java/com/datatorrent/contrib/hive/HiveMockTest.java @@ -50,7 +50,6 @@ import com.datatorrent.contrib.hive.FSPojoToHiveOperator.FIELD_TYPE; import com.datatorrent.lib.helper.OperatorContextTestHelper; - public class HiveMockTest extends HiveTestService { public static final String APP_ID = "HiveOperatorTest"; @@ -250,6 +249,11 @@ public void testInsertString() throws Exception } fsRolling.endWindow(); + + if (wid == 6) { + fsRolling.beforeCheckpoint(wid); + fsRolling.checkpointed(wid); + } } fsRolling.teardown(); @@ -353,6 +357,11 @@ public void testInsertPOJO() throws Exception } fsRolling.endWindow(); + + if (wid == 6) { + fsRolling.beforeCheckpoint(wid); + fsRolling.checkpointed(wid); + } } fsRolling.teardown(); @@ -521,6 +530,11 @@ public void testHDFSHiveCheckpoint() throws SQLException, TException fsRolling.endWindow(); + if ((wid == 6) || (wid == 9)) { + fsRolling.beforeCheckpoint(wid); + fsRolling.checkpointed(wid); + } + if (wid == 9) { Kryo kryo = new Kryo(); FieldSerializer f1 = (FieldSerializer)kryo.getSerializer(HiveOperator.class); diff --git a/library/src/main/java/com/datatorrent/lib/io/fs/AbstractFileInputOperator.java b/library/src/main/java/com/datatorrent/lib/io/fs/AbstractFileInputOperator.java index 14cabfced9..d4ee03ba3f 100644 --- a/library/src/main/java/com/datatorrent/lib/io/fs/AbstractFileInputOperator.java +++ b/library/src/main/java/com/datatorrent/lib/io/fs/AbstractFileInputOperator.java @@ -37,7 +37,6 @@ import java.util.regex.Matcher; import java.util.regex.Pattern; -import javax.validation.OverridesAttribute; import javax.validation.constraints.NotNull; import org.slf4j.Logger; @@ -99,7 +98,7 @@ */ @org.apache.hadoop.classification.InterfaceStability.Evolving public abstract class AbstractFileInputOperator implements InputOperator, Partitioner>, StatsListener, - Operator.CheckpointListener, Operator.CheckpointNotificationListener + Operator.CheckpointListener, Operator.CheckpointNotificationListener { private static final Logger LOG = LoggerFactory.getLogger(AbstractFileInputOperator.class); diff --git a/library/src/main/java/com/datatorrent/lib/io/fs/AbstractFileOutputOperator.java b/library/src/main/java/com/datatorrent/lib/io/fs/AbstractFileOutputOperator.java index f703a19123..27a56cdc1e 100644 --- a/library/src/main/java/com/datatorrent/lib/io/fs/AbstractFileOutputOperator.java +++ b/library/src/main/java/com/datatorrent/lib/io/fs/AbstractFileOutputOperator.java @@ -262,7 +262,7 @@ public abstract class AbstractFileOutputOperator extends BaseOperator imp private Long expireStreamAfterAccessMillis; private final Set filesWithOpenStreams; - private boolean initializeContext; + private transient boolean initializeContext; /** * This input port receives incoming tuples. @@ -968,6 +968,9 @@ protected String getPartFileName(String fileName, int part) @Override public void beginWindow(long windowId) { + // All the filter state needs to be flushed to the disk. Not all filters allow a flush option, so the filters have + // to be closed and reopened. If no filter being is being used then it is a essentially a noop as the underlying + // FSDataOutputStream is not being closed in this operation. if (initializeContext) { try { Map openStreams = streamsCache.asMap(); @@ -1240,7 +1243,6 @@ public void beforeCheckpoint(long l) long start = System.currentTimeMillis(); streamContext.finalizeContext(); totalWritingTime += System.currentTimeMillis() - start; - //streamContext.resetFilter(); // Re-initialize context when next window starts after checkpoint initializeContext = true; } diff --git a/library/src/main/java/com/datatorrent/lib/io/fs/AbstractReconciler.java b/library/src/main/java/com/datatorrent/lib/io/fs/AbstractReconciler.java index c12becd120..0d67c317c7 100644 --- a/library/src/main/java/com/datatorrent/lib/io/fs/AbstractReconciler.java +++ b/library/src/main/java/com/datatorrent/lib/io/fs/AbstractReconciler.java @@ -31,8 +31,8 @@ import com.google.common.collect.Queues; import com.datatorrent.api.Context; import com.datatorrent.api.DefaultInputPort; -import com.datatorrent.api.Operator.IdleTimeHandler; import com.datatorrent.api.Operator.CheckpointNotificationListener; +import com.datatorrent.api.Operator.IdleTimeHandler; import com.datatorrent.common.util.BaseOperator; import com.datatorrent.common.util.NameableThreadFactory; import com.datatorrent.netlet.util.DTThrowable; diff --git a/library/src/test/java/com/datatorrent/lib/io/fs/AbstractFileOutputOperatorTest.java b/library/src/test/java/com/datatorrent/lib/io/fs/AbstractFileOutputOperatorTest.java index 5b58121242..8f0fbb050a 100644 --- a/library/src/test/java/com/datatorrent/lib/io/fs/AbstractFileOutputOperatorTest.java +++ b/library/src/test/java/com/datatorrent/lib/io/fs/AbstractFileOutputOperatorTest.java @@ -66,6 +66,7 @@ import com.datatorrent.api.DAG; import com.datatorrent.api.LocalMode; import com.datatorrent.api.StreamingApplication; +import com.datatorrent.api.annotation.Stateless; import com.datatorrent.lib.helper.OperatorContextTestHelper; import com.datatorrent.lib.testbench.RandomWordGenerator; import com.datatorrent.lib.util.TestUtils; @@ -245,7 +246,7 @@ private void populateFile(String fileName, String contents) throws IOException */ public static AbstractFileOutputOperator checkpoint(AbstractFileOutputOperator writer, long windowId) { - if (windowId >= 0) { + if (windowId >= Stateless.WINDOW_ID) { writer.beforeCheckpoint(windowId); } Kryo kryo = new Kryo(); @@ -421,7 +422,7 @@ private void testSingleFileFailedWriteHelper(SingleHDFSExactlyOnceWriter writer) writer.input.put(1); writer.endWindow(); - AbstractFileOutputOperator checkPointWriter = checkpoint(writer, 0); + AbstractFileOutputOperator checkPointWriter = checkpoint(writer, Stateless.WINDOW_ID); writer.beginWindow(1); writer.input.put(2); @@ -677,7 +678,7 @@ private void testMultiFileFailedWriteHelper(EvenOddHDFSExactlyOnceWriter writer) writer.requestFinalize(EVEN_FILE); writer.endWindow(); - AbstractFileOutputOperator checkPointWriter = checkpoint(writer, 0); + AbstractFileOutputOperator checkPointWriter = checkpoint(writer, Stateless.WINDOW_ID); writer.beginWindow(1); writer.input.put(4); @@ -821,7 +822,7 @@ private void testSingleRollingFileFailedWriteHelper(SingleHDFSExactlyOnceWriter writer.input.put(2); writer.endWindow(); - AbstractFileOutputOperator checkPointWriter = checkpoint(writer, 0); + AbstractFileOutputOperator checkPointWriter = checkpoint(writer, Stateless.WINDOW_ID); writer.beginWindow(1); writer.input.put(3); @@ -868,8 +869,8 @@ public void testSingleRollingFileFailedWrite1() writer.input.put(4); writer.endWindow(); - AbstractFileOutputOperator checkPointWriter = checkpoint(writer, 0); - AbstractFileOutputOperator checkPointWriter1 = checkpoint(writer, -1); + AbstractFileOutputOperator checkPointWriter = checkpoint(writer, Stateless.WINDOW_ID); + AbstractFileOutputOperator checkPointWriter1 = checkpoint(writer, Stateless.WINDOW_ID); LOG.debug("Checkpoint endOffsets={}", checkPointWriter.endOffsets); @@ -1134,7 +1135,7 @@ private void testMultiRollingFileFailedWriteHelper(EvenOddHDFSExactlyOnceWriter writer.input.put(1); writer.endWindow(); - AbstractFileOutputOperator checkPointWriter = checkpoint(writer, 0); + AbstractFileOutputOperator checkPointWriter = checkpoint(writer, Stateless.WINDOW_ID); writer.beginWindow(1); writer.input.put(2); @@ -1231,7 +1232,7 @@ private void testMultiRollingFileFailedWriteOverwriteHelperCache1(EvenOddHDFSExa writer.input.put(1); writer.endWindow(); - AbstractFileOutputOperator checkPointWriter = checkpoint(writer, 0); + AbstractFileOutputOperator checkPointWriter = checkpoint(writer, Stateless.WINDOW_ID); writer.beginWindow(1); writer.input.put(2); @@ -1278,7 +1279,7 @@ private void testMultiRollingFileFailedWriteOverwriteHelper(EvenOddHDFSExactlyOn writer.input.process(1); writer.endWindow(); - AbstractFileOutputOperator checkPointWriter = checkpoint(writer, 0); + AbstractFileOutputOperator checkPointWriter = checkpoint(writer, Stateless.WINDOW_ID); writer.beginWindow(1); writer.input.process(2); @@ -1369,7 +1370,7 @@ private void singleFileMultiRollingFailureHelper(SingleHDFSExactlyOnceWriter wri writer.input.put(3); writer.input.put(4); - AbstractFileOutputOperator checkPointWriter = checkpoint(writer, -1); + AbstractFileOutputOperator checkPointWriter = checkpoint(writer, Stateless.WINDOW_ID); writer.input.put(3); writer.input.put(4); @@ -1766,13 +1767,11 @@ protected FilterStreamContext createFilterStreamContext(Outp for (int j = 0; j < 1000; ++j) { writer.input.put(i); } - //writer.endWindow(); + writer.endWindow(); if ((i % 2) == 1) { writer.beforeCheckpoint(i); evenOffsets.add(evenCounterContext.getCounter()); oddOffsets.add(oddCounterContext.getCounter()); - //evenOffsets.add(evenFile.length()); - //oddOffsets.add(oddFile.length()); } } diff --git a/library/src/test/java/com/datatorrent/lib/io/fs/AbstractWindowFileOutputOperatorTest.java b/library/src/test/java/com/datatorrent/lib/io/fs/AbstractWindowFileOutputOperatorTest.java index bcb1bc3827..daebecbb60 100644 --- a/library/src/test/java/com/datatorrent/lib/io/fs/AbstractWindowFileOutputOperatorTest.java +++ b/library/src/test/java/com/datatorrent/lib/io/fs/AbstractWindowFileOutputOperatorTest.java @@ -22,6 +22,7 @@ import org.junit.Test; import org.junit.runner.Description; +import com.datatorrent.api.annotation.Stateless; import com.datatorrent.lib.helper.OperatorContextTestHelper; import com.datatorrent.lib.io.fs.AbstractFileOutputOperatorTest.FSTestWatcher; import com.datatorrent.lib.util.TestUtils.TestInfo; @@ -70,7 +71,7 @@ public void testOperator() oper.input.process("window 0"); oper.endWindow(); - AbstractFileOutputOperator checkPoint = AbstractFileOutputOperatorTest.checkpoint(oper, 0); + AbstractFileOutputOperator checkPoint = AbstractFileOutputOperatorTest.checkpoint(oper, Stateless.WINDOW_ID); oper.beginWindow(1); oper.input.process("window 1"); @@ -110,7 +111,7 @@ public void testOperatorMidWindowRestore() oper.beginWindow(1); oper.input.process("1"); - AbstractFileOutputOperator checkPoint = AbstractFileOutputOperatorTest.checkpoint(oper, -1); + AbstractFileOutputOperator checkPoint = AbstractFileOutputOperatorTest.checkpoint(oper, Stateless.WINDOW_ID); oper.input.process("1"); oper.teardown(); diff --git a/library/src/test/java/com/datatorrent/lib/io/fs/FSInputModuleAppTest.java b/library/src/test/java/com/datatorrent/lib/io/fs/FSInputModuleAppTest.java index 4213a00b27..4042d3c7ca 100644 --- a/library/src/test/java/com/datatorrent/lib/io/fs/FSInputModuleAppTest.java +++ b/library/src/test/java/com/datatorrent/lib/io/fs/FSInputModuleAppTest.java @@ -99,6 +99,7 @@ public void testApplication() throws Exception conf.set("dt.operator.hdfsInputModule.prop.blockSize", "10"); conf.set("dt.operator.hdfsInputModule.prop.blocksThreshold", "4"); conf.set("dt.operator.hdfsInputModule.prop.scanIntervalMillis", "10000"); + conf.set("dt.attr.CHECKPOINT_WINDOW_COUNT","10"); LocalMode lma = LocalMode.newInstance(); lma.prepareDAG(app, conf); diff --git a/library/src/test/java/org/apache/apex/malhar/lib/fs/GenericFileOutputOperatorTest.java b/library/src/test/java/org/apache/apex/malhar/lib/fs/GenericFileOutputOperatorTest.java index 6082f57772..8b8ed01e62 100644 --- a/library/src/test/java/org/apache/apex/malhar/lib/fs/GenericFileOutputOperatorTest.java +++ b/library/src/test/java/org/apache/apex/malhar/lib/fs/GenericFileOutputOperatorTest.java @@ -62,6 +62,7 @@ public void testIdleWindowsFinalize() throws IOException } writer.endWindow(); } + checkpoint(writer, 10); writer.committed(10); for (int i = 13; i <= 26; i++) { @@ -71,7 +72,10 @@ public void testIdleWindowsFinalize() throws IOException } writer.endWindow(); } + checkpoint(writer, 20); writer.committed(20); + + checkpoint(writer, 26); writer.committed(26); String[] expected = {"0a\n0b\n1a\n1b\n6a\n6b\n7a\n7b\n", "13a\n13b\n14a\n14b\n18a\n18b\n19a\n19b\n", @@ -108,8 +112,10 @@ public void testTupleCountFinalize() throws IOException } writer.endWindow(); if (i % 10 == 0) { + checkpoint(writer, 10); writer.committed(10); } + checkpoint(writer, 24); } writer.committed(tuples.length);