-
Notifications
You must be signed in to change notification settings - Fork 12k
Description
测试环境:
MQ和测试程序都在本地
生产者:
Message msg = new Message(topic, tag, keys, content);
SendResult result = producer.send(msg, new MessageQueueSelector()
{
@OverRide
public MessageQueue select(List mqs, Message msg, Object arg)
{
Integer id = Integer.parseInt(arg.toString());
int index = id % mqs.size();
return mqs.get(index);
}
}, serverId);
消费者:
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(group);
// Specify name server addresses.
consumer.setNamesrvAddr(MQFactory.LOCAL_NAMESRV_ADDR);
// Subscribe one more more topics to consume.
consumer.subscribe(topic, tag);
/**
* 设置Consumer第一次启动是从队列头部开始消费还是队列尾部开始消费
* 如果非第一次启动,那么按照上次消费的位置继续消费
*/
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
consumer.registerMessageListener(listener);
//Launch the consumer instance.
consumer.start();
listener里面的内容:
@OverRide
public ConsumeOrderlyStatus consumeMessage(List msgs, ConsumeOrderlyContext context)
{
MessageExt message = msgs.get(0);
LogUtils.info("收到消息 -> " + message.getMsgId() + ",内容 -> " + new String(message.getBody()) + ",key -> " + message.getKeys());
return ConsumeOrderlyStatus.SUCCESS;
}
问题就是,每次消费者第一次启动后,生产者发来的消息总是要很久才能收到,最长时间超过1分钟,最短也是30秒以上 ,有什么办法可以缩短这个时间吗