From 541997abb6a12039424dd85d3682df52781f934f Mon Sep 17 00:00:00 2001 From: farmio Date: Thu, 18 Mar 2021 08:12:04 +0100 Subject: [PATCH] the queue must go on call .task_done() even when the telegram raised an exception while processing --- xknx/core/telegram_queue.py | 15 ++++++++------- 1 file changed, 8 insertions(+), 7 deletions(-) diff --git a/xknx/core/telegram_queue.py b/xknx/core/telegram_queue.py index 581f52731..f870e87ac 100644 --- a/xknx/core/telegram_queue.py +++ b/xknx/core/telegram_queue.py @@ -119,15 +119,16 @@ async def _telegram_consumer(self) -> None: self.xknx.telegrams.task_done() break - try: - if telegram.direction == TelegramDirection.INCOMING: + if telegram.direction == TelegramDirection.INCOMING: + try: await self.process_telegram_incoming(telegram) + except XKNXException as ex: + logger.error("Error while processing incoming telegram %s", ex) + finally: self.xknx.telegrams.task_done() - elif telegram.direction == TelegramDirection.OUTGOING: - self.outgoing_queue.put_nowait(telegram) - # self.xknx.telegrams.task_done() for outgoing is called in _outgoing_rate_limiter. - except XKNXException as ex: - logger.error("Error while processing telegram %s", ex) + elif telegram.direction == TelegramDirection.OUTGOING: + self.outgoing_queue.put_nowait(telegram) + # self.xknx.telegrams.task_done() for outgoing is called in _outgoing_rate_limiter. async def _outgoing_rate_limiter(self) -> None: """Endless loop for processing outgoing telegrams."""