Open
Description
def _notification_callback(self, _, pid, channel, payload):
LOG.info("_notification_callback invoked", channel=channel, pid=pid, payload_preview=str(payload)[:100])
print(f"Notification callback invoked for channel: {channel}, pid: {pid}, payload: {payload}")
# Create a task to process this notification asynchronously
# This avoids blocking the callback which would block the connection
try:
asyncio.create_task(
self._process_notification_callback(payload, channel, pid)
)
except RuntimeError as error:
self._log_error(
"Failed to create notification processing task",
pid=pid,
error=str(error),
)
async def _process_notification_callback(self, payload, channel, pid):
"""
Process a notification received via callback.
This runs in a separate task to avoid blocking the connection.
"""
try:
notification = json.loads(payload)
LOG.info("Raw payload received", notification=notification, channel=channel)
await self.distribute_notification(notification)
except asyncio.CancelledError:
LOG.info("Notification processing cancelled", channel=channel, pid=pid)
raise
except json.JSONDecodeError as error:
self._log_error(
"Failed to decode JSON payload from notification",
pid=pid,
error=str(error),
payload=payload,
)
except Exception as error:
self._log_error(
"Error processing notification from callback",
pid=pid,
error=str(error),
)
async def _listen_to_channel(self):
"""
Background task that listens to the database notification channel.
Uses db_backoff decorator to handle reconnection logic.
"""
LOG.info("Connecting to database for notifications", channel=self.channel)
conn = await asyncpg.connect(
user=SETTINGS.db_user,
password=SETTINGS.db_password,
host=SETTINGS.db_host,
port=SETTINGS.db_port,
database=SETTINGS.database,
)
LOG.info("Database connection established", channel=self.channel)
try:
# Listen to the channel
await conn.add_listener(self.channel, self._notification_callback)
LOG.info("Listening on channel", channel=self.channel)
# Main notification loop
while self.running:
try:
# Just keep the connection alive
await asyncio.sleep(1)
except asyncio.TimeoutError:
# Timeout is expected, just continue the loop
continue
except asyncio.CancelledError:
LOG.info(
"Notification listener cancelled received from the running task",
channel=self.channel,
)
break
except Exception as error:
self._log_error(
"Error processing notification",
error=str(error),
)
finally:
# Clean up listener and close connection
try:
await conn.remove_listener(self.channel, self._notification_callback)
await conn.close()
LOG.info("Database connection closed", channel=self.channel)
except Exception as error:
# Handle any errors during cleanup
self._log_error(
"Error closing database connection",
error=str(error),
)
Below is the tests setup
@pytest_asyncio.fixture(scope="session")
async def pg_conn_fixture():
"""PG Connection fixture"""
with patch(
"dlnspublisher.core.notification_manager.DalEngine.get_connection"
) as mock_get_connection:
conn = await asyncpg.connect(
dsn="postgres://postgres:postgres@dlnsdb:5432/postgres"
)
mock_get_connection.return_value = conn
yield
await conn.close()
@pytest_asyncio.fixture(scope="session")
async def fixture_app(pg_conn_fixture):
yield create_app()
@pytest.fixture(scope="session")
async def fixture_async_client(fixture_app) -> AsyncGenerator[AsyncClient, None]:
async with LifespanManager(fixture_app):
async with AsyncClient(
transport=ASGITransport(app=fixture_app), base_url="http://test"
) as async_client:
yield async_client
@pytest.mark.asyncio
async def test_sse_event_queue_receives_inserted_notification(
fixture_async_client: AsyncClient,
fixture_app,
insert_distribution_notification,
):
"""
tests /eap/notifications/sse returns OK when valid JWT is passed.
"""
insert_distribution_notification()
async with fixture_async_client.stream(
"GET",
"/eap/notifications/sse",
headers={HTTP_HEADER_CIS_JWT: VALID_JWT},
) as response:
try:
chunk = await asyncio.wait_for(
response.aiter_bytes().__anext__(), timeout=0.1
)
# SSE events are typically in the format: b'data: ...\n\n'
assert chunk
# Optionally, parse the event data
if chunk.startswith(b"data:"):
data = chunk[len(b"data:") :].strip()
# Try to decode JSON if possible
try:
payload = json.loads(data)
assert payload.get("payload") == "example"
except Exception:
# If not JSON, just check content
assert b"example" in data
except asyncio.TimeoutError:
assert False, "No event received from SSE endpoint after DB insert"
When I run the test case, fixture_app runs the fastAPI app and starts listening to the channel and then insert_distribution_notification inserts some data into a table which triggers the NOTIFY channel, however the callback for add_listener nevers gets executed, as I have a pdb debugger in that function but it never executes the call back and the test case gets stuck indefinitely.
I could see in my pg_stats_activity that the LISTEN channel process through fixture_app is successful, however callback nevers executed even though the NOTIFY channel name is triggered through insert_distribution_notification fixture
Metadata
Metadata
Assignees
Labels
No labels