diff --git a/CHANGELOG-3.0.md b/CHANGELOG-3.0.md index c34ba0d600..c718ca109a 100644 --- a/CHANGELOG-3.0.md +++ b/CHANGELOG-3.0.md @@ -3,7 +3,11 @@ # Added - [#6062](https://github.com/hyperf/hyperf/pull/6057) Added `RequestTraceListener` for `hyperf/tracer`. -- [#6143](https://github.com/hyperf/hyperf/pull/6143) Added `ignore_exceptions` for tracer +- [#6143](https://github.com/hyperf/hyperf/pull/6143) Added `ignore_exceptions` for `hyperf/tracer`. + +## Optimized + +- [#6151](https://github.com/hyperf/hyperf/pull/6151) Optimized `FailToConsume` event for `hyperf/kafka`. ## Fixed diff --git a/src/kafka/src/ConsumerManager.php b/src/kafka/src/ConsumerManager.php index dc6c3e0fd1..53b89dd76f 100644 --- a/src/kafka/src/ConsumerManager.php +++ b/src/kafka/src/ConsumerManager.php @@ -114,7 +114,12 @@ function (ConsumeMessage $message) use ($consumer, $consumerConfig) { wait(function () use ($consumer, $consumerConfig, $message) { $this->dispatcher?->dispatch(new BeforeConsume($consumer, $message)); - $result = $consumer->consume($message); + try { + $result = $consumer->consume($message); + } catch (Throwable $exception) { + $this->dispatcher?->dispatch(new FailToConsume($consumer, $message, $exception)); + throw $exception; + } if (! $consumerConfig->getAutoCommit()) { if (! is_string($result)) { @@ -146,7 +151,6 @@ function (ConsumeMessage $message) use ($consumer, $consumerConfig) { $longLangConsumer->start(); } catch (Throwable $exception) { $this->stdoutLogger->warning((string) $exception); - $this->dispatcher?->dispatch(new FailToConsume($this->consumer, [], $exception)); } if (CoordinatorManager::until(Constants::WORKER_EXIT)->yield(10)) {