Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[BUG] when kafka consumer throw exception then message doesn't auto ack #6150

Closed
szutoutou opened this issue Sep 14, 2023 · 0 comments
Closed
Labels
bug Something isn't working

Comments

@szutoutou
Copy link
Contributor

Execute the command and paste the result below.

longlang\phpkafka\Consumer\Consumer

public function start(): void
    {
        $consumeCallback = $this->consumeCallback;
        if (null === $consumeCallback) {
            throw new InvalidArgumentException('consumeCallback must not null');
        }
        $interval = (int) ($this->config->getInterval() * 1000000);
        $this->started = true;
        $autoCommit = $this->config->getAutoCommit();
        while ($this->started) {
            $message = $this->consume();
            if (null === $message) {
                if ($interval > 0 && $this->emptyMessageCountInLoop === \count($this->fetchOptions)) {
                    // When the empty message count is equal with the count of fetch options,
                    // We must sleep some micro seconds to avoid dead cycle.
                    usleep($interval);
                }
            } else {
                $consumeCallback($message);
                if ($autoCommit) {
                    $this->ack($message);
                }
            }
        }
    }

Hyperf\Kafka\ConsumerManager:107

public function handle(): void
            {
                $consumerConfig = $this->getConsumerConfig();
                $consumer = $this->consumer;
                $longLangConsumer = new LongLangConsumer(
                    $consumerConfig,
                    function (ConsumeMessage $message) use ($consumer, $consumerConfig) {
                        $config = $this->getConfig();
                        wait(function () use ($consumer, $consumerConfig, $message) {
                            $this->dispatcher?->dispatch(new BeforeConsume($consumer, $message));

                            try {
                                $result = $consumer->consume($message);
                            } catch (Throwable $exception) {
                                $this->dispatcher?->dispatch(new FailConsume($consumer, $message, $exception));
                                throw $exception;
                            }

                            if (! $consumerConfig->getAutoCommit()) {
                                if (! is_string($result)) {
                                    throw new InvalidConsumeResultException('The result is invalid.');
                                }

                                if ($result === Result::ACK) {
                                    $message->getConsumer()->ack($message);
                                }

                                if ($result === Result::REQUEUE) {
                                    $this->producer->send($message->getTopic(), $message->getValue(), $message->getKey(), $message->getHeaders());
                                }
                            }

                            $this->dispatcher?->dispatch(new AfterConsume($consumer, $message, $result));
                        }, $config['consume_timeout'] ?? -1);
                    }
                );

                // stop consumer when worker exit
                Coroutine::create(function () use ($longLangConsumer) {
                    CoordinatorManager::until(Constants::WORKER_EXIT)->yield();
                    $longLangConsumer->stop();
                });

                while (true) {
                    try {
                        $longLangConsumer->start();
                    } catch (Throwable $exception) {
                        $this->stdoutLogger->warning((string) $exception);
                        $this->dispatcher?->dispatch(new FailToConsume($this->consumer, [], $exception));
                    }

                    if (CoordinatorManager::until(Constants::WORKER_EXIT)->yield(10)) {
                        break;
                    }
                }

                $longLangConsumer->close();
            }

Description:

When $consumeCallback($message) throw exception then message doesn't auto ack.
We must ack at every consumer or endless loop.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something isn't working
Projects
None yet
Development

No branches or pull requests

1 participant