Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

开启5个子线程发送5条MQ消息,MqttConsumer 只能收到4条或者更少,必现。 #215

Open
lzkisok opened this issue Oct 25, 2023 · 18 comments

Comments

@lzkisok
Copy link

lzkisok commented Oct 25, 2023

No description provided.

@lzkisok
Copy link
Author

lzkisok commented Oct 25, 2023

或者异步发送5条消息,MqttConsumer 会丢消息,麻烦官方确认下

@lzkisok
Copy link
Author

lzkisok commented Oct 25, 2023

多试几次,必现

@lzkisok
Copy link
Author

lzkisok commented Oct 26, 2023

进过测试发现,parseLmqOffset 导致两条lmq消息offset相同,addSendingMessages 只add进一条,hashcode和equals是根据offset判断的

@DongyuanPan
Copy link
Contributor

DongyuanPan commented Oct 26, 2023

给一下rocketmq和rocketmq-mqtt的版本,以及复现的步骤

@lzkisok
Copy link
Author

lzkisok commented Oct 26, 2023

rocktmq 5.1.4版本,rocketmq-mqtt 1.0.1 版本
1,项目demo改RocketMQProducer 改成5个子线程同步发送5条mq消息
2,MqttConsumer可以观察到没有完整收到5条消息,消息丢失了
3,如果MqttConsumer完整收到5条消息,重复1,2步骤,继续观察,比较容易复现

@lzkisok
Copy link
Author

lzkisok commented Oct 26, 2023

目前观察到的原因:
从rocketmq一次pull到两条消息,两条消息的INNER_MULTI_QUEUE_OFFSET 相同, 导致了这个问题

@DongyuanPan
Copy link
Contributor

rocktmq 5.1.4版本,rocketmq-mqtt 1.0.1 版本 1,项目demo改RocketMQProducer 改成5个子线程同步发送5条mq消息 2,MqttConsumer可以观察到没有完整收到5条消息,消息丢失了 3,如果MqttConsumer完整收到5条消息,重复1,2步骤,继续观察,比较容易复现

我用的develop分支的最新代码,没有复现哈

@lzkisok
Copy link
Author

lzkisok commented Oct 26, 2023

企业微信截图_5905f1b2-4e79-4314-bb91-7e8b9a715acd
image

企业微信截图_a9c191a5-3817-462e-b794-ba66ce19a942
企业微信截图_0d522e3d-7945-4376-9498-d44b07b86651

如图,我也用了develop分支的最新代码部署cs,rmq5.1.4版本,比较容易复现,多试几次,很容易复现

@lzkisok
Copy link
Author

lzkisok commented Oct 26, 2023

Thread t1 = new Thread(new Runnable(){
@OverRide
public void run() {
try {
sendMessage(1);
} catch (MQBrokerException e) {
throw new RuntimeException(e);
} catch (RemotingException e) {
throw new RuntimeException(e);
} catch (InterruptedException e) {
throw new RuntimeException(e);
} catch (MQClientException e) {
throw new RuntimeException(e);
}
}
});

    Thread t2 = new Thread(new Runnable(){
        @Override
        public void run() {
            try {
                sendMessage(2);
            } catch (MQBrokerException e) {
                throw new RuntimeException(e);
            } catch (RemotingException e) {
                throw new RuntimeException(e);
            } catch (InterruptedException e) {
                throw new RuntimeException(e);
            } catch (MQClientException e) {
                throw new RuntimeException(e);
            }
        }
    });

    Thread t3 = new Thread(new Runnable(){
        @Override
        public void run() {
            try {
                sendMessage(3);
            } catch (MQBrokerException e) {
                throw new RuntimeException(e);
            } catch (RemotingException e) {
                throw new RuntimeException(e);
            } catch (InterruptedException e) {
                throw new RuntimeException(e);
            } catch (MQClientException e) {
                throw new RuntimeException(e);
            }
        }
    });

    Thread t4 = new Thread(new Runnable(){
        @Override
        public void run() {
            try {
                sendMessage(4);
            } catch (MQBrokerException e) {
                throw new RuntimeException(e);
            } catch (RemotingException e) {
                throw new RuntimeException(e);
            } catch (InterruptedException e) {
                throw new RuntimeException(e);
            } catch (MQClientException e) {
                throw new RuntimeException(e);
            }
        }
    });

    Thread t5 = new Thread(new Runnable(){
        @Override
        public void run() {
            try {
                sendMessage(5);
            } catch (MQBrokerException e) {
                throw new RuntimeException(e);
            } catch (RemotingException e) {
                throw new RuntimeException(e);
            } catch (InterruptedException e) {
                throw new RuntimeException(e);
            } catch (MQClientException e) {
                throw new RuntimeException(e);
            }
        }
    });
    t1.start();
    t2.start();
    t3.start();
    t4.start();
    t5.start();
    
    这是测试用的RocketMQProducer中main方法的发送mq的代码

