diff --git a/Makefile b/Makefile index 1d423da..6e698b3 100644 --- a/Makefile +++ b/Makefile @@ -52,7 +52,7 @@ phpcoverage: $(DE) ./vendor/bin/paratest -c ./vendor/hanaboso/php-check-utils/phpunit.xml.dist -p 1 --coverage-html var/coverage --whitelist src tests phpcoverage-ci: - $(DE) ./vendor/hanaboso/php-check-utils/bin/coverage.sh -p 1 -c 97 + $(DE) ./vendor/hanaboso/php-check-utils/bin/coverage.sh -p 1 -c 96 test: docker-up-force composer-install fasttest diff --git a/composer.json b/composer.json index 836329a..e4e3f48 100644 --- a/composer.json +++ b/composer.json @@ -1,7 +1,7 @@ { "name": "hanaboso/rabbit-mq-bundle", "license": "proprietary", - "version": "1.4.0", + "version": "1.4.1", "autoload": { "psr-4": { "RabbitMqBundle\\": "src" diff --git a/composer.lock b/composer.lock index 9cceb5c..66638ba 100644 --- a/composer.lock +++ b/composer.lock @@ -4,7 +4,7 @@ "Read more about it at https://getcomposer.org/doc/01-basic-usage.md#installing-dependencies", "This file is @generated automatically" ], - "content-hash": "1800ae7aa7bb771210790722f34ca191", + "content-hash": "89eb5960a17da66708b57d268ca96aa6", "packages": [ { "name": "guzzlehttp/promises", @@ -3479,16 +3479,16 @@ }, { "name": "hanaboso/php-check-utils", - "version": "1.1.28", + "version": "1.1.29", "source": { "type": "git", "url": "https://github.com/hanaboso/php-check-utils.git", - "reference": "06050be508f0f021cadfe18512c49d520c1949e7" + "reference": "305cb28e75044f9131aa0be74444a623da52d3f5" }, "dist": { "type": "zip", - "url": "https://api.github.com/repos/hanaboso/php-check-utils/zipball/06050be508f0f021cadfe18512c49d520c1949e7", - "reference": "06050be508f0f021cadfe18512c49d520c1949e7", + "url": "https://api.github.com/repos/hanaboso/php-check-utils/zipball/305cb28e75044f9131aa0be74444a623da52d3f5", + "reference": "305cb28e75044f9131aa0be74444a623da52d3f5", "shasum": "" }, "require": { @@ -3546,20 +3546,20 @@ } ], "description": "Utils for php development - CodeSniffer, PhpStan, PhpStorm", - "time": "2020-06-19T07:10:37+00:00" + "time": "2020-06-25T11:25:00+00:00" }, { "name": "jean85/pretty-package-versions", - "version": "1.3.0", + "version": "1.5.0", "source": { "type": "git", "url": "https://github.com/Jean85/pretty-package-versions.git", - "reference": "e3517fb11b67e798239354fe8213927d012ad8f9" + "reference": "e9f4324e88b8664be386d90cf60fbc202e1f7fc9" }, "dist": { "type": "zip", - "url": "https://api.github.com/repos/Jean85/pretty-package-versions/zipball/e3517fb11b67e798239354fe8213927d012ad8f9", - "reference": "e3517fb11b67e798239354fe8213927d012ad8f9", + "url": "https://api.github.com/repos/Jean85/pretty-package-versions/zipball/e9f4324e88b8664be386d90cf60fbc202e1f7fc9", + "reference": "e9f4324e88b8664be386d90cf60fbc202e1f7fc9", "shasum": "" }, "require": { @@ -3597,7 +3597,7 @@ "release", "versions" ], - "time": "2020-04-24T14:19:45+00:00" + "time": "2020-06-23T06:23:06+00:00" }, { "name": "justinrainbow/json-schema", @@ -4267,20 +4267,19 @@ }, { "name": "nette/php-generator", - "version": "v3.4.0", + "version": "v3.4.1", "source": { "type": "git", "url": "https://github.com/nette/php-generator.git", - "reference": "ea2c8e8d6439f0a4e3cd3431c572b51a8131539b" + "reference": "7051954c534cebafd650efe8b145ac75b223cb66" }, "dist": { "type": "zip", - "url": "https://api.github.com/repos/nette/php-generator/zipball/ea2c8e8d6439f0a4e3cd3431c572b51a8131539b", - "reference": "ea2c8e8d6439f0a4e3cd3431c572b51a8131539b", + "url": "https://api.github.com/repos/nette/php-generator/zipball/7051954c534cebafd650efe8b145ac75b223cb66", + "reference": "7051954c534cebafd650efe8b145ac75b223cb66", "shasum": "" }, "require": { - "ext-tokenizer": "*", "nette/utils": "^2.4.2 || ^3.0", "php": ">=7.1" }, @@ -4328,7 +4327,7 @@ "php", "scaffolding" ], - "time": "2020-05-26T16:32:45+00:00" + "time": "2020-06-19T14:31:47+00:00" }, { "name": "nette/reflection", @@ -5202,16 +5201,16 @@ }, { "name": "phpstan/phpstan", - "version": "0.12.30", + "version": "0.12.31", "source": { "type": "git", "url": "https://github.com/phpstan/phpstan.git", - "reference": "1f2c16d3fbb5eec6e55fbe2358e32570cefa20e5" + "reference": "776c8056b401e1b67f277b9e9fb334d1a274671d" }, "dist": { "type": "zip", - "url": "https://api.github.com/repos/phpstan/phpstan/zipball/1f2c16d3fbb5eec6e55fbe2358e32570cefa20e5", - "reference": "1f2c16d3fbb5eec6e55fbe2358e32570cefa20e5", + "url": "https://api.github.com/repos/phpstan/phpstan/zipball/776c8056b401e1b67f277b9e9fb334d1a274671d", + "reference": "776c8056b401e1b67f277b9e9fb334d1a274671d", "shasum": "" }, "require": { @@ -5240,20 +5239,20 @@ "MIT" ], "description": "PHPStan - PHP Static Analysis Tool", - "time": "2020-06-21T14:08:19+00:00" + "time": "2020-06-24T20:55:29+00:00" }, { "name": "phpstan/phpstan-doctrine", - "version": "0.12.16", + "version": "0.12.17", "source": { "type": "git", "url": "https://github.com/phpstan/phpstan-doctrine.git", - "reference": "65146e35905478bfb4e2ba078ffca1a16029d4ee" + "reference": "5eed42b2815f100f25bd45fe8c1a9b01f3e41657" }, "dist": { "type": "zip", - "url": "https://api.github.com/repos/phpstan/phpstan-doctrine/zipball/65146e35905478bfb4e2ba078ffca1a16029d4ee", - "reference": "65146e35905478bfb4e2ba078ffca1a16029d4ee", + "url": "https://api.github.com/repos/phpstan/phpstan-doctrine/zipball/5eed42b2815f100f25bd45fe8c1a9b01f3e41657", + "reference": "5eed42b2815f100f25bd45fe8c1a9b01f3e41657", "shasum": "" }, "require": { @@ -5306,7 +5305,7 @@ "MIT" ], "description": "Doctrine extensions for PHPStan", - "time": "2020-06-14T11:03:59+00:00" + "time": "2020-06-23T08:50:27+00:00" }, { "name": "phpstan/phpstan-nette", diff --git a/src/Consumer/AsyncConsumer.php b/src/Consumer/AsyncConsumer.php index 11df4ec..1fb5dd5 100644 --- a/src/Consumer/AsyncConsumer.php +++ b/src/Consumer/AsyncConsumer.php @@ -8,6 +8,7 @@ use RabbitMqBundle\Connection\Configurator; use RabbitMqBundle\Connection\ConnectionManager; use RabbitMqBundle\Consumer\Callback\Exception\CallbackException; +use RabbitMqBundle\Utils\Message; use Throwable; /** @@ -109,17 +110,19 @@ private function runAsyncConsumer(): void $this->nowait, function (AMQPMessage $message): void { try { - $this->callback->processMessage( - $message, - $this->connectionManager->getConnection(), - (int) $this->channelId - ); + $this->callback + ->processMessage( + $message, + $this->connectionManager->getConnection(), + (int) $this->channelId + ) + ->wait(); } catch (Throwable $e) { - throw new CallbackException( - sprintf('RabbitMq callback error: %s', $e->getMessage()), - $e->getCode(), - $e - ); + $m = sprintf('RabbitMq callback error: %s', $e->getMessage()); + $this->logger->error($m, ['Message' => $message]); + Message::nack($message, $this->connectionManager->getConnection(), (int) $this->channelId, TRUE); + + throw new CallbackException($m, $e->getCode(), $e); } }, NULL, diff --git a/src/Consumer/Callback/DumpAsyncCallback.php b/src/Consumer/Callback/DumpAsyncCallback.php index 59d4c45..15d9e02 100644 --- a/src/Consumer/Callback/DumpAsyncCallback.php +++ b/src/Consumer/Callback/DumpAsyncCallback.php @@ -29,14 +29,19 @@ final class DumpAsyncCallback implements AsyncCallbackInterface public function processMessage(AMQPMessage $message, Connection $connection, int $channelId): PromiseInterface { var_dump(['body' => Message::getBody($message), 'headers' => Message::getHeaders($message)]); - $promise = new Promise(); + $promise = new Promise( + static function () use (&$promise): void { + if ($promise) { + $promise->resolve('waited'); + } + }, + ); $promise ->then( - static function (AMQPMessage $message) use ($connection, $channelId): void { + static function () use (&$message, $connection, $channelId): void { Message::ack($message, $connection, $channelId); } - ) - ->resolve($message); + ); return $promise; } diff --git a/src/Consumer/Callback/NullAsyncCallback.php b/src/Consumer/Callback/NullAsyncCallback.php index 60b29d7..5968a9f 100644 --- a/src/Consumer/Callback/NullAsyncCallback.php +++ b/src/Consumer/Callback/NullAsyncCallback.php @@ -28,14 +28,19 @@ final class NullAsyncCallback implements AsyncCallbackInterface */ public function processMessage(AMQPMessage $message, Connection $connection, int $channelId): PromiseInterface { - $promise = new Promise(); + $promise = new Promise( + static function () use (&$promise): void { + if ($promise) { + $promise->resolve('waited'); + } + }, + ); $promise ->then( - static function (AMQPMessage $message) use ($connection, $channelId): void { + static function () use (&$message, $connection, $channelId): void { Message::ack($message, $connection, $channelId); } - ) - ->resolve($message); + ); return $promise; } diff --git a/src/Consumer/ConsumerAbstract.php b/src/Consumer/ConsumerAbstract.php index ad8ecdd..e38fc7a 100644 --- a/src/Consumer/ConsumerAbstract.php +++ b/src/Consumer/ConsumerAbstract.php @@ -13,6 +13,7 @@ use RabbitMqBundle\Connection\ConnectionManager; use RabbitMqBundle\Connection\SetupInterface; use RabbitMqBundle\Consumer\Callback\Exception\CallbackException; +use RabbitMqBundle\Utils\Message; use Throwable; /** @@ -165,11 +166,16 @@ function (AMQPMessage $message): void { (int) $this->channelId ); } catch (Throwable $e) { - throw new CallbackException( - sprintf('RabbitMq callback error: %s', $e->getMessage()), - $e->getCode(), - $e + $m = sprintf('RabbitMq callback error: %s', $e->getMessage()); + $this->logger->error($m, ['Message' => $message]); + Message::nack( + $message, + $this->connectionManager->getConnection(), + (int) $this->channelId, + TRUE ); + + throw new CallbackException($m, $e->getCode(), $e); } }, NULL,