Skip to content

Latest commit

 

History

History

springboot-activemq-demo

Folders and files

NameName
Last commit message
Last commit date

parent directory

..
 
 
 
 
 
 
 
 

springboot整合ActiveMQ实现异步交易

前言

前段时间,我们分享了ActiveMQ的一些基本知识,介绍了ActiveMQ的简单部署和基本用法,演示了java环境使用ActiveMQ收发消息的简单操作,但当时只讲了ActiveMQdemo也不是web项目,距离我们实际应用确实也比较远,为了让各位小伙伴更够更直观地了解ActiveMQ的实际应用场景,体会到异步交易的魅力,今天我们通过一个小小的demo,来看下springbootActiveMQ的整合应用。

今天的核心知识点就两个:

  • Springboot异步交易
  • springboot整合ActiveMQ

好了,话不多说,我们直接开始。

正文

我们的内容,是以文件异步导出业务为例写的一些业务代码。我先简单说下业务处理过程,第一步,用户发起文件导出请求,后端接收到前端请求后,验证请求参数,并发起异步文件导出交易,交易发起成功后返回结果。

第二步,导出成功后,用户可以在文件下载中心进行下载。

为了演示方便,我把所有数据都存放在reids里面了,一般实际项目中会把文件信息存放在数据库中,处理成功后才会放进缓存。项目的完整源码附在文末,有兴趣的小伙伴自己去看。

启用JMS

创建项目,我们这里就不介绍,到今天还不会搭建springboot开发环境,确实该面壁思过了。项目创建完成后,在springboot入口加上如下配置启用jms(java message servic):

@EnableJms

引入依赖

除了spring-boot-starter-web,这里我们还需要引入如下依赖:

<dependency>
     <groupId>org.springframework.boot</groupId>
     <artifactId>spring-boot-starter-activemq</artifactId>
</dependency>

<dependency>
    <groupId>org.apache.activemq</groupId>
    <artifactId>activemq-pool</artifactId>
    <version>5.15.10</version>
</dependency>
<!-- 在2.X版本,spring.activemq.pool.enabled=true时,需依赖该jar -->
<dependency>
    <groupId>org.messaginghub</groupId>
    <artifactId>pooled-jms</artifactId>
    <version>1.0.3</version>
</dependency>

这里简单说明下,第一个依赖是activemqstarter,是activemq组件的核心依赖,所有的组件都是基于他展开的;

第二个依赖是activemq的连接池,类似于数据库连接池;

第三个依赖是activemq自动配置类依赖的包。

后面两个依赖是可选的,如果你启用了activeMQ连接池(spring.activemq.pool.enabled=true时),那你就必须依赖,没有依赖的话,sprinbgoot启动会报错:

主要原因是activemq的自动配置时依赖了这个包,没有这个包Jms的连接工厂是无法被初始化的:

有兴趣的小伙伴可以自己把这个依赖先拿掉试下。

添加配置

完成上面的工作,我们要启动本地的ActvieMQ服务,然后添加ActvieMQ配置信息:

spring.activemq.broker-url=tcp://127.0.0.1:61616
spring.activemq.pool.enabled=true
spring.activemq.pool.max-connections=100

如果不需要连接池,后面两个配置可以直接拿掉。

消息发送接口

发送接口 就是消息的生产者,springboot提供了消息的模板类(JmsMessagingTemplate),我们可以通过Autowired注入使用:

@Service
public class JmsSendService {
    @Autowired
    private JmsMessagingTemplate jmsTemplate;

    public void sendMessage(String queueName, String message) {
        jmsTemplate.convertAndSend(queueName, message);
    }
}

ActiveMQ支持有返回值和无返回值两种会话形式,你可以根据自己的需要选择,JmsMessagingTemplate都是支持的,提供的模板方法也比较丰富:

这里我们只用到了convertAndSend,字面意思就是方法的意思,object是消息内容,destination是消息队列名称,看下源码你就知道,方法内部会把我们的消息内容转换成Message对象,当然如果你有特殊需求,你也可以自己组装Message,只是过程比较繁琐,简单业务的话,用我这种方式就比较简便了。

如果你需要接收返回值,那你可以调用sendAndReceive(Message<T> var)接口来实现,但是需要你自己定义自己的Message<T>,需要实现Message<T>接口。

class StringMessage implements Message<String> {

    private String payload;
    private MessageHeaders messageHeaders;

    public StringMessage(String payload) {
        this.payload = payload;
    }

    @Override
    public String getPayload() {
        return this.payload;
    }

    @Override
    public MessageHeaders getHeaders() {
        return this.messageHeaders;
    }
}

调用sendAndReceive

public String sendAndReceive(String queueName, String message) {
        Message<?> messageBack = jmsTemplate.sendAndReceive(queueName, new StringMessage(message));
        return (String)messageBack.getPayload();
    }

JmsMessagingTemplate其实就是springboot抽象出来的一个通用的消息发送模板,它理论上是可以支持所有mq的,只需要官方提供starter即可,对开发者来说,确实比较友好,只需要修改配置,剩下的就不用管了,很方便有木有。

这里是servicce层的实现过程:

/**
     * 文件导出
     * @param name
     * @param userId
     * @return
     */
public JSONObject export(String userId, String name) {

    JSONObject result = new JSONObject();
    result.put("userId", userId);
    result.put("type", 0);
    String uuId = UUIDUtil.getUUId();
    result.put("fileId", uuId);
    result.put("name", name);
    // 异步导出文件
    doExport(result);
    result.put("success", true);
    result.put("code", 0);
    result.put("message", "数据导出提交成功,请稍后到文件中心下载!");
    return result;
}

springboot异步交易

导出文件方法doExport内部,我们使用了多线程异步交易,这样的好处是把业务逻辑都放进异步交易中处理,可以将响应结果更快地呈现给用户,让接口响应更快。这里我们插个楼,讲一些springboot异步线程池的用法。

启用异步交易

springboot启动异步交易很简单,只需要在项目入口加上@EnableAsync即可

添加异步线程池配置

配置线程池大小

@Configuration
public class ExcuterConfig {
    @Bean
    public TaskExecutor taskExecutor() {
        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
        executor.setCorePoolSize(10);
        executor.setMaxPoolSize(150);
        executor.setQueueCapacity(500);
        return executor;
    }
}

使用线程池

这里的taskExecutor就是我们前面配置的方法名。这里的异步线程和mq的异步交易是不一样的。线程池大小是固定的,当所有线程被阻塞,线程池队列也被占满,有新的交易进来时,线程池会因为资源耗尽报错,这时候后续业务是无法正常处理的;但是mq基本上是不存在阻塞资源耗尽的情况的(除非资源耗尽),特别是对于不需要有返回指定的交易,它只是一个消息仓库,只要消息不被消费,消息是可以一直存在的,也不会超时。

@Async("taskExecutor")    void doExport(JSONObject jsonObject) {        try {            String name = jsonObject.getString("name");            if (StringUtils.hasLength(name)) {                String userId = jsonObject.getString("userId");                String uuId = jsonObject.getString("fileId");                // 其他数据校验,这里通过睡眠模拟                Thread.sleep(1000L);                // 组装保存文件信息                jsonObject.put("type", 0);                jsonObject.put("isDownload", false);                jsonObject.put("createTime", System.currentTimeMillis());                // 保存文件数据,实际业务中,这部分应该是存在数据库里的,这里为了演示方便,直接存在数据库里了                redisUtil.setString(String.format("fileExport.%s.%s", userId, uuId), jsonObject.toJSONString());                // 发送文件导出业务消息                jmsSendService.sendMessage("file-export-queue", jsonObject.toJSONString());            }        } catch (Exception e) {            logger.error("数据导出错误", e);        }    }

消息接收消费

这里主要是通过@JmsListener创建了一个消息监听器,监听ActiveMQ指定队列的状态,当有新的消息进来时,该方法会被执行。方法内部是我们要异步业务处理过程。针对不同的业务类别,你可以指定不同的队列名称,但是同一个业务的发送方和消费者必须是相同的队列名称,否则是无法被消费的。

 @JmsListener(destination = "file-export-queue",  containerFactory = "jmsListenerContainerFactory")    public void testMq(String message) {        logger.info("文件导出业务入参:{}", message);        JSONObject messageJsonObject = JSON.parseObject(message);        Integer type = messageJsonObject.getInteger("type");        if (type == 0) {            Object fileId = messageJsonObject.get("fileId");            Object userId = messageJsonObject.get("userId");            String filePath = String.format("./%s.txt", fileId);            messageJsonObject.put("path", filePath);            String fileKey = String.format("fileExport.%s.%s", userId, fileId);            // 查询数据            List<String> dataList = Lists.newArrayList("张三", "历史", "周三");            try(FileOutputStream fileOutputStream = new FileOutputStream(filePath)) {                for (String s : dataList) {                    fileOutputStream.write(s.getBytes(StandardCharsets.UTF_8));                    fileOutputStream.write("\n".getBytes());                }                redisUtil.setString(fileKey, messageJsonObject.toJSONString());            } catch (Exception e) {                logger.error("文件导出失败", e);            }        }

测试

到这里,我们就可以简单测试下了,我写了一个页面,两个接口。

接口

@GetMapping("/file/{user_id}/export")    public JSONObject fileExport(@PathVariable("user_id") String userId,                                 @RequestParam String name) {        return fileService.export(userId, name);    }    @GetMapping("/file/{user_id}/download/{file_id}")    public JSONObject download(@PathVariable("user_id") String userId,                               @PathVariable("file_id") String fileId,                               HttpServletResponse response) {        return fileService.download(userId, fileId, response);    }

页面

这里名称随便输,数据是写死的。导出请求提交成功后,会返回文件id,我把文件id展示在页面上,点击链接就可以下载

总结

今天的内容从整体上来看,还是比较简单的,主要是springboot已经把好多配置工作搞好了,我们只需要简单配置即可。但是过程还是有点艰辛的,官方没有提供相关文档,网上的教程我又不想参考,所以踩了好多坑,花的时间也有点长,但是结局还是比较完美的,所有需求都实现了,而且还让我积累了整合经验。但是在整合过程中,我发现对于ActiveMQ配置这块,我还是比较迷,大部分的配置都不清楚,所以未来这块还需要深入去研究下。

最后,希望有兴趣的小伙伴最好自己动手实践下,毕竟实践出真知,眼睛会了,手不见得会……

今天分享内容的源码:

https://github.com/Syske/learning-dome-code/tree/dev/springboot-activemq-demo

更多java优质内容,请关注我的公众号【云中志】和博客

https://www.cnblogs.com/caoleiCoding/