Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature -- Message Queue #686

Merged
merged 17 commits into from
Jul 27, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
221 changes: 221 additions & 0 deletions app/Middleware/Queue.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,221 @@
<?php
/**
* =========================================================================
* Program: CDash - Cross-Platform Dashboard System
* Module: $Id$
* Language: PHP
* Date: $Date$
* Version: $Revision$
* Copyright (c) Kitware, Inc. All rights reserved.
* See LICENSE or http://www.cdash.org/licensing/ for details.
* This software is distributed WITHOUT ANY WARRANTY; without even
* the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR
* PURPOSE. See the above copyright notices for more information.
* =========================================================================
*/

namespace CDash\Middleware;

use Bernard\Driver;
use Bernard\Message;
use Bernard\Middleware;
use Bernard\Middleware\MiddlewareBuilder;
use Bernard\Producer;
use Bernard\QueueFactory;
use Bernard\QueueFactory\PersistentFactory;
use Bernard\Router;
use Bernard\Router\SimpleRouter;
use Bernard\Serializer;
use Bernard\Serializer\SimpleSerializer;
use CDash\Middleware\Queue\Consumer;
use CDash\Middleware\Queue\DriverFactory;

/**
* Class Queue
* @package CDash\Middleware
* @see https://github.com/bernardphp/bernard/blob/c452caa6de208b0274449cde1c48eb32fb9f59f9/example/bootstrap.php
*/
class Queue
{
/** @var Driver $driver */
protected $driver;

/** @var Serializer $serializer */
protected $serializer;

/** @var MiddlewareBuilder $middlewareBuilder */
protected $middlewareBuilder;

/** @var Middleware\ErrorLogFactory $errorLogFactory */
protected $errorLogFactory;

/** @var Middleware\FailuresFactory $failuresFactory */
protected $failuresFactory;

/** @var QueueFactory $queueFactory*/
protected $queueFactory;

/** @var Producer $producer */
protected $producer;

/** @var Router $router*/
protected $router;

/** @var Consumer $consumer */
protected $consumer;

/** @var */
protected $services;

/**
* Queue constructor.
* @param Driver $driver
*/
public function __construct(Driver $driver = null, array $services = [])
{
$this->driver = $driver;
$this->services = $services;
}

/**
* @return Driver|Driver\AppEngineDriver|Driver\DoctrineDriver|Driver\FlatFileDriver|Driver\PhpRedisDriver|Driver\PredisDriver|Driver\SqsDriver|null
* @throws \Doctrine\DBAL\DBALException
*/
protected function getDriver()
{
if (!$this->driver) {
$this->driver = DriverFactory::create();
}
return $this->driver;
}

/**
* @return Serializer|SimpleSerializer
*/
protected function getSerializer()
{
if (!$this->serializer) {
$this->serializer = new SimpleSerializer();
}
return $this->serializer;
}

/**
* @return MiddlewareBuilder
*/
protected function getMiddlewareBuilder()
{
if (!$this->middlewareBuilder) {
$this->middlewareBuilder = new MiddlewareBuilder();
}
return $this->middlewareBuilder;
}

/**
* @return Middleware\ErrorLogFactory
*/
protected function getErrorLogFactory()
{
if (!$this->errorLogFactory) {
$this->errorLogFactory = new Middleware\ErrorLogFactory();
}
return $this->errorLogFactory;
}

/**
* @return Middleware\FailuresFactory
*/
protected function getFailuresFactory()
{
if (!$this->failuresFactory) {
$this->failuresFactory = new Middleware\FailuresFactory($this->getQueueFactory());
}
return $this->failuresFactory;
}

/**
* @return MiddlewareBuilder
*/
protected function getConsumerMiddleware()
{
$chain = $this->getMiddlewareBuilder();
$chain->push($this->getErrorLogFactory());
$chain->push($this->getFailuresFactory());

return $chain;
}

/**
* @return QueueFactory|PersistentFactory
*/
protected function getQueueFactory()
{
if (!$this->queueFactory) {
$this->queueFactory = new PersistentFactory($this->getDriver(), $this->getSerializer());
}
return $this->queueFactory;
}

/**
* @return Producer
*/
protected function getProducer()
{
if (!$this->producer) {
$this->producer = new Producer($this->getQueueFactory(), $this->getMiddlewareBuilder());
}
return $this->producer;
}

/**
* @return Router|SimpleRouter
*/
protected function getRouter()
{
if (!$this->router) {
$this->router = new SimpleRouter($this->services);
}
return $this->router;
}

/**
* @return Consumer
*/
protected function getConsumer()
{
if (!$this->consumer) {
$this->consumer = new Consumer($this->getRouter(), $this->getConsumerMiddleware());
}
return $this->consumer;
}

/**
* @param Message $message
*/
public function produce(Message $message)
{
$queue = $this->getProducer();
$queue->produce($message);
}

/**
* @param $name
* @param array $options
*/
public function consume($name, array $options = [])
{
$queues = $this->getQueueFactory();
$consumer = $this->getConsumer();

$consumer->consume($queues->create($name), $options);
}

/**
* @param $name
* @param $service
*/
public function addService($name, $service)
{
$this->services[$name] = $service;
}
}
29 changes: 29 additions & 0 deletions app/Middleware/Queue/Consumer.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
<?php
/**
* =========================================================================
* Program: CDash - Cross-Platform Dashboard System
* Module: $Id$
* Language: PHP
* Date: $Date$
* Version: $Revision$
* Copyright (c) Kitware, Inc. All rights reserved.
* See LICENSE or http://www.cdash.org/licensing/ for details.
* This software is distributed WITHOUT ANY WARRANTY; without even
* the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR
* PURPOSE. See the above copyright notices for more information.
* =========================================================================
*/

