Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

when the brokers more than three. The problem of selectOneMessageQueue when send message #2223

Closed
542928492 opened this issue Aug 3, 2020 · 3 comments

Comments

@542928492
Copy link
Contributor

 When there are three or more borkers in one topic's routeinfo ---
TopicPublishInfo, for example broker-001,broker-002,broker-003,Some times the
selected messagequeue list may be broker-001 broker-002 broker-001,rather than
broker-001 broker-002 broker-003.

  When sending message to broker-001\broker-002 is timeout or other problem the send result will always be failed,and lose one chance to send broker-003. because it is
not set selected pos to sendWhichQueue's ThreadLocalIndex.

@542928492
Copy link
Contributor Author

Reproduce code

import org.apache.rocketmq.client.common.ThreadLocalIndex;
import org.apache.rocketmq.common.message.MessageQueue;

import java.util.ArrayList;
import java.util.List;

public class MQTEST {

public static List<MessageQueue> messageQueueList = new ArrayList<>();
private static volatile ThreadLocalIndex sendWhichQueue = new ThreadLocalIndex();

public static void main(String[] args) {

    for (int i=0;i<8;i++){
        MessageQueue mq = new MessageQueue();
        mq.setBrokerName("b1");
        mq.setQueueId(i);
        mq.setTopic("TT");
        messageQueueList.add(mq);
    }

    for (int i=0;i<8;i++){
        MessageQueue mq = new MessageQueue();
        mq.setBrokerName("b2");
        mq.setQueueId(i);
        mq.setTopic("TT");
        messageQueueList.add(mq);
    }

    for (int i=0;i<8;i++){
        MessageQueue mq = new MessageQueue();
        mq.setBrokerName("b3");
        mq.setQueueId(i);
        mq.setTopic("TT");
        messageQueueList.add(mq);
    }

    MessageQueue mq = new MessageQueue();
    mq.setBrokerName("c1");
    mq.setQueueId(0);
    mq.setTopic("TT");
    messageQueueList.add(mq);

    MessageQueue mq2 = new MessageQueue();
    mq2.setBrokerName("c2");
    mq2.setQueueId(0);
    mq2.setTopic("TT");
    messageQueueList.add(mq2);

    MessageQueue mqtmp = null;
    for ( int times = 0; times < 3; times++) {
        String lastBrokerName = null == mqtmp ? null : mqtmp.getBrokerName();
        mqtmp= selectOneMessageQueue(lastBrokerName);
        System.out.println(mqtmp.getBrokerName());
    }

}

public static MessageQueue selectOneMessageQueue(final String lastBrokerName) {
    if (lastBrokerName == null) {
        return selectOneMessageQueue();
    } else {
        int index = sendWhichQueue.getAndIncrement();
        for (int i = 0; i < messageQueueList.size(); i++) {
            int pos = Math.abs(index++) % messageQueueList.size();
            if (pos < 0)
                pos = 0;
            MessageQueue mq = messageQueueList.get(pos);
            if (!mq.getBrokerName().equals(lastBrokerName)) {
                return mq;
            }
        }
        return selectOneMessageQueue();
    }
}
public static MessageQueue selectOneMessageQueue() {
    int index = sendWhichQueue.getAndIncrement();
    int pos = Math.abs(index) % messageQueueList.size();
    if (pos < 0)
        pos = 0;
    return messageQueueList.get(pos);
}

}

@542928492
Copy link
Contributor Author

Repair plan

public static MessageQueue selectOneMessageQueue(final String lastBrokerName) {
if (lastBrokerName == null) {
return selectOneMessageQueue();
} else {

        for (int i = 0; i < messageQueueList.size(); i++) {
            //repair: move sendWhichQueue to here
            int index = sendWhichQueue.getAndIncrement();
            int pos = Math.abs(index) % messageQueueList.size();
            if (pos < 0)
                pos = 0;
            MessageQueue mq = messageQueueList.get(pos);
            if (!mq.getBrokerName().equals(lastBrokerName)) {
                return mq;
            }
        }
        return selectOneMessageQueue();
    }

@RongtongJin
Copy link
Contributor

IMO, you are right. Could you submit a pull request to fix this issue?

@RongtongJin RongtongJin reopened this Aug 6, 2020
542928492 added a commit to 542928492/rocketmq-1 that referenced this issue Aug 17, 2020
@RongtongJin RongtongJin changed the title when the brokers more than three。 The problem of selectOneMessageQueue when send message when the brokers more than three. The problem of selectOneMessageQueue when send message Aug 17, 2020
GenerousMan pushed a commit to GenerousMan/rocketmq that referenced this issue Aug 12, 2022
pulllock pushed a commit to pulllock/rocketmq that referenced this issue Oct 19, 2023
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

2 participants