From 4e8aa78144b360eacdc7b586718112205b3aa53d Mon Sep 17 00:00:00 2001 From: "Tushar R. Gosavi" Date: Mon, 20 Mar 2017 23:51:12 +0530 Subject: [PATCH] APEXCORE-619 Checkpoint leaf stateless operator for correct recovery after application restart. --- .../common/util/FSStorageAgent.java | 9 +- .../util/LeafStatelessStorageAgent.java | 95 ++++++++++ .../common/util/FSStorageAgentTest.java | 2 +- .../util/LeafStatelessStorageAgentTest.java | 69 +++++++ .../com/datatorrent/stram/StramClient.java | 13 +- .../stram/StreamingContainerAgent.java | 6 +- .../stram/plan/logical/LogicalPlan.java | 14 ++ .../stram/plan/physical/PTOperator.java | 13 +- .../com/datatorrent/stram/CheckpointTest.java | 13 +- .../stram/LeafStatelessOperatorTests.java | 174 ++++++++++++++++++ .../stram/StreamingContainerManagerTest.java | 2 +- 11 files changed, 397 insertions(+), 13 deletions(-) create mode 100644 common/src/main/java/com/datatorrent/common/util/LeafStatelessStorageAgent.java create mode 100644 common/src/test/java/com/datatorrent/common/util/LeafStatelessStorageAgentTest.java create mode 100644 engine/src/test/java/com/datatorrent/stram/LeafStatelessOperatorTests.java diff --git a/common/src/main/java/com/datatorrent/common/util/FSStorageAgent.java b/common/src/main/java/com/datatorrent/common/util/FSStorageAgent.java index fe90b86d02..1b7ddb86d1 100644 --- a/common/src/main/java/com/datatorrent/common/util/FSStorageAgent.java +++ b/common/src/main/java/com/datatorrent/common/util/FSStorageAgent.java @@ -119,13 +119,20 @@ public void save(Object object, int operatorId, long windowId) throws IOExceptio } finally { if (stateSaved) { logger.debug("Saving {}: {}", operatorId, window); - fileContext.rename(lPath, new Path(path + Path.SEPARATOR + operatorIdStr + Path.SEPARATOR + window), + fileContext.rename(lPath, getCheckpointPath(operatorId, windowId), Options.Rename.OVERWRITE); } } } } + public Path getCheckpointPath(int operatorId, long windowId) + { + String operatorIdStr = String.valueOf(operatorId); + String window = Long.toHexString(windowId); + return new Path(path + Path.SEPARATOR + operatorIdStr + Path.SEPARATOR + window); + } + @Override public Object load(int operatorId, long windowId) throws IOException { diff --git a/common/src/main/java/com/datatorrent/common/util/LeafStatelessStorageAgent.java b/common/src/main/java/com/datatorrent/common/util/LeafStatelessStorageAgent.java new file mode 100644 index 0000000000..b7da057741 --- /dev/null +++ b/common/src/main/java/com/datatorrent/common/util/LeafStatelessStorageAgent.java @@ -0,0 +1,95 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package com.datatorrent.common.util; + +import java.io.IOException; +import java.io.ObjectStreamException; +import java.util.EnumSet; + +import org.slf4j.Logger; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.CreateFlag; +import org.apache.hadoop.fs.FSDataOutputStream; + +import com.datatorrent.api.annotation.Stateless; + +import static org.slf4j.LoggerFactory.getLogger; + +/** + * A StorageAgent which write empty checkpoint files for an operator. This StorageAgent is used + * by stateless leaf operators to make sure that recovery works correctly after application + * restart. see APEXCORE-619. + */ +public class LeafStatelessStorageAgent extends FSStorageAgent +{ + private static final Logger LOG = getLogger(LeafStatelessStorageAgent.class); + + public LeafStatelessStorageAgent(String path, Configuration conf) + { + super(path, conf); + } + + /** + * Allow writing initial checkpoint, after that for any other windowId this agent creates a + * zero length file. + * + * @param object + * @param operatorId + * @param windowId + * @throws IOException + */ + @Override + public void save(Object object, int operatorId, long windowId) throws IOException + { + if (windowId == Stateless.WINDOW_ID) { + super.save(object, operatorId, windowId); + } else { + FSDataOutputStream fout = null; + try { + fout = fileContext.create(getCheckpointPath(operatorId, windowId), EnumSet.of(CreateFlag.CREATE, CreateFlag.OVERWRITE)); + } finally { + if (fout != null) { + fout.close(); + } + } + } + } + + /** + * Always load initial checkpoint. + * + * @param operatorId + * @param windowId + * @return + * @throws IOException + */ + @Override + public Object load(int operatorId, long windowId) throws IOException + { + return super.load(operatorId, Stateless.WINDOW_ID); + } + + @Override + public Object readResolve() throws ObjectStreamException + { + LeafStatelessStorageAgent emptyFSStorageAgent = new LeafStatelessStorageAgent(this.path, null); + return emptyFSStorageAgent; + } +} diff --git a/common/src/test/java/com/datatorrent/common/util/FSStorageAgentTest.java b/common/src/test/java/com/datatorrent/common/util/FSStorageAgentTest.java index 0d6e38bf46..81325ff243 100644 --- a/common/src/test/java/com/datatorrent/common/util/FSStorageAgentTest.java +++ b/common/src/test/java/com/datatorrent/common/util/FSStorageAgentTest.java @@ -39,7 +39,7 @@ public class FSStorageAgentTest { - private static class TestMeta extends TestWatcher + protected static class TestMeta extends TestWatcher { String applicationPath; FSStorageAgent storageAgent; diff --git a/common/src/test/java/com/datatorrent/common/util/LeafStatelessStorageAgentTest.java b/common/src/test/java/com/datatorrent/common/util/LeafStatelessStorageAgentTest.java new file mode 100644 index 0000000000..c73701a7df --- /dev/null +++ b/common/src/test/java/com/datatorrent/common/util/LeafStatelessStorageAgentTest.java @@ -0,0 +1,69 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package com.datatorrent.common.util; + +import java.io.IOException; +import java.util.Map; + +import org.junit.Assert; +import org.junit.Test; + +import org.apache.hadoop.fs.FileContext; +import org.apache.hadoop.fs.Path; + +import com.google.common.collect.Maps; + +import com.datatorrent.api.annotation.Stateless; + +public class LeafStatelessStorageAgentTest extends FSStorageAgentTest +{ + @Test + public void testActivationCheckpointWrite() throws IOException + { + Map activationData = Maps.newHashMap(); + activationData.put(1, "one"); + activationData.put(2, "two"); + + // only saving initial state is allowed. + LeafStatelessStorageAgent agent = new LeafStatelessStorageAgent(testMeta.applicationPath, null); + agent.save(activationData, 1, Stateless.WINDOW_ID); + FileContext fileContext = FileContext.getFileContext(); + Path ckPath = agent.getCheckpointPath(1, Stateless.WINDOW_ID); + Assert.assertTrue("operator 1 window -1", fileContext.util().exists(ckPath)); + Assert.assertTrue("File contain some data", fileContext.getFileStatus(ckPath).getLen() > 0); + + // save at any other window, writes an empty file. + Map data = Maps.newHashMap(); + data.put(3, "three"); + data.put(4, "four"); + agent.save(data, 1, 1); + ckPath = agent.getCheckpointPath(1, 1); + Assert.assertTrue("operator 1 window 1", fileContext.util().exists(ckPath)); + Assert.assertEquals("file is empty", 0, fileContext.getFileStatus(ckPath).getLen()); + + // load from any window returns the same state as activation window. + Map loaded = (Map)agent.load(1, 1); + Assert.assertEquals("data matches with activation checkpoint", activationData, loaded); + + // Test purge + agent.delete(1, 1); + ckPath = agent.getCheckpointPath(1, 1); + Assert.assertFalse("operator 1 window 1 file does not exists", fileContext.util().exists(ckPath)); + } +} diff --git a/engine/src/main/java/com/datatorrent/stram/StramClient.java b/engine/src/main/java/com/datatorrent/stram/StramClient.java index dad42e37a5..efbe4e707b 100644 --- a/engine/src/main/java/com/datatorrent/stram/StramClient.java +++ b/engine/src/main/java/com/datatorrent/stram/StramClient.java @@ -81,6 +81,7 @@ import com.datatorrent.api.StorageAgent; import com.datatorrent.common.util.AsyncFSStorageAgent; import com.datatorrent.common.util.BasicContainerOptConfigurator; +import com.datatorrent.common.util.LeafStatelessStorageAgent; import com.datatorrent.stram.client.StramClientUtils; import com.datatorrent.stram.client.StramClientUtils.ClientRMHelper; import com.datatorrent.stram.engine.StreamingContainer; @@ -472,12 +473,22 @@ public void startApplication() throws YarnException, IOException ((StorageAgent.ApplicationAwareStorageAgent)agent).setApplicationAttributes(dag.getAttributes()); } + Path checkpointPath = new Path(appPath, LogicalPlan.SUBDIR_CHECKPOINTS); if (dag.getAttributes().get(OperatorContext.STORAGE_AGENT) == null) { /* which would be the most likely case */ - Path checkpointPath = new Path(appPath, LogicalPlan.SUBDIR_CHECKPOINTS); + checkpointPath = new Path(appPath, LogicalPlan.SUBDIR_CHECKPOINTS); // use conf client side to pickup any proxy settings from dt-site.xml dag.setAttribute(OperatorContext.STORAGE_AGENT, new AsyncFSStorageAgent(checkpointPath.toString(), conf)); } + StorageAgent emptyStorageAgent = new LeafStatelessStorageAgent(checkpointPath.toString(), conf); + List leafOperators = dag.getLeafOperators(); + for (LogicalPlan.OperatorMeta ometa : leafOperators) { + if (ometa.getAttributes().get(OperatorContext.STORAGE_AGENT) == null && ometa.isStateless()) { + LOG.info("Setting LeafStatelessStorageAgent for operator {}", ometa.getName()); + ometa.getAttributes().put(OperatorContext.STORAGE_AGENT, emptyStorageAgent); + } + } + if (dag.getAttributes().get(LogicalPlan.CONTAINER_OPTS_CONFIGURATOR) == null) { dag.setAttribute(LogicalPlan.CONTAINER_OPTS_CONFIGURATOR, new BasicContainerOptConfigurator()); } diff --git a/engine/src/main/java/com/datatorrent/stram/StreamingContainerAgent.java b/engine/src/main/java/com/datatorrent/stram/StreamingContainerAgent.java index 0aa0b83e55..a2420bafdd 100644 --- a/engine/src/main/java/com/datatorrent/stram/StreamingContainerAgent.java +++ b/engine/src/main/java/com/datatorrent/stram/StreamingContainerAgent.java @@ -436,7 +436,11 @@ private OperatorDeployInfo createOperatorDeployInfo(PTOperator oper) throw new RuntimeException("Cannot clone operator attributes", ex); } if (oper.isOperatorStateLess()) { - ndi.contextAttributes.put(OperatorContext.STATELESS, true); + if (oper.isLeafOperator()) { + ndi.contextAttributes.put(OperatorContext.STATELESS, false); + } else { + ndi.contextAttributes.put(OperatorContext.STATELESS, true); + } } return ndi; } diff --git a/engine/src/main/java/com/datatorrent/stram/plan/logical/LogicalPlan.java b/engine/src/main/java/com/datatorrent/stram/plan/logical/LogicalPlan.java index 401eea94cb..e931343f9c 100644 --- a/engine/src/main/java/com/datatorrent/stram/plan/logical/LogicalPlan.java +++ b/engine/src/main/java/com/datatorrent/stram/plan/logical/LogicalPlan.java @@ -93,6 +93,7 @@ import com.datatorrent.api.annotation.InputPortFieldAnnotation; import com.datatorrent.api.annotation.OperatorAnnotation; import com.datatorrent.api.annotation.OutputPortFieldAnnotation; +import com.datatorrent.api.annotation.Stateless; import com.datatorrent.common.experimental.AppData; import com.datatorrent.common.metric.MetricsAggregator; import com.datatorrent.common.metric.SingleMetricAggregator; @@ -1058,6 +1059,10 @@ private void copyAttributesFrom(OperatorMeta operatorMeta) } } + public boolean isLeafOperator() + { + return leafOperators.contains(this); + } private class PortMapping implements Operators.OperatorDescriptor { @@ -1192,6 +1197,15 @@ public LogicalPlan getDAG() return LogicalPlan.this; } + public boolean isStateless() + { + if (getDAG().getValue(OperatorContext.STATELESS) || getValue(OperatorContext.STATELESS)) { + return true; + } + + return getOperator().getClass().isAnnotationPresent(Stateless.class); + } + @Override public String toString() { diff --git a/engine/src/main/java/com/datatorrent/stram/plan/physical/PTOperator.java b/engine/src/main/java/com/datatorrent/stram/plan/physical/PTOperator.java index 471dca2a39..7a712f237b 100644 --- a/engine/src/main/java/com/datatorrent/stram/plan/physical/PTOperator.java +++ b/engine/src/main/java/com/datatorrent/stram/plan/physical/PTOperator.java @@ -42,8 +42,6 @@ import com.datatorrent.api.Operator.InputPort; import com.datatorrent.api.Partitioner.PartitionKeys; import com.datatorrent.api.StatsListener; -import com.datatorrent.api.annotation.Stateless; - import com.datatorrent.stram.Journal.Recoverable; import com.datatorrent.stram.api.Checkpoint; import com.datatorrent.stram.api.StreamingContainerUmbilicalProtocol; @@ -66,6 +64,11 @@ public class PTOperator implements java.io.Serializable public static final Recoverable SET_OPERATOR_STATE = new SetOperatorState(); + public boolean isLeafOperator() + { + return operatorMeta.isLeafOperator(); + } + public enum State { PENDING_DEPLOY, @@ -428,11 +431,7 @@ public Class getUnifierClass() public boolean isOperatorStateLess() { - if (operatorMeta.getDAG().getValue(OperatorContext.STATELESS) || operatorMeta.getValue(OperatorContext.STATELESS)) { - return true; - } - - return operatorMeta.getOperator().getClass().isAnnotationPresent(Stateless.class); + return operatorMeta.isStateless(); } public Checkpoint addCheckpoint(long windowId, long startTime) diff --git a/engine/src/test/java/com/datatorrent/stram/CheckpointTest.java b/engine/src/test/java/com/datatorrent/stram/CheckpointTest.java index d7f96d4c17..c359436cbc 100644 --- a/engine/src/test/java/com/datatorrent/stram/CheckpointTest.java +++ b/engine/src/test/java/com/datatorrent/stram/CheckpointTest.java @@ -141,8 +141,11 @@ public void testBackup() throws Exception GenericTestOperator o2 = dag.addOperator("o2", GenericTestOperator.class); dag.setOperatorAttribute(o2, OperatorContext.STATELESS, true); - dag.addStream("o1.outport", o1.outport, o2.inport1).setLocality(Locality.CONTAINER_LOCAL); + GenericTestOperator o3 = dag.addOperator("o3", GenericTestOperator.class); + dag.setOperatorAttribute(o3, OperatorContext.STATELESS, true); + dag.addStream("o1.outport", o1.outport, o2.inport1).setLocality(Locality.CONTAINER_LOCAL); + dag.addStream("o2.outport", o2.outport1, o3.inport1).setLocality(Locality.CONTAINER_LOCAL); StramLocalCluster sc = new StramLocalCluster(dag); sc.setHeartbeatMonitoringEnabled(false); sc.run(); @@ -167,6 +170,14 @@ public void testBackup() throws Exception Assert.assertEquals("number checkpoints " + checkpoints, 1, checkpoints.size()); Assert.assertEquals("checkpoints " + o2p1, Sets.newHashSet(Stateless.WINDOW_ID), checkpoints); + PTOperator o3p1 = plan.getOperators(dag.getMeta(o3)).get(0); + checkpoints = Sets.newHashSet(); + for (long windowId : storageAgent.getWindowIds(o3p1.getId())) { + checkpoints.add(windowId); + } + Assert.assertEquals("number checkpoints " + checkpoints, 3, checkpoints.size()); + Assert.assertTrue("contains " + checkpoints + " " + Stateless.WINDOW_ID, checkpoints.contains(Stateless.WINDOW_ID)); + Assert.assertEquals("checkpoints " + o1p1 + " " + o1p1.checkpoints, 2, o1p1.checkpoints.size()); Assert.assertNotNull("checkpoint not null for statefull operator " + o1p1, o1p1.stats.checkpointStats); diff --git a/engine/src/test/java/com/datatorrent/stram/LeafStatelessOperatorTests.java b/engine/src/test/java/com/datatorrent/stram/LeafStatelessOperatorTests.java new file mode 100644 index 0000000000..56338e9bd2 --- /dev/null +++ b/engine/src/test/java/com/datatorrent/stram/LeafStatelessOperatorTests.java @@ -0,0 +1,174 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package com.datatorrent.stram; + +import java.net.InetSocketAddress; +import java.util.List; + +import org.junit.Assert; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; + +import com.google.common.collect.Lists; + +import com.datatorrent.api.Context; +import com.datatorrent.api.DAG; +import com.datatorrent.api.annotation.Stateless; +import com.datatorrent.stram.api.OperatorDeployInfo; +import com.datatorrent.stram.engine.GenericTestOperator; +import com.datatorrent.stram.engine.TestGeneratorInputOperator; +import com.datatorrent.stram.plan.logical.LogicalPlan; +import com.datatorrent.stram.support.StramTestSupport; + +import static com.datatorrent.stram.StreamingContainerManagerTest.getNodeDeployInfo; + +public class LeafStatelessOperatorTests +{ + + @Rule + public StramTestSupport.TestMeta testMeta = new StramTestSupport.TestMeta(); + + private LogicalPlan dag; + + @Before + public void setup() + { + dag = StramTestSupport.createDAG(testMeta); + } + + @Stateless + public static class StatelessOperator extends GenericTestOperator + { + } + + @Stateless + public static class StatelessInputOperator extends TestGeneratorInputOperator + { + + } + + /** + * Test operator with explicit stateless attribute set. + */ + @Test + public void testGenerateDeployInfo() + { + TestGeneratorInputOperator o1 = dag.addOperator("o1", TestGeneratorInputOperator.class); + GenericTestOperator o2 = dag.addOperator("o2", GenericTestOperator.class); + GenericTestOperator o3 = dag.addOperator("o3", GenericTestOperator.class); + dag.setOperatorAttribute(o3, Context.OperatorContext.STATELESS, true); + GenericTestOperator o4 = dag.addOperator("o4", GenericTestOperator.class); + dag.setOperatorAttribute(o4, Context.OperatorContext.STATELESS, true); + + dag.addStream("o1.outport", o1.outport, o2.inport1); + dag.addStream("o2.outport1", o2.outport1, o3.inport1).setLocality(DAG.Locality.CONTAINER_LOCAL); + dag.addStream("o3.outport1", o3.outport1, o4.inport1).setLocality(DAG.Locality.THREAD_LOCAL); + dag.getAttributes().put(LogicalPlan.CONTAINERS_MAX_COUNT, 1); + dag.setAttribute(Context.OperatorContext.STORAGE_AGENT, new StramTestSupport.MemoryStorageAgent()); + + Assert.assertEquals("number of leaf operators ", 1, dag.getLeafOperators().size()); + + verifyStatelessAttribute(Lists.newArrayList(null, null, true, false)); + } + + /** + * Test operator with stateless annotation. + */ + @Test + public void testGenerateDeployInfo1() + { + TestGeneratorInputOperator o1 = dag.addOperator("o1", TestGeneratorInputOperator.class); + GenericTestOperator o2 = dag.addOperator("o2", GenericTestOperator.class); + StatelessOperator o3 = dag.addOperator("o3", StatelessOperator.class); + StatelessOperator o4 = dag.addOperator("o4", StatelessOperator.class); + + dag.addStream("o1.outport", o1.outport, o2.inport1); + dag.addStream("o2.outport1", o2.outport1, o3.inport1).setLocality(DAG.Locality.CONTAINER_LOCAL); + dag.addStream("o3.outport1", o3.outport1, o4.inport1).setLocality(DAG.Locality.THREAD_LOCAL); + dag.getAttributes().put(LogicalPlan.CONTAINERS_MAX_COUNT, 1); + dag.setAttribute(Context.OperatorContext.STORAGE_AGENT, new StramTestSupport.MemoryStorageAgent()); + + Assert.assertEquals("number of leaf operators ", 1, dag.getLeafOperators().size()); + + verifyStatelessAttribute(Lists.newArrayList(null, null, true, false)); + } + + @Test + public void testAllStatelessOperator() + { + StatelessInputOperator o1 = dag.addOperator("o1", StatelessInputOperator.class); + StatelessOperator o2 = dag.addOperator("o2", StatelessOperator.class); + StatelessOperator o3 = dag.addOperator("o3", StatelessOperator.class); + StatelessOperator o4 = dag.addOperator("o4", StatelessOperator.class); + + dag.addStream("o1.outport", o1.outport, o2.inport1); + dag.addStream("o2.outport1", o2.outport1, o3.inport1).setLocality(DAG.Locality.CONTAINER_LOCAL); + dag.addStream("o3.outport1", o3.outport1, o4.inport1).setLocality(DAG.Locality.THREAD_LOCAL); + dag.getAttributes().put(LogicalPlan.CONTAINERS_MAX_COUNT, 1); + dag.setAttribute(Context.OperatorContext.STORAGE_AGENT, new StramTestSupport.MemoryStorageAgent()); + + Assert.assertEquals("number of leaf operators ", 1, dag.getLeafOperators().size()); + + // all except last operator is stateless. + verifyStatelessAttribute(Lists.newArrayList(true, true, true, false)); + } + + @Test + public void testWithDAGStatelessAttribute() + { + dag.setAttribute(Context.OperatorContext.STATELESS, true); + TestGeneratorInputOperator o1 = dag.addOperator("o1", TestGeneratorInputOperator.class); + GenericTestOperator o2 = dag.addOperator("o2", GenericTestOperator.class); + GenericTestOperator o3 = dag.addOperator("o3", GenericTestOperator.class); + GenericTestOperator o4 = dag.addOperator("o4", GenericTestOperator.class); + + dag.addStream("o1.outport", o1.outport, o2.inport1); + dag.addStream("o2.outport1", o2.outport1, o3.inport1).setLocality(DAG.Locality.CONTAINER_LOCAL); + dag.addStream("o3.outport1", o3.outport1, o4.inport1).setLocality(DAG.Locality.THREAD_LOCAL); + dag.getAttributes().put(LogicalPlan.CONTAINERS_MAX_COUNT, 1); + dag.setAttribute(Context.OperatorContext.STORAGE_AGENT, new StramTestSupport.MemoryStorageAgent()); + + Assert.assertEquals("number of leaf operators ", 1, dag.getLeafOperators().size()); + + verifyStatelessAttribute(Lists.newArrayList(true, true, true, false)); + } + + + private void verifyStatelessAttribute(List values) + { + StreamingContainerManager dnm = new StreamingContainerManager(dag); + dnm.assignContainer(new StreamingContainerManager.ContainerResource(0, "container1Id", "host1", 1024, 0, null), InetSocketAddress.createUnresolved("host1", 9001)); + StreamingContainerAgent sca1 = dnm.getContainerAgent(dnm.getPhysicalPlan().getContainers().get(0).getExternalId()); + List c1 = sca1.getDeployInfoList(sca1.container.getOperators()); + + OperatorDeployInfo odi = getNodeDeployInfo(c1, dag.getOperatorMeta("o1")); + Assert.assertEquals("o1 is stateful operator ", odi.contextAttributes.get(Context.OperatorContext.STATELESS), values.get(0)); + + odi = getNodeDeployInfo(c1, dag.getOperatorMeta("o2")); + Assert.assertEquals("o2 is stateful operator ", odi.contextAttributes.get(Context.OperatorContext.STATELESS), values.get(1)); + + odi = getNodeDeployInfo(c1, dag.getOperatorMeta("o3")); + Assert.assertEquals("o3 is stateful operator ", odi.contextAttributes.get(Context.OperatorContext.STATELESS), values.get(2)); + + odi = getNodeDeployInfo(c1, dag.getOperatorMeta("o4")); + Assert.assertEquals("o4 is stateful operator ", odi.contextAttributes.get(Context.OperatorContext.STATELESS), values.get(3)); + } + +} diff --git a/engine/src/test/java/com/datatorrent/stram/StreamingContainerManagerTest.java b/engine/src/test/java/com/datatorrent/stram/StreamingContainerManagerTest.java index 84622c4e14..92363c5ab9 100644 --- a/engine/src/test/java/com/datatorrent/stram/StreamingContainerManagerTest.java +++ b/engine/src/test/java/com/datatorrent/stram/StreamingContainerManagerTest.java @@ -631,7 +631,7 @@ public static List getDeployInfo(StreamingContainerAgent sca return sca.getDeployInfoList(sca.container.getOperators()); } - private static OperatorDeployInfo getNodeDeployInfo(List di, OperatorMeta nodeConf) + static OperatorDeployInfo getNodeDeployInfo(List di, OperatorMeta nodeConf) { for (OperatorDeployInfo ndi : di) { if (nodeConf.getName().equals(ndi.name)) {