diff --git a/src/main/java/com/sohu/jafka/producer/ZKBrokerPartitionInfo.java b/src/main/java/com/sohu/jafka/producer/ZKBrokerPartitionInfo.java index 0c039e8..8a39aba 100644 --- a/src/main/java/com/sohu/jafka/producer/ZKBrokerPartitionInfo.java +++ b/src/main/java/com/sohu/jafka/producer/ZKBrokerPartitionInfo.java @@ -17,7 +17,6 @@ package com.sohu.jafka.producer; -import static java.lang.String.format; import java.util.ArrayList; import java.util.HashMap; @@ -121,9 +120,11 @@ public Map getAllBrokerInfo() { } /** - * Generate a sequence of (brokerId, numPartitions) for all topics registered in zookeeper + * Generate a sequence of (brokerId, numPartitions) for all topics + * registered in zookeeper * - * @return a mapping from topic to sequence of (brokerId, numPartitions) + * @return a mapping from topic to sequence of (brokerId, + * numPartitions) */ private Map> getZKTopicPartitionInfo() { final Map> brokerPartitionsPerTopic = new HashMap>(); @@ -153,7 +154,7 @@ private Map getZKBrokerInfo() { Map brokers = new HashMap(); List allBrokersIds = ZkUtils.getChildrenParentMayNotExist(zkClient, ZkUtils.BrokerIdsPath); if (allBrokersIds != null) { - logger.info("read all brokers count: " + allBrokersIds.size()); + logger.info("read all brokers count: "+allBrokersIds.size()); for (String brokerId : allBrokersIds) { String brokerInfo = ZkUtils.readData(zkClient, ZkUtils.BrokerIdsPath + "/" + brokerId); Broker createBroker = Broker.createBroker(Integer.valueOf(brokerId), brokerInfo); @@ -181,14 +182,14 @@ class BrokerTopicsListener implements IZkChildListener { private Map originBrokerIds; - public BrokerTopicsListener(Map> originalBrokerTopicsParitions, - Map originBrokerIds) { + public BrokerTopicsListener(Map> originalBrokerTopicsParitions, Map originBrokerIds) { super(); - this.originalBrokerTopicsParitions = new HashMap>( - originalBrokerTopicsParitions); - this.originBrokerIds = new HashMap(originBrokerIds); - logger.debug("[BrokerTopicsListener] Creating broker topics listener to watch the following paths - \n" + "/broker/topics, /broker/topics/, /broker/"); - logger.debug("[BrokerTopicsListener] Initialized this broker topics listener with initial mapping of broker id to " + "partition id per topic with " + originalBrokerTopicsParitions); + this.originalBrokerTopicsParitions = originalBrokerTopicsParitions; + this.originBrokerIds = originBrokerIds; + logger.debug("[BrokerTopicsListener] Creating broker topics listener to watch the following paths - \n" + + "/broker/topics, /broker/topics/, /broker/"); + logger.debug("[BrokerTopicsListener] Initialized this broker topics listener with initial mapping of broker id to " + + "partition id per topic with " + originalBrokerTopicsParitions); } public void handleChildChange(final String parentPath, List currentChilds) throws Exception { @@ -206,8 +207,7 @@ public void handleChildChange(final String parentPath, List currentChild String path = ZkUtils.BrokerTopicsPath + "/" + addedTopic; List brokerList = ZkUtils.getChildrenParentMayNotExist(zkClient, path); processNewBrokerInExistingTopic(addedTopic, brokerList); - zkClient.subscribeChildChanges(ZkUtils.BrokerTopicsPath + "/" + addedTopic, - brokerTopicsListener); + zkClient.subscribeChildChanges(ZkUtils.BrokerTopicsPath + "/" + addedTopic, brokerTopicsListener); } } else if (ZkUtils.BrokerIdsPath.equals(parentPath)) { processBrokerChange(parentPath, curChilds); @@ -215,7 +215,8 @@ public void handleChildChange(final String parentPath, List currentChild //check path: /brokers/topics/ String[] ps = parentPath.split("/"); if (ps.length == 4 && "topics".equals(ps[2])) { - logger.debug("[BrokerTopicsListener] List of brokers changed at " + parentPath + "\t Currently registered " + " list of brokers -> " + curChilds + " for topic -> " + ps[3]); + logger.debug("[BrokerTopicsListener] List of brokers changed at " + parentPath + "\t Currently registered " + " list of brokers -> " + + curChilds + " for topic -> " + ps[3]); processNewBrokerInExistingTopic(ps[3], curChilds); } } @@ -225,6 +226,10 @@ public void handleChildChange(final String parentPath, List currentChild } } + /** + * @param parentPath + * @param curChilds + */ private void processBrokerChange(String parentPath, List curChilds) { final Map oldBrokerIdMap = new HashMap(originBrokerIds); for (int i = curChilds.size() - 1; i >= 0; i--) { @@ -265,6 +270,10 @@ private void processBrokerChange(String parentPath, List curChilds) { } } + /** + * @param topic + * @param brokerList + */ private void processNewBrokerInExistingTopic(String topic, List brokerList) { SortedSet updatedBrokerParts = getBrokerPartitions(zkClient, topic, brokerList); @@ -279,14 +288,19 @@ private void processNewBrokerInExistingTopic(String topic, List brokerLi // keep only brokers that are alive Iterator iter = mergedBrokerParts.iterator(); while (iter.hasNext()) { - Partition p = iter.next(); - if (!allBrokers.containsKey(p.brokerId)) { + if (!allBrokers.containsKey(iter.next().brokerId)) { iter.remove(); - logger.warn(format("remove partition[%s] while broker[%s] is dead.", p.getName(), p.brokerId)); } } +// mergedBrokerParts = Sets.filter(mergedBrokerParts, new Predicate() { +// +// public boolean apply(Partition input) { +// return allBrokers.containsKey(input.brokerId); +// } +// }); topicBrokerPartitions.put(topic, mergedBrokerParts); - logger.info(format("topic[%s] available partitions are: %s", topic, mergedBrokerParts)); + logger.debug("[BrokerTopicsListener] List of broker partitions for topic: " + topic + " are " + + mergedBrokerParts); } private void resetState() { @@ -304,8 +318,8 @@ class ZKSessionExpirationListener implements IZkStateListener { public void handleNewSession() throws Exception { /** - * When we get a SessionExpired event, we lost all ephemeral nodes and zkclient has - * reestablished a connection for us. + * When we get a SessionExpired event, we lost all ephemeral + * nodes and zkclient has reestablished a connection for us. */ logger.info("ZK expired; release old list of broker partitions for topics "); topicBrokerPartitions = getZKTopicPartitionInfo(); @@ -327,12 +341,14 @@ public void handleStateChanged(KeeperState state) throws Exception { } /** - * Generate a mapping from broker id to (brokerId, numPartitions) for the list of brokers - * specified + * Generate a mapping from broker id to (brokerId, numPartitions) for + * the list of brokers specified * * @param topic the topic to which the brokers have registered - * @param brokerList the list of brokers for which the partitions info is to be generated - * @return a sequence of (brokerId, numPartitions) for brokers in brokerList + * @param brokerList the list of brokers for which the partitions info + * is to be generated + * @return a sequence of (brokerId, numPartitions) for brokers in + * brokerList */ private static SortedSet getBrokerPartitions(ZkClient zkClient, String topic, List brokerList) { final String brokerTopicPath = ZkUtils.BrokerTopicsPath + "/" + topic;