该开发包主要解决rabbitmq相关代码的封装操作,实现多mq服务器的连接配置,易于扩展。 解决消息生产、消费等场景存在的消息丢失、消息堆积等异常问题,异常时记录明确的异常原因以方便深层次排查定位相关问题。
composer require burning/php-rabbitmq
新增配置文件:new_rabbitmq.php,配置如下:
<?php
/**
* 新 rabbitmq配置文件
*/
$config["new_rabbitmq"]['connections'] = [
// 默认 tms 配置
'default' => [
'driver' => 'amqp',
'host' => '192.168.71.210',
'port' => '5672',
'vhost' => '/tms', // 按业务系统进行区分,给每个业务系统一个独立的虚拟机
'login' => 'huangshi',
'password' => 'HUanGshi',
#=====================================================================
# 考虑场景通用化,默认启用Topic模式,可兼容Direct模式
#=====================================================================
'default_exchange' => 'tms.exchange', // 业务交换机名称
'default_queue' => 'tms.ordersys.queue', // 业务队列名称,各系统请采用自身的队列
#=====================================================================
# exchange - queues maps
# 支持exchange对应多个queue
# exchange与queue一对一,则route中的queue可以不填,自动填充default_queue
# eg: exchange => queue OR exchange => [queue1, queue2, queue3]
#=====================================================================
'route' => [
'tms.exchange' => [
'tms.ordersys.queue',
'tms.tracksys.queue',
],
],
#=====================================================================
# exchange - queue binding
# queue => binding_key
# 各个队列绑定值 设置为队列名(下划线连接)
# Topic Exchange – 将路由键和某模式进行匹配。此时队列需要绑定一个模式上。符号“#”匹配一个或多个词,符号“*”匹配不多不少一个词。
# 因此“audit.#”能够匹配到“audit.irs.corporate”,但是“audit.*” 只会匹配到“audit.irs”
#=====================================================================
'binding' => [
'tms.ordersys.queue' => 'tms_ordersys.#',
'tms.tracksys.queue' => 'tms_tracksys.#',
],
],
// 发布/订阅 广播模式
'default_fanout' => [
'driver' => 'amqp',
'host' => '192.168.71.210',
'port' => '5672',
'vhost' => '/tms', // 按业务系统进行区分,给每个业务系统一个独立的虚拟机
'login' => 'huangshi',
'password' => 'HUanGshi',
#=====================================================================
# Fanout Exchange – 不处理路由键。你只需要简单的将队列绑定到该交换机上。发送到交换机的消息都会被转发到与该交换机绑定的所有队列上。
# 很像子网广播,每台子网内的主机都获得了一份复制的消息。Fanout交换机转发消息是最快的
#=====================================================================
'exchange_params' => [
'type' => 'fanout',
'passive' => false,
'durable' => true,
'auto_delete' => false,
],
'default_exchange' => 'tms.fanout.exchange', // 业务交换机名称
'default_queue' => 'tms.fanout.queue', // 业务队列名称,各系统请采用自身的队列
#=====================================================================
# exchange - queues maps
# 支持exchange对应多个queue
# 发布订阅模式,只要有人订阅当前交换机,并把队列绑定到该交换机,则消息会自动转发到当前交换机绑定的所有队列上,供各订阅者使用
# eg: exchange => queue OR exchange => [queue1, queue2, queue3]
#=====================================================================
'route' => [
'tms.fanout.exchange' => [
'tms.fanout.queue',
'tms.fanout.queue2',
],
],
],
];
try {
// 加载mq配置
$config = $this->load->config("new_rabbitmq");
// mq配置
$queues_config = $config['connections'];
if (empty($queues_config)){
throw new Exception("new_rabbitmq 配置为空,请检查rabbitmq配置");
}
// 原mq消息
$mqData = [
"sku" => "20221229",
"country_code" => "CN",
"create_time" => date("Y-m-d H:i:s")
];
// 实例化mq对象
$mqClient = new \Burning\PhpRabbitmq\MQServiceProvider($queues_config);
$mq = $mqClient->getMqClient();
// 创建connection
$mq->connection("default");
$mq->setModel(\Burning\PhpRabbitmq\Objects\PublishModel::CONFIRM);
// 绑定消息到 tms_ordersys.# 匹配模式对应的 tms.ordersys.queue 队列上
$bindingKey = "tms_ordersys.skuSync";
// 生产消息
$correlationId = $mq->push($mqData, $bindingKey);
if (!$correlationId) {
// 发送消息失败,获取失败原因 $mq->getHandlerCallbackMessage()
throw new Exception("MQ生产消息失败:".$mq->getHandlerCallbackMessage());
}
// 有值则推送消息成功,为空则失败
var_dump($correlationId);
}catch (Exception $e){
echo "推送消息异常:".$e->getMessage();
exit();
}
try {
// 加载mq配置
$config = $this->load->config("new_rabbitmq");
// mq配置
$queues_config = $config['connections'];
if (empty($queues_config)){
throw new Exception("new_rabbitmq 配置为空,请检查rabbitmq配置");
}
// 实例化mq对象
$mqClient = new \Burning\PhpRabbitmq\MQServiceProvider($queues_config);
$mq = $mqClient->getMqClient();
// 创建connection
$channel = $mq->connection("default");
// 推模式持续订阅消费消息,$callback回调方法需返回true/false,队列才能明确是否删除消息
$queue = 'tms.ordersys.queue';
$channel->consume($queue, function(\PhpAmqpLib\Message\AMQPMessage $message){
// 消息消费失败的逻辑应该在业务层去做记录、预警等提醒开发跟进,不建议重新入队。避免消息产生堆积
echo $message->getBody() . myEOL();
// 消费消息,处理业务逻辑
return true; // 回调为true,队列删除消息
});
$channel->start();
}catch (Exception $e){
$msg = '消费消息异常exception queue[' . $queue . '] message : [ ' . $e->getMessage() . ']';
if ($e instanceof \PhpAmqpLib\Exception\AMQPRuntimeException) {
// 部分错误发生后直接重启
// Broken pipe or closed connection
// missed server heartbeat
}
echo $msg;
// 特殊异常时,退出进程,以便消费者守护进程自动重启;Unacked 的消息,会重新回到队列的头部,变为 Ready。
exit(1);
}
订阅者只要将自己的队列名绑定在该default_fanout配置对应的交换机 tms.fanout.exchange 下,即可自动接收到生产者发布的消息
$config = $this->load->config("new_rabbitmq");
// mq配置
$queues_config = $config['connections'];
if (empty($queues_config)){
throw new Exception("new_rabbitmq 配置为空,请检查rabbitmq配置");
}
$mqClient = new \Burning\PhpRabbitmq\MQServiceProvider($queues_config);
$mq = $mqClient->getMqClient();
$mq->connection("default_fanout");
$data = [
'test' => 'test value',
'msg' => '测试发布订阅消息',
'date_time'=>date('Y-m-d H:i:s')
];
$correlation_id = $mq->push($data);
var_dump($correlation_id);
处理逻辑同上面“消费消息”