Skip to content

Worker seems to be not closing the LISTEN/NOTIFY connection on stop #78

@mikz

Description

@mikz

Hey,

I'm starting some workers in tests to e2e validate the system and then I have trouble tearing down the database. It seems the Worker.stop does not stop the connection created in _maintain_notifications is not stored:

chancy/chancy/worker.py

Lines 445 to 470 in b0e3e94

async def _maintain_notifications(self):
"""
Listen for notifications from the database.
Improves the reactivity of a worker by allowing it to almost immediately
react to cluster events using Postgres's LISTEN/NOTIFY feature.
.. note::
This feature utilizes a permanent connection to the database
separate from the shared connection pool and is not counted against
the pool's connection limit.
"""
connection = await AsyncConnection.connect(
self.chancy.dsn, autocommit=True
)
await connection.execute(
sql.SQL("LISTEN {channel};").format(
channel=sql.Identifier(f"{self.chancy.prefix}events")
)
)
self.chancy.log.info("Started listening for realtime notifications.")
self._notifications_ready_event.set()
async for notification in connection.notifies():
j = json.loads(notification.payload)
await self.hub.emit(j.pop("t"), j)

so later in stop it is not closed:

chancy/chancy/worker.py

Lines 805 to 849 in b0e3e94

async def stop(self) -> bool:
"""
Stop the worker.
Attempts to stop the worker gracefully, sending a CancelledError to all
running tasks and waiting up to `shutdown_timeout` seconds for them to
complete before returning.
Returns True if the worker was stopped cleanly, or False if the worker
returned due to the timeout expiring.
"""
try:
async with asyncio.timeout(self.shutdown_timeout) as cm:
# Stop accepting new queues and queue changes.
try:
await self.manager.cancel("queues")
except KeyError:
pass
# Delete all the queues we know about so the executors can
# clean up.
self._queues.clear()
while self._executors:
await asyncio.sleep(0.1)
# And finally axe everything else started by this worker.
await self.manager.cancel_all()
except TimeoutError:
# We check this instead of depending on the exception in case the
# exception wasn't really raised by us but a nested timeout.
if cm.expired():
await self.hub.emit(
"worker.shutdown_timeout",
{
"worker": self,
},
)
return False
raise
await self.hub.emit(
"worker.stopped",
{
"worker": self,
},
)
return True

I'd like to confirm it is not intentional before I make a PR to fix it.

Metadata

Metadata

Assignees

Labels

bugSomething isn't working

Projects

No projects

Milestone

Relationships

None yet

Development

No branches or pull requests

Issue actions