Skip to content
This repository was archived by the owner on Jan 3, 2021. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 4 additions & 2 deletions .travis.yml
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
language: php

cache:
apt: true
directories:
- $HOME/.composer/cache

php:
- '7.0'
- '7.1'
- '7.2'

before_script:
- composer install
4 changes: 2 additions & 2 deletions composer.json
Original file line number Diff line number Diff line change
Expand Up @@ -5,14 +5,14 @@
"license": "MIT",

"require": {
"php": "^7.0",
"php": "^7.1",
"psr/log": "^1.0",

"swarrot/swarrot": "^2.0 || ^3.0",

"symfony/config": "^3.0 || ^4.0",
"symfony/console": "^3.0 || ^4.0",
"symfony/process": "^3.0 || ^4.0",
"symfony/process": "^3.4 || ^4.0",
"symfony/http-kernel": "^3.0 || ^4.0",
"symfony/event-dispatcher": "^3.0 || ^4.0",
"symfony/dependency-injection": "^3.3 || ^4.0"
Expand Down
22 changes: 8 additions & 14 deletions src/Broker/PeclBroker.php
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,10 @@
use AMQPExchangeException;
use AMQPConnectionException;

use Swarrot\Broker\MessageProvider\MessageProviderInterface;
use Swarrot\Broker\MessageProvider\PeclPackageMessageProvider;

use Swarrot\Broker\MessagePublisher\MessagePublisherInterface;
use Swarrot\Broker\MessagePublisher\PeclPackageMessagePublisher;

use Wisembly\AmqpBundle\Gate;
Expand All @@ -39,8 +42,7 @@ class PeclBroker implements BrokerInterface
/** @var PeclPackageMessagePublisher[][] */
private $producers = [];

/** {@inheritDoc} */
public function getProvider(Gate $gate)
public function getProvider(Gate $gate): MessageProviderInterface
{
$name = $gate->getName();
$connection = $gate->getConnection()->getName();
Expand All @@ -61,8 +63,7 @@ public function getProvider(Gate $gate)
return $this->providers[$connection][$name];
}

/** {@inheritDoc} */
public function getProducer(Gate $gate)
public function getProducer(Gate $gate): MessagePublisherInterface
{
$name = $gate->getName();
$connection = $gate->getConnection()->getName();
Expand All @@ -87,7 +88,7 @@ public function getProducer(Gate $gate)
return $this->producers[$connection][$name];
}

private function declare(Gate $gate, AMQPChannel $channel)
private function declare(Gate $gate, AMQPChannel $channel): void
{
if (false === $gate->getAutoDeclare()) {
return;
Expand Down Expand Up @@ -146,8 +147,7 @@ public function __destruct()
}
}

/** {@inheritDoc} */
public function createTemporaryQueue(Gate $gate)
public function createTemporaryQueue(Gate $gate): Gate
{
$id = sha1(uniqid(mt_rand(), true));
$key = $gate->getRoutingKey();
Expand All @@ -168,13 +168,7 @@ public function createTemporaryQueue(Gate $gate)
return $gate;
}

/**
* Get a channel with the connection $connection
*
* @param Connection $connnection Connection to use
* @return AMQPChannel
*/
private function getChannel(Connection $connection)
private function getChannel(Connection $connection): AMQPChannel
{
$name = $connection->getName();

Expand Down
25 changes: 9 additions & 16 deletions src/Broker/PhpAmqpLibBroker.php
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,10 @@
use PhpAmqpLib\Exception\AMQPProtocolChannelException;
use PhpAmqpLib\Exception\AMQPProtocolConnectionException;

use Swarrot\Broker\MessageProvider\MessageProviderInterface;
use Swarrot\Broker\MessageProvider\PhpAmqpLibMessageProvider;

use Swarrot\Broker\MessagePublisher\MessagePublisherInterface;
use Swarrot\Broker\MessagePublisher\PhpAmqpLibMessagePublisher;

use Wisembly\AmqpBundle\Gate;
Expand All @@ -32,8 +35,7 @@ class PhpAmqpLibBroker implements BrokerInterface
/** @var PhpAmqpLibMessagePublisher[][] */
private $producers = [];

/** {@inheritDoc} */
public function getProvider(Gate $gate)
public function getProvider(Gate $gate): MessageProviderInterface
{
$name = $gate->getName();
$connection = $gate->getConnection()->getName();
Expand All @@ -54,8 +56,7 @@ public function getProvider(Gate $gate)
return $this->providers[$connection][$name];
}

/** {@inheritDoc} */
public function getProducer(Gate $gate)
public function getProducer(Gate $gate): MessagePublisherInterface
{
$name = $gate->getName();
$connection = $gate->getConnection()->getName();
Expand All @@ -76,7 +77,7 @@ public function getProducer(Gate $gate)
return $this->producers[$connection][$name];
}

private function declare(Gate $gate, AMQPChannel $channel)
private function declare(Gate $gate, AMQPChannel $channel): void
{
if (false === $gate->getAutoDeclare()) {
return;
Expand Down Expand Up @@ -117,8 +118,7 @@ private function declare(Gate $gate, AMQPChannel $channel)
);
}

/** {@inheritDoc} */
public function createTemporaryQueue(Gate $gate)
public function createTemporaryQueue(Gate $gate): Gate
{
$key = $gate->getRoutingKey();
$name = sha1(uniqid(mt_rand(), true));
Expand All @@ -142,12 +142,7 @@ public function createTemporaryQueue(Gate $gate)
return $gate;
}

/**
* Get a channel with the connection $connection
*
* @return AMQPChannel
*/
private function getChannel(Connection $connection)
private function getChannel(Connection $connection): AMQPChannel
{
$name = $connection->getName();

Expand All @@ -164,8 +159,7 @@ private function getChannel(Connection $connection)
}
}

/** @return AMQPLazyConnection */
private function getConnection(Connection $connection)
private function getConnection(Connection $connection): AMQPLazyConnection
{
$name = $connection->getName();

Expand All @@ -184,4 +178,3 @@ private function getConnection(Connection $connection)
return $this->connections[$name] = $connection;
}
}

