Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Develop #137

Merged
merged 85 commits into from Aug 1, 2017
Merged

Develop #137

merged 85 commits into from Aug 1, 2017

Conversation

lyy4j
Copy link

@lyy4j lyy4j commented Jul 27, 2017

producer client of ConsistentHash selector:
here the code:

public class SelectMessageQueueByConsistentHash implements MessageQueueSelector {

private volatile SortedMap<Integer, String> virtualNodes =
        new TreeMap<Integer, String>();

private static final int VIRTUAL_NODES = 100;

private volatile HashMap<String, MessageQueue> idToQueueMap = new HashMap<String, MessageQueue>();

private Object idToQueueMapMonitor = new Object();

@Override
public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
    if (queueChange(mqs)) {
        synchronized (this.idToQueueMapMonitor) {
            if (queueChange(mqs)) {
                reloadConsistentHash(mqs);
            }
        }
    }
    String uniqueQueueId = getMsgQueue(arg.toString());
    MessageQueue messageQueue = idToQueueMap.get(uniqueQueueId);
    return messageQueue;
}

private boolean queueChange(List<MessageQueue> mqs) {
    if (mqs.size() != this.idToQueueMap.size()) {
        return true;
    }

    for (MessageQueue queue : mqs) {
        String id = queue.getTopic() + "_" + queue.getBrokerName() + "_" + queue.getQueueId();
        if (!this.idToQueueMap.containsKey(id)) {
            return true;
        }
    }

    return false;
}

private String getMsgQueue(String node) {

    int hash = getHash(node);
    SortedMap<Integer, String> subMap =
            virtualNodes.tailMap(hash);

    Integer i;
    String virtualNode;

    if (subMap.size() == 0) { 
        i = virtualNodes.firstKey();
        virtualNode = virtualNodes.get(i);
    } else {
        i = subMap.firstKey();
        virtualNode = subMap.get(i);
    }


    String result = virtualNode.substring(0, virtualNode.indexOf("&&"));
    return result;
}

private int getHash(String str) {
    final int p = 16777619;
    int hash = (int) 2166136261L;
    for (int i = 0; i < str.length(); i++)
        hash = (hash ^ str.charAt(i)) * p;
    hash += hash << 13;
    hash ^= hash >> 7;
    hash += hash << 3;
    hash ^= hash >> 17;
    hash += hash << 5;

    if (hash < 0)
        hash = Math.abs(hash);
    return hash;
}

private void reloadConsistentHash(List<MessageQueue> mqs) {
    idToQueueMap.clear();
    for (MessageQueue messageQueue : mqs) {
        String id = messageQueue.getTopic() + "_" + messageQueue.getBrokerName() + "_" + messageQueue.getQueueId();
        idToQueueMap.put(id, messageQueue);
    }

    virtualNodes.clear();

    for (String id : idToQueueMap.keySet()) {
        for (int i = 0; i < VIRTUAL_NODES; i++) {
            String virtualNodeName = id + "&&VN" + String.valueOf(i);
            int hash = getHash(virtualNodeName);
            virtualNodes.put(hash, virtualNodeName);
        }
    }
}

}

yilingfeng and others added 30 commits February 28, 2017 19:22
…ease-client profile similar to release-all profile, reason of performing this change is that latest version of maven-assembly-plugin has removed finalName property.
Signed-off-by: shtykh_roman <rshtykh@yahoo.com>
dongeforever and others added 20 commits June 6, 2017 10:44
…atch scenario

Author: evthoriz <evthoriz@gmail.com>

Closes #128 from evthoriz/bugfix.
… for BrokerData#selectBrokerAddr().

Signed-off-by: shroman <rshtykh@yahoo.com>
@vsair
Copy link
Contributor

vsair commented Jul 28, 2017

I think you may have a look at the contributor's guideline.

http://rocketmq.apache.org/docs/pull-request/

vongosling and others added 2 commits July 29, 2017 16:17
…se failsafe:integration-test run it but not run ut
# Conflicts:
#	broker/src/main/java/org/apache/rocketmq/broker/processor/PullMessageProcessor.java
#	common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java
#	common/src/main/java/org/apache/rocketmq/common/MixAll.java
#	common/src/main/java/org/apache/rocketmq/common/message/Message.java
#	common/src/main/java/org/apache/rocketmq/common/protocol/route/BrokerData.java
#	pom.xml
#	remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingAbstract.java
#	store/src/main/java/org/apache/rocketmq/store/CommitLog.java
#	test/src/main/java/org/apache/rocketmq/test/listener/AbstractListener.java
#	test/src/test/java/org/apache/rocketmq/test/client/consumer/filter/SqlFilterIT.java
@asfgit asfgit merged commit 9bb6eae into master Aug 1, 2017
@coveralls
Copy link

Coverage Status

Coverage decreased (-0.03%) to 39.09% when pulling 9bb6eae on develop into d414920 on master.

2 similar comments
@coveralls
Copy link

Coverage Status

Coverage decreased (-0.03%) to 39.09% when pulling 9bb6eae on develop into d414920 on master.

@coveralls
Copy link

Coverage Status

Coverage decreased (-0.03%) to 39.09% when pulling 9bb6eae on develop into d414920 on master.

lizhanhui pushed a commit to lizhanhui/rocketmq that referenced this pull request Jun 25, 2019
lizhanhui pushed a commit to lizhanhui/rocketmq that referenced this pull request Jun 25, 2019
lollipopjin added a commit to lollipopjin/rocketmq that referenced this pull request Jun 16, 2022
…ature_1

* origin/develop_with_prehistory: (3800 commits)
  Issue apache#179 [BUG]check properties length
  Issue apache#161 printmsg bug 修复
  Issue apache#175 change version to 3.6.3-SNAPSHOT
  Issue apache#175 pub 3.6.2 server release
  Issue apache#174 顺序消息重试次数超过默认值,导致消息直接进入死信队列
  Issue apache#173 日志滚动异常,更新日志配置
  Issue apache#172 rebalance 抛 Error 异常,导致 rebalance 被中断
  Issue apache#171 指定消息 id 进行重新发送
  Issue apache#146 add trace hook switch and consume return type
  Issue apache#146 add trace hook switch and consume return type
  Issue apache#169
  Issue apache#158 1. Unify config manager to configuration in common. 2. Configuration is hold in controller. 3. Extend config object or property can register to configuration.
  Issue apache#163 1. command of get broker config
  Issue apache#163 1. Directly print msg after query by queue offset.
  Issue apache#163 1. format file header
  Issue apache#163 1. support for modifying the config of name server dynamically 2. add two request codes 3. add two commands 3. set config store path when start up using special property file
  Issue apache#137 【买卖家】队列热点数据分析
  Issue apache#162 ConsumerConnection 获取连接失败。
  Issue apache#161 printmsg 使用 tag 过滤时,拉消息不全
  Issue apache#160 重置位点优化
  ...
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet