Skip to content

Latest commit

 

History

History
132 lines (105 loc) · 4.26 KB

广播模型.md

File metadata and controls

132 lines (105 loc) · 4.26 KB

广播模型

概念

20201030183457220

可以有多个消费者 每个消费者有自己的queue(队列) 每个队列都要绑定到Exchange(交换机) 生产者发送的消息,只能发送到交换机, 交换机来决定要发给哪个队列,生产者无法决定。 交换机把消息发送给绑定过的所有队列 队列的消费者都能拿到消息。实现一条消息被多个消费者消费

使用场景:

下单操作 ---》 订单系统

​ ---》 库存系统

​ ---》 支付系统

​ ---》 物流系统

案例

生产者 下订单

//获取连接对象
        Connection connection = RabbitMQUtils.getConnection();

        //获取连接中通道
        Channel channel = connection.createChannel();

        //将通道声明为交换机 交换机名称 类型  fanout表示广播
        channel.exchangeDeclare("logs","fanout");

        /**
         * 生产消息
         * 1. 交换机名称
         * 2. 队列名称
         * 3. 传递消息额外设置
         * 4. 具体消息
         */
        for (int i = 1; i <= 10; i++) {
            channel.basicPublish("orders","",null,("下订单 的消息" + i).getBytes());
        }

消费者1 订单系统

//获取连接对象
        Connection connection = RabbitMQUtils.getConnection();

        //获取连接中通道
        Channel channel = connection.createChannel();

        //为通道绑定交换机 交换机名称 类型  fanout表示广播
        channel.exchangeDeclare("orders","fanout");

        //交换机绑定临时队列
        String queueName = channel.queueDeclare().getQueue();
        channel.queueBind(queueName,"orders","");

        /**
         * 消费消息
         * 1. 消费的队列
         * 2. 开始消息的自动确认机制
         * 3. 回调接口 重写该接口的handleDelivery方法 处理消息
         */
        channel.basicConsume(queueName,false,new DefaultConsumer(channel){

            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                try {
                    Thread.sleep(100);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                System.out.println("订单模块消费的消息:"+ new String(body) );
                //手动确认 参数一 消息标志  参数2 是否开启多个消息同时确认
                channel.basicAck(envelope.getDeliveryTag(),false);
            }
        });

消费者2 物流模块

//获取连接对象
        Connection connection = RabbitMQUtils.getConnection();

        //获取连接中通道
        Channel channel = connection.createChannel();

        //为通道绑定交换机 交换机名称 类型  fanout表示广播
        channel.exchangeDeclare("orders","fanout");

        //交换机绑定临时队列
        String queueName = channel.queueDeclare().getQueue();
        channel.queueBind(queueName,"orders","");

        /**
         * 消费消息
         * 1. 消费的队列
         * 2. 开始消息的自动确认机制
         * 3. 回调接口 重写该接口的handleDelivery方法 处理消息
         */
        channel.basicConsume(queueName,false,new DefaultConsumer(channel){

            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                try {
                    Thread.sleep(100);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                System.out.println("物流模块 消费的消息:"+ new String(body) );
                //手动确认 参数一 消息标志  参数2 是否开启多个消息同时确认
                channel.basicAck(envelope.getDeliveryTag(),false);
            }
        });

结果

image-20211122151759968

image-20211122151810392