From 3ab11ea2945175713bf759024376b5072d94f0c8 Mon Sep 17 00:00:00 2001 From: Srishty Agrawal Date: Wed, 29 Mar 2017 12:35:57 -0700 Subject: [PATCH] STORM-2506: Print mapping between Task ID and Kafka Partitions --- .../apache/storm/kafka/spout/KafkaSpout.java | 4 ++-- .../org/apache/storm/kafka/KafkaSpout.java | 4 ++-- .../org/apache/storm/kafka/KafkaUtils.java | 17 +++++++------- .../apache/storm/kafka/StaticCoordinator.java | 5 +++-- .../org/apache/storm/kafka/ZkCoordinator.java | 22 +++++++++++-------- .../apache/storm/kafka/KafkaUtilsTest.java | 8 +++---- .../apache/storm/kafka/ZkCoordinatorTest.java | 2 +- .../src/jvm/org/apache/storm/daemon/Task.java | 2 +- 8 files changed, 35 insertions(+), 29 deletions(-) diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java index 09795ed7930..eaf3995827b 100644 --- a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java +++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java @@ -148,8 +148,8 @@ public void onPartitionsRevoked(Collection partitions) { @Override public void onPartitionsAssigned(Collection partitions) { - LOG.info("Partitions reassignment. [consumer-group={}, consumer={}, topic-partitions={}]", - kafkaSpoutConfig.getConsumerGroupId(), kafkaConsumer, partitions); + LOG.info("Partitions reassignment. [task-ID={}, consumer-group={}, consumer={}, topic-partitions={}]", + context.getThisTaskId(), kafkaSpoutConfig.getConsumerGroupId(), kafkaConsumer, partitions); initialize(partitions); } diff --git a/external/storm-kafka/src/jvm/org/apache/storm/kafka/KafkaSpout.java b/external/storm-kafka/src/jvm/org/apache/storm/kafka/KafkaSpout.java index ead605711ff..79cde569d1c 100644 --- a/external/storm-kafka/src/jvm/org/apache/storm/kafka/KafkaSpout.java +++ b/external/storm-kafka/src/jvm/org/apache/storm/kafka/KafkaSpout.java @@ -82,11 +82,11 @@ public void open(Map conf, final TopologyContext context, final if (_spoutConfig.hosts instanceof StaticHosts) { _coordinator = new StaticCoordinator(_connections, conf, _spoutConfig, _state, context.getThisTaskIndex(), - totalTasks, topologyInstanceId); + totalTasks, context.getThisTaskId(), topologyInstanceId); } else { _coordinator = new ZkCoordinator(_connections, conf, _spoutConfig, _state, context.getThisTaskIndex(), - totalTasks, topologyInstanceId); + totalTasks, context.getThisTaskId(), topologyInstanceId); } context.registerMetric("kafkaOffset", new IMetric() { diff --git a/external/storm-kafka/src/jvm/org/apache/storm/kafka/KafkaUtils.java b/external/storm-kafka/src/jvm/org/apache/storm/kafka/KafkaUtils.java index 604f1f38b6f..76bb8968458 100644 --- a/external/storm-kafka/src/jvm/org/apache/storm/kafka/KafkaUtils.java +++ b/external/storm-kafka/src/jvm/org/apache/storm/kafka/KafkaUtils.java @@ -258,7 +258,8 @@ public static Iterable> generateTuples(MessageMetadataSchemeAsMulti } - public static List calculatePartitionsForTask(List partitons, int totalTasks, int taskIndex) { + public static List calculatePartitionsForTask(List partitons, + int totalTasks, int taskIndex, int taskId) { Preconditions.checkArgument(taskIndex < totalTasks, "task index must be less that total tasks"); List taskPartitions = new ArrayList(); List partitions = new ArrayList(); @@ -273,20 +274,20 @@ public static List calculatePartitionsForTask(List taskPartitions) { - String taskPrefix = taskId(taskIndex, totalTasks); + private static void logPartitionMapping(int totalTasks, int taskIndex, List taskPartitions, int taskId) { + String taskPrefix = taskPrefix(taskIndex, totalTasks, taskId); if (taskPartitions.isEmpty()) { - LOG.warn(taskPrefix + "no partitions assigned"); + LOG.warn(taskPrefix + " no partitions assigned"); } else { - LOG.info(taskPrefix + "assigned " + taskPartitions); + LOG.info(taskPrefix + " assigned " + taskPartitions); } } - public static String taskId(int taskIndex, int totalTasks) { - return "Task [" + (taskIndex + 1) + "/" + totalTasks + "] "; + public static String taskPrefix(int taskIndex, int totalTasks, int taskId) { + return "Task [" + (taskIndex + 1) + "/" + totalTasks + "], Task-ID: " + taskId; } } diff --git a/external/storm-kafka/src/jvm/org/apache/storm/kafka/StaticCoordinator.java b/external/storm-kafka/src/jvm/org/apache/storm/kafka/StaticCoordinator.java index 46cb7d95a28..cd23ca6445c 100644 --- a/external/storm-kafka/src/jvm/org/apache/storm/kafka/StaticCoordinator.java +++ b/external/storm-kafka/src/jvm/org/apache/storm/kafka/StaticCoordinator.java @@ -26,11 +26,12 @@ public class StaticCoordinator implements PartitionCoordinator { Map _managers = new HashMap(); List _allManagers = new ArrayList<>(); - public StaticCoordinator(DynamicPartitionConnections connections, Map topoConf, SpoutConfig config, ZkState state, int taskIndex, int totalTasks, String topologyInstanceId) { + public StaticCoordinator(DynamicPartitionConnections connections, Map topoConf, SpoutConfig config, ZkState state, + int taskIndex, int totalTasks, int taskId, String topologyInstanceId) { StaticHosts hosts = (StaticHosts) config.hosts; List partitions = new ArrayList(); partitions.add(hosts.getPartitionInformation()); - List myPartitions = KafkaUtils.calculatePartitionsForTask(partitions, totalTasks, taskIndex); + List myPartitions = KafkaUtils.calculatePartitionsForTask(partitions, totalTasks, taskIndex, taskId); for (Partition myPartition : myPartitions) { _managers.put(myPartition, new PartitionManager(connections, topologyInstanceId, state, topoConf, config, myPartition)); } diff --git a/external/storm-kafka/src/jvm/org/apache/storm/kafka/ZkCoordinator.java b/external/storm-kafka/src/jvm/org/apache/storm/kafka/ZkCoordinator.java index e814157650b..136dc519546 100644 --- a/external/storm-kafka/src/jvm/org/apache/storm/kafka/ZkCoordinator.java +++ b/external/storm-kafka/src/jvm/org/apache/storm/kafka/ZkCoordinator.java @@ -23,7 +23,7 @@ import java.util.*; -import static org.apache.storm.kafka.KafkaUtils.taskId; +import static org.apache.storm.kafka.KafkaUtils.taskPrefix; public class ZkCoordinator implements PartitionCoordinator { private static final Logger LOG = LoggerFactory.getLogger(ZkCoordinator.class); @@ -31,6 +31,7 @@ public class ZkCoordinator implements PartitionCoordinator { SpoutConfig _spoutConfig; int _taskIndex; int _totalTasks; + int _taskId; String _topologyInstanceId; Map _managers = new HashMap(); List _cachedList = new ArrayList(); @@ -41,15 +42,18 @@ public class ZkCoordinator implements PartitionCoordinator { ZkState _state; Map _topoConf; - public ZkCoordinator(DynamicPartitionConnections connections, Map topoConf, SpoutConfig spoutConfig, ZkState state, int taskIndex, int totalTasks, String topologyInstanceId) { - this(connections, topoConf, spoutConfig, state, taskIndex, totalTasks, topologyInstanceId, buildReader(topoConf, spoutConfig)); + public ZkCoordinator(DynamicPartitionConnections connections, Map topoConf, SpoutConfig spoutConfig, ZkState state, + int taskIndex, int totalTasks, int taskId, String topologyInstanceId) { + this(connections, topoConf, spoutConfig, state, taskIndex, totalTasks, taskId, topologyInstanceId, buildReader(topoConf, spoutConfig)); } - public ZkCoordinator(DynamicPartitionConnections connections, Map topoConf, SpoutConfig spoutConfig, ZkState state, int taskIndex, int totalTasks, String topologyInstanceId, DynamicBrokersReader reader) { + public ZkCoordinator(DynamicPartitionConnections connections, Map topoConf, SpoutConfig spoutConfig, ZkState state, + int taskIndex, int totalTasks, int taskId, String topologyInstanceId, DynamicBrokersReader reader) { _spoutConfig = spoutConfig; _connections = connections; _taskIndex = taskIndex; _totalTasks = totalTasks; + _taskId = taskId; _topologyInstanceId = topologyInstanceId; _topoConf = topoConf; _state = state; @@ -75,9 +79,9 @@ public List getMyManagedPartitions() { @Override public void refresh() { try { - LOG.info(taskId(_taskIndex, _totalTasks) + "Refreshing partition manager connections"); + LOG.info(taskPrefix(_taskIndex, _totalTasks, _taskId) + " Refreshing partition manager connections"); List brokerInfo = _reader.getBrokerInfo(); - List mine = KafkaUtils.calculatePartitionsForTask(brokerInfo, _totalTasks, _taskIndex); + List mine = KafkaUtils.calculatePartitionsForTask(brokerInfo, _totalTasks, _taskIndex, _taskId); Set curr = _managers.keySet(); Set newPartitions = new HashSet(mine); @@ -86,7 +90,7 @@ public void refresh() { Set deletedPartitions = new HashSet(curr); deletedPartitions.removeAll(mine); - LOG.info(taskId(_taskIndex, _totalTasks) + "Deleted partition managers: " + deletedPartitions.toString()); + LOG.info(taskPrefix(_taskIndex, _totalTasks, _taskId) + " Deleted partition managers: " + deletedPartitions.toString()); Map deletedManagers = new HashMap<>(); for (Partition id : deletedPartitions) { @@ -95,7 +99,7 @@ public void refresh() { for (PartitionManager manager : deletedManagers.values()) { if (manager != null) manager.close(); } - LOG.info(taskId(_taskIndex, _totalTasks) + "New partition managers: " + newPartitions.toString()); + LOG.info(taskPrefix(_taskIndex, _totalTasks, _taskId) + " New partition managers: " + newPartitions.toString()); for (Partition id : newPartitions) { PartitionManager man = new PartitionManager( @@ -113,7 +117,7 @@ public void refresh() { throw new RuntimeException(e); } _cachedList = new ArrayList(_managers.values()); - LOG.info(taskId(_taskIndex, _totalTasks) + "Finished refreshing"); + LOG.info(taskPrefix(_taskIndex, _totalTasks, _taskId) + " Finished refreshing"); } @Override diff --git a/external/storm-kafka/src/test/org/apache/storm/kafka/KafkaUtilsTest.java b/external/storm-kafka/src/test/org/apache/storm/kafka/KafkaUtilsTest.java index 7fd1d418e0f..b575bc912c8 100644 --- a/external/storm-kafka/src/test/org/apache/storm/kafka/KafkaUtilsTest.java +++ b/external/storm-kafka/src/test/org/apache/storm/kafka/KafkaUtilsTest.java @@ -271,7 +271,7 @@ public void runPartitionToTaskMappingTest(int numPartitions, int partitionsPerTa partitions.add(globalPartitionInformation); int numTasks = numPartitions / partitionsPerTask; for (int i = 0 ; i < numTasks ; i++) { - assertEquals(partitionsPerTask, KafkaUtils.calculatePartitionsForTask(partitions, numTasks, i).size()); + assertEquals(partitionsPerTask, KafkaUtils.calculatePartitionsForTask(partitions, numTasks, i, i).size()); } } @@ -281,8 +281,8 @@ public void moreTasksThanPartitions() { List partitions = new ArrayList(); partitions.add(globalPartitionInformation); int numTasks = 2; - assertEquals(1, KafkaUtils.calculatePartitionsForTask(partitions, numTasks, 0).size()); - assertEquals(0, KafkaUtils.calculatePartitionsForTask(partitions, numTasks, 1).size()); + assertEquals(1, KafkaUtils.calculatePartitionsForTask(partitions, numTasks, 0, 0).size()); + assertEquals(0, KafkaUtils.calculatePartitionsForTask(partitions, numTasks, 1, 1).size()); } @Test (expected = IllegalArgumentException.class ) @@ -290,6 +290,6 @@ public void assignInvalidTask() { GlobalPartitionInformation globalPartitionInformation = new GlobalPartitionInformation(TEST_TOPIC); List partitions = new ArrayList(); partitions.add(globalPartitionInformation); - KafkaUtils.calculatePartitionsForTask(partitions, 1, 1); + KafkaUtils.calculatePartitionsForTask(partitions, 1, 1, 1); } } diff --git a/external/storm-kafka/src/test/org/apache/storm/kafka/ZkCoordinatorTest.java b/external/storm-kafka/src/test/org/apache/storm/kafka/ZkCoordinatorTest.java index 9778d15d784..8d8746fb36e 100644 --- a/external/storm-kafka/src/test/org/apache/storm/kafka/ZkCoordinatorTest.java +++ b/external/storm-kafka/src/test/org/apache/storm/kafka/ZkCoordinatorTest.java @@ -174,7 +174,7 @@ private void waitForRefresh() throws InterruptedException { private List buildCoordinators(int totalTasks) { List coordinatorList = new ArrayList(); for (int i = 0; i < totalTasks; i++) { - ZkCoordinator coordinator = new ZkCoordinator(dynamicPartitionConnections, topoConf, spoutConfig, state, i, totalTasks, "test-id", reader); + ZkCoordinator coordinator = new ZkCoordinator(dynamicPartitionConnections, topoConf, spoutConfig, state, i, totalTasks, i, "test-id", reader); coordinatorList.add(coordinator); } return coordinatorList; diff --git a/storm-client/src/jvm/org/apache/storm/daemon/Task.java b/storm-client/src/jvm/org/apache/storm/daemon/Task.java index 5c28085a0f5..b79a259ead3 100644 --- a/storm-client/src/jvm/org/apache/storm/daemon/Task.java +++ b/storm-client/src/jvm/org/apache/storm/daemon/Task.java @@ -124,7 +124,7 @@ public List getOutgoingTasks(Integer outTaskId, String stream, List getOutgoingTasks(String stream, List values) { if (debug) { - LOG.info("Emitting: {} {} {}", componentId, stream, values); + LOG.info("Emitting Tuple: taskId={} componentId={} stream={} values={}", taskId, componentId, stream, values); } List outTasks = new ArrayList<>();