Skip to content

Commit

Permalink
Added logger for consumer
Browse files Browse the repository at this point in the history
  • Loading branch information
JirsaR committed Jun 26, 2020
1 parent 3062712 commit 0455b45
Show file tree
Hide file tree
Showing 7 changed files with 69 additions and 51 deletions.
2 changes: 1 addition & 1 deletion Makefile
Expand Up @@ -52,7 +52,7 @@ phpcoverage:
$(DE) ./vendor/bin/paratest -c ./vendor/hanaboso/php-check-utils/phpunit.xml.dist -p 1 --coverage-html var/coverage --whitelist src tests

phpcoverage-ci:
$(DE) ./vendor/hanaboso/php-check-utils/bin/coverage.sh -p 1 -c 97
$(DE) ./vendor/hanaboso/php-check-utils/bin/coverage.sh -p 1 -c 96

test: docker-up-force composer-install fasttest

Expand Down
2 changes: 1 addition & 1 deletion composer.json
@@ -1,7 +1,7 @@
{
"name": "hanaboso/rabbit-mq-bundle",
"license": "proprietary",
"version": "1.4.0",
"version": "1.4.1",
"autoload": {
"psr-4": {
"RabbitMqBundle\\": "src"
Expand Down
53 changes: 26 additions & 27 deletions composer.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

23 changes: 13 additions & 10 deletions src/Consumer/AsyncConsumer.php
Expand Up @@ -8,6 +8,7 @@
use RabbitMqBundle\Connection\Configurator;
use RabbitMqBundle\Connection\ConnectionManager;
use RabbitMqBundle\Consumer\Callback\Exception\CallbackException;
use RabbitMqBundle\Utils\Message;
use Throwable;

/**
Expand Down Expand Up @@ -109,17 +110,19 @@ private function runAsyncConsumer(): void
$this->nowait,
function (AMQPMessage $message): void {
try {
$this->callback->processMessage(
$message,
$this->connectionManager->getConnection(),
(int) $this->channelId
);
$this->callback
->processMessage(
$message,
$this->connectionManager->getConnection(),
(int) $this->channelId
)
->wait();
} catch (Throwable $e) {
throw new CallbackException(
sprintf('RabbitMq callback error: %s', $e->getMessage()),
$e->getCode(),
$e
);
$m = sprintf('RabbitMq callback error: %s', $e->getMessage());
$this->logger->error($m, ['Message' => $message]);
Message::nack($message, $this->connectionManager->getConnection(), (int) $this->channelId, TRUE);

throw new CallbackException($m, $e->getCode(), $e);
}
},
NULL,
Expand Down
13 changes: 9 additions & 4 deletions src/Consumer/Callback/DumpAsyncCallback.php
Expand Up @@ -29,14 +29,19 @@ final class DumpAsyncCallback implements AsyncCallbackInterface
public function processMessage(AMQPMessage $message, Connection $connection, int $channelId): PromiseInterface
{
var_dump(['body' => Message::getBody($message), 'headers' => Message::getHeaders($message)]);
$promise = new Promise();
$promise = new Promise(
static function () use (&$promise): void {
if ($promise) {
$promise->resolve('waited');
}
},
);
$promise
->then(
static function (AMQPMessage $message) use ($connection, $channelId): void {
static function () use (&$message, $connection, $channelId): void {
Message::ack($message, $connection, $channelId);
}
)
->resolve($message);
);

return $promise;
}
Expand Down
13 changes: 9 additions & 4 deletions src/Consumer/Callback/NullAsyncCallback.php
Expand Up @@ -28,14 +28,19 @@ final class NullAsyncCallback implements AsyncCallbackInterface
*/
public function processMessage(AMQPMessage $message, Connection $connection, int $channelId): PromiseInterface
{
$promise = new Promise();
$promise = new Promise(
static function () use (&$promise): void {
if ($promise) {
$promise->resolve('waited');
}
},
);
$promise
->then(
static function (AMQPMessage $message) use ($connection, $channelId): void {
static function () use (&$message, $connection, $channelId): void {
Message::ack($message, $connection, $channelId);
}
)
->resolve($message);
);

return $promise;
}
Expand Down
14 changes: 10 additions & 4 deletions src/Consumer/ConsumerAbstract.php
Expand Up @@ -13,6 +13,7 @@
use RabbitMqBundle\Connection\ConnectionManager;
use RabbitMqBundle\Connection\SetupInterface;
use RabbitMqBundle\Consumer\Callback\Exception\CallbackException;
use RabbitMqBundle\Utils\Message;
use Throwable;

/**
Expand Down Expand Up @@ -165,11 +166,16 @@ function (AMQPMessage $message): void {
(int) $this->channelId
);
} catch (Throwable $e) {
throw new CallbackException(
sprintf('RabbitMq callback error: %s', $e->getMessage()),
$e->getCode(),
$e
$m = sprintf('RabbitMq callback error: %s', $e->getMessage());
$this->logger->error($m, ['Message' => $message]);
Message::nack(
$message,
$this->connectionManager->getConnection(),
(int) $this->channelId,
TRUE
);

throw new CallbackException($m, $e->getCode(), $e);
}
},
NULL,
Expand Down

0 comments on commit 0455b45

Please sign in to comment.