Skip to content

add listener callback not getting executed in asyncio pytest framework #1262

Open
@sdalmia11

Description

@sdalmia11
def _notification_callback(self, _, pid, channel, payload):
        LOG.info("_notification_callback invoked", channel=channel, pid=pid, payload_preview=str(payload)[:100])
        print(f"Notification callback invoked for channel: {channel}, pid: {pid}, payload: {payload}")

        # Create a task to process this notification asynchronously
        # This avoids blocking the callback which would block the connection
        try:
            asyncio.create_task(
                self._process_notification_callback(payload, channel, pid)
            )
        except RuntimeError as error:
            self._log_error(
                "Failed to create notification processing task",
                pid=pid,
                error=str(error),
            )
async def _process_notification_callback(self, payload, channel, pid):
        """
        Process a notification received via callback.
        This runs in a separate task to avoid blocking the connection.
        """
        try:
            notification = json.loads(payload)
            LOG.info("Raw payload received", notification=notification, channel=channel)
            await self.distribute_notification(notification)
        except asyncio.CancelledError:
            LOG.info("Notification processing cancelled", channel=channel, pid=pid)
            raise
        except json.JSONDecodeError as error:
            self._log_error(
                "Failed to decode JSON payload from notification",
                pid=pid,
                error=str(error),
                payload=payload,
            )
        except Exception as error:
            self._log_error(
                "Error processing notification from callback",
                pid=pid,
                error=str(error),
            )
async def _listen_to_channel(self):
        """
        Background task that listens to the database notification channel.
        Uses db_backoff decorator to handle reconnection logic.
        """
        LOG.info("Connecting to database for notifications", channel=self.channel)
        conn = await asyncpg.connect(
            user=SETTINGS.db_user,
            password=SETTINGS.db_password,
            host=SETTINGS.db_host,
            port=SETTINGS.db_port,
            database=SETTINGS.database,
        )
        LOG.info("Database connection established", channel=self.channel)
        try:
            # Listen to the channel
            await conn.add_listener(self.channel, self._notification_callback)
            LOG.info("Listening on channel", channel=self.channel)
            # Main notification loop
            while self.running:
                try:
                    # Just keep the connection alive
                    await asyncio.sleep(1)
                except asyncio.TimeoutError:
                    # Timeout is expected, just continue the loop
                    continue
                except asyncio.CancelledError:
                    LOG.info(
                        "Notification listener cancelled received from the running task",
                        channel=self.channel,
                    )
                    break
                except Exception as error:
                    self._log_error(
                        "Error processing notification",
                        error=str(error),
                    )
        finally:
            # Clean up listener and close connection
            try:
                await conn.remove_listener(self.channel, self._notification_callback)
                await conn.close()
                LOG.info("Database connection closed", channel=self.channel)
            except Exception as error:
                # Handle any errors during cleanup
                self._log_error(
                    "Error closing database connection",
                    error=str(error),
                )

Below is the tests setup

@pytest_asyncio.fixture(scope="session")
async def pg_conn_fixture():
    """PG Connection fixture"""
    with patch(
        "dlnspublisher.core.notification_manager.DalEngine.get_connection"
    ) as mock_get_connection:
        conn = await asyncpg.connect(
            dsn="postgres://postgres:postgres@dlnsdb:5432/postgres"
        )
        mock_get_connection.return_value = conn
        yield
        await conn.close()


@pytest_asyncio.fixture(scope="session")
async def fixture_app(pg_conn_fixture):
    yield create_app()


@pytest.fixture(scope="session")
async def fixture_async_client(fixture_app) -> AsyncGenerator[AsyncClient, None]:
    async with LifespanManager(fixture_app):
        async with AsyncClient(
            transport=ASGITransport(app=fixture_app), base_url="http://test"
        ) as async_client:
            yield async_client

@pytest.mark.asyncio
async def test_sse_event_queue_receives_inserted_notification(
    fixture_async_client: AsyncClient,
    fixture_app,
    insert_distribution_notification,
):
    """
    tests /eap/notifications/sse returns OK when valid JWT is passed.
    """
    insert_distribution_notification()
    async with fixture_async_client.stream(
        "GET",
        "/eap/notifications/sse",
        headers={HTTP_HEADER_CIS_JWT: VALID_JWT},
    ) as response:
        try:
            chunk = await asyncio.wait_for(
                response.aiter_bytes().__anext__(), timeout=0.1
            )
            # SSE events are typically in the format: b'data: ...\n\n'
            assert chunk
            # Optionally, parse the event data
            if chunk.startswith(b"data:"):
                data = chunk[len(b"data:") :].strip()
                # Try to decode JSON if possible
                try:
                    payload = json.loads(data)
                    assert payload.get("payload") == "example"
                except Exception:
                    # If not JSON, just check content
                    assert b"example" in data
        except asyncio.TimeoutError:
            assert False, "No event received from SSE endpoint after DB insert"

When I run the test case, fixture_app runs the fastAPI app and starts listening to the channel and then insert_distribution_notification inserts some data into a table which triggers the NOTIFY channel, however the callback for add_listener nevers gets executed, as I have a pdb debugger in that function but it never executes the call back and the test case gets stuck indefinitely.

I could see in my pg_stats_activity that the LISTEN channel process through fixture_app is successful, however callback nevers executed even though the NOTIFY channel name is triggered through insert_distribution_notification fixture

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions