Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added Annotation AsyncQueueMessage. #402

Merged
merged 9 commits into from Aug 16, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
8 changes: 8 additions & 0 deletions CHANGELOG.md
@@ -1,5 +1,13 @@
# v1.1.0 - TBD

## Added

- [#402](https://github.com/hyperf-cloud/hyperf/pull/402) Added Annotation AsyncQueueMessage.

## Deleted

- [#402](https://github.com/hyperf-cloud/hyperf/pull/402) Deleted deprecated method `AsyncQueue::delay`.

# v1.0.11 - 2019-08-15

## Added
Expand Down
32 changes: 32 additions & 0 deletions src/async-queue/src/Annotation/AsyncQueueMessage.php
@@ -0,0 +1,32 @@
<?php

declare(strict_types=1);
/**
* This file is part of Hyperf.
*
* @link https://www.hyperf.io
* @document https://doc.hyperf.io
* @contact group@hyperf.io
* @license https://github.com/hyperf-cloud/hyperf/blob/master/LICENSE
*/

namespace Hyperf\AsyncQueue\Annotation;

use Hyperf\Di\Annotation\AbstractAnnotation;

/**
* @Annotation
* @Target({"CLASS", "METHOD"})
*/
class AsyncQueueMessage extends AbstractAnnotation
{
/**
* @var string
*/
public $pool = 'default';

/**
* @var int
*/
public $delay = 0;
}
63 changes: 63 additions & 0 deletions src/async-queue/src/AnnotationJob.php
@@ -0,0 +1,63 @@
<?php

declare(strict_types=1);
/**
* This file is part of Hyperf.
*
* @link https://www.hyperf.io
* @document https://doc.hyperf.io
* @contact group@hyperf.io
* @license https://github.com/hyperf-cloud/hyperf/blob/master/LICENSE
*/

namespace Hyperf\AsyncQueue;

use Hyperf\Contract\CompressInterface;
use Hyperf\Contract\UnCompressInterface;
use Hyperf\Utils\ApplicationContext;

class AnnotationJob extends Job
{
/**
* @var string
*/
public $class;

/**
* @var string
*/
public $method;

/**
* @var array
*/
public $params = [];

public function __construct(string $class, string $method, array $params)
{
$this->class = $class;
$this->method = $method;
foreach ($params as $key => $value) {
if ($value instanceof CompressInterface) {
$value = $value->compress();
}
$this->params[$key] = $value;
}
}

public function handle()
{
$container = ApplicationContext::getContainer();

$class = $container->get($this->class);

$params = [];
foreach ($this->params as $key => $value) {
if ($value instanceof UnCompressInterface) {
$value = $value->uncompress();
}
$params[$key] = $value;
}
$class->{$this->method}(...$params);
}
}
70 changes: 70 additions & 0 deletions src/async-queue/src/Aspect/AsyncQueueAspect.php
@@ -0,0 +1,70 @@
<?php

declare(strict_types=1);
/**
* This file is part of Hyperf.
*
* @link https://www.hyperf.io
* @document https://doc.hyperf.io
* @contact group@hyperf.io
* @license https://github.com/hyperf-cloud/hyperf/blob/master/LICENSE
*/

namespace Hyperf\AsyncQueue\Aspect;

use Hyperf\AsyncQueue\Annotation\AsyncQueueMessage;
use Hyperf\AsyncQueue\AnnotationJob;
use Hyperf\AsyncQueue\Driver\DriverFactory;
use Hyperf\AsyncQueue\Environment;
use Hyperf\Di\Annotation\Aspect;
use Hyperf\Di\Aop\AbstractAspect;
use Hyperf\Di\Aop\ProceedingJoinPoint;
use Psr\Container\ContainerInterface;

/**
* @Aspect
*/
class AsyncQueueAspect extends AbstractAspect
{
public $annotations = [
AsyncQueueMessage::class,
];

/**
* @var ContainerInterface
*/
protected $container;

public function __construct(ContainerInterface $container)
{
$this->container = $container;
}

public function process(ProceedingJoinPoint $proceedingJoinPoint)
{
$env = $this->container->get(Environment::class);
if ($env->isAsyncQueue()) {
$proceedingJoinPoint->process();
return;
}

$class = $proceedingJoinPoint->className;
$method = $proceedingJoinPoint->methodName;
$arguments = $proceedingJoinPoint->getArguments();
$pool = 'default';
$delay = 0;

$metadata = $proceedingJoinPoint->getAnnotationMetadata();
/** @var AsyncQueueMessage $annotation */
$annotation = $metadata->method[AsyncQueueMessage::class] ?? $metadata->class[AsyncQueueMessage::class] ?? null;
if ($annotation instanceof AsyncQueueMessage) {
$pool = $annotation->pool;
$delay = $annotation->delay;
}

$factory = $this->container->get(DriverFactory::class);
$driver = $factory->get($pool);

$driver->push(new AnnotationJob($class, $method, $arguments), $delay);
}
}
3 changes: 3 additions & 0 deletions src/async-queue/src/Driver/Driver.php
Expand Up @@ -12,6 +12,7 @@

namespace Hyperf\AsyncQueue\Driver;

use Hyperf\AsyncQueue\Environment;
use Hyperf\AsyncQueue\Event\AfterHandle;
use Hyperf\AsyncQueue\Event\BeforeHandle;
use Hyperf\AsyncQueue\Event\FailedHandle;
Expand Down Expand Up @@ -53,6 +54,8 @@ public function __construct(ContainerInterface $container, $config)

public function consume(): void
{
$this->container->get(Environment::class)->setAsyncQueue(true);

while (true) {
[$data, $message] = $this->pop();

Expand Down
6 changes: 0 additions & 6 deletions src/async-queue/src/Driver/DriverInterface.php
Expand Up @@ -21,12 +21,6 @@ interface DriverInterface
*/
public function push(JobInterface $job, int $delay = 0): bool;

/**
* Push a delay job to queue.
* @deprecated v1.1
*/
public function delay(JobInterface $job, int $delay = 0): bool;

/**
* Delete a delay job to queue.
*/
Expand Down
14 changes: 0 additions & 14 deletions src/async-queue/src/Driver/RedisDriver.php
Expand Up @@ -74,20 +74,6 @@ public function push(JobInterface $job, int $delay = 0): bool
return $this->redis->zAdd($this->channel->getDelayed(), time() + $delay, $data) > 0;
}

/**
* @deprecated v1.1
*/
public function delay(JobInterface $job, int $delay = 0): bool
{
if ($delay === 0) {
return $this->push($job);
}

$message = new Message($job);
$data = $this->packer->pack($message);
return $this->redis->zAdd($this->channel->getDelayed(), time() + $delay, $data) > 0;
}

public function delete(JobInterface $job): bool
{
$message = new Message($job);
Expand Down
32 changes: 32 additions & 0 deletions src/async-queue/src/Environment.php
@@ -0,0 +1,32 @@
<?php

declare(strict_types=1);
/**
* This file is part of Hyperf.
*
* @link https://www.hyperf.io
* @document https://doc.hyperf.io
* @contact group@hyperf.io
* @license https://github.com/hyperf-cloud/hyperf/blob/master/LICENSE
*/

namespace Hyperf\AsyncQueue;

class Environment
{
/**
* @var bool
*/
protected $asyncQueue = false;

public function isAsyncQueue(): bool
{
return $this->asyncQueue;
}

public function setAsyncQueue(bool $asyncQueue): self
{
$this->asyncQueue = $asyncQueue;
return $this;
}
}