id | title |
---|---|
rocketmq-quick-start |
RockerMQ 快速开始 |
:::tip 本文将介绍Spring Boot整合RocketMQ,完成消息生产与消费。 :::
示例代码托管在Gitee:spring-boot-rocketmq-demo 项目地址
-
创建消息生产者项目
mq-producer
,并添加依赖。<!-- rocketmq-spring-boot-starter --> <dependency> <groupId>org.apache.rocketmq</groupId> <artifactId>rocketmq-spring-boot-starter</artifactId> <version>2.2.0</version> </dependency>
-
写配置,
application.yml
。server: port: 8001 spring: application: name: mq-producer rocketmq: # namesrv地址 name-server: 121.196.189.156:9876 # 生产者 producer: # 组 group: mq-producer-group # 发送超时 send-message-timeout: 30000 producer: demo: topic: rmq-test-topic
-
写代码。
RocketMQTemplate
提供了操作RocketMQ的大多数方法。/** * 单独封装一层发送mq的工具类,如实现发送同步消息、异步消息、延迟消息等 */ @Slf4j @Component public class RocketMqProducer { @Autowired private RocketMQTemplate rocketMQTemplate; public boolean sendMessage(String topic, String msg) { log.info("mq发送开始,msg:{}", msg); SendResult sendResult = rocketMQTemplate.syncSend(topic, msg); log.info("mq发送结束:{}", sendResult); return "SEND_OK".equals(sendResult.getSendStatus().name()) ? true : false; } }
调用工具类发送消息,发送消息必须指定
topic
(消息主题)。/** * 业务上要发送的mq都定义到这里 */ @Slf4j @Component public class MqProducer { @Autowired private RocketMqProducer rocketMqProducer; @Value("${producer.demo.topic}") private String testTopic; public boolean test(String msg) { return rocketMqProducer.sendMessage(testTopic, msg); } }
创建TestController,用于后面测试将请求参数中的字符串作为消息发送给消费者。
@RestController @RequestMapping("test") public class TestController { @Autowired private MqProducer mqProducer; @GetMapping("msg") public String test(String msg) { return mqProducer.test(msg) ? "Send Success" : "Send Fail"; } }
-
创建消息消费者项目
mq-consumer
,并添加如上依赖。 -
写配置,
application.yml
。server: port: 8002 spring: application: name: mq-consumer rocketmq: name-server: 121.196.189.156:9876 consumer: group: mq-consumer-group consumer: demo: topic: rmq-test-topic group: rmq-test-group
-
写代码。
创建
RocketMQListener
接口的实现类,并添加@RocketMQMessageListener
注解,用于监听某一topic
消息。@Slf4j @Component @RocketMQMessageListener(consumerGroup = "${consumer.demo.group}", topic = "${consumer.demo.topic}", consumeMode = ConsumeMode.CONCURRENTLY, messageModel = MessageModel.CLUSTERING, consumeTimeout = 60000L) public class TestMessage implements RocketMQListener<String> { @Override public void onMessage(String message) { log.info("消息消费成功,msg:{}", message); } }
-
测试。
使用Postman调用
mq-producer
中定义的测试接口,观察消费者是否能监听到生产者发送的消息。消费者收到消息并打印日志。