Skip to content

devweyes/swoft-queue

Folders and files

NameName
Last commit message
Last commit date

Latest commit

 

History

20 Commits
 
 
 
 
 
 
 
 

Repository files navigation

swoft queue

1. 介绍

用户进程/进程池与queue的完美结合,注解切面版queue,支持 redis,rabbitmq(等官方发基础包 开发中)

2. 使用

composer

composer require devweyes/queue

主要配置

<?php
        return [
            Queue::MANAGER => [
                'class' => QueueManager::class,
                'driver' => bean(Queue::DRIVER),
                'serverIdPrefix' => 'swoft_ws_server_cluster_'
            ],
            Queue::DRIVER => [
                'class' => RedisQueue::class,
                'redis' => bean('redis.pool'),
                'serializer' => bean(Queue::SERIALIZER),
                'prefix' => 'swoft_queue_',
                'default' => 'default',
                'waite' => 10,
                'retry' => 3
            ],
            Queue::SERIALIZER => [
                'class' => PhpSerializer::class
            ]
        ];

用户进程更多消费者

新增进程数量配置

<?php

use Jcsp\Queue\Helper\Tool;
...
'wsServer' => [
    'class' => \Swoft\WebSocket\Server\WebSocketServer::class,
    ...
    //可配置多个消息消费,视业务量而定
    'process' => array_merge(
                Tool::moreProcess('recvMessageProcess', bean(\Jcsp\WsCluster\Process\RecvMessageProcess::class), 3),
                [
                  //自定义进程
                ]
            )
]

数据手动生产

<?php
use Jcsp\Queue\Queue;

Queue::bind('queue')->push('this is message');

redis驱动queue消费

  • redis内存数据库,并不可靠但性能及高,数据量小于10K出队入队速度显著,非常适用于实时异步短消息传输。否则,请使用rabbitmq或其他作为驱动
  • 用户进程需继承Jcsp\Queue\Contract\UserProcess
  • 进程池需实现Jcsp\Queue\Contract\ProcessInterface
  • 进程需包含三个方法 run(), receive(), fallback(),分别实现 入口消费错误处理逻辑
  • run方法内无需再实现while(true) 的业务,甚至无需任何代码
  • run()方法内严禁使用 exit() return等,$this->queue用于自定义队列名以覆盖注解
  • receivereturn Result::ACK为正确消费,其他方法或发生异常均视为消费失败
<?php declare(strict_types=1);

namespace App\Process;

use Jcsp\Queue\Annotation\Mapping\Pull;
use Jcsp\Queue\Result;
use Swoft\Bean\Annotation\Mapping\Bean;
use Swoft\Bean\Annotation\Mapping\Inject;
use Swoft\Bean\BeanFactory;
use Swoft\Log\Helper\CLog;
use Swoft\Process\Process;
use Jcsp\Queue\Contract\UserProcess;

/**
 * Class MonitorProcess
 *
 * @since 2.0
 *
 * @Bean()
 */
class RecvMessageProcess extends UserProcess
{
    /**
     * @param Process $process
     * @Pull("queue")
     */
    public function run(Process $process): void
    {
        //add queue
        $this->queue = 'new_queue';
        //waite
    }
    /**
     * customer
     * @param $message
     * @return string
     */
    public function receive($message): string
    {
        return Result::ACK;
    }

    /**
     * when error callback
     * @param $message
     * @return string
     */
    public function fallback(\Throwable $throwable, $message): void
    {
        //
        vdump('error', $throwable->getMessage(), 'message',$message);
    }
}

rabbitmq驱动queue消费

In development

About

queue for swoft, redis,rabbitmq and so on

Resources

License

Stars

Watchers

Forks

Packages

No packages published

Languages