Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
Revert "fixed issue 2: #2"
This reverts commit 7c1fd3a.
  • Loading branch information
adyliu committed May 26, 2012
1 parent 7c1fd3a commit 10db9a2
Showing 1 changed file with 40 additions and 24 deletions.
64 changes: 40 additions & 24 deletions src/main/java/com/sohu/jafka/producer/ZKBrokerPartitionInfo.java
Expand Up @@ -17,7 +17,6 @@


package com.sohu.jafka.producer; package com.sohu.jafka.producer;


import static java.lang.String.format;


import java.util.ArrayList; import java.util.ArrayList;
import java.util.HashMap; import java.util.HashMap;
Expand Down Expand Up @@ -121,9 +120,11 @@ public Map<Integer, Broker> getAllBrokerInfo() {
} }


/** /**
* Generate a sequence of (brokerId, numPartitions) for all topics registered in zookeeper * Generate a sequence of (brokerId, numPartitions) for all topics
* registered in zookeeper
* *
* @return a mapping from topic to sequence of (brokerId, numPartitions) * @return a mapping from topic to sequence of (brokerId,
* numPartitions)
*/ */
private Map<String, SortedSet<Partition>> getZKTopicPartitionInfo() { private Map<String, SortedSet<Partition>> getZKTopicPartitionInfo() {
final Map<String, SortedSet<Partition>> brokerPartitionsPerTopic = new HashMap<String, SortedSet<Partition>>(); final Map<String, SortedSet<Partition>> brokerPartitionsPerTopic = new HashMap<String, SortedSet<Partition>>();
Expand Down Expand Up @@ -153,7 +154,7 @@ private Map<Integer, Broker> getZKBrokerInfo() {
Map<Integer, Broker> brokers = new HashMap<Integer, Broker>(); Map<Integer, Broker> brokers = new HashMap<Integer, Broker>();
List<String> allBrokersIds = ZkUtils.getChildrenParentMayNotExist(zkClient, ZkUtils.BrokerIdsPath); List<String> allBrokersIds = ZkUtils.getChildrenParentMayNotExist(zkClient, ZkUtils.BrokerIdsPath);
if (allBrokersIds != null) { if (allBrokersIds != null) {
logger.info("read all brokers count: " + allBrokersIds.size()); logger.info("read all brokers count: "+allBrokersIds.size());
for (String brokerId : allBrokersIds) { for (String brokerId : allBrokersIds) {
String brokerInfo = ZkUtils.readData(zkClient, ZkUtils.BrokerIdsPath + "/" + brokerId); String brokerInfo = ZkUtils.readData(zkClient, ZkUtils.BrokerIdsPath + "/" + brokerId);
Broker createBroker = Broker.createBroker(Integer.valueOf(brokerId), brokerInfo); Broker createBroker = Broker.createBroker(Integer.valueOf(brokerId), brokerInfo);
Expand Down Expand Up @@ -181,14 +182,14 @@ class BrokerTopicsListener implements IZkChildListener {


private Map<Integer, Broker> originBrokerIds; private Map<Integer, Broker> originBrokerIds;


public BrokerTopicsListener(Map<String, SortedSet<Partition>> originalBrokerTopicsParitions, public BrokerTopicsListener(Map<String, SortedSet<Partition>> originalBrokerTopicsParitions, Map<Integer, Broker> originBrokerIds) {
Map<Integer, Broker> originBrokerIds) {
super(); super();
this.originalBrokerTopicsParitions = new HashMap<String, SortedSet<Partition>>( this.originalBrokerTopicsParitions = originalBrokerTopicsParitions;
originalBrokerTopicsParitions); this.originBrokerIds = originBrokerIds;
this.originBrokerIds = new HashMap<Integer, Broker>(originBrokerIds); logger.debug("[BrokerTopicsListener] Creating broker topics listener to watch the following paths - \n"
logger.debug("[BrokerTopicsListener] Creating broker topics listener to watch the following paths - \n" + "/broker/topics, /broker/topics/<topic>, /broker/<ids>"); + "/broker/topics, /broker/topics/<topic>, /broker/<ids>");
logger.debug("[BrokerTopicsListener] Initialized this broker topics listener with initial mapping of broker id to " + "partition id per topic with " + originalBrokerTopicsParitions); logger.debug("[BrokerTopicsListener] Initialized this broker topics listener with initial mapping of broker id to "
+ "partition id per topic with " + originalBrokerTopicsParitions);
} }


public void handleChildChange(final String parentPath, List<String> currentChilds) throws Exception { public void handleChildChange(final String parentPath, List<String> currentChilds) throws Exception {
Expand All @@ -206,16 +207,16 @@ public void handleChildChange(final String parentPath, List<String> currentChild
String path = ZkUtils.BrokerTopicsPath + "/" + addedTopic; String path = ZkUtils.BrokerTopicsPath + "/" + addedTopic;
List<String> brokerList = ZkUtils.getChildrenParentMayNotExist(zkClient, path); List<String> brokerList = ZkUtils.getChildrenParentMayNotExist(zkClient, path);
processNewBrokerInExistingTopic(addedTopic, brokerList); processNewBrokerInExistingTopic(addedTopic, brokerList);
zkClient.subscribeChildChanges(ZkUtils.BrokerTopicsPath + "/" + addedTopic, zkClient.subscribeChildChanges(ZkUtils.BrokerTopicsPath + "/" + addedTopic, brokerTopicsListener);
brokerTopicsListener);
} }
} else if (ZkUtils.BrokerIdsPath.equals(parentPath)) { } else if (ZkUtils.BrokerIdsPath.equals(parentPath)) {
processBrokerChange(parentPath, curChilds); processBrokerChange(parentPath, curChilds);
} else { } else {
//check path: /brokers/topics/<topicname> //check path: /brokers/topics/<topicname>
String[] ps = parentPath.split("/"); String[] ps = parentPath.split("/");
if (ps.length == 4 && "topics".equals(ps[2])) { if (ps.length == 4 && "topics".equals(ps[2])) {
logger.debug("[BrokerTopicsListener] List of brokers changed at " + parentPath + "\t Currently registered " + " list of brokers -> " + curChilds + " for topic -> " + ps[3]); logger.debug("[BrokerTopicsListener] List of brokers changed at " + parentPath + "\t Currently registered " + " list of brokers -> "
+ curChilds + " for topic -> " + ps[3]);
processNewBrokerInExistingTopic(ps[3], curChilds); processNewBrokerInExistingTopic(ps[3], curChilds);
} }
} }
Expand All @@ -225,6 +226,10 @@ public void handleChildChange(final String parentPath, List<String> currentChild
} }
} }


/**
* @param parentPath
* @param curChilds
*/
private void processBrokerChange(String parentPath, List<String> curChilds) { private void processBrokerChange(String parentPath, List<String> curChilds) {
final Map<Integer, Broker> oldBrokerIdMap = new HashMap<Integer, Broker>(originBrokerIds); final Map<Integer, Broker> oldBrokerIdMap = new HashMap<Integer, Broker>(originBrokerIds);
for (int i = curChilds.size() - 1; i >= 0; i--) { for (int i = curChilds.size() - 1; i >= 0; i--) {
Expand Down Expand Up @@ -265,6 +270,10 @@ private void processBrokerChange(String parentPath, List<String> curChilds) {
} }
} }


/**
* @param topic
* @param brokerList
*/
private void processNewBrokerInExistingTopic(String topic, List<String> brokerList) { private void processNewBrokerInExistingTopic(String topic, List<String> brokerList) {


SortedSet<Partition> updatedBrokerParts = getBrokerPartitions(zkClient, topic, brokerList); SortedSet<Partition> updatedBrokerParts = getBrokerPartitions(zkClient, topic, brokerList);
Expand All @@ -279,14 +288,19 @@ private void processNewBrokerInExistingTopic(String topic, List<String> brokerLi
// keep only brokers that are alive // keep only brokers that are alive
Iterator<Partition> iter = mergedBrokerParts.iterator(); Iterator<Partition> iter = mergedBrokerParts.iterator();
while (iter.hasNext()) { while (iter.hasNext()) {
Partition p = iter.next(); if (!allBrokers.containsKey(iter.next().brokerId)) {
if (!allBrokers.containsKey(p.brokerId)) {
iter.remove(); iter.remove();
logger.warn(format("remove partition[%s] while broker[%s] is dead.", p.getName(), p.brokerId));
} }
} }
// mergedBrokerParts = Sets.filter(mergedBrokerParts, new Predicate<Partition>() {
//
// public boolean apply(Partition input) {
// return allBrokers.containsKey(input.brokerId);
// }
// });
topicBrokerPartitions.put(topic, mergedBrokerParts); topicBrokerPartitions.put(topic, mergedBrokerParts);
logger.info(format("topic[%s] available partitions are: %s", topic, mergedBrokerParts)); logger.debug("[BrokerTopicsListener] List of broker partitions for topic: " + topic + " are " +
mergedBrokerParts);
} }


private void resetState() { private void resetState() {
Expand All @@ -304,8 +318,8 @@ class ZKSessionExpirationListener implements IZkStateListener {


public void handleNewSession() throws Exception { public void handleNewSession() throws Exception {
/** /**
* When we get a SessionExpired event, we lost all ephemeral nodes and zkclient has * When we get a SessionExpired event, we lost all ephemeral
* reestablished a connection for us. * nodes and zkclient has reestablished a connection for us.
*/ */
logger.info("ZK expired; release old list of broker partitions for topics "); logger.info("ZK expired; release old list of broker partitions for topics ");
topicBrokerPartitions = getZKTopicPartitionInfo(); topicBrokerPartitions = getZKTopicPartitionInfo();
Expand All @@ -327,12 +341,14 @@ public void handleStateChanged(KeeperState state) throws Exception {
} }


/** /**
* Generate a mapping from broker id to (brokerId, numPartitions) for the list of brokers * Generate a mapping from broker id to (brokerId, numPartitions) for
* specified * the list of brokers specified
* *
* @param topic the topic to which the brokers have registered * @param topic the topic to which the brokers have registered
* @param brokerList the list of brokers for which the partitions info is to be generated * @param brokerList the list of brokers for which the partitions info
* @return a sequence of (brokerId, numPartitions) for brokers in brokerList * is to be generated
* @return a sequence of (brokerId, numPartitions) for brokers in
* brokerList
*/ */
private static SortedSet<Partition> getBrokerPartitions(ZkClient zkClient, String topic, List<?> brokerList) { private static SortedSet<Partition> getBrokerPartitions(ZkClient zkClient, String topic, List<?> brokerList) {
final String brokerTopicPath = ZkUtils.BrokerTopicsPath + "/" + topic; final String brokerTopicPath = ZkUtils.BrokerTopicsPath + "/" + topic;
Expand Down

0 comments on commit 10db9a2

Please sign in to comment.