Skip to content

Latest commit

 

History

History
85 lines (79 loc) · 6.23 KB

RocketMQ DefaultMQPullConsumer.md

File metadata and controls

85 lines (79 loc) · 6.23 KB

RocketMQ DefaultMQPullConsumer

《RocketMQ实战与原理解析》

DefaultMQPullConsumer的使用

public class DefaultMQPullConsumerDemo {

    private static final Map<MessageQueue, Long> OFFSE_TABLE = new HashMap<MessageQueue, Long>();

    public static void main(String[] args) throws MQClientException {
        DefaultMQPullConsumer consumer = new DefaultMQPullConsumer("please_rename_unique_group_name");
        consumer.setNamesrvAddr("127.0.0.1:9876");
        /* Specify where to start in case the specified Consumer group is a brand new one. */
        consumer.setMessageModel(MessageModel.BROADCASTING);
        consumer.start();
        Set<MessageQueue> mqs = consumer.fetchSubscribeMessageQueues("TopicTest");
        for (MessageQueue mq : mqs) {
            long Offset = consumer.fetchConsumeOffset(mq, true);
            System.out.printf(" Consume from the Queue: " + mq + "%n");
            while (true) {
                try {
                    long startTime = System.currentTimeMillis();
                    PullResult pullResult = consumer.pullBlockIfNotFound(mq, null, getMessageQueueOffset(mq), 32);
                    long endTime = System.currentTimeMillis();
                    System.out.printf("pullResult = %s costTime = %s秒 %n", pullResult, (endTime - startTime) / 1000);
                    putMessageQueueOffset(mq, pullResult.getNextBeginOffset());
                    switch (pullResult.getPullStatus()) {
                        case FOUND:
                            System.out.printf("FOUND %s %n", pullResult);
                            List<String> messageContents = pullResult.getMsgFoundList().stream().map(item -> new String(item.getBody())).collect(Collectors.toList());
                            System.out.printf("messageContents = %s %n", messageContents);
                            break;
                        case NO_MATCHED_MSG:
                            System.out.printf("NO_MATCHED_MSG %s %n", pullResult);
                            break;
                        case NO_NEW_MSG:
                            System.out.printf("NO_NEW_MSG %s %n", pullResult);
                            break;
                        case OFFSET_ILLEGAL:
                            System.out.printf("OFFSET_ILLEGAL %s %n", pullResult);
                            break;
                        default:
                            break;
                    }
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        }
        consumer.shutdown();
    }

    private static long getMessageQueueOffset(MessageQueue mq) {
        Long Offset = OFFSE_TABLE.get(mq);
        if (Offset != null) {
            return Offset;
        }
        return 0;
    }

    private static void putMessageQueueOffset(MessageQueue mq, long Offset) {
        OFFSE_TABLE.put(mq, Offset);
    }
}

示例代码的处理逻辑是逐个读取某 Topic 下所有 Message Queue 的内容,读完一遍后退出,主要处理额外的三件事情: (1)获取 Message Queue 并遍历一个 Topic 包括多个 Message Queue, 如果这个 Consumer 需要获取 Topic 下所有的消息, 就要遍历多有的 Message Queue。 如果有特殊情况, 也可以选择某些特定的 Message Queue 来读取消息。 (2) 维护 Offsetstore 从一个 Message Queue 里拉取消息的时候,要传入 Offset 参数( long 类型的值),随着不断读取消息, Offset 会不断增长。这个时候由用户负责把 Offset 存储下来,根据具体情况可以存到内存里、写到磁盘或者数据库里等。 (3) 根据不同的消息状态做不同的处理 拉取消息的请求发出后,会返回:FOUND、NO_MATCHED_MSG、NO_NEW_MSG、OFFSET_ILLEGAL 四种状态,需要根据每个状态做不同的处理。 比较重要的两个状态是 FOUNT 和 NO_NEW_MSG,分别表示获取到消息和没有新的消息。实际情况中可以把 while( true) 放到外层,达到无限循环的目的。 因为 PullConsumer 需要用户自己处理遍历Message Queue、保存 Offset,所以 PullConsumer 有更多的自主性和灵活性。

Consumer的启动与关闭

消息队列一般是提供一个不间断的持续性服务,Consumer 在使用过程中,如何才能优雅地启动和关闭,确保不漏掉或者重复消费消息呢?

Consumer 分为 Push 和 Pull 两种方式. (1) 对于 PullConsumer 来说,使用者主动权很高,可以根据实际需要暂停、停止、启动消费过程。需要注意的是 Offset 的保存,要在程序的异常处理部分增加把 Offset 写入磁盘方面的处理,记准了每个 Message Queue 的 Offset,才能保证消息消息消费的准确性。 (2) DefaultMQPushConsumer 的退出,要调用 shutdown() 函数,以便释放资源、保存 Offset 等。这个调用要加到 Consumer 所在应用的退出逻辑中。PushConsumer 在启动的时候,会做各种配置检查,然后连接 NameServer 获取 Topic 信息,启动时如果遇到异常,比如无法连接 NameServer,程序仍然可以正常启动不报错(日志里有 WARN 信息)。在单机环境下可以测试这种情况,启动 DefaultMQPushConsumer 时故意 把 NameServer 地址填错,程序仍然可以正常启动,但是不会收到消息。

为什么 DefaultMQPushConsumer 在无法连接 NameServer 时不直接报错退出呢?

这和分布式系统的设计有关,RocketMQ 集群可以有多个 NameServer、Broker,某个机器出异常后整体服务依然可用。所以 DefaultMQPushConsumer 被设计成当发现某个连接异常时不立刻退出,而是不断尝试重新连接。可以进行样一个测试, 在 DefaultMQPushConsumer 正常运行的时候,手动 kill 掉 Broker 或 NameServer,过一会儿再启动。会发现 DefaultMQPushConsumer 不会出错退出,在服务恢复后正常运行,在服务不可用的这段时间,仅仅会在日志里报异常 信息。

如果需要在 DefaultMQPushConsumer 启动的时候,及时暴露配置问题,该如何操作呢?

可以在 Consumer.start() 语句后调用:Consumer.fetchSubscribeMessageQueues("TopicName"),这时如果配置信息写得不准确,或者当前服务不可用,这个语句会报MQClientException 异常。