namespace CDash\Middleware\Queue;

class Consumer extends \Bernard\Consumer
{
public function bind()
{
if (function_exists('pcntl_signal')) {
pcntl_signal(SIGTERM, array($this, 'shutdown'));
pcntl_signal(SIGQUIT, array($this, 'shutdown'));
pcntl_signal(SIGINT, array($this, 'shutdown'));
}
}
}
126 changes: 126 additions & 0 deletions app/Middleware/Queue/DriverFactory.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,126 @@
<?php
/**
* =========================================================================
* Program: CDash - Cross-Platform Dashboard System
* Module: $Id$
* Language: PHP
* Date: $Date$
* Version: $Revision$
* Copyright (c) Kitware, Inc. All rights reserved.
* See LICENSE or http://www.cdash.org/licensing/ for details.
* This software is distributed WITHOUT ANY WARRANTY; without even
* the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR
* PURPOSE. See the above copyright notices for more information.
* =========================================================================
*/

namespace CDash\Middleware\Queue;

use Aws\Sqs\SqsClient;
use Bernard\Driver\AppEngineDriver;
use Bernard\Driver\DoctrineDriver;
use Bernard\Driver\FlatFileDriver;
use Bernard\Driver\PhpRedisDriver;
use Bernard\Driver\PredisDriver;
use Bernard\Driver\SqsDriver;
use CDash\Config;
use CDash\Log;
use Doctrine\DBAL\DriverManager;
use Predis\Client as PredisClient;
use Redis;

class DriverFactory
{
const APP_ENGINE = 'AppEngine';
const DOCTRINE = 'Doctrine';
const FLAT_FILE = 'FlatFile';
const IRON_MQ = 'IronMQ';
const PHP_REDIS = 'PhpRedis';
const PREDIS = 'Predis';
const SQS = 'SQS';

// Begin NOT Available in Bernard ~0.12
const MEMORY = 'Memory';
const INTEROP = 'Interop';
const MONGO = 'MongoDB';
const PHEANSTALK = 'Pheanstalk';
// End NOT Available in Bernard ~0.12

/**
* @param array $configuration
* @return AppEngineDriver|DoctrineDriver|FlatFileDriver|PhpRedisDriver|PredisDriver|SqsDriver|null
* @throws \Doctrine\DBAL\DBALException
*/
public static function create(array $configuration = [])
{
if (empty($configuration)) {
$configuration = static::getConfiguration();
}

if (isset($configuration[0]) && isset($configuration[1])) {
list($key, $properties) = $configuration;
} else {
throw new \Exception("Driver not configured.");
}

$driver = null;
switch ($key) {
case self::APP_ENGINE:
// Limited functionality
$driver = new AppEngineDriver($properties['queueMap']);
break;
case self::DOCTRINE:
$connection = DriverManager::getConnection($properties);
$driver = new DoctrineDriver($connection);
break;
case self::FLAT_FILE:
$driver = new FlatFileDriver($properties['baseDirectory']);
break;
case self::PHP_REDIS:
$redis = new Redis();
$redis->connect($properties['host']);
try {
$redis->setOption(Redis::OPT_PREFIX, $properties['prefix']);
} catch (\Exception $e) {
Log::getInstance()->error($e);
}
$driver = new PhpRedisDriver($redis);
break;
case self::PREDIS:
$client = new PredisClient(null, $properties);
$driver = new PredisDriver($client);
break;
case self::SQS:
$sqs = new SqsClient($properties);
$driver = new SqsDriver($sqs);
break;
case self::IRON_MQ:
default:
throw new \Exception("{$key} Not Implemented.");
}
return $driver;
}

public static function getConfiguration()
{
$config = Config::getInstance();
$queue_config = $config->load('queue');
$key = null;
$filter = function ($v, $k) use (&$key) {
if ($v['enabled']) {
$key = $k;
return true;
}
};

$driver_config = array_filter(
$queue_config['drivers'],
$filter,
ARRAY_FILTER_USE_BOTH
);

unset($driver_config['enabled']);

return [$key, array_pop($driver_config)];
}
}
Loading