27 changes: 3 additions & 24 deletions src/BrokerInterface.php
Original file line number Diff line number Diff line change
Expand Up @@ -14,30 +14,9 @@
*/
interface BrokerInterface
{
/**
* Return the provider $name for the connection $connection
*
* @param string $name Provider's identifier
*
* @return MessageProviderInterface
*/
public function getProvider(Gate $gate);
public function getProvider(Gate $gate): MessageProviderInterface;

/**
* Return the producer $name for the connection $connection
*
* @param string $name Producer's identifier
*
* @return MessagePublisherInterface
*/
public function getProducer(Gate $gate);
public function getProducer(Gate $gate): MessagePublisherInterface;

/**
* Create a temporary queue
*
* @param Gate original from which the data will be copied
*
* @return Gate the new associated gate
*/
public function createTemporaryQueue(Gate $origin);
public function createTemporaryQueue(Gate $origin): Gate;
}
10 changes: 2 additions & 8 deletions src/Command/ConsumerCommand.php
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,6 @@
use Symfony\Component\Console\Command\Command;
use Symfony\Component\Console\Output\OutputInterface;

use Symfony\Component\Process\ProcessBuilder;

use Psr\Log\NullLogger;
use Psr\Log\LoggerInterface;

Expand Down Expand Up @@ -77,13 +75,10 @@ protected function execute(InputInterface $input, OutputInterface $output)
$gate = $this->gates->get($gate);

$provider = $this->broker->getProvider($gate);
$producer = $this->broker->getProducer($gate);

$processor = new CommandProcessor(
$this->logger,
new ProcessBuilder,
$provider,
$producer,
$this->consolePath,
$input->hasOption('env') ? $input->getOption('env') : null,
true === $input->getOption('disable-verbosity-propagation') ? OutputInterface::VERBOSITY_QUIET : $output->getVerbosity()
Expand All @@ -94,6 +89,7 @@ protected function execute(InputInterface $input, OutputInterface $output)
$processor = new RpcServerProcessor($processor, $producer, $this->logger);
}

// Wrap processor in an Swarrot ExceptionCatcherProcessor to avoid breaking processor if an error occurs
$processor = new ExceptionCatcherProcessor($processor, $this->logger);
$options = [];

Expand All @@ -105,9 +101,7 @@ protected function execute(InputInterface $input, OutputInterface $output)
$options['memory_limit'] = (int) $input->getOption('memory-limit');
}

// Wrap processor in an Swarrot ExceptionCatcherProcessor to avoid breaking processor if an error occurs
$consumer = new Consumer($provider, $processor);

$consumer = new Consumer($provider, $processor);
$consumer->consume(['poll_interval' => $input->getOption('poll-interval')]);
}
}
33 changes: 17 additions & 16 deletions src/Connection.php
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
/**
* Value object for an AMQP connection information
*
* @author Baptiste Clavié <baptiste@eisembly.com>
* @author Baptiste Clavié <clavie.b@gmail.com>
*/
class Connection
{
Expand All @@ -29,8 +29,15 @@ class Connection
/** @var string */
private $query;

public function __construct($name, $host, $port, $login, $password, $vhost, $query)
{
public function __construct(
string $name,
string $host,
?int $port,
?string $login,
?string $password,
?string $vhost,
?string $query
) {
$this->name = $name;
$this->host = $host;
$this->port = $port;
Expand All @@ -40,43 +47,37 @@ public function __construct($name, $host, $port, $login, $password, $vhost, $que
$this->query = $query;
}

/** @return string */
public function getName()
public function getName(): string
{
return $this->name;
}

/** @return string */
public function getHost()
public function getHost(): string
{
return $this->host;
}

/** @return string */
public function getPort()
public function getPort(): ?int
{
return $this->port;
}

/** @return string */
public function getLogin()
public function getLogin(): ?string
{
return $this->login;
}

/** @return string */
public function getPassword()
public function getPassword(): ?string
{
return $this->password;
}

/** @return string */
public function getVhost()
public function getVhost(): ?string
{
return $this->vhost;
}

public function getQuery()
public function getQuery(): ?string
{
return $this->query;
}
Expand Down
6 changes: 3 additions & 3 deletions src/EventListener/MessagePublishedEvent.php
Original file line number Diff line number Diff line change
Expand Up @@ -35,18 +35,18 @@ public function __construct(Message $message, Datetime $publishedAt, Gate $gate)
}

/** @return Message */
public function getMessage()
public function getMessage(): Message
{
return $this->message;
}

/** @return Gate */
public function getGate()
public function getGate(): Gate
{
return $this->gate;
}

public function getPublishedAt()
public function getPublishedAt(): Datetime
{
return $this->publishedAt;
}
Expand Down
Loading