Skip to content

接收消息

rongtong edited this page Feb 27, 2023 · 4 revisions

Push模式

修改application.properties

## application.properties
rocketmq.name-server=127.0.0.1:9876

注意:

请将上述示例配置中的127.0.0.1:9876替换成真实RocketMQ的NameServer地址与端口

编写代码

@SpringBootApplication
public class ConsumerApplication{
    
    public static void main(String[] args){
        SpringApplication.run(ConsumerApplication.class, args);
    }
    
    @Slf4j
    @Service
    @RocketMQMessageListener(topic = "test-topic-1", consumerGroup = "my-consumer_test-topic-1")
    public class MyConsumer1 implements RocketMQListener<String>{
        public void onMessage(String message) {
            log.info("received message: {}", message);
        }
    }
    
    @Slf4j
    @Service
    @RocketMQMessageListener(topic = "test-topic-2", consumerGroup = "my-consumer_test-topic-2")
    public class MyConsumer2 implements RocketMQListener<OrderPaidEvent>{
        public void onMessage(OrderPaidEvent orderPaidEvent) {
            log.info("received orderPaidEvent: {}", orderPaidEvent);
        }
    }
}

Pull模式

RocketMQ Spring 2.2.0开始,RocketMQ Srping支持Pull模式消费

修改application.properties

## application.properties
rocketmq.name-server=127.0.0.1:9876
# When set rocketmq.pull-consumer.group and rocketmq.pull-consumer.topic, rocketmqTemplate will start lite pull consumer
# If you do not want to use lite pull consumer, please do not set rocketmq.pull-consumer.group and rocketmq.pull-consumer.topic
rocketmq.pull-consumer.group=my-group1
rocketmq.pull-consumer.topic=test

注意之前lite pull consumer的生效配置为rocketmq.consumer.group和rocketmq.consumer.topic,但由于非常容易与push-consumer混淆,因此在2.2.3版本之后修改为rocketmq.pull-consumer.group和rocketmq.pull-consumer.topic.

编写代码

@SpringBootApplication
public class ConsumerApplication implements CommandLineRunner {

    @Resource
    private RocketMQTemplate rocketMQTemplate;

    @Resource(name = "extRocketMQTemplate")
    private RocketMQTemplate extRocketMQTemplate;

    public static void main(String[] args) {
        SpringApplication.run(ConsumerApplication.class, args);
    }

    @Override
    public void run(String... args) throws Exception {
        //This is an example of pull consumer using rocketMQTemplate.
        List<String> messages = rocketMQTemplate.receive(String.class);
        System.out.printf("receive from rocketMQTemplate, messages=%s %n", messages);

        //This is an example of pull consumer using extRocketMQTemplate.
        messages = extRocketMQTemplate.receive(String.class);
        System.out.printf("receive from extRocketMQTemplate, messages=%s %n", messages);
    }
}