@lzkisok
Copy link
Author

lzkisok commented Oct 26, 2023

企业微信截图_9d8aaffd-c167-4780-a4bf-cb9358fbacbb
这个是cs 一次拉了两条消息,两条消息的INNER_MULTI_QUEUE_OFFSET 相同,导致addSendingMessages只能成功一次

@lzkisok
Copy link
Author

lzkisok commented Oct 26, 2023

企业微信截图_3ae8bd29-1cbb-4f6b-809f-f72b9b4c9ea2
如上图,pull到两条消息,offset相同
企业微信截图_74373a44-2ae8-42ee-afe5-582587d30f1e
如上图,addSendingMessages 丢失一条消息

@lzkisok
Copy link
Author

lzkisok commented Oct 27, 2023

8a02f52e-cad0-4185-8727-134b47f241f0
如上图,rmq5.1.4 当消息并发时,这里取了相同的offset
img_v2_08eb737f-4333-453c-858e-8cc5bf71d7ag
如上图,测试rmq5.1.4 的测试结果可以看到确实是相同

@lzkisok
Copy link
Author

lzkisok commented Oct 27, 2023

企业微信截图_5869c60d-1567-4de2-bb0b-d38c9f19afa1
这是rockemq的代码,这里这个topicQueueKey 是topic+queneid,当一个消息被发送到两个quene的时候,defaultMessageStore.assignOffset(msg);这行代码没有被锁住

@lzkisok
Copy link
Author

lzkisok commented Oct 27, 2023

能私信下吗?给个私信方式,咋们交流交流

@DongyuanPan
Copy link
Contributor

rmq5.1.4版本,比较容易复现,多试几次,很容易复现

是的,这个版本的RMQ复现了,应该是一个BUG,这个到RMQ的ISSUE讨论吧

apache/rocketmq#7511

@lzkisok
Copy link
Author

lzkisok commented Oct 30, 2023

嗯嗯,好的,7511也是我提的,我们看代码找到了一个暂时的解决方式,将同一个LMQ的消息,始终发送到同一个MessageQuene中,让topic+queneid的锁能生效
image

@DongyuanPan
Copy link
Contributor

DongyuanPan commented Nov 1, 2023

嗯嗯,好的,7511也是我提的,我们看代码找到了一个暂时的解决方式,将同一个LMQ的消息,始终发送到同一个MessageQuene中,让topic+queneid的锁能生效 image

嗯嗯,这样可以简单暂时避免这个问题。
目前我提交了一个修改的版本,解决这个并发问题,可以CR一下 apache/rocketmq#7525

@DongyuanPan
Copy link
Contributor

能私信下吗?给个私信方式,咋们交流交流

#64 这里面有SIG的群,可以扫码进群。
也欢迎私戳我主页邮箱

@RongtongJin RongtongJin pinned this issue Nov 1, 2023
@RongtongJin RongtongJin unpinned this issue Nov 2, 2023
pingww added a commit to pingww/rocketmq-mqtt that referenced this issue Nov 13, 2023
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants