From 9b1e336ed03a320b25e5799655eb64571ce21c48 Mon Sep 17 00:00:00 2001 From: Ernestas Vaiciukevicius Date: Thu, 12 Jan 2017 16:54:59 +0200 Subject: [PATCH 1/6] Move state from old PartitionManager when recreating manager for same partition --- .../apache/storm/kafka/PartitionManager.java | 116 +++++++++++------- .../org/apache/storm/kafka/ZkCoordinator.java | 16 ++- 2 files changed, 87 insertions(+), 45 deletions(-) diff --git a/external/storm-kafka/src/jvm/org/apache/storm/kafka/PartitionManager.java b/external/storm-kafka/src/jvm/org/apache/storm/kafka/PartitionManager.java index 79e7c3d223b..e8cc31b7a18 100644 --- a/external/storm-kafka/src/jvm/org/apache/storm/kafka/PartitionManager.java +++ b/external/storm-kafka/src/jvm/org/apache/storm/kafka/PartitionManager.java @@ -66,7 +66,29 @@ public class PartitionManager { ZkState _state; Map _stormConf; long numberFailed, numberAcked; - public PartitionManager(DynamicPartitionConnections connections, String topologyInstanceId, ZkState state, Map stormConf, SpoutConfig spoutConfig, Partition id) { + + public PartitionManager( + DynamicPartitionConnections connections, + String topologyInstanceId, + ZkState state, + Map stormConf, + SpoutConfig spoutConfig, + Partition id) + { + this(connections, topologyInstanceId, state, stormConf, spoutConfig, id, null); + } + + /** + * @param previousManager previous partition manager if manager for partition is being recreated + */ + public PartitionManager( + DynamicPartitionConnections connections, + String topologyInstanceId, + ZkState state, + Map stormConf, + SpoutConfig spoutConfig, + Partition id, + PartitionManager previousManager) { _partition = id; _connections = connections; _spoutConfig = spoutConfig; @@ -76,53 +98,63 @@ public PartitionManager(DynamicPartitionConnections connections, String topology _stormConf = stormConf; numberAcked = numberFailed = 0; - try { - _failedMsgRetryManager = (FailedMsgRetryManager) Class.forName(spoutConfig.failedMsgRetryManagerClass).newInstance(); - _failedMsgRetryManager.prepare(spoutConfig, _stormConf); - } catch (ClassNotFoundException | InstantiationException | IllegalAccessException e) { - throw new IllegalArgumentException(String.format("Failed to create an instance of <%s> from: <%s>", - FailedMsgRetryManager.class, - spoutConfig.failedMsgRetryManagerClass), e); - } + if (previousManager != null) { + _failedMsgRetryManager = previousManager._failedMsgRetryManager; + _committedTo = previousManager._committedTo; + _waitingToEmit = previousManager._waitingToEmit; + _pending = previousManager._pending; + LOG.info("Recreating PartitionManager based on previous manager, _waitingToEmit size: {}, _pending size: {}", + _waitingToEmit, + _pending); + } else { + try { + _failedMsgRetryManager = (FailedMsgRetryManager) Class.forName(spoutConfig.failedMsgRetryManagerClass).newInstance(); + _failedMsgRetryManager.prepare(spoutConfig, _stormConf); + } catch (ClassNotFoundException | InstantiationException | IllegalAccessException e) { + throw new IllegalArgumentException(String.format("Failed to create an instance of <%s> from: <%s>", + FailedMsgRetryManager.class, + spoutConfig.failedMsgRetryManagerClass), e); + } - String jsonTopologyId = null; - Long jsonOffset = null; - String path = committedPath(); - try { - Map json = _state.readJSON(path); - LOG.info("Read partition information from: " + path + " --> " + json ); - if (json != null) { - jsonTopologyId = (String) ((Map) json.get("topology")).get("id"); - jsonOffset = (Long) json.get("offset"); + String jsonTopologyId = null; + Long jsonOffset = null; + String path = committedPath(); + try { + Map json = _state.readJSON(path); + LOG.info("Read partition information from: " + path + " --> " + json); + if (json != null) { + jsonTopologyId = (String) ((Map) json.get("topology")).get("id"); + jsonOffset = (Long) json.get("offset"); + } + } catch (Throwable e) { + LOG.warn("Error reading and/or parsing at ZkNode: " + path, e); } - } catch (Throwable e) { - LOG.warn("Error reading and/or parsing at ZkNode: " + path, e); - } - String topic = _partition.topic; - Long currentOffset = KafkaUtils.getOffset(_consumer, topic, id.partition, spoutConfig); + String topic = _partition.topic; + Long currentOffset = KafkaUtils.getOffset(_consumer, topic, id.partition, spoutConfig); - if (jsonTopologyId == null || jsonOffset == null) { // failed to parse JSON? - _committedTo = currentOffset; - LOG.info("No partition information found, using configuration to determine offset"); - } else if (!topologyInstanceId.equals(jsonTopologyId) && spoutConfig.ignoreZkOffsets) { - _committedTo = KafkaUtils.getOffset(_consumer, topic, id.partition, spoutConfig.startOffsetTime); - LOG.info("Topology change detected and ignore zookeeper offsets set to true, using configuration to determine offset"); - } else { - _committedTo = jsonOffset; - LOG.info("Read last commit offset from zookeeper: " + _committedTo + "; old topology_id: " + jsonTopologyId + " - new topology_id: " + topologyInstanceId ); - } + if (jsonTopologyId == null || jsonOffset == null) { // failed to parse JSON? + _committedTo = currentOffset; + LOG.info("No partition information found, using configuration to determine offset"); + } else if (!topologyInstanceId.equals(jsonTopologyId) && spoutConfig.ignoreZkOffsets) { + _committedTo = KafkaUtils.getOffset(_consumer, topic, id.partition, spoutConfig.startOffsetTime); + LOG.info("Topology change detected and ignore zookeeper offsets set to true, using configuration to determine offset"); + } else { + _committedTo = jsonOffset; + LOG.info("Read last commit offset from zookeeper: " + _committedTo + "; old topology_id: " + jsonTopologyId + " - new topology_id: " + topologyInstanceId); + } - if (currentOffset - _committedTo > spoutConfig.maxOffsetBehind || _committedTo <= 0) { - LOG.info("Last commit offset from zookeeper: " + _committedTo); - Long lastCommittedOffset = _committedTo; - _committedTo = currentOffset; - LOG.info("Commit offset " + lastCommittedOffset + " is more than " + - spoutConfig.maxOffsetBehind + " behind latest offset " + currentOffset + ", resetting to startOffsetTime=" + spoutConfig.startOffsetTime); - } + if (currentOffset - _committedTo > spoutConfig.maxOffsetBehind || _committedTo <= 0) { + LOG.info("Last commit offset from zookeeper: " + _committedTo); + Long lastCommittedOffset = _committedTo; + _committedTo = currentOffset; + LOG.info("Commit offset " + lastCommittedOffset + " is more than " + + spoutConfig.maxOffsetBehind + " behind latest offset " + currentOffset + ", resetting to startOffsetTime=" + spoutConfig.startOffsetTime); + } - LOG.info("Starting Kafka " + _consumer.host() + " " + id + " from offset " + _committedTo); - _emittedToOffset = _committedTo; + LOG.info("Starting Kafka " + _consumer.host() + " " + id + " from offset " + _committedTo); + _emittedToOffset = _committedTo; + } _fetchAPILatencyMax = new CombinedMetric(new MaxMetric()); _fetchAPILatencyMean = new ReducedMetric(new MeanReducer()); diff --git a/external/storm-kafka/src/jvm/org/apache/storm/kafka/ZkCoordinator.java b/external/storm-kafka/src/jvm/org/apache/storm/kafka/ZkCoordinator.java index 98bf8a0fabc..14be5845007 100644 --- a/external/storm-kafka/src/jvm/org/apache/storm/kafka/ZkCoordinator.java +++ b/external/storm-kafka/src/jvm/org/apache/storm/kafka/ZkCoordinator.java @@ -88,14 +88,24 @@ public void refresh() { LOG.info(taskId(_taskIndex, _totalTasks) + "Deleted partition managers: " + deletedPartitions.toString()); + Map deletedManagers = new HashMap<>(); for (Partition id : deletedPartitions) { - PartitionManager man = _managers.remove(id); - man.close(); + deletedManagers.put(id.partition, _managers.remove(id)); + } + for (PartitionManager manager : deletedManagers.values()) { + if (manager != null) manager.close(); } LOG.info(taskId(_taskIndex, _totalTasks) + "New partition managers: " + newPartitions.toString()); for (Partition id : newPartitions) { - PartitionManager man = new PartitionManager(_connections, _topologyInstanceId, _state, _stormConf, _spoutConfig, id); + PartitionManager man = new PartitionManager( + _connections, + _topologyInstanceId, + _state, + _stormConf, + _spoutConfig, + id, + deletedManagers.get(id.partition)); _managers.put(id, man); } From a1a7cef9c84941ef8a1909fd4db10c85fe509e0e Mon Sep 17 00:00:00 2001 From: Ernestas Vaiciukevicius Date: Thu, 12 Jan 2017 17:39:51 +0200 Subject: [PATCH 2/6] Test to check if old PartitionManager's state is moved to new manager during manager recreate --- .../apache/storm/kafka/ZkCoordinatorTest.java | 30 +++++++++++++++++-- 1 file changed, 28 insertions(+), 2 deletions(-) diff --git a/external/storm-kafka/src/test/org/apache/storm/kafka/ZkCoordinatorTest.java b/external/storm-kafka/src/test/org/apache/storm/kafka/ZkCoordinatorTest.java index 65bf0b45ee1..b5fe1f2c2f9 100644 --- a/external/storm-kafka/src/test/org/apache/storm/kafka/ZkCoordinatorTest.java +++ b/external/storm-kafka/src/test/org/apache/storm/kafka/ZkCoordinatorTest.java @@ -28,8 +28,7 @@ import java.util.*; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertNotEquals; +import static org.junit.Assert.*; import static org.mockito.Matchers.any; import static org.mockito.Matchers.anyInt; import static org.mockito.Mockito.when; @@ -114,6 +113,33 @@ public void testPartitionsChange() throws Exception { } } + @Test + public void testPartitionManagerRecreate() throws Exception { + final int totalTasks = 2; + int partitionsPerTask = 2; + List coordinatorList = buildCoordinators(totalTasks / partitionsPerTask); + when(reader.getBrokerInfo()).thenReturn(TestUtils.buildPartitionInfoList(TestUtils.buildPartitionInfo(totalTasks, 9092))); + List> partitionManagersBeforeRefresh = getPartitionManagers(coordinatorList); + waitForRefresh(); + when(reader.getBrokerInfo()).thenReturn(TestUtils.buildPartitionInfoList(TestUtils.buildPartitionInfo(totalTasks, 9093))); + List> partitionManagersAfterRefresh = getPartitionManagers(coordinatorList); + assertEquals(partitionManagersAfterRefresh.size(), partitionManagersAfterRefresh.size()); + Iterator> iterator = partitionManagersAfterRefresh.iterator(); + for (List partitionManagersBefore : partitionManagersBeforeRefresh) { + List partitionManagersAfter = iterator.next(); + for (PartitionManager before : partitionManagersBefore) + for (PartitionManager after: partitionManagersAfter) + if (before.getPartition().partition == after.getPartition().partition) + assertStateIsTheSame(before, after); + } + } + + private void assertStateIsTheSame(PartitionManager managerBefore, PartitionManager managerAfter) { + // check if state was actually moved from old PartitionManager + assertNotSame(managerBefore, managerAfter); + assertSame(managerBefore._waitingToEmit, managerAfter._waitingToEmit); + } + private void assertPartitionsAreDifferent(List partitionManagersBefore, List partitionManagersAfter, int partitionsPerTask) { assertEquals(partitionsPerTask, partitionManagersBefore.size()); assertEquals(partitionManagersBefore.size(), partitionManagersAfter.size()); From c8c6ee83d69cf76d8aaeb9d5ccaedbd5946d4c9b Mon Sep 17 00:00:00 2001 From: Ernestas Vaiciukevicius Date: Thu, 12 Jan 2017 17:57:46 +0200 Subject: [PATCH 3/6] Include _emittedToOffset when copying state during PartitionManager recreate --- .../org/apache/storm/kafka/PartitionManager.java | 13 +++++++------ 1 file changed, 7 insertions(+), 6 deletions(-) diff --git a/external/storm-kafka/src/jvm/org/apache/storm/kafka/PartitionManager.java b/external/storm-kafka/src/jvm/org/apache/storm/kafka/PartitionManager.java index e8cc31b7a18..bc355ba76e1 100644 --- a/external/storm-kafka/src/jvm/org/apache/storm/kafka/PartitionManager.java +++ b/external/storm-kafka/src/jvm/org/apache/storm/kafka/PartitionManager.java @@ -101,11 +101,12 @@ public PartitionManager( if (previousManager != null) { _failedMsgRetryManager = previousManager._failedMsgRetryManager; _committedTo = previousManager._committedTo; + _emittedToOffset = previousManager._emittedToOffset; _waitingToEmit = previousManager._waitingToEmit; _pending = previousManager._pending; LOG.info("Recreating PartitionManager based on previous manager, _waitingToEmit size: {}, _pending size: {}", - _waitingToEmit, - _pending); + _waitingToEmit.size(), + _pending.size()); } else { try { _failedMsgRetryManager = (FailedMsgRetryManager) Class.forName(spoutConfig.failedMsgRetryManagerClass).newInstance(); @@ -192,7 +193,7 @@ public EmitState next(SpoutOutputCollector collector) { } else { tups = KafkaUtils.generateTuples(_spoutConfig, toEmit.message(), _partition.topic); } - + if ((tups != null) && tups.iterator().hasNext()) { if (!Strings.isNullOrEmpty(_spoutConfig.outputStreamId)) { for (List tup : tups) { @@ -233,7 +234,7 @@ private void fill() { } catch (TopicOffsetOutOfRangeException e) { offset = KafkaUtils.getOffset(_consumer, _partition.topic, _partition.partition, kafka.api.OffsetRequest.EarliestTime()); // fetch failed, so don't update the fetch metrics - + //fix bug [STORM-643] : remove outdated failed offsets if (!processingNewTuples) { // For the case of EarliestTime it would be better to discard @@ -246,7 +247,7 @@ private void fill() { if (null != omitted) { _lostMessageCount.incrBy(omitted.size()); } - + LOG.warn("Removing the failed offsets for {} that are out of range: {}", _partition, omitted); } @@ -255,7 +256,7 @@ private void fill() { _emittedToOffset = offset; LOG.warn("{} Using new offset: {}", _partition, _emittedToOffset); } - + return; } long end = System.nanoTime(); From e16cd49e0f401a2f984ae2b9f30872ba4d54f831 Mon Sep 17 00:00:00 2001 From: Ernestas Vaiciukevicius Date: Fri, 3 Feb 2017 10:08:40 +0200 Subject: [PATCH 4/6] Fix assert in ZkCoordinator tests --- .../src/test/org/apache/storm/kafka/ZkCoordinatorTest.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/external/storm-kafka/src/test/org/apache/storm/kafka/ZkCoordinatorTest.java b/external/storm-kafka/src/test/org/apache/storm/kafka/ZkCoordinatorTest.java index b5fe1f2c2f9..3dac1303051 100644 --- a/external/storm-kafka/src/test/org/apache/storm/kafka/ZkCoordinatorTest.java +++ b/external/storm-kafka/src/test/org/apache/storm/kafka/ZkCoordinatorTest.java @@ -105,7 +105,7 @@ public void testPartitionsChange() throws Exception { waitForRefresh(); when(reader.getBrokerInfo()).thenReturn(TestUtils.buildPartitionInfoList(TestUtils.buildPartitionInfo(totalTasks, 9093))); List> partitionManagersAfterRefresh = getPartitionManagers(coordinatorList); - assertEquals(partitionManagersAfterRefresh.size(), partitionManagersAfterRefresh.size()); + assertEquals(partitionManagersBeforeRefresh.size(), partitionManagersAfterRefresh.size()); Iterator> iterator = partitionManagersAfterRefresh.iterator(); for (List partitionManagersBefore : partitionManagersBeforeRefresh) { List partitionManagersAfter = iterator.next(); @@ -123,7 +123,7 @@ public void testPartitionManagerRecreate() throws Exception { waitForRefresh(); when(reader.getBrokerInfo()).thenReturn(TestUtils.buildPartitionInfoList(TestUtils.buildPartitionInfo(totalTasks, 9093))); List> partitionManagersAfterRefresh = getPartitionManagers(coordinatorList); - assertEquals(partitionManagersAfterRefresh.size(), partitionManagersAfterRefresh.size()); + assertEquals(partitionManagersBeforeRefresh.size(), partitionManagersAfterRefresh.size()); Iterator> iterator = partitionManagersAfterRefresh.iterator(); for (List partitionManagersBefore : partitionManagersBeforeRefresh) { List partitionManagersAfter = iterator.next(); From 4451e91a17df0cbcd70d98355185e434da597069 Mon Sep 17 00:00:00 2001 From: Ernestas Vaiciukevicius Date: Fri, 3 Feb 2017 10:33:40 +0200 Subject: [PATCH 5/6] Simplify ZkCoordinator test --- .../apache/storm/kafka/ZkCoordinatorTest.java | 20 +++++++++++++------ 1 file changed, 14 insertions(+), 6 deletions(-) diff --git a/external/storm-kafka/src/test/org/apache/storm/kafka/ZkCoordinatorTest.java b/external/storm-kafka/src/test/org/apache/storm/kafka/ZkCoordinatorTest.java index 3dac1303051..ed84191fcb9 100644 --- a/external/storm-kafka/src/test/org/apache/storm/kafka/ZkCoordinatorTest.java +++ b/external/storm-kafka/src/test/org/apache/storm/kafka/ZkCoordinatorTest.java @@ -124,18 +124,26 @@ public void testPartitionManagerRecreate() throws Exception { when(reader.getBrokerInfo()).thenReturn(TestUtils.buildPartitionInfoList(TestUtils.buildPartitionInfo(totalTasks, 9093))); List> partitionManagersAfterRefresh = getPartitionManagers(coordinatorList); assertEquals(partitionManagersBeforeRefresh.size(), partitionManagersAfterRefresh.size()); - Iterator> iterator = partitionManagersAfterRefresh.iterator(); + + HashMap managersAfterRefresh = new HashMap(); + for (List partitionManagersAfter : partitionManagersAfterRefresh) { + for (PartitionManager manager : partitionManagersAfter) { + assertFalse("Multiple PartitionManagers for same partition", managersAfterRefresh.containsKey(manager.getPartition().partition)); + managersAfterRefresh.put(manager.getPartition().partition, manager); + } + } + for (List partitionManagersBefore : partitionManagersBeforeRefresh) { - List partitionManagersAfter = iterator.next(); - for (PartitionManager before : partitionManagersBefore) - for (PartitionManager after: partitionManagersAfter) - if (before.getPartition().partition == after.getPartition().partition) - assertStateIsTheSame(before, after); + for (PartitionManager manager : partitionManagersBefore) { + assertStateIsTheSame(manager, managersAfterRefresh.get(manager.getPartition().partition)); + } } } private void assertStateIsTheSame(PartitionManager managerBefore, PartitionManager managerAfter) { // check if state was actually moved from old PartitionManager + assertNotNull(managerBefore); + assertNotNull(managerAfter); assertNotSame(managerBefore, managerAfter); assertSame(managerBefore._waitingToEmit, managerAfter._waitingToEmit); } From b2c09d49bbd397a1f164f2813e8eb554933c6e20 Mon Sep 17 00:00:00 2001 From: Ernestas Vaiciukevicius Date: Fri, 3 Feb 2017 10:44:50 +0200 Subject: [PATCH 6/6] Added more asserts to ZkCoordinatorTest.testPartitionManagerRecreate --- .../src/test/org/apache/storm/kafka/ZkCoordinatorTest.java | 2 ++ 1 file changed, 2 insertions(+) diff --git a/external/storm-kafka/src/test/org/apache/storm/kafka/ZkCoordinatorTest.java b/external/storm-kafka/src/test/org/apache/storm/kafka/ZkCoordinatorTest.java index ed84191fcb9..adef7408773 100644 --- a/external/storm-kafka/src/test/org/apache/storm/kafka/ZkCoordinatorTest.java +++ b/external/storm-kafka/src/test/org/apache/storm/kafka/ZkCoordinatorTest.java @@ -146,6 +146,8 @@ private void assertStateIsTheSame(PartitionManager managerBefore, PartitionManag assertNotNull(managerAfter); assertNotSame(managerBefore, managerAfter); assertSame(managerBefore._waitingToEmit, managerAfter._waitingToEmit); + assertSame(managerBefore._emittedToOffset, managerAfter._emittedToOffset); + assertSame(managerBefore._committedTo, managerAfter._committedTo); } private void assertPartitionsAreDifferent(List partitionManagersBefore, List partitionManagersAfter, int partitionsPerTask) {