From 5305cd3a008d8dc337a7ef0ed030290ec9b4bea5 Mon Sep 17 00:00:00 2001 From: Chandni Singh Date: Fri, 13 May 2016 17:19:37 -0700 Subject: [PATCH] APEXMALHAR-2093 Removed usages of Idempotent Storage Manager --- .../contrib/avro/AvroFileInputOperator.java | 4 +- .../kafka/AbstractKafkaInputOperator.java | 51 +-- .../kinesis/AbstractKinesisInputOperator.java | 44 +-- .../AbstractRabbitMQInputOperator.java | 27 +- .../AbstractRabbitMQOutputOperator.java | 21 +- .../redis/AbstractRedisInputOperator.java | 34 +- .../contrib/kafka/KafkaInputOperatorTest.java | 10 +- .../kinesis/KinesisInputOperatorTest.java | 4 +- .../nifi/NiFiSinglePortInputOperatorTest.java | 3 +- .../NiFiSinglePortOutputOperatorTest.java | 3 +- .../rabbitmq/RabbitMQInputOperatorTest.java | 11 +- .../rabbitmq/RabbitMQOutputOperatorTest.java | 5 +- .../contrib/redis/RedisInputOperatorTest.java | 11 +- .../malhar/kafka/KafkaInputOperatorTest.java | 4 +- .../lib/io/fs/AbstractFileInputOperator.java | 38 +- .../datatorrent/lib/io/fs/FileSplitter.java | 33 +- .../lib/io/fs/FileSplitterInput.java | 34 +- .../lib/io/jms/AbstractJMSInputOperator.java | 39 +- .../managed/IncrementalCheckpointManager.java | 4 +- .../malhar/lib/wal/FSWindowDataManager.java | 339 ++++++++++++++++++ .../malhar/lib/wal/WindowDataManager.java | 312 ---------------- .../io/fs/AbstractFileInputOperatorTest.java | 24 +- .../lib/io/fs/FileSplitterInputTest.java | 23 +- .../lib/io/fs/FileSplitterTest.java | 20 +- .../io/jms/JMSStringInputOperatorTest.java | 4 +- .../lib/wal/FSWindowDataManagerTest.java | 4 +- pom.xml | 3 + 27 files changed, 574 insertions(+), 535 deletions(-) create mode 100644 library/src/main/java/org/apache/apex/malhar/lib/wal/FSWindowDataManager.java diff --git a/contrib/src/main/java/com/datatorrent/contrib/avro/AvroFileInputOperator.java b/contrib/src/main/java/com/datatorrent/contrib/avro/AvroFileInputOperator.java index 14dfdf2bbd..989f859d41 100644 --- a/contrib/src/main/java/com/datatorrent/contrib/avro/AvroFileInputOperator.java +++ b/contrib/src/main/java/com/datatorrent/contrib/avro/AvroFileInputOperator.java @@ -24,6 +24,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.apache.apex.malhar.lib.wal.FSWindowDataManager; import org.apache.avro.AvroRuntimeException; import org.apache.avro.file.DataFileStream; import org.apache.avro.generic.GenericDatumReader; @@ -36,7 +37,6 @@ import com.datatorrent.api.AutoMetric; import com.datatorrent.api.DefaultOutputPort; -import com.datatorrent.lib.io.IdempotentStorageManager; import com.datatorrent.lib.io.fs.AbstractFileInputOperator; /** @@ -48,7 +48,7 @@ * No need to provide schema,its inferred from the file
* This operator emits a GenericRecord based on the schema derived from the * input file
- * Users can add the {@link IdempotentStorageManager.FSIdempotentStorageManager} + * Users can add the {@link FSWindowDataManager} * to ensure exactly once semantics with a HDFS backed WAL. * * @displayName AvroFileInputOperator diff --git a/contrib/src/main/java/com/datatorrent/contrib/kafka/AbstractKafkaInputOperator.java b/contrib/src/main/java/com/datatorrent/contrib/kafka/AbstractKafkaInputOperator.java index b166b9e9c8..b026d1647c 100644 --- a/contrib/src/main/java/com/datatorrent/contrib/kafka/AbstractKafkaInputOperator.java +++ b/contrib/src/main/java/com/datatorrent/contrib/kafka/AbstractKafkaInputOperator.java @@ -28,7 +28,6 @@ import com.datatorrent.api.StatsListener; import com.datatorrent.api.annotation.OperatorAnnotation; import com.datatorrent.api.annotation.Stateless; -import com.datatorrent.lib.io.IdempotentStorageManager; import com.datatorrent.lib.util.KryoCloneUtils; import com.google.common.base.Joiner; @@ -44,6 +43,8 @@ import kafka.javaapi.consumer.SimpleConsumer; import kafka.message.Message; import kafka.message.MessageAndOffset; + +import org.apache.apex.malhar.lib.wal.WindowDataManager; import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.tuple.MutablePair; import org.apache.commons.lang3.tuple.Pair; @@ -137,7 +138,7 @@ public abstract class AbstractKafkaInputOperator implem private long maxTotalMsgSizePerWindow = Long.MAX_VALUE; private transient int emitCount = 0; private transient long emitTotalMsgSize = 0; - protected IdempotentStorageManager idempotentStorageManager; + protected WindowDataManager windowDataManager; protected transient long currentWindowId; protected transient int operatorId; protected final transient Map> currentWindowRecoveryState; @@ -192,7 +193,7 @@ public abstract class AbstractKafkaInputOperator implem public AbstractKafkaInputOperator() { - idempotentStorageManager = new IdempotentStorageManager.NoopIdempotentStorageManager(); + windowDataManager = new WindowDataManager.NoopWindowDataManager(); currentWindowRecoveryState = new HashMap>(); } @@ -244,16 +245,16 @@ public void setup(OperatorContext context) } this.context = context; operatorId = context.getId(); - if(consumer instanceof HighlevelKafkaConsumer && !(idempotentStorageManager instanceof IdempotentStorageManager.NoopIdempotentStorageManager)) { + if(consumer instanceof HighlevelKafkaConsumer && !(windowDataManager instanceof WindowDataManager.NoopWindowDataManager)) { throw new RuntimeException("Idempotency is not supported for High Level Kafka Consumer"); } - idempotentStorageManager.setup(context); + windowDataManager.setup(context); } @Override public void teardown() { - idempotentStorageManager.teardown(); + windowDataManager.teardown(); consumer.teardown(); } @@ -261,7 +262,7 @@ public void teardown() public void beginWindow(long windowId) { currentWindowId = windowId; - if (windowId <= idempotentStorageManager.getLargestRecoveryWindow()) { + if (windowId <= windowDataManager.getLargestRecoveryWindow()) { replay(windowId); } emitCount = 0; @@ -272,7 +273,8 @@ protected void replay(long windowId) { try { @SuppressWarnings("unchecked") - Map> recoveredData = (Map>) idempotentStorageManager.load(operatorId, windowId); + Map> recoveredData = (Map>) + windowDataManager.load(operatorId, windowId); if (recoveredData != null) { Map> pms = KafkaMetadataUtil.getPartitionsForTopic(getConsumer().brokers, getConsumer().topic); if (pms != null) { @@ -311,7 +313,7 @@ protected void replay(long windowId) } } } - if(windowId == idempotentStorageManager.getLargestRecoveryWindow()) { + if(windowId == windowDataManager.getLargestRecoveryWindow()) { // Start the consumer at the largest recovery window SimpleKafkaConsumer cons = (SimpleKafkaConsumer)getConsumer(); // Set the offset positions to the consumer @@ -337,9 +339,9 @@ public void endWindow() Map carryOn = new HashMap<>(offsetStats); offsetTrackHistory.add(Pair.of(currentWindowId, carryOn)); } - if (currentWindowId > idempotentStorageManager.getLargestRecoveryWindow()) { + if (currentWindowId > windowDataManager.getLargestRecoveryWindow()) { try { - idempotentStorageManager.save(currentWindowRecoveryState, operatorId, currentWindowId); + windowDataManager.save(currentWindowRecoveryState, operatorId, currentWindowId); } catch (IOException e) { throw new RuntimeException("saving recovery", e); @@ -376,7 +378,7 @@ public void committed(long windowId) } try { - idempotentStorageManager.deleteUpTo(operatorId, windowId); + windowDataManager.deleteUpTo(operatorId, windowId); } catch (IOException e) { throw new RuntimeException("deleting state", e); @@ -386,7 +388,8 @@ public void committed(long windowId) @Override public void activate(OperatorContext ctx) { - if (context.getValue(OperatorContext.ACTIVATION_WINDOW_ID) != Stateless.WINDOW_ID && context.getValue(OperatorContext.ACTIVATION_WINDOW_ID) < idempotentStorageManager.getLargestRecoveryWindow()) { + if (context.getValue(OperatorContext.ACTIVATION_WINDOW_ID) != Stateless.WINDOW_ID && + context.getValue(OperatorContext.ACTIVATION_WINDOW_ID) < windowDataManager.getLargestRecoveryWindow()) { // If it is a replay state, don't start the consumer return; } @@ -405,7 +408,7 @@ public void deactivate() @Override public void emitTuples() { - if (currentWindowId <= idempotentStorageManager.getLargestRecoveryWindow()) { + if (currentWindowId <= windowDataManager.getLargestRecoveryWindow()) { return; } int count = consumer.messageSize() + ((pendingMessage != null) ? 1 : 0); @@ -505,7 +508,7 @@ public Collection>> definePa logger.info("Initial offsets: {} ", "{ " + Joiner.on(", ").useForNull("").withKeyValueSeparator(": ").join(initOffset) + " }"); } - Collection newManagers = Sets.newHashSet(); + Collection newManagers = Sets.newHashSet(); Set deletedOperators = Sets.newHashSet(); switch (strategy) { @@ -537,7 +540,7 @@ else if (newWaitingPartition.size() != 0) { partitions.add(createPartition(Sets.newHashSet(newPartition), null, newManagers)); } newWaitingPartition.clear(); - idempotentStorageManager.partitioned(newManagers, deletedOperators); + windowDataManager.partitioned(newManagers, deletedOperators); return partitions; } @@ -580,7 +583,7 @@ else if (newWaitingPartition.size() != 0) { logger.info("[ONE_TO_MANY]: Add operator partition for kafka partition(s): {} ", StringUtils.join(newWaitingPartition, ", ")); partitions.add(createPartition(Sets.newHashSet(newWaitingPartition), null, newManagers)); - idempotentStorageManager.partitioned(newManagers, deletedOperators); + windowDataManager.partitioned(newManagers, deletedOperators); return partitions; } else { @@ -631,12 +634,12 @@ else if (newWaitingPartition.size() != 0) { break; } - idempotentStorageManager.partitioned(newManagers, deletedOperators); + windowDataManager.partitioned(newManagers, deletedOperators); return newPartitions; } // Create a new partition with the partition Ids and initial offset positions - protected Partitioner.Partition> createPartition(Set pIds, Map initOffsets, Collection newManagers) + protected Partitioner.Partition> createPartition(Set pIds, Map initOffsets, Collection newManagers) { Partitioner.Partition> p = new DefaultPartition<>(KryoCloneUtils.cloneObject(this)); @@ -648,7 +651,7 @@ protected Partitioner.Partition> createPartition(S p.getPartitionedInstance().offsetStats.putAll(p.getPartitionedInstance().getConsumer().getCurrentOffsets()); } } - newManagers.add(p.getPartitionedInstance().idempotentStorageManager); + newManagers.add(p.getPartitionedInstance().windowDataManager); PartitionInfo pif = new PartitionInfo(); pif.kpids = pIds; @@ -929,14 +932,14 @@ static class PartitionInfo long byteRateLeft; } - public IdempotentStorageManager getIdempotentStorageManager() + public WindowDataManager getWindowDataManager() { - return idempotentStorageManager; + return windowDataManager; } - public void setIdempotentStorageManager(IdempotentStorageManager idempotentStorageManager) + public void setWindowDataManager(WindowDataManager windowDataManager) { - this.idempotentStorageManager = idempotentStorageManager; + this.windowDataManager = windowDataManager; } public void setInitialPartitionCount(int partitionCount) diff --git a/contrib/src/main/java/com/datatorrent/contrib/kinesis/AbstractKinesisInputOperator.java b/contrib/src/main/java/com/datatorrent/contrib/kinesis/AbstractKinesisInputOperator.java index 6306b04a14..2352236300 100644 --- a/contrib/src/main/java/com/datatorrent/contrib/kinesis/AbstractKinesisInputOperator.java +++ b/contrib/src/main/java/com/datatorrent/contrib/kinesis/AbstractKinesisInputOperator.java @@ -25,13 +25,14 @@ import com.datatorrent.api.*; import com.datatorrent.api.Operator.ActivationListener; import com.datatorrent.common.util.Pair; -import com.datatorrent.lib.io.IdempotentStorageManager; import com.datatorrent.lib.util.KryoCloneUtils; import com.esotericsoftware.kryo.DefaultSerializer; import com.esotericsoftware.kryo.serializers.JavaSerializer; import com.google.common.collect.Sets; +import org.apache.apex.malhar.lib.wal.FSWindowDataManager; +import org.apache.apex.malhar.lib.wal.WindowDataManager; import org.apache.commons.lang3.StringUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -85,7 +86,7 @@ public abstract class AbstractKinesisInputOperator implements InputOperator, private String endPoint; - protected IdempotentStorageManager idempotentStorageManager; + protected WindowDataManager windowDataManager; protected transient long currentWindowId; protected transient int operatorId; protected final transient Map> currentWindowRecoveryState; @@ -132,7 +133,7 @@ public abstract class AbstractKinesisInputOperator implements InputOperator, public AbstractKinesisInputOperator() { - idempotentStorageManager = new IdempotentStorageManager.FSIdempotentStorageManager(); + windowDataManager = new FSWindowDataManager(); currentWindowRecoveryState = new HashMap>(); } /** @@ -167,7 +168,7 @@ public Collection> definePartitions(Coll // Operator partitions List> newPartitions = null; - Collection newManagers = Sets.newHashSet(); + Collection newManagers = Sets.newHashSet(); Set deletedOperators = Sets.newHashSet(); // initialize the shard positions @@ -198,7 +199,7 @@ public Collection> definePartitions(Coll partitions.add(createPartition(Sets.newHashSet(pid), null, newManagers)); } newWaitingPartition.clear(); - idempotentStorageManager.partitioned(newManagers, deletedOperators); + windowDataManager.partitioned(newManagers, deletedOperators); return partitions; } break; @@ -250,7 +251,7 @@ public Collection> definePartitions(Coll default: break; } - idempotentStorageManager.partitioned(newManagers, deletedOperators); + windowDataManager.partitioned(newManagers, deletedOperators); return newPartitions; } @@ -370,10 +371,10 @@ private List getOpenShards(Collection createPartition(Set shardIds, Map initShardPos, Collection newManagers) + Partition createPartition(Set shardIds, Map initShardPos, Collection newManagers) { Partition p = new DefaultPartition(KryoCloneUtils.cloneObject(this)); - newManagers.add(p.getPartitionedInstance().idempotentStorageManager); + newManagers.add(p.getPartitionedInstance().windowDataManager); p.getPartitionedInstance().getConsumer().setShardIds(shardIds); p.getPartitionedInstance().getConsumer().resetShardPositions(initShardPos); @@ -400,9 +401,9 @@ public void setup(OperatorContext context) } consumer.create(); operatorId = context.getId(); - idempotentStorageManager.setup(context); + windowDataManager.setup(context); shardPosition.clear(); - if (context.getValue(OperatorContext.ACTIVATION_WINDOW_ID) < idempotentStorageManager.getLargestRecoveryWindow()) { + if (context.getValue(OperatorContext.ACTIVATION_WINDOW_ID) < windowDataManager.getLargestRecoveryWindow()) { isReplayState = true; } } @@ -413,7 +414,7 @@ public void setup(OperatorContext context) @Override public void teardown() { - idempotentStorageManager.teardown(); + windowDataManager.teardown(); consumer.teardown(); } @@ -425,7 +426,7 @@ public void beginWindow(long windowId) { emitCount = 0; currentWindowId = windowId; - if (windowId <= idempotentStorageManager.getLargestRecoveryWindow()) { + if (windowId <= windowDataManager.getLargestRecoveryWindow()) { replay(windowId); } } @@ -434,7 +435,8 @@ protected void replay(long windowId) { try { @SuppressWarnings("unchecked") - Map> recoveredData = (Map>) idempotentStorageManager.load(operatorId, windowId); + Map> recoveredData = + (Map>)windowDataManager.load(operatorId, windowId); if (recoveredData == null) { return; } @@ -464,10 +466,10 @@ protected void replay(long windowId) @Override public void endWindow() { - if (currentWindowId > idempotentStorageManager.getLargestRecoveryWindow()) { + if (currentWindowId > windowDataManager.getLargestRecoveryWindow()) { context.setCounters(getConsumer().getConsumerStats(shardPosition)); try { - idempotentStorageManager.save(currentWindowRecoveryState, operatorId, currentWindowId); + windowDataManager.save(currentWindowRecoveryState, operatorId, currentWindowId); } catch (IOException e) { throw new RuntimeException("saving recovery", e); @@ -495,7 +497,7 @@ public void activate(OperatorContext ctx) public void committed(long windowId) { try { - idempotentStorageManager.deleteUpTo(operatorId, windowId); + windowDataManager.deleteUpTo(operatorId, windowId); } catch (IOException e) { throw new RuntimeException("deleting state", e); @@ -521,7 +523,7 @@ public void deactivate() @Override public void emitTuples() { - if (currentWindowId <= idempotentStorageManager.getLargestRecoveryWindow()) { + if (currentWindowId <= windowDataManager.getLargestRecoveryWindow()) { return; } int count = consumer.getQueueSize(); @@ -707,13 +709,13 @@ public void setEndPoint(String endPoint) this.endPoint = endPoint; } - public IdempotentStorageManager getIdempotentStorageManager() + public WindowDataManager getWindowDataManager() { - return idempotentStorageManager; + return windowDataManager; } - public void setIdempotentStorageManager(IdempotentStorageManager idempotentStorageManager) + public void setWindowDataManager(WindowDataManager windowDataManager) { - this.idempotentStorageManager = idempotentStorageManager; + this.windowDataManager = windowDataManager; } } diff --git a/contrib/src/main/java/com/datatorrent/contrib/rabbitmq/AbstractRabbitMQInputOperator.java b/contrib/src/main/java/com/datatorrent/contrib/rabbitmq/AbstractRabbitMQInputOperator.java index bcd91953fd..b84269845d 100644 --- a/contrib/src/main/java/com/datatorrent/contrib/rabbitmq/AbstractRabbitMQInputOperator.java +++ b/contrib/src/main/java/com/datatorrent/contrib/rabbitmq/AbstractRabbitMQInputOperator.java @@ -20,7 +20,6 @@ import com.datatorrent.api.*; import com.datatorrent.api.Context.OperatorContext; -import com.datatorrent.lib.io.IdempotentStorageManager; import com.datatorrent.lib.util.KeyValPair; import com.datatorrent.netlet.util.DTThrowable; import com.rabbitmq.client.*; @@ -38,6 +37,8 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.apache.apex.malhar.lib.wal.WindowDataManager; + /** * This is the base implementation of a RabbitMQ input operator.  * Subclasses should implement the methods which convert RabbitMQ messages to tuples. @@ -102,7 +103,7 @@ public abstract class AbstractRabbitMQInputOperator implements protected transient String cTag; protected transient ArrayBlockingQueue> holdingBuffer; - private IdempotentStorageManager idempotentStorageManager; + private WindowDataManager windowDataManager; protected final transient Map currentWindowRecoveryState; private transient final Set pendingAck; private transient final Set recoveredTags; @@ -114,7 +115,7 @@ public AbstractRabbitMQInputOperator() currentWindowRecoveryState = new HashMap(); pendingAck = new HashSet(); recoveredTags = new HashSet(); - idempotentStorageManager = new IdempotentStorageManager.NoopIdempotentStorageManager(); + windowDataManager = new WindowDataManager.NoopWindowDataManager(); } @@ -189,7 +190,7 @@ public void emitTuples() public void beginWindow(long windowId) { currentWindowId = windowId; - if (windowId <= this.idempotentStorageManager.getLargestRecoveryWindow()) { + if (windowId <= this.windowDataManager.getLargestRecoveryWindow()) { replay(windowId); } } @@ -198,7 +199,7 @@ public void beginWindow(long windowId) private void replay(long windowId) { Map recoveredData; try { - recoveredData = (Map) this.idempotentStorageManager.load(operatorContextId, windowId); + recoveredData = (Map) this.windowDataManager.load(operatorContextId, windowId); if (recoveredData == null) { return; } @@ -224,7 +225,7 @@ public void endWindow() } try { - this.idempotentStorageManager.save(currentWindowRecoveryState, operatorContextId, currentWindowId); + this.windowDataManager.save(currentWindowRecoveryState, operatorContextId, currentWindowId); } catch (IOException e) { DTThrowable.rethrow(e); } @@ -247,13 +248,13 @@ public void setup(OperatorContext context) { this.operatorContextId = context.getId(); holdingBuffer = new ArrayBlockingQueue>(bufferSize); - this.idempotentStorageManager.setup(context); + this.windowDataManager.setup(context); } @Override public void teardown() { - this.idempotentStorageManager.teardown(); + this.windowDataManager.teardown(); } @Override @@ -319,7 +320,7 @@ public void checkpointed(long windowId) public void committed(long windowId) { try { - idempotentStorageManager.deleteUpTo(operatorContextId, windowId); + windowDataManager.deleteUpTo(operatorContextId, windowId); } catch (IOException e) { throw new RuntimeException("committing", e); @@ -391,12 +392,12 @@ public void setRoutingKey(String routingKey) this.routingKey = routingKey; } - public IdempotentStorageManager getIdempotentStorageManager() { - return idempotentStorageManager; + public WindowDataManager getWindowDataManager() { + return windowDataManager; } - public void setIdempotentStorageManager(IdempotentStorageManager idempotentStorageManager) { - this.idempotentStorageManager = idempotentStorageManager; + public void setWindowDataManager(WindowDataManager windowDataManager) { + this.windowDataManager = windowDataManager; } diff --git a/contrib/src/main/java/com/datatorrent/contrib/rabbitmq/AbstractRabbitMQOutputOperator.java b/contrib/src/main/java/com/datatorrent/contrib/rabbitmq/AbstractRabbitMQOutputOperator.java index 8d04154829..6043c5b9a3 100644 --- a/contrib/src/main/java/com/datatorrent/contrib/rabbitmq/AbstractRabbitMQOutputOperator.java +++ b/contrib/src/main/java/com/datatorrent/contrib/rabbitmq/AbstractRabbitMQOutputOperator.java @@ -19,7 +19,6 @@ package com.datatorrent.contrib.rabbitmq; import com.datatorrent.common.util.BaseOperator; -import com.datatorrent.lib.io.IdempotentStorageManager; import com.datatorrent.netlet.util.DTThrowable; import com.datatorrent.api.Context.OperatorContext; import com.rabbitmq.client.Channel; @@ -32,6 +31,8 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.apache.apex.malhar.lib.wal.WindowDataManager; + /** * This is the base implementation of a RabbitMQ output operator.  * A concrete operator should be created from this skeleton implementation. @@ -74,7 +75,7 @@ public class AbstractRabbitMQOutputOperator extends BaseOperator transient String exchange = "testEx"; transient String queueName="testQ"; - private IdempotentStorageManager idempotentStorageManager; + private WindowDataManager windowDataManager; private transient long currentWindowId; private transient long largestRecoveryWindowId; private transient int operatorContextId; @@ -95,7 +96,7 @@ public void setup(OperatorContext context) channel = connection.createChannel(); channel.exchangeDeclare(exchange, "fanout"); - this.idempotentStorageManager.setup(context); + this.windowDataManager.setup(context); } catch (IOException ex) { @@ -108,7 +109,7 @@ public void setup(OperatorContext context) public void beginWindow(long windowId) { currentWindowId = windowId; - largestRecoveryWindowId = idempotentStorageManager.getLargestRecoveryWindow(); + largestRecoveryWindowId = windowDataManager.getLargestRecoveryWindow(); if (windowId <= largestRecoveryWindowId) { // Do not resend already sent tuples skipProcessingTuple = true; @@ -131,7 +132,7 @@ public void endWindow() return; } try { - idempotentStorageManager.save("processedWindow", operatorContextId, currentWindowId); + windowDataManager.save("processedWindow", operatorContextId, currentWindowId); } catch (IOException e) { DTThrowable.rethrow(e); } @@ -151,19 +152,19 @@ public void teardown() try { channel.close(); connection.close(); - this.idempotentStorageManager.teardown(); + this.windowDataManager.teardown(); } catch (IOException ex) { logger.debug(ex.toString()); } } - public IdempotentStorageManager getIdempotentStorageManager() { - return idempotentStorageManager; + public WindowDataManager getWindowDataManager() { + return windowDataManager; } - public void setIdempotentStorageManager(IdempotentStorageManager idempotentStorageManager) { - this.idempotentStorageManager = idempotentStorageManager; + public void setWindowDataManager(WindowDataManager windowDataManager) { + this.windowDataManager = windowDataManager; } } diff --git a/contrib/src/main/java/com/datatorrent/contrib/redis/AbstractRedisInputOperator.java b/contrib/src/main/java/com/datatorrent/contrib/redis/AbstractRedisInputOperator.java index daca1fced2..fd0a8852f0 100644 --- a/contrib/src/main/java/com/datatorrent/contrib/redis/AbstractRedisInputOperator.java +++ b/contrib/src/main/java/com/datatorrent/contrib/redis/AbstractRedisInputOperator.java @@ -25,6 +25,9 @@ import javax.validation.constraints.NotNull; +import org.apache.apex.malhar.lib.wal.FSWindowDataManager; +import org.apache.apex.malhar.lib.wal.WindowDataManager; + import redis.clients.jedis.ScanParams; import redis.clients.jedis.ScanResult; @@ -32,7 +35,6 @@ import com.datatorrent.api.Context.OperatorContext; import com.datatorrent.netlet.util.DTThrowable; import com.datatorrent.lib.db.AbstractKeyValueStoreInputOperator; -import com.datatorrent.lib.io.IdempotentStorageManager; /** * This is the base implementation of a Redis input operator. @@ -57,7 +59,7 @@ public abstract class AbstractRedisInputOperator extends AbstractKeyValueStor private transient boolean skipOffsetRecovery = true; @NotNull - private IdempotentStorageManager idempotentStorageManager; + private WindowDataManager windowDataManager; private transient OperatorContext context; private transient long currentWindowId; @@ -83,7 +85,7 @@ public AbstractRedisInputOperator() recoveryState = new RecoveryState(); recoveryState.scanOffsetAtBeginWindow = 0; recoveryState.numberOfScanCallsInWindow = 0; - setIdempotentStorageManager(new IdempotentStorageManager.NoopIdempotentStorageManager()); + setWindowDataManager(new FSWindowDataManager()); } @Override @@ -92,7 +94,7 @@ public void beginWindow(long windowId) currentWindowId = windowId; scanCallsInCurrentWindow = 0; replay = false; - if (currentWindowId <= getIdempotentStorageManager().getLargestRecoveryWindow()) { + if (currentWindowId <= getWindowDataManager().getLargestRecoveryWindow()) { replay(windowId); } } @@ -105,11 +107,11 @@ private void replay(long windowId) if (!skipOffsetRecovery) { // Begin offset for this window is recovery offset stored for the last // window - RecoveryState recoveryStateForLastWindow = (RecoveryState) getIdempotentStorageManager().load(context.getId(), windowId - 1); + RecoveryState recoveryStateForLastWindow = (RecoveryState) getWindowDataManager().load(context.getId(), windowId - 1); recoveryState.scanOffsetAtBeginWindow = recoveryStateForLastWindow.scanOffsetAtBeginWindow; } skipOffsetRecovery = false; - RecoveryState recoveryStateForCurrentWindow = (RecoveryState) getIdempotentStorageManager().load(context.getId(), windowId); + RecoveryState recoveryStateForCurrentWindow = (RecoveryState) getWindowDataManager().load(context.getId(), windowId); recoveryState.numberOfScanCallsInWindow = recoveryStateForCurrentWindow.numberOfScanCallsInWindow; if (recoveryState.scanOffsetAtBeginWindow != null) { scanOffset = recoveryState.scanOffsetAtBeginWindow; @@ -153,7 +155,7 @@ public void setup(OperatorContext context) { super.setup(context); sleepTimeMillis = context.getValue(context.SPIN_MILLIS); - getIdempotentStorageManager().setup(context); + getWindowDataManager().setup(context); this.context = context; scanOffset = 0; scanComplete = false; @@ -161,7 +163,7 @@ public void setup(OperatorContext context) scanParameters.count(scanCount); // For the 1st window after checkpoint, windowID - 1 would not have recovery - // offset stored in idempotentStorageManager + // offset stored in windowDataManager // But recoveryOffset is non-transient, so will be recovered with // checkPointing // Offset recovery from idempotency storage can be skipped in this case @@ -181,9 +183,9 @@ public void endWindow() recoveryState.scanOffsetAtBeginWindow = scanOffset; recoveryState.numberOfScanCallsInWindow = scanCallsInCurrentWindow; - if (currentWindowId > getIdempotentStorageManager().getLargestRecoveryWindow()) { + if (currentWindowId > getWindowDataManager().getLargestRecoveryWindow()) { try { - getIdempotentStorageManager().save(recoveryState, context.getId(), currentWindowId); + getWindowDataManager().save(recoveryState, context.getId(), currentWindowId); } catch (IOException e) { DTThrowable.rethrow(e); } @@ -194,7 +196,7 @@ public void endWindow() public void teardown() { super.teardown(); - getIdempotentStorageManager().teardown(); + getWindowDataManager().teardown(); } /* @@ -231,7 +233,7 @@ public void checkpointed(long windowId) public void committed(long windowId) { try { - getIdempotentStorageManager().deleteUpTo(context.getId(), windowId); + getWindowDataManager().deleteUpTo(context.getId(), windowId); } catch (IOException e) { throw new RuntimeException("committing", e); } @@ -240,16 +242,16 @@ public void committed(long windowId) /* * get Idempotent Storage manager instance */ - public IdempotentStorageManager getIdempotentStorageManager() + public WindowDataManager getWindowDataManager() { - return idempotentStorageManager; + return windowDataManager; } /* * set Idempotent storage manager instance */ - public void setIdempotentStorageManager(IdempotentStorageManager idempotentStorageManager) + public void setWindowDataManager(WindowDataManager windowDataManager) { - this.idempotentStorageManager = idempotentStorageManager; + this.windowDataManager = windowDataManager; } } diff --git a/contrib/src/test/java/com/datatorrent/contrib/kafka/KafkaInputOperatorTest.java b/contrib/src/test/java/com/datatorrent/contrib/kafka/KafkaInputOperatorTest.java index 9db135531a..27235f5bf7 100644 --- a/contrib/src/test/java/com/datatorrent/contrib/kafka/KafkaInputOperatorTest.java +++ b/contrib/src/test/java/com/datatorrent/contrib/kafka/KafkaInputOperatorTest.java @@ -28,6 +28,7 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; +import org.apache.apex.malhar.lib.wal.FSWindowDataManager; import org.apache.commons.io.FileUtils; import org.apache.hadoop.conf.Configuration; import org.junit.Assert; @@ -49,7 +50,6 @@ import com.datatorrent.common.util.FSStorageAgent; import com.datatorrent.common.util.BaseOperator; import com.datatorrent.lib.helper.OperatorContextTestHelper; -import com.datatorrent.lib.io.IdempotentStorageManager; import com.datatorrent.lib.partitioner.StatelessPartitionerTest; import com.datatorrent.lib.testbench.CollectorTestSink; import com.datatorrent.stram.StramLocalCluster; @@ -141,7 +141,7 @@ public void testKafkaInputOperator(int sleepTime, final int totalCount, KafkaCon } if(idempotent) { - node.setIdempotentStorageManager(new IdempotentStorageManager.FSIdempotentStorageManager()); + node.setWindowDataManager(new FSWindowDataManager()); } consumer.setTopic(TEST_TOPIC); @@ -304,7 +304,7 @@ public void testRecoveryAndIdempotency() throws Exception operator.deactivate(); operator = createAndDeployOperator(); - Assert.assertEquals("largest recovery window", 2, operator.getIdempotentStorageManager().getLargestRecoveryWindow()); + Assert.assertEquals("largest recovery window", 2, operator.getWindowDataManager().getLargestRecoveryWindow()); operator.beginWindow(1); operator.emitTuples(); @@ -339,9 +339,9 @@ private KafkaSinglePortStringInputOperator createAndDeployOperator() consumer.setTopic(TEST_TOPIC); consumer.setInitialOffset("earliest"); - IdempotentStorageManager.FSIdempotentStorageManager storageManager = new IdempotentStorageManager.FSIdempotentStorageManager(); + FSWindowDataManager storageManager = new FSWindowDataManager(); storageManager.setRecoveryPath(testMeta.recoveryDir); - testMeta.operator.setIdempotentStorageManager(storageManager); + testMeta.operator.setWindowDataManager(storageManager); testMeta.operator.setConsumer(consumer); testMeta.operator.setZookeeper("localhost:" + KafkaOperatorTestBase.TEST_ZOOKEEPER_PORT[0]); testMeta.operator.setMaxTuplesPerWindow(500); diff --git a/contrib/src/test/java/com/datatorrent/contrib/kinesis/KinesisInputOperatorTest.java b/contrib/src/test/java/com/datatorrent/contrib/kinesis/KinesisInputOperatorTest.java index 8051b96a43..e8eff5d65c 100644 --- a/contrib/src/test/java/com/datatorrent/contrib/kinesis/KinesisInputOperatorTest.java +++ b/contrib/src/test/java/com/datatorrent/contrib/kinesis/KinesisInputOperatorTest.java @@ -232,11 +232,11 @@ public void testRecoveryAndIdempotency() throws Exception testMeta.operator.setup(testMeta.context); testMeta.operator.activate(testMeta.context); - Assert.assertEquals("largest recovery window", 1, testMeta.operator.getIdempotentStorageManager().getLargestRecoveryWindow()); + Assert.assertEquals("largest recovery window", 1, testMeta.operator.getWindowDataManager().getLargestRecoveryWindow()); testMeta.operator.beginWindow(1); testMeta.operator.endWindow(); Assert.assertEquals("num of messages in window 1", 10, testMeta.sink.collectedTuples.size()); testMeta.sink.collectedTuples.clear(); } -} \ No newline at end of file +} diff --git a/contrib/src/test/java/com/datatorrent/contrib/nifi/NiFiSinglePortInputOperatorTest.java b/contrib/src/test/java/com/datatorrent/contrib/nifi/NiFiSinglePortInputOperatorTest.java index f36f1f2e24..dc56e1c533 100644 --- a/contrib/src/test/java/com/datatorrent/contrib/nifi/NiFiSinglePortInputOperatorTest.java +++ b/contrib/src/test/java/com/datatorrent/contrib/nifi/NiFiSinglePortInputOperatorTest.java @@ -33,6 +33,7 @@ import org.junit.Before; import org.junit.Test; +import org.apache.apex.malhar.lib.wal.FSWindowDataManager; import org.apache.nifi.remote.protocol.DataPacket; import org.apache.nifi.stream.io.ByteArrayInputStream; import org.apache.nifi.util.file.FileUtils; @@ -71,7 +72,7 @@ public void setup() throws IOException sink = new CollectorTestSink<>(); builder = new MockSiteToSiteClient.Builder(); - windowDataManager = new WindowDataManager.FSWindowDataManager(); + windowDataManager = new FSWindowDataManager(); operator = new NiFiSinglePortInputOperator(builder, windowDataManager); operator.outputPort.setSink(sink); diff --git a/contrib/src/test/java/com/datatorrent/contrib/nifi/NiFiSinglePortOutputOperatorTest.java b/contrib/src/test/java/com/datatorrent/contrib/nifi/NiFiSinglePortOutputOperatorTest.java index e8aa982eb1..14b14930c7 100644 --- a/contrib/src/test/java/com/datatorrent/contrib/nifi/NiFiSinglePortOutputOperatorTest.java +++ b/contrib/src/test/java/com/datatorrent/contrib/nifi/NiFiSinglePortOutputOperatorTest.java @@ -30,6 +30,7 @@ import org.junit.Before; import org.junit.Test; +import org.apache.apex.malhar.lib.wal.FSWindowDataManager; import org.apache.apex.malhar.lib.wal.WindowDataManager; import org.apache.commons.io.IOUtils; import org.apache.nifi.remote.protocol.DataPacket; @@ -65,7 +66,7 @@ public void setup() throws IOException context = new OperatorContextTestHelper.TestIdOperatorContext(12345, attributeMap); - windowDataManager = new WindowDataManager.FSWindowDataManager(); + windowDataManager = new FSWindowDataManager(); stsBuilder = new MockSiteToSiteClient.Builder(); dpBuilder = new StringNiFiDataPacketBuilder(); diff --git a/contrib/src/test/java/com/datatorrent/contrib/rabbitmq/RabbitMQInputOperatorTest.java b/contrib/src/test/java/com/datatorrent/contrib/rabbitmq/RabbitMQInputOperatorTest.java index 315160b9ee..4fccffa431 100644 --- a/contrib/src/test/java/com/datatorrent/contrib/rabbitmq/RabbitMQInputOperatorTest.java +++ b/contrib/src/test/java/com/datatorrent/contrib/rabbitmq/RabbitMQInputOperatorTest.java @@ -33,6 +33,8 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.apache.apex.malhar.lib.wal.FSWindowDataManager; + import com.datatorrent.contrib.helper.CollectorModule; import com.datatorrent.contrib.helper.MessageQueueTestHelper; import com.datatorrent.api.Context.OperatorContext; @@ -41,7 +43,6 @@ import com.datatorrent.api.DAG.Locality; import com.datatorrent.api.LocalMode; import com.datatorrent.lib.helper.OperatorContextTestHelper; -import com.datatorrent.lib.io.IdempotentStorageManager; import com.datatorrent.lib.testbench.CollectorTestSink; import com.datatorrent.netlet.util.DTThrowable; @@ -127,7 +128,7 @@ protected void runTest(final int testNum) throws IOException LocalMode lma = LocalMode.newInstance(); DAG dag = lma.getDAG(); RabbitMQInputOperator consumer = dag.addOperator("Consumer", RabbitMQInputOperator.class); - consumer.setIdempotentStorageManager(new IdempotentStorageManager.FSIdempotentStorageManager()); + consumer.setWindowDataManager(new FSWindowDataManager()); final CollectorModule collector = dag.addOperator("Collector", new CollectorModule()); @@ -188,7 +189,7 @@ public void run() public void testRecoveryAndIdempotency() throws Exception { RabbitMQInputOperator operator = new RabbitMQInputOperator(); - operator.setIdempotentStorageManager(new IdempotentStorageManager.FSIdempotentStorageManager()); + operator.setWindowDataManager(new FSWindowDataManager()); operator.setHost("localhost"); operator.setExchange("testEx"); operator.setExchangeType("fanout"); @@ -220,7 +221,7 @@ public void testRecoveryAndIdempotency() throws Exception operator.setup(context); operator.activate(context); - Assert.assertEquals("largest recovery window", 1, operator.getIdempotentStorageManager().getLargestRecoveryWindow()); + Assert.assertEquals("largest recovery window", 1, operator.getWindowDataManager().getLargestRecoveryWindow()); operator.beginWindow(1); operator.endWindow(); Assert.assertEquals("num of messages in window 1", 15, sink.collectedTuples.size()); @@ -228,7 +229,7 @@ public void testRecoveryAndIdempotency() throws Exception operator.deactivate(); operator.teardown(); - operator.getIdempotentStorageManager().deleteUpTo(context.getId(), 1); + operator.getWindowDataManager().deleteUpTo(context.getId(), 1); publisher.teardown(); } } diff --git a/contrib/src/test/java/com/datatorrent/contrib/rabbitmq/RabbitMQOutputOperatorTest.java b/contrib/src/test/java/com/datatorrent/contrib/rabbitmq/RabbitMQOutputOperatorTest.java index 596a33a46b..3fa36abf71 100644 --- a/contrib/src/test/java/com/datatorrent/contrib/rabbitmq/RabbitMQOutputOperatorTest.java +++ b/contrib/src/test/java/com/datatorrent/contrib/rabbitmq/RabbitMQOutputOperatorTest.java @@ -29,8 +29,9 @@ import org.junit.Test; import org.slf4j.LoggerFactory; +import org.apache.apex.malhar.lib.wal.FSWindowDataManager; + import com.datatorrent.contrib.helper.SourceModule; -import com.datatorrent.lib.io.IdempotentStorageManager; import com.datatorrent.api.DAG; import com.datatorrent.api.DAG.Locality; import com.datatorrent.api.LocalMode; @@ -144,7 +145,7 @@ protected void runTest(int testNum) throws IOException SourceModule source = dag.addOperator("source", new SourceModule()); source.setTestNum(testNum); RabbitMQOutputOperator collector = dag.addOperator("generator", new RabbitMQOutputOperator()); - collector.setIdempotentStorageManager(new IdempotentStorageManager.FSIdempotentStorageManager()); + collector.setWindowDataManager(new FSWindowDataManager()); collector.setExchange("testEx"); dag.addStream("Stream", source.outPort, collector.inputPort).setLocality(Locality.CONTAINER_LOCAL); diff --git a/contrib/src/test/java/com/datatorrent/contrib/redis/RedisInputOperatorTest.java b/contrib/src/test/java/com/datatorrent/contrib/redis/RedisInputOperatorTest.java index 6f3a177c7d..bee170d797 100644 --- a/contrib/src/test/java/com/datatorrent/contrib/redis/RedisInputOperatorTest.java +++ b/contrib/src/test/java/com/datatorrent/contrib/redis/RedisInputOperatorTest.java @@ -25,6 +25,8 @@ import org.junit.Assert; import org.junit.Test; +import org.apache.apex.malhar.lib.wal.FSWindowDataManager; + import redis.clients.jedis.ScanParams; import com.datatorrent.api.Attribute; @@ -34,7 +36,6 @@ import com.datatorrent.api.LocalMode; import com.datatorrent.common.util.BaseOperator; import com.datatorrent.lib.helper.OperatorContextTestHelper; -import com.datatorrent.lib.io.IdempotentStorageManager; import com.datatorrent.lib.testbench.CollectorTestSink; import com.datatorrent.lib.util.KeyValPair; @@ -134,7 +135,7 @@ public void testRecoveryAndIdempotency() throws Exception testStore.put("test_ghi", "123"); RedisKeyValueInputOperator operator = new RedisKeyValueInputOperator(); - operator.setIdempotentStorageManager(new IdempotentStorageManager.FSIdempotentStorageManager()); + operator.setWindowDataManager(new FSWindowDataManager()); operator.setStore(operatorStore); operator.setScanCount(1); @@ -162,13 +163,13 @@ public void testRecoveryAndIdempotency() throws Exception // failure and then re-deployment of operator // Re-instantiating to reset values operator = new RedisKeyValueInputOperator(); - operator.setIdempotentStorageManager(new IdempotentStorageManager.FSIdempotentStorageManager()); + operator.setWindowDataManager(new FSWindowDataManager()); operator.setStore(operatorStore); operator.setScanCount(1); operator.outputPort.setSink(sink); operator.setup(context); - Assert.assertEquals("largest recovery window", 2, operator.getIdempotentStorageManager().getLargestRecoveryWindow()); + Assert.assertEquals("largest recovery window", 2, operator.getWindowDataManager().getLargestRecoveryWindow()); operator.beginWindow(1); operator.emitTuples(); @@ -188,7 +189,7 @@ public void testRecoveryAndIdempotency() throws Exception testStore.remove(entry.getKey()); } sink.collectedTuples.clear(); - operator.getIdempotentStorageManager().deleteUpTo(context.getId(), 5); + operator.getWindowDataManager().deleteUpTo(context.getId(), 5); operator.teardown(); } } diff --git a/kafka/src/test/java/org/apache/apex/malhar/kafka/KafkaInputOperatorTest.java b/kafka/src/test/java/org/apache/apex/malhar/kafka/KafkaInputOperatorTest.java index ede7f38bb6..72ecd579cf 100644 --- a/kafka/src/test/java/org/apache/apex/malhar/kafka/KafkaInputOperatorTest.java +++ b/kafka/src/test/java/org/apache/apex/malhar/kafka/KafkaInputOperatorTest.java @@ -40,7 +40,7 @@ import org.junit.runners.Parameterized; import org.slf4j.LoggerFactory; -import org.apache.apex.malhar.lib.wal.WindowDataManager; +import org.apache.apex.malhar.lib.wal.FSWindowDataManager; import org.apache.commons.io.FileUtils; import com.datatorrent.api.Context; @@ -255,7 +255,7 @@ public void testInputOperator(boolean hasFailure, boolean idempotent) throws Exc node.setClusters(getClusterConfig()); node.setStrategy(partition); if(idempotent) { - node.setWindowDataManager(new WindowDataManager.FSWindowDataManager()); + node.setWindowDataManager(new FSWindowDataManager()); } diff --git a/library/src/main/java/com/datatorrent/lib/io/fs/AbstractFileInputOperator.java b/library/src/main/java/com/datatorrent/lib/io/fs/AbstractFileInputOperator.java index bf1605d6b1..dae5c7f895 100644 --- a/library/src/main/java/com/datatorrent/lib/io/fs/AbstractFileInputOperator.java +++ b/library/src/main/java/com/datatorrent/lib/io/fs/AbstractFileInputOperator.java @@ -43,6 +43,7 @@ import org.slf4j.LoggerFactory; import org.apache.apex.malhar.lib.fs.LineByLineFileInputOperator; +import org.apache.apex.malhar.lib.wal.WindowDataManager; import org.apache.commons.lang.mutable.MutableLong; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileStatus; @@ -63,7 +64,6 @@ import com.datatorrent.api.StatsListener; import com.datatorrent.lib.counters.BasicCounters; -import com.datatorrent.lib.io.IdempotentStorageManager; import com.datatorrent.lib.util.KryoCloneUtils; /** @@ -127,7 +127,7 @@ public abstract class AbstractFileInputOperator protected transient MutableLong pendingFileCount = new MutableLong(); @NotNull - protected IdempotentStorageManager idempotentStorageManager = new IdempotentStorageManager.NoopIdempotentStorageManager(); + private WindowDataManager windowDataManager = new WindowDataManager.NoopWindowDataManager(); protected transient long currentWindowId; protected final transient LinkedList currentWindowRecoveryState = Lists.newLinkedList(); protected int operatorId; //needed in partitioning @@ -388,11 +388,11 @@ public void setEmitBatchSize(int emitBatchSize) /** * Sets the idempotent storage manager on the operator. - * @param idempotentStorageManager an {@link IdempotentStorageManager} + * @param windowDataManager an {@link WindowDataManager} */ - public void setIdempotentStorageManager(IdempotentStorageManager idempotentStorageManager) + public void setWindowDataManager(WindowDataManager windowDataManager) { - this.idempotentStorageManager = idempotentStorageManager; + this.windowDataManager = windowDataManager; } /** @@ -400,9 +400,9 @@ public void setIdempotentStorageManager(IdempotentStorageManager idempotentStora * * @return the idempotent storage manager. */ - public IdempotentStorageManager getIdempotentStorageManager() + public WindowDataManager getWindowDataManager() { - return idempotentStorageManager; + return windowDataManager; } /** @@ -456,8 +456,8 @@ public void setup(OperatorContext context) fileCounters.setCounter(FileCounters.LOCAL_NUMBER_OF_RETRIES, localNumberOfRetries); fileCounters.setCounter(FileCounters.PENDING_FILES, pendingFileCount); - idempotentStorageManager.setup(context); - if (context.getValue(OperatorContext.ACTIVATION_WINDOW_ID) < idempotentStorageManager.getLargestRecoveryWindow()) { + windowDataManager.setup(context); + if (context.getValue(OperatorContext.ACTIVATION_WINDOW_ID) < windowDataManager.getLargestRecoveryWindow()) { //reset current file and offset in case of replay currentFile = null; offset = 0; @@ -512,14 +512,14 @@ public void teardown() throw new RuntimeException(errorMessage, savedException); } - idempotentStorageManager.teardown(); + windowDataManager.teardown(); } @Override public void beginWindow(long windowId) { currentWindowId = windowId; - if (windowId <= idempotentStorageManager.getLargestRecoveryWindow()) { + if (windowId <= windowDataManager.getLargestRecoveryWindow()) { replay(windowId); } } @@ -527,9 +527,9 @@ public void beginWindow(long windowId) @Override public void endWindow() { - if (currentWindowId > idempotentStorageManager.getLargestRecoveryWindow()) { + if (currentWindowId > windowDataManager.getLargestRecoveryWindow()) { try { - idempotentStorageManager.save(currentWindowRecoveryState, operatorId, currentWindowId); + windowDataManager.save(currentWindowRecoveryState, operatorId, currentWindowId); } catch (IOException e) { throw new RuntimeException("saving recovery", e); } @@ -553,7 +553,7 @@ protected void replay(long windowId) //all the recovery data for a window and then processes only those files which would be hashed //to it in the current run. try { - Map recoveryDataPerOperator = idempotentStorageManager.load(windowId); + Map recoveryDataPerOperator = windowDataManager.load(windowId); for (Object recovery : recoveryDataPerOperator.values()) { @SuppressWarnings("unchecked") @@ -615,7 +615,7 @@ protected void replay(long windowId) @Override public void emitTuples() { - if (currentWindowId <= idempotentStorageManager.getLargestRecoveryWindow()) { + if (currentWindowId <= windowDataManager.getLargestRecoveryWindow()) { return; } @@ -836,7 +836,7 @@ public Collection>> definePartitions(Coll List scanners = scanner.partition(totalCount, oldscanners); Collection>> newPartitions = Lists.newArrayListWithExpectedSize(totalCount); - Collection newManagers = Lists.newArrayListWithExpectedSize(totalCount); + Collection newManagers = Lists.newArrayListWithExpectedSize(totalCount); KryoCloneUtils> cloneUtils = KryoCloneUtils.createCloneUtils(this); for (int i = 0; i < scanners.size(); i++) { @@ -889,10 +889,10 @@ public Collection>> definePartitions(Coll } } newPartitions.add(new DefaultPartition>(oper)); - newManagers.add(oper.idempotentStorageManager); + newManagers.add(oper.windowDataManager); } - idempotentStorageManager.partitioned(newManagers, deletedOperators); + windowDataManager.partitioned(newManagers, deletedOperators); LOG.info("definePartitions called returning {} partitions", newPartitions.size()); return newPartitions; } @@ -917,7 +917,7 @@ public void checkpointed(long windowId) public void committed(long windowId) { try { - idempotentStorageManager.deleteUpTo(operatorId, windowId); + windowDataManager.deleteUpTo(operatorId, windowId); } catch (IOException e) { throw new RuntimeException(e); } diff --git a/library/src/main/java/com/datatorrent/lib/io/fs/FileSplitter.java b/library/src/main/java/com/datatorrent/lib/io/fs/FileSplitter.java index 69e44a50a5..cd79a2bb2f 100644 --- a/library/src/main/java/com/datatorrent/lib/io/fs/FileSplitter.java +++ b/library/src/main/java/com/datatorrent/lib/io/fs/FileSplitter.java @@ -40,6 +40,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.apache.apex.malhar.lib.wal.WindowDataManager; import org.apache.commons.lang.mutable.MutableLong; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileStatus; @@ -61,7 +62,6 @@ import com.datatorrent.api.Operator; import com.datatorrent.api.annotation.OperatorAnnotation; import com.datatorrent.lib.counters.BasicCounters; -import com.datatorrent.lib.io.IdempotentStorageManager; import com.datatorrent.lib.io.block.BlockMetadata.FileBlockMetadata; import com.datatorrent.netlet.util.DTThrowable; @@ -99,7 +99,7 @@ public class FileSplitter implements InputOperator, Operator.CheckpointListener protected TimeBasedDirectoryScanner scanner; @NotNull - protected IdempotentStorageManager idempotentStorageManager; + protected WindowDataManager windowDataManager; @NotNull protected final transient LinkedList currentWindowRecoveryState; @@ -118,7 +118,7 @@ public FileSplitter() { currentWindowRecoveryState = Lists.newLinkedList(); fileCounters = new BasicCounters(MutableLong.class); - idempotentStorageManager = new IdempotentStorageManager.FSIdempotentStorageManager(); + windowDataManager = new WindowDataManager.NoopWindowDataManager(); scanner = new TimeBasedDirectoryScanner(); blocksThreshold = Integer.MAX_VALUE; } @@ -133,7 +133,7 @@ public void setup(Context.OperatorContext context) this.context = context; fileCounters.setCounter(Counters.PROCESSED_FILES, new MutableLong()); - idempotentStorageManager.setup(context); + windowDataManager.setup(context); try { fs = scanner.getFSInstance(); @@ -145,8 +145,7 @@ public void setup(Context.OperatorContext context) blockSize = fs.getDefaultBlockSize(new Path(scanner.files.iterator().next())); } - if (context.getValue(Context.OperatorContext.ACTIVATION_WINDOW_ID) < - idempotentStorageManager.getLargestRecoveryWindow()) { + if (context.getValue(Context.OperatorContext.ACTIVATION_WINDOW_ID) < windowDataManager.getLargestRecoveryWindow()) { blockMetadataIterator = null; } else { //don't setup scanner while recovery @@ -176,7 +175,7 @@ public void beginWindow(long windowId) { blockCount = 0; currentWindowId = windowId; - if (windowId <= idempotentStorageManager.getLargestRecoveryWindow()) { + if (windowId <= windowDataManager.getLargestRecoveryWindow()) { replay(windowId); } } @@ -185,7 +184,7 @@ protected void replay(long windowId) { try { @SuppressWarnings("unchecked") - LinkedList recoveredData = (LinkedList)idempotentStorageManager.load(operatorId, windowId); + LinkedList recoveredData = (LinkedList)windowDataManager.load(operatorId, windowId); if (recoveredData == null) { //This could happen when there are multiple physical instances and one of them is ahead in processing windows. return; @@ -210,7 +209,7 @@ protected void replay(long windowId) } } - if (windowId == idempotentStorageManager.getLargestRecoveryWindow()) { + if (windowId == windowDataManager.getLargestRecoveryWindow()) { scanner.setup(context); } } catch (IOException e) { @@ -221,7 +220,7 @@ protected void replay(long windowId) @Override public void emitTuples() { - if (currentWindowId <= idempotentStorageManager.getLargestRecoveryWindow()) { + if (currentWindowId <= windowDataManager.getLargestRecoveryWindow()) { return; } @@ -260,9 +259,9 @@ public void emitTuples() @Override public void endWindow() { - if (currentWindowId > idempotentStorageManager.getLargestRecoveryWindow()) { + if (currentWindowId > windowDataManager.getLargestRecoveryWindow()) { try { - idempotentStorageManager.save(currentWindowRecoveryState, operatorId, currentWindowId); + windowDataManager.save(currentWindowRecoveryState, operatorId, currentWindowId); } catch (IOException e) { throw new RuntimeException("saving recovery", e); } @@ -370,14 +369,14 @@ public TimeBasedDirectoryScanner getScanner() return this.scanner; } - public void setIdempotentStorageManager(IdempotentStorageManager idempotentStorageManager) + public void setWindowDataManager(WindowDataManager windowDataManager) { - this.idempotentStorageManager = idempotentStorageManager; + this.windowDataManager = windowDataManager; } - public IdempotentStorageManager getIdempotentStorageManager() + public WindowDataManager getWindowDataManager() { - return this.idempotentStorageManager; + return this.windowDataManager; } @Override @@ -389,7 +388,7 @@ public void checkpointed(long l) public void committed(long l) { try { - idempotentStorageManager.deleteUpTo(operatorId, l); + windowDataManager.deleteUpTo(operatorId, l); } catch (IOException e) { throw new RuntimeException(e); } diff --git a/library/src/main/java/com/datatorrent/lib/io/fs/FileSplitterInput.java b/library/src/main/java/com/datatorrent/lib/io/fs/FileSplitterInput.java index bd680163b9..a58bee7378 100644 --- a/library/src/main/java/com/datatorrent/lib/io/fs/FileSplitterInput.java +++ b/library/src/main/java/com/datatorrent/lib/io/fs/FileSplitterInput.java @@ -42,6 +42,8 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; + +import org.apache.apex.malhar.lib.wal.WindowDataManager; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; @@ -60,7 +62,6 @@ import com.datatorrent.api.Operator; import com.datatorrent.api.annotation.OperatorAnnotation; import com.datatorrent.api.annotation.Stateless; -import com.datatorrent.lib.io.IdempotentStorageManager; import com.datatorrent.netlet.util.DTThrowable; /** @@ -79,7 +80,7 @@ public class FileSplitterInput extends AbstractFileSplitter implements InputOperator, Operator.CheckpointListener { @NotNull - private IdempotentStorageManager idempotentStorageManager; + private WindowDataManager windowDataManager; @NotNull protected final transient LinkedList currentWindowRecoveryState; @@ -95,7 +96,7 @@ public FileSplitterInput() { super(); currentWindowRecoveryState = Lists.newLinkedList(); - idempotentStorageManager = new IdempotentStorageManager.NoopIdempotentStorageManager(); + windowDataManager = new WindowDataManager.NoopWindowDataManager(); referenceTimes = Maps.newHashMap(); scanner = new TimeBasedDirectoryScanner(); } @@ -105,10 +106,10 @@ public void setup(Context.OperatorContext context) { sleepMillis = context.getValue(Context.OperatorContext.SPIN_MILLIS); scanner.setup(context); - idempotentStorageManager.setup(context); + windowDataManager.setup(context); super.setup(context); - long largestRecoveryWindow = idempotentStorageManager.getLargestRecoveryWindow(); + long largestRecoveryWindow = windowDataManager.getLargestRecoveryWindow(); if (largestRecoveryWindow == Stateless.WINDOW_ID || context.getValue(Context.OperatorContext.ACTIVATION_WINDOW_ID) > largestRecoveryWindow) { scanner.startScanning(Collections.unmodifiableMap(referenceTimes)); @@ -119,7 +120,7 @@ public void setup(Context.OperatorContext context) public void beginWindow(long windowId) { super.beginWindow(windowId); - if (windowId <= idempotentStorageManager.getLargestRecoveryWindow()) { + if (windowId <= windowDataManager.getLargestRecoveryWindow()) { replay(windowId); } } @@ -128,8 +129,7 @@ protected void replay(long windowId) { try { @SuppressWarnings("unchecked") - LinkedList recoveredData = (LinkedList)idempotentStorageManager - .load(operatorId, windowId); + LinkedList recoveredData = (LinkedList)windowDataManager.load(operatorId, windowId); if (recoveredData == null) { //This could happen when there are multiple physical instances and one of them is ahead in processing windows. return; @@ -150,7 +150,7 @@ protected void replay(long windowId) } catch (IOException e) { throw new RuntimeException("replay", e); } - if (windowId == idempotentStorageManager.getLargestRecoveryWindow()) { + if (windowId == windowDataManager.getLargestRecoveryWindow()) { scanner.startScanning(Collections.unmodifiableMap(referenceTimes)); } } @@ -158,7 +158,7 @@ protected void replay(long windowId) @Override public void emitTuples() { - if (currentWindowId <= idempotentStorageManager.getLargestRecoveryWindow()) { + if (currentWindowId <= windowDataManager.getLargestRecoveryWindow()) { return; } @@ -204,9 +204,9 @@ protected void updateReferenceTimes(ScannedFileInfo fileInfo) @Override public void endWindow() { - if (currentWindowId > idempotentStorageManager.getLargestRecoveryWindow()) { + if (currentWindowId > windowDataManager.getLargestRecoveryWindow()) { try { - idempotentStorageManager.save(currentWindowRecoveryState, operatorId, currentWindowId); + windowDataManager.save(currentWindowRecoveryState, operatorId, currentWindowId); } catch (IOException e) { throw new RuntimeException("saving recovery", e); } @@ -235,7 +235,7 @@ public void checkpointed(long l) public void committed(long l) { try { - idempotentStorageManager.deleteUpTo(operatorId, l); + windowDataManager.deleteUpTo(operatorId, l); } catch (IOException e) { throw new RuntimeException(e); } @@ -247,14 +247,14 @@ public void teardown() scanner.teardown(); } - public void setIdempotentStorageManager(IdempotentStorageManager idempotentStorageManager) + public void setWindowDataManager(WindowDataManager windowDataManager) { - this.idempotentStorageManager = idempotentStorageManager; + this.windowDataManager = windowDataManager; } - public IdempotentStorageManager getIdempotentStorageManager() + public WindowDataManager getWindowDataManager() { - return this.idempotentStorageManager; + return this.windowDataManager; } public void setScanner(TimeBasedDirectoryScanner scanner) diff --git a/library/src/main/java/com/datatorrent/lib/io/jms/AbstractJMSInputOperator.java b/library/src/main/java/com/datatorrent/lib/io/jms/AbstractJMSInputOperator.java index 43445a791c..cc27c88550 100644 --- a/library/src/main/java/com/datatorrent/lib/io/jms/AbstractJMSInputOperator.java +++ b/library/src/main/java/com/datatorrent/lib/io/jms/AbstractJMSInputOperator.java @@ -39,6 +39,8 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.apache.apex.malhar.lib.wal.FSWindowDataManager; +import org.apache.apex.malhar.lib.wal.WindowDataManager; import org.apache.commons.lang.mutable.MutableLong; import com.google.common.collect.Maps; @@ -52,7 +54,6 @@ import com.datatorrent.api.Operator.ActivationListener; import com.datatorrent.api.annotation.OperatorAnnotation; import com.datatorrent.lib.counters.BasicCounters; -import com.datatorrent.lib.io.IdempotentStorageManager; import com.datatorrent.netlet.util.DTThrowable; /** @@ -63,8 +64,8 @@ * {@link #onMessage(Message)} is called which buffers the message into a holding buffer. This is asynchronous.
* {@link #emitTuples()} retrieves messages from holding buffer and processes them. *

- * Important: The {@link IdempotentStorageManager.FSIdempotentStorageManager} makes the operator fault tolerant as - * well as idempotent. If {@link IdempotentStorageManager.NoopIdempotentStorageManager} is set on the operator then + * Important: The {@link FSWindowDataManager} makes the operator fault tolerant as + * well as idempotent. If {@link WindowDataManager.NoopWindowDataManager} is set on the operator then * it will not be fault-tolerant as well. *

* Configurations:
@@ -105,7 +106,7 @@ public abstract class AbstractJMSInputOperator extends JMSBase private final transient AtomicReference throwable; @NotNull - protected IdempotentStorageManager idempotentStorageManager; + protected WindowDataManager windowDataManager; private transient long[] operatorRecoveredWindows; protected transient long currentWindowId; protected transient int emitCount; @@ -120,7 +121,7 @@ public AbstractJMSInputOperator() counters = new BasicCounters(MutableLong.class); throwable = new AtomicReference(); pendingAck = Sets.newHashSet(); - idempotentStorageManager = new IdempotentStorageManager.FSIdempotentStorageManager(); + windowDataManager = new FSWindowDataManager(); lock = new Lock(); @@ -200,9 +201,9 @@ public void setup(OperatorContext context) spinMillis = context.getValue(OperatorContext.SPIN_MILLIS); counters.setCounter(CounterKeys.RECEIVED, new MutableLong()); counters.setCounter(CounterKeys.REDELIVERED, new MutableLong()); - idempotentStorageManager.setup(context); + windowDataManager.setup(context); try { - operatorRecoveredWindows = idempotentStorageManager.getWindowIds(context.getId()); + operatorRecoveredWindows = windowDataManager.getWindowIds(context.getId()); if (operatorRecoveredWindows != null) { Arrays.sort(operatorRecoveredWindows); } @@ -261,7 +262,7 @@ public void activate(OperatorContext ctx) public void beginWindow(long windowId) { currentWindowId = windowId; - if (windowId <= idempotentStorageManager.getLargestRecoveryWindow()) { + if (windowId <= windowDataManager.getLargestRecoveryWindow()) { replay(windowId); } } @@ -270,7 +271,7 @@ protected void replay(long windowId) { try { @SuppressWarnings("unchecked") - Map recoveredData = (Map)idempotentStorageManager.load(context.getId(), windowId); + Map recoveredData = (Map)windowDataManager.load(context.getId(), windowId); if (recoveredData == null) { return; } @@ -286,7 +287,7 @@ protected void replay(long windowId) @Override public void emitTuples() { - if (currentWindowId <= idempotentStorageManager.getLargestRecoveryWindow()) { + if (currentWindowId <= windowDataManager.getLargestRecoveryWindow()) { return; } @@ -346,7 +347,7 @@ public void handleIdleTime() @Override public void endWindow() { - if (currentWindowId > idempotentStorageManager.getLargestRecoveryWindow()) { + if (currentWindowId > windowDataManager.getLargestRecoveryWindow()) { synchronized (lock) { boolean stateSaved = false; boolean ackCompleted = false; @@ -359,7 +360,7 @@ public void endWindow() emitCount++; lastMsg = msg; } - idempotentStorageManager.save(currentWindowRecoveryState, context.getId(), currentWindowId); + windowDataManager.save(currentWindowRecoveryState, context.getId(), currentWindowId); stateSaved = true; currentWindowRecoveryState.clear(); @@ -376,7 +377,7 @@ public void endWindow() } finally { if (stateSaved && !ackCompleted) { try { - idempotentStorageManager.delete(context.getId(), currentWindowId); + windowDataManager.delete(context.getId(), currentWindowId); } catch (IOException e) { LOG.error("unable to delete corrupted state", e); } @@ -416,7 +417,7 @@ public void checkpointed(long windowId) public void committed(long windowId) { try { - idempotentStorageManager.deleteUpTo(context.getId(), windowId); + windowDataManager.deleteUpTo(context.getId(), windowId); } catch (IOException e) { throw new RuntimeException("committing", e); } @@ -447,7 +448,7 @@ protected void cleanup() @Override public void teardown() { - idempotentStorageManager.teardown(); + windowDataManager.teardown(); } /** @@ -500,17 +501,17 @@ public void setConsumerName(String consumerName) * * @param storageManager */ - public void setIdempotentStorageManager(IdempotentStorageManager storageManager) + public void setWindowDataManager(WindowDataManager storageManager) { - this.idempotentStorageManager = storageManager; + this.windowDataManager = storageManager; } /** * @return the idempotent storage manager. */ - public IdempotentStorageManager getIdempotentStorageManager() + public WindowDataManager getWindowDataManager() { - return this.idempotentStorageManager; + return this.windowDataManager; } protected abstract void emit(T payload); diff --git a/library/src/main/java/org/apache/apex/malhar/lib/state/managed/IncrementalCheckpointManager.java b/library/src/main/java/org/apache/apex/malhar/lib/state/managed/IncrementalCheckpointManager.java index 536702d466..7858e8913f 100644 --- a/library/src/main/java/org/apache/apex/malhar/lib/state/managed/IncrementalCheckpointManager.java +++ b/library/src/main/java/org/apache/apex/malhar/lib/state/managed/IncrementalCheckpointManager.java @@ -32,7 +32,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import org.apache.apex.malhar.lib.wal.WindowDataManager; +import org.apache.apex.malhar.lib.wal.FSWindowDataManager; import com.google.common.base.Preconditions; import com.google.common.base.Throwables; @@ -49,7 +49,7 @@ * * This component is also responsible for purging old time buckets. */ -public class IncrementalCheckpointManager extends WindowDataManager.FSWindowDataManager +public class IncrementalCheckpointManager extends FSWindowDataManager implements ManagedStateComponent { private static final String WAL_RELATIVE_PATH = "managed_state"; diff --git a/library/src/main/java/org/apache/apex/malhar/lib/wal/FSWindowDataManager.java b/library/src/main/java/org/apache/apex/malhar/lib/wal/FSWindowDataManager.java new file mode 100644 index 0000000000..b8ab2ce522 --- /dev/null +++ b/library/src/main/java/org/apache/apex/malhar/lib/wal/FSWindowDataManager.java @@ -0,0 +1,339 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.apex.malhar.lib.wal; + +import java.io.IOException; +import java.util.Arrays; +import java.util.Collection; +import java.util.Iterator; +import java.util.Map; +import java.util.Set; +import java.util.SortedSet; + +import javax.validation.constraints.NotNull; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; + +import com.google.common.base.Preconditions; +import com.google.common.collect.Maps; +import com.google.common.collect.Sets; +import com.google.common.collect.TreeMultimap; + +import com.datatorrent.api.Context; +import com.datatorrent.api.DAG; +import com.datatorrent.api.annotation.Stateless; +import com.datatorrent.common.util.FSStorageAgent; + +/** + * An {@link WindowDataManager} that uses FS to persist state. + */ +public class FSWindowDataManager implements WindowDataManager +{ + private static final String DEF_RECOVERY_PATH = "idempotentState"; + + protected transient FSStorageAgent storageAgent; + + /** + * Recovery path relative to app path where state is saved. + */ + @NotNull + private String recoveryPath; + + private boolean isRecoveryPathRelativeToAppPath = true; + + /** + * largest window for which there is recovery data across all physical operator instances. + */ + protected transient long largestRecoveryWindow; + + /** + * This is not null only for one physical instance.
+ * It consists of operator ids which have been deleted but have some state that can be replayed. + * Only one of the instances would be handling (modifying) the files that belong to this state. + */ + protected Set deletedOperators; + + /** + * Sorted mapping from window id to all the operators that have state to replay for that window. + */ + protected final transient TreeMultimap replayState; + + protected transient FileSystem fs; + protected transient Path appPath; + + public FSWindowDataManager() + { + replayState = TreeMultimap.create(); + largestRecoveryWindow = Stateless.WINDOW_ID; + recoveryPath = DEF_RECOVERY_PATH; + } + + @Override + public void setup(Context.OperatorContext context) + { + Configuration configuration = new Configuration(); + if (isRecoveryPathRelativeToAppPath) { + appPath = new Path(context.getValue(DAG.APPLICATION_PATH) + Path.SEPARATOR + recoveryPath); + } else { + appPath = new Path(recoveryPath); + } + + try { + storageAgent = new FSStorageAgent(appPath.toString(), configuration); + + fs = FileSystem.newInstance(appPath.toUri(), configuration); + + if (fs.exists(appPath)) { + FileStatus[] fileStatuses = fs.listStatus(appPath); + + for (FileStatus operatorDirStatus : fileStatuses) { + int operatorId = Integer.parseInt(operatorDirStatus.getPath().getName()); + + for (FileStatus status : fs.listStatus(operatorDirStatus.getPath())) { + String fileName = status.getPath().getName(); + if (fileName.endsWith(FSStorageAgent.TMP_FILE)) { + continue; + } + long windowId = Long.parseLong(fileName, 16); + replayState.put(windowId, operatorId); + if (windowId > largestRecoveryWindow) { + largestRecoveryWindow = windowId; + } + } + } + } + } catch (IOException e) { + throw new RuntimeException(e); + } + } + + @Override + public void save(Object object, int operatorId, long windowId) throws IOException + { + storageAgent.save(object, operatorId, windowId); + } + + @Override + public Object load(int operatorId, long windowId) throws IOException + { + Set operators = replayState.get(windowId); + if (operators == null || !operators.contains(operatorId)) { + return null; + } + return storageAgent.load(operatorId, windowId); + } + + @Override + public void delete(int operatorId, long windowId) throws IOException + { + storageAgent.delete(operatorId, windowId); + } + + @Override + public Map load(long windowId) throws IOException + { + Set operators = replayState.get(windowId); + if (operators == null) { + return null; + } + Map data = Maps.newHashMap(); + for (int operatorId : operators) { + data.put(operatorId, load(operatorId, windowId)); + } + return data; + } + + @Override + public long[] getWindowIds(int operatorId) throws IOException + { + Path operatorPath = new Path(appPath, String.valueOf(operatorId)); + if (!fs.exists(operatorPath) || fs.listStatus(operatorPath).length == 0) { + return null; + } + return storageAgent.getWindowIds(operatorId); + } + + @Override + public long[] getWindowIds() throws IOException + { + SortedSet windowIds = replayState.keySet(); + long[] windowIdsArray = new long[windowIds.size()]; + + int index = 0; + + for (Long windowId: windowIds) { + windowIdsArray[index] = windowId; + index++; + } + + return windowIdsArray; + } + + /** + * This deletes all the recovery files of window ids <= windowId. + * + * @param operatorId operator id. + * @param windowId the largest window id for which the states will be deleted. + * @throws IOException + */ + @Override + public void deleteUpTo(int operatorId, long windowId) throws IOException + { + //deleting the replay state + if (windowId <= largestRecoveryWindow && deletedOperators != null && !deletedOperators.isEmpty()) { + Iterator>> iterator = replayState.asMap().entrySet().iterator(); + while (iterator.hasNext()) { + Map.Entry> windowEntry = iterator.next(); + long lwindow = windowEntry.getKey(); + if (lwindow > windowId) { + break; + } + for (Integer loperator : windowEntry.getValue()) { + + if (deletedOperators.contains(loperator)) { + storageAgent.delete(loperator, lwindow); + + Path loperatorPath = new Path(appPath, Integer.toString(loperator)); + if (fs.listStatus(loperatorPath).length == 0) { + //The operator was deleted and it has nothing to replay. + deletedOperators.remove(loperator); + fs.delete(loperatorPath, true); + } + } else if (loperator == operatorId) { + storageAgent.delete(loperator, lwindow); + } + } + iterator.remove(); + } + } + + if (fs.listStatus(new Path(appPath, Integer.toString(operatorId))).length > 0) { + long[] windowsAfterReplay = storageAgent.getWindowIds(operatorId); + Arrays.sort(windowsAfterReplay); + for (long lwindow : windowsAfterReplay) { + if (lwindow <= windowId) { + storageAgent.delete(operatorId, lwindow); + } + } + } + } + + @Override + public long getLargestRecoveryWindow() + { + return largestRecoveryWindow; + } + + @Override + public void partitioned(Collection newManagers, Set removedOperatorIds) + { + Preconditions.checkArgument(newManagers != null && !newManagers.isEmpty(), + "there has to be one idempotent storage manager"); + org.apache.apex.malhar.lib.wal.FSWindowDataManager deletedOperatorsManager = null; + + if (removedOperatorIds != null && !removedOperatorIds.isEmpty()) { + if (this.deletedOperators == null) { + this.deletedOperators = Sets.newHashSet(); + } + this.deletedOperators.addAll(removedOperatorIds); + } + + for (WindowDataManager storageManager : newManagers) { + + org.apache.apex.malhar.lib.wal.FSWindowDataManager lmanager = (org.apache.apex.malhar.lib.wal.FSWindowDataManager)storageManager; + lmanager.recoveryPath = this.recoveryPath; + lmanager.storageAgent = this.storageAgent; + + if (lmanager.deletedOperators != null) { + deletedOperatorsManager = lmanager; + } + //only one physical instance can manage deleted operators so clearing this field for rest of the instances. + if (lmanager != deletedOperatorsManager) { + lmanager.deletedOperators = null; + } + } + + if (removedOperatorIds == null || removedOperatorIds.isEmpty()) { + //Nothing to do + return; + } + if (this.deletedOperators != null) { + + /*If some operators were removed then there needs to be a manager which can clean there state when it is not + needed.*/ + if (deletedOperatorsManager == null) { + //None of the managers were handling deleted operators data. + deletedOperatorsManager = (org.apache.apex.malhar.lib.wal.FSWindowDataManager)newManagers.iterator().next(); + deletedOperatorsManager.deletedOperators = Sets.newHashSet(); + } + + deletedOperatorsManager.deletedOperators.addAll(removedOperatorIds); + } + } + + @Override + public void teardown() + { + try { + fs.close(); + } catch (IOException e) { + throw new RuntimeException(e); + } + } + + /** + * @return recovery path + */ + public String getRecoveryPath() + { + return recoveryPath; + } + + /** + * Sets the recovery path. If {@link #isRecoveryPathRelativeToAppPath} is true then this path is handled relative + * to the application path; otherwise it is handled as an absolute path. + * + * @param recoveryPath recovery path + */ + public void setRecoveryPath(String recoveryPath) + { + this.recoveryPath = recoveryPath; + } + + /** + * @return true if recovery path is relative to app path; false otherwise. + */ + public boolean isRecoveryPathRelativeToAppPath() + { + return isRecoveryPathRelativeToAppPath; + } + + /** + * Specifies whether the recovery path is relative to application path. + * + * @param recoveryPathRelativeToAppPath true if recovery path is relative to application path; false otherwise. + */ + public void setRecoveryPathRelativeToAppPath(boolean recoveryPathRelativeToAppPath) + { + isRecoveryPathRelativeToAppPath = recoveryPathRelativeToAppPath; + } +} diff --git a/library/src/main/java/org/apache/apex/malhar/lib/wal/WindowDataManager.java b/library/src/main/java/org/apache/apex/malhar/lib/wal/WindowDataManager.java index 296238b6d4..a1917a6c28 100644 --- a/library/src/main/java/org/apache/apex/malhar/lib/wal/WindowDataManager.java +++ b/library/src/main/java/org/apache/apex/malhar/lib/wal/WindowDataManager.java @@ -19,31 +19,14 @@ package org.apache.apex.malhar.lib.wal; import java.io.IOException; -import java.util.Arrays; import java.util.Collection; -import java.util.Iterator; import java.util.Map; import java.util.Set; -import java.util.SortedSet; - -import javax.validation.constraints.NotNull; - -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FileStatus; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; - -import com.google.common.base.Preconditions; -import com.google.common.collect.Maps; -import com.google.common.collect.Sets; -import com.google.common.collect.TreeMultimap; import com.datatorrent.api.Component; import com.datatorrent.api.Context; -import com.datatorrent.api.DAG; import com.datatorrent.api.StorageAgent; import com.datatorrent.api.annotation.Stateless; -import com.datatorrent.common.util.FSStorageAgent; import com.datatorrent.lib.io.fs.AbstractFileInputOperator; /** @@ -109,301 +92,6 @@ public interface WindowDataManager extends StorageAgent, Component - * It consists of operator ids which have been deleted but have some state that can be replayed. - * Only one of the instances would be handling (modifying) the files that belong to this state. - */ - protected Set deletedOperators; - - /** - * Sorted mapping from window id to all the operators that have state to replay for that window. - */ - protected final transient TreeMultimap replayState; - - protected transient FileSystem fs; - protected transient Path appPath; - - public FSWindowDataManager() - { - replayState = TreeMultimap.create(); - largestRecoveryWindow = Stateless.WINDOW_ID; - recoveryPath = DEF_RECOVERY_PATH; - } - - @Override - public void setup(Context.OperatorContext context) - { - Configuration configuration = new Configuration(); - if (isRecoveryPathRelativeToAppPath) { - appPath = new Path(context.getValue(DAG.APPLICATION_PATH) + Path.SEPARATOR + recoveryPath); - } else { - appPath = new Path(recoveryPath); - } - - try { - storageAgent = new FSStorageAgent(appPath.toString(), configuration); - - fs = FileSystem.newInstance(appPath.toUri(), configuration); - - if (fs.exists(appPath)) { - FileStatus[] fileStatuses = fs.listStatus(appPath); - - for (FileStatus operatorDirStatus : fileStatuses) { - int operatorId = Integer.parseInt(operatorDirStatus.getPath().getName()); - - for (FileStatus status : fs.listStatus(operatorDirStatus.getPath())) { - String fileName = status.getPath().getName(); - if (fileName.endsWith(FSStorageAgent.TMP_FILE)) { - continue; - } - long windowId = Long.parseLong(fileName, 16); - replayState.put(windowId, operatorId); - if (windowId > largestRecoveryWindow) { - largestRecoveryWindow = windowId; - } - } - } - } - } catch (IOException e) { - throw new RuntimeException(e); - } - } - - @Override - public void save(Object object, int operatorId, long windowId) throws IOException - { - storageAgent.save(object, operatorId, windowId); - } - - @Override - public Object load(int operatorId, long windowId) throws IOException - { - Set operators = replayState.get(windowId); - if (operators == null || !operators.contains(operatorId)) { - return null; - } - return storageAgent.load(operatorId, windowId); - } - - @Override - public void delete(int operatorId, long windowId) throws IOException - { - storageAgent.delete(operatorId, windowId); - } - - @Override - public Map load(long windowId) throws IOException - { - Set operators = replayState.get(windowId); - if (operators == null) { - return null; - } - Map data = Maps.newHashMap(); - for (int operatorId : operators) { - data.put(operatorId, load(operatorId, windowId)); - } - return data; - } - - @Override - public long[] getWindowIds(int operatorId) throws IOException - { - Path operatorPath = new Path(appPath, String.valueOf(operatorId)); - if (!fs.exists(operatorPath) || fs.listStatus(operatorPath).length == 0) { - return null; - } - return storageAgent.getWindowIds(operatorId); - } - - @Override - public long[] getWindowIds() throws IOException - { - SortedSet windowIds = replayState.keySet(); - long[] windowIdsArray = new long[windowIds.size()]; - - int index = 0; - - for (Long windowId: windowIds) { - windowIdsArray[index] = windowId; - index++; - } - - return windowIdsArray; - } - - /** - * This deletes all the recovery files of window ids <= windowId. - * - * @param operatorId operator id. - * @param windowId the largest window id for which the states will be deleted. - * @throws IOException - */ - @Override - public void deleteUpTo(int operatorId, long windowId) throws IOException - { - //deleting the replay state - if (windowId <= largestRecoveryWindow && deletedOperators != null && !deletedOperators.isEmpty()) { - Iterator>> iterator = replayState.asMap().entrySet().iterator(); - while (iterator.hasNext()) { - Map.Entry> windowEntry = iterator.next(); - long lwindow = windowEntry.getKey(); - if (lwindow > windowId) { - break; - } - for (Integer loperator : windowEntry.getValue()) { - - if (deletedOperators.contains(loperator)) { - storageAgent.delete(loperator, lwindow); - - Path loperatorPath = new Path(appPath, Integer.toString(loperator)); - if (fs.listStatus(loperatorPath).length == 0) { - //The operator was deleted and it has nothing to replay. - deletedOperators.remove(loperator); - fs.delete(loperatorPath, true); - } - } else if (loperator == operatorId) { - storageAgent.delete(loperator, lwindow); - } - } - iterator.remove(); - } - } - - if (fs.listStatus(new Path(appPath, Integer.toString(operatorId))).length > 0) { - long[] windowsAfterReplay = storageAgent.getWindowIds(operatorId); - Arrays.sort(windowsAfterReplay); - for (long lwindow : windowsAfterReplay) { - if (lwindow <= windowId) { - storageAgent.delete(operatorId, lwindow); - } - } - } - } - - @Override - public long getLargestRecoveryWindow() - { - return largestRecoveryWindow; - } - - @Override - public void partitioned(Collection newManagers, Set removedOperatorIds) - { - Preconditions.checkArgument(newManagers != null && !newManagers.isEmpty(), - "there has to be one idempotent storage manager"); - FSWindowDataManager deletedOperatorsManager = null; - - if (removedOperatorIds != null && !removedOperatorIds.isEmpty()) { - if (this.deletedOperators == null) { - this.deletedOperators = Sets.newHashSet(); - } - this.deletedOperators.addAll(removedOperatorIds); - } - - for (WindowDataManager storageManager : newManagers) { - - FSWindowDataManager lmanager = (FSWindowDataManager)storageManager; - lmanager.recoveryPath = this.recoveryPath; - lmanager.storageAgent = this.storageAgent; - - if (lmanager.deletedOperators != null) { - deletedOperatorsManager = lmanager; - } - //only one physical instance can manage deleted operators so clearing this field for rest of the instances. - if (lmanager != deletedOperatorsManager) { - lmanager.deletedOperators = null; - } - } - - if (removedOperatorIds == null || removedOperatorIds.isEmpty()) { - //Nothing to do - return; - } - if (this.deletedOperators != null) { - - /*If some operators were removed then there needs to be a manager which can clean there state when it is not - needed.*/ - if (deletedOperatorsManager == null) { - //None of the managers were handling deleted operators data. - deletedOperatorsManager = (FSWindowDataManager)newManagers.iterator().next(); - deletedOperatorsManager.deletedOperators = Sets.newHashSet(); - } - - deletedOperatorsManager.deletedOperators.addAll(removedOperatorIds); - } - } - - @Override - public void teardown() - { - try { - fs.close(); - } catch (IOException e) { - throw new RuntimeException(e); - } - } - - /** - * @return recovery path - */ - public String getRecoveryPath() - { - return recoveryPath; - } - - /** - * Sets the recovery path. If {@link #isRecoveryPathRelativeToAppPath} is true then this path is handled relative - * to the application path; otherwise it is handled as an absolute path. - * - * @param recoveryPath recovery path - */ - public void setRecoveryPath(String recoveryPath) - { - this.recoveryPath = recoveryPath; - } - - /** - * @return true if recovery path is relative to app path; false otherwise. - */ - public boolean isRecoveryPathRelativeToAppPath() - { - return isRecoveryPathRelativeToAppPath; - } - - /** - * Specifies whether the recovery path is relative to application path. - * - * @param recoveryPathRelativeToAppPath true if recovery path is relative to application path; false otherwise. - */ - public void setRecoveryPathRelativeToAppPath(boolean recoveryPathRelativeToAppPath) - { - isRecoveryPathRelativeToAppPath = recoveryPathRelativeToAppPath; - } - } - /** * This {@link WindowDataManager} will never do recovery. This is a convenience class so that operators * can use the same logic for maintaining idempotency and avoiding idempotency. diff --git a/library/src/test/java/com/datatorrent/lib/io/fs/AbstractFileInputOperatorTest.java b/library/src/test/java/com/datatorrent/lib/io/fs/AbstractFileInputOperatorTest.java index ea16185788..aa295b143c 100644 --- a/library/src/test/java/com/datatorrent/lib/io/fs/AbstractFileInputOperatorTest.java +++ b/library/src/test/java/com/datatorrent/lib/io/fs/AbstractFileInputOperatorTest.java @@ -34,6 +34,7 @@ import org.junit.runner.Description; import org.apache.apex.malhar.lib.fs.LineByLineFileInputOperator; +import org.apache.apex.malhar.lib.wal.FSWindowDataManager; import org.apache.commons.io.FileUtils; import org.apache.commons.lang3.StringUtils; import org.apache.hadoop.conf.Configuration; @@ -54,7 +55,6 @@ import com.datatorrent.api.StatsListener; import com.datatorrent.lib.helper.OperatorContextTestHelper; -import com.datatorrent.lib.io.IdempotentStorageManager; import com.datatorrent.lib.io.fs.AbstractFileInputOperator.DirectoryScanner; import com.datatorrent.lib.partitioner.StatelessPartitionerTest.PartitioningContextImpl; import com.datatorrent.lib.testbench.CollectorTestSink; @@ -615,10 +615,10 @@ public void testIdempotency() throws Exception } LineByLineFileInputOperator oper = new LineByLineFileInputOperator(); - IdempotentStorageManager.FSIdempotentStorageManager manager = new IdempotentStorageManager.FSIdempotentStorageManager(); + FSWindowDataManager manager = new FSWindowDataManager(); manager.setRecoveryPath(testMeta.dir + "/recovery"); - oper.setIdempotentStorageManager(manager); + oper.setWindowDataManager(manager); CollectorTestSink queryResults = new CollectorTestSink(); TestUtils.setSink(oper.output, queryResults); @@ -664,10 +664,10 @@ public void testIdempotencyWithMultipleEmitTuples() throws Exception } LineByLineFileInputOperator oper = new LineByLineFileInputOperator(); - IdempotentStorageManager.FSIdempotentStorageManager manager = new IdempotentStorageManager.FSIdempotentStorageManager(); + FSWindowDataManager manager = new FSWindowDataManager(); manager.setRecoveryPath(testMeta.dir + "/recovery"); - oper.setIdempotentStorageManager(manager); + oper.setWindowDataManager(manager); CollectorTestSink queryResults = new CollectorTestSink(); TestUtils.setSink(oper.output, queryResults); @@ -707,11 +707,11 @@ public void testIdempotencyWhenFileContinued() throws Exception FileUtils.write(new File(testMeta.dir, "file0"), StringUtils.join(lines, '\n')); LineByLineFileInputOperator oper = new LineByLineFileInputOperator(); - IdempotentStorageManager.FSIdempotentStorageManager manager = new IdempotentStorageManager.FSIdempotentStorageManager(); + FSWindowDataManager manager = new FSWindowDataManager(); manager.setRecoveryPath(testMeta.dir + "/recovery"); oper.setEmitBatchSize(5); - oper.setIdempotentStorageManager(manager); + oper.setWindowDataManager(manager); CollectorTestSink queryResults = new CollectorTestSink(); @SuppressWarnings({"unchecked", "rawtypes"}) @@ -770,10 +770,10 @@ public void testStateWithIdempotency() throws Exception LineByLineFileInputOperator oper = new LineByLineFileInputOperator(); - IdempotentStorageManager.FSIdempotentStorageManager manager = new IdempotentStorageManager.FSIdempotentStorageManager(); + FSWindowDataManager manager = new FSWindowDataManager(); manager.setRecoveryPath(testMeta.dir + "/recovery"); - oper.setIdempotentStorageManager(manager); + oper.setWindowDataManager(manager); CollectorTestSink queryResults = new CollectorTestSink(); @SuppressWarnings({"unchecked", "rawtypes"}) @@ -821,7 +821,7 @@ public void testIdempotentStorageManagerPartitioning() throws Exception LineByLineFileInputOperator oper = new LineByLineFileInputOperator(); oper.getScanner().setFilePatternRegexp(".*partition([\\d]*)"); oper.setDirectory(new File(testMeta.dir).getAbsolutePath()); - oper.setIdempotentStorageManager(new TestStorageManager()); + oper.setWindowDataManager(new TestStorageManager()); oper.operatorId = 7; Path path = new Path(new File(testMeta.dir).getAbsolutePath()); @@ -839,7 +839,7 @@ public void testIdempotentStorageManagerPartitioning() throws Exception List storageManagers = Lists.newLinkedList(); for (Partition> p : newPartitions) { - storageManagers.add((TestStorageManager)p.getPartitionedInstance().idempotentStorageManager); + storageManagers.add((TestStorageManager)p.getPartitionedInstance().getWindowDataManager()); } Assert.assertEquals("count of storage managers", 2, storageManagers.size()); @@ -857,7 +857,7 @@ public void testIdempotentStorageManagerPartitioning() throws Exception Assert.assertEquals("deleted operators", Sets.newHashSet(7), deleteManager.getDeletedOperators()); } - private static class TestStorageManager extends IdempotentStorageManager.FSIdempotentStorageManager + private static class TestStorageManager extends FSWindowDataManager { Set getDeletedOperators() { diff --git a/library/src/test/java/com/datatorrent/lib/io/fs/FileSplitterInputTest.java b/library/src/test/java/com/datatorrent/lib/io/fs/FileSplitterInputTest.java index cf11a25fa2..faa1d45157 100644 --- a/library/src/test/java/com/datatorrent/lib/io/fs/FileSplitterInputTest.java +++ b/library/src/test/java/com/datatorrent/lib/io/fs/FileSplitterInputTest.java @@ -37,6 +37,8 @@ import org.junit.runner.Description; import org.slf4j.Logger; import org.slf4j.LoggerFactory; + +import org.apache.apex.malhar.lib.wal.FSWindowDataManager; import org.apache.commons.io.FileUtils; import org.apache.commons.lang3.StringUtils; import org.apache.hadoop.fs.FileContext; @@ -49,7 +51,6 @@ import com.datatorrent.api.Attribute; import com.datatorrent.api.Context; import com.datatorrent.lib.helper.OperatorContextTestHelper; -import com.datatorrent.lib.io.IdempotentStorageManager; import com.datatorrent.lib.io.block.BlockMetadata; import com.datatorrent.lib.testbench.CollectorTestSink; import com.datatorrent.lib.util.KryoCloneUtils; @@ -189,9 +190,8 @@ public void testBlockMetadataWithSplit() throws InterruptedException @Test public void testIdempotency() throws InterruptedException { - IdempotentStorageManager.FSIdempotentStorageManager fsIdempotentStorageManager = new IdempotentStorageManager - .FSIdempotentStorageManager(); - testMeta.fileSplitterInput.setIdempotentStorageManager(fsIdempotentStorageManager); + FSWindowDataManager fsIdempotentStorageManager = new FSWindowDataManager(); + testMeta.fileSplitterInput.setWindowDataManager(fsIdempotentStorageManager); testMeta.fileSplitterInput.setup(testMeta.context); //will emit window 1 from data directory @@ -294,9 +294,8 @@ public void testBlocksThreshold() throws InterruptedException @Test public void testIdempotencyWithBlocksThreshold() throws InterruptedException { - IdempotentStorageManager.FSIdempotentStorageManager fsIdempotentStorageManager = new IdempotentStorageManager - .FSIdempotentStorageManager(); - testMeta.fileSplitterInput.setIdempotentStorageManager(fsIdempotentStorageManager); + FSWindowDataManager fsWindowDataManager = new FSWindowDataManager(); + testMeta.fileSplitterInput.setWindowDataManager(fsWindowDataManager); testMeta.fileSplitterInput.setBlocksThreshold(10); testMeta.fileSplitterInput.getScanner().setScanIntervalMillis(500); testMeta.fileSplitterInput.setup(testMeta.context); @@ -341,9 +340,8 @@ public void testFirstWindowAfterRecovery() throws IOException, InterruptedExcept @Ignore public void testRecoveryOfPartialFile() throws InterruptedException { - IdempotentStorageManager.FSIdempotentStorageManager fsIdempotentStorageManager = new IdempotentStorageManager - .FSIdempotentStorageManager(); - testMeta.fileSplitterInput.setIdempotentStorageManager(fsIdempotentStorageManager); + FSWindowDataManager fsIdempotentStorageManager = new FSWindowDataManager(); + testMeta.fileSplitterInput.setWindowDataManager(fsIdempotentStorageManager); testMeta.fileSplitterInput.setBlockSize(2L); testMeta.fileSplitterInput.setBlocksThreshold(2); testMeta.fileSplitterInput.getScanner().setScanIntervalMillis(500); @@ -460,10 +458,9 @@ public void testSingleFile() throws InterruptedException, IOException @Test public void testRecoveryOfBlockMetadataIterator() throws InterruptedException { - IdempotentStorageManager.FSIdempotentStorageManager fsIdempotentStorageManager = - new IdempotentStorageManager.FSIdempotentStorageManager(); + FSWindowDataManager fsWindowDataManager = new FSWindowDataManager(); - testMeta.fileSplitterInput.setIdempotentStorageManager(fsIdempotentStorageManager); + testMeta.fileSplitterInput.setWindowDataManager(fsWindowDataManager); testMeta.fileSplitterInput.setBlockSize(2L); testMeta.fileSplitterInput.setBlocksThreshold(2); testMeta.fileSplitterInput.getScanner().setScanIntervalMillis(500); diff --git a/library/src/test/java/com/datatorrent/lib/io/fs/FileSplitterTest.java b/library/src/test/java/com/datatorrent/lib/io/fs/FileSplitterTest.java index e62f643ac7..01febe3327 100644 --- a/library/src/test/java/com/datatorrent/lib/io/fs/FileSplitterTest.java +++ b/library/src/test/java/com/datatorrent/lib/io/fs/FileSplitterTest.java @@ -35,6 +35,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.apache.apex.malhar.lib.wal.FSWindowDataManager; import org.apache.commons.io.FileUtils; import org.apache.commons.lang3.StringUtils; import org.apache.hadoop.fs.FileContext; @@ -45,7 +46,6 @@ import com.datatorrent.api.Attribute; import com.datatorrent.api.Context; import com.datatorrent.lib.helper.OperatorContextTestHelper; -import com.datatorrent.lib.io.IdempotentStorageManager; import com.datatorrent.lib.io.block.BlockMetadata; import com.datatorrent.lib.testbench.CollectorTestSink; import com.datatorrent.lib.util.TestUtils; @@ -109,7 +109,7 @@ protected void starting(org.junit.runner.Description description) fileSplitter.scanner.setScanIntervalMillis(500); fileSplitter.scanner.setFilePatternRegularExp(".*[.]txt"); fileSplitter.scanner.setFiles(dataDirectory); - fileSplitter.setIdempotentStorageManager(new IdempotentStorageManager.NoopIdempotentStorageManager()); + fileSplitter.setWindowDataManager(new FSWindowDataManager()); Attribute.AttributeMap.DefaultAttributeMap attributes = new Attribute.AttributeMap.DefaultAttributeMap(); attributes.put(Context.DAGContext.APPLICATION_PATH, dataDirectory); @@ -191,9 +191,9 @@ public void testBlockMetadataWithSplit() throws InterruptedException @Test public void testIdempotency() throws InterruptedException { - IdempotentStorageManager.FSIdempotentStorageManager fsIdempotentStorageManager = - new IdempotentStorageManager.FSIdempotentStorageManager(); - testMeta.fileSplitter.setIdempotentStorageManager(fsIdempotentStorageManager); + FSWindowDataManager fsWindowDataManager = + new FSWindowDataManager(); + testMeta.fileSplitter.setWindowDataManager(fsWindowDataManager); testMeta.fileSplitter.setup(testMeta.context); //will emit window 1 from data directory @@ -296,9 +296,8 @@ public void testBlocksThreshold() throws InterruptedException @Test public void testIdempotencyWithBlocksThreshold() throws InterruptedException { - IdempotentStorageManager.FSIdempotentStorageManager fsIdempotentStorageManager = new IdempotentStorageManager - .FSIdempotentStorageManager(); - testMeta.fileSplitter.setIdempotentStorageManager(fsIdempotentStorageManager); + FSWindowDataManager fsWindowDataManager = new FSWindowDataManager(); + testMeta.fileSplitter.setWindowDataManager(fsWindowDataManager); testMeta.fileSplitter.setBlocksThreshold(10); testMeta.fileSplitter.scanner.setScanIntervalMillis(500); testMeta.fileSplitter.setup(testMeta.context); @@ -346,10 +345,9 @@ public void testFirstWindowAfterRecovery() throws IOException, InterruptedExcept @Ignore public void testRecoveryOfPartialFile() throws InterruptedException { - IdempotentStorageManager.FSIdempotentStorageManager fsIdempotentStorageManager = new IdempotentStorageManager - .FSIdempotentStorageManager(); + FSWindowDataManager fsIdempotentStorageManager = new FSWindowDataManager(); fsIdempotentStorageManager.setRecoveryPath(testMeta.dataDirectory + '/' + "recovery"); - testMeta.fileSplitter.setIdempotentStorageManager(fsIdempotentStorageManager); + testMeta.fileSplitter.setWindowDataManager(fsIdempotentStorageManager); testMeta.fileSplitter.setBlockSize(2L); testMeta.fileSplitter.setBlocksThreshold(2); testMeta.fileSplitter.scanner.setScanIntervalMillis(500); diff --git a/library/src/test/java/com/datatorrent/lib/io/jms/JMSStringInputOperatorTest.java b/library/src/test/java/com/datatorrent/lib/io/jms/JMSStringInputOperatorTest.java index b8c916d93c..42f730cda7 100644 --- a/library/src/test/java/com/datatorrent/lib/io/jms/JMSStringInputOperatorTest.java +++ b/library/src/test/java/com/datatorrent/lib/io/jms/JMSStringInputOperatorTest.java @@ -126,7 +126,7 @@ public void testRecoveryAndIdempotency() throws Exception testMeta.operator.activate(testMeta.context); Assert.assertEquals("largest recovery window", 1, - testMeta.operator.getIdempotentStorageManager().getLargestRecoveryWindow()); + testMeta.operator.getWindowDataManager().getLargestRecoveryWindow()); testMeta.operator.beginWindow(1); testMeta.operator.endWindow(); @@ -163,7 +163,7 @@ protected void acknowledge() throws JMSException testMeta.operator.activate(testMeta.context); Assert.assertEquals("window 1 should not exist", Stateless.WINDOW_ID, - testMeta.operator.getIdempotentStorageManager().getLargestRecoveryWindow()); + testMeta.operator.getWindowDataManager().getLargestRecoveryWindow()); } private void produceMsg(int numMessages) throws Exception diff --git a/library/src/test/java/org/apache/apex/malhar/lib/wal/FSWindowDataManagerTest.java b/library/src/test/java/org/apache/apex/malhar/lib/wal/FSWindowDataManagerTest.java index 7f3adc9cf4..21d5b763ad 100644 --- a/library/src/test/java/org/apache/apex/malhar/lib/wal/FSWindowDataManagerTest.java +++ b/library/src/test/java/org/apache/apex/malhar/lib/wal/FSWindowDataManagerTest.java @@ -56,7 +56,7 @@ private static class TestMeta extends TestWatcher { String applicationPath; - WindowDataManager.FSWindowDataManager storageManager; + FSWindowDataManager storageManager; Context.OperatorContext context; @Override @@ -64,7 +64,7 @@ protected void starting(Description description) { TestUtils.deleteTargetTestClassFolder(description); super.starting(description); - storageManager = new WindowDataManager.FSWindowDataManager(); + storageManager = new FSWindowDataManager(); applicationPath = "target/" + description.getClassName() + "/" + description.getMethodName(); Attribute.AttributeMap.DefaultAttributeMap attributes = new Attribute.AttributeMap.DefaultAttributeMap(); diff --git a/pom.xml b/pom.xml index c0e98a3799..59e30bb141 100644 --- a/pom.xml +++ b/pom.xml @@ -136,6 +136,9 @@ @org.apache.hadoop.classification.InterfaceStability$Unstable com.datatorrent.lib.io.WebSocketInputOperator com.datatorrent.lib.io.WebSocketOutputOperator + *#idempotentStorageManager + *#setIdempotentStorageManager(com.datatorrent.lib.io.IdempotentStorageManager) + *#getIdempotentStorageManager() ${semver.plugin.skip}