Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

PONG not received #6910

Open
1 task done
jangor1912 opened this issue Sep 7, 2022 · 2 comments
Open
1 task done

PONG not received #6910

jangor1912 opened this issue Sep 7, 2022 · 2 comments
Labels

Comments

@jangor1912
Copy link

Describe the bug

During aiohttp usage we have noticed the strange issue - the client is sending PING message and the server takes a long time to respond. This simple script is the proof that pong method is correlated with the next message that is being sent
to client instead of being sent right away

To Reproduce

Run this simple script - ping_pong.py:

import argparse
import asyncio
import logging
import time
import traceback
from contextvars import ContextVar
from typing import AsyncGenerator
from typing import Awaitable
from typing import Callable
from typing import Optional
from typing import Union
from uuid import uuid4

import aiohttp
from aiohttp import ClientSession
from aiohttp import ClientWebSocketResponse
from aiohttp import web
from aiohttp import WSMessage
from aiohttp import WSMsgType
from aiohttp.web_request import Request
from aiohttp.web_ws import WebSocketResponse

logger = logging.getLogger(__name__)

RECONNECTION_TIMEOUT = 1.0
PING_SEND_TIME: ContextVar[dict[int, float]] = ContextVar('PING_SEND_TIME', default=dict())
SERVER_HOST = "localhost"
SERVER_PORT = 8080


async def ws_connect(
    url: str, reconnect_timeout: float = RECONNECTION_TIMEOUT, autoping: bool = True
) -> tuple[ClientSession, ClientWebSocketResponse]:
    while True:
        logger.info("Connection attempt...")
        session = aiohttp.ClientSession()
        try:
            ws = await session.ws_connect(url, autoping=autoping)
            logger.info(f"Connected with autoping to set to {autoping}")
            return session, ws
        except aiohttp.client.ClientConnectorError:
            logger.info("CLOSING SESSION IN ws_connect!")
            await session.close()
            await asyncio.sleep(reconnect_timeout)


async def send_ping(ws: Union[ClientWebSocketResponse, WebSocketResponse], ping_number: int) -> None:
    message = f"{ping_number}"
    logger.debug(f"[PING {ping_number}] Sending ping with message = '{message}'")
    await ws.ping(message=message.encode("utf-8"))
    PING_SEND_TIME.get()[ping_number] = time.time()
    logger.debug(f"[PING {ping_number}] Successfully sent ping with message = '{message}'")


async def stream_from_ws(url: str, sink: Callable[[str], Awaitable[None]]) -> None:
    session, ws = await ws_connect(url=url, reconnect_timeout=1, autoping=False)
    try:
        await fetch_stream_with_ping(ws=ws, sink=sink)
    except asyncio.CancelledError:
        traceback.print_exc()
    finally:
        await session.close()


async def wait_for_pong(
    ws: Union[ClientWebSocketResponse, WebSocketResponse], timeout: int = 20
) -> Optional[WSMessage]:
    listener_id = str(uuid4())
    logger.debug(f"[{listener_id}] Listening for PONG started!")
    try:
        response = await asyncio.wait_for(ws.receive(), timeout=timeout)
    except asyncio.TimeoutError:
        logger.error(f"[{listener_id}] PONG was never received and TimeoutError was raised!")
        return None

    if response.type == WSMsgType.PONG:
        pong_number = int(response.data.decode('utf-8'))
        logger.debug(f"[{listener_id}] PONG was successfully received - message = '{pong_number}'")
        pong_receive_time = time.time()
        ping_send_time = PING_SEND_TIME.get()[pong_number]
        logger.info(f"Client was waiting for PONG for - {pong_receive_time - ping_send_time}!")
        del PING_SEND_TIME.get()[pong_number]
        return None
    else:
        logger.debug(f"[{listener_id}] Received response that is not PONG")
        return response


async def ack(data: WSMessage) -> str:
    return "ok"


async def fetch_stream_with_ping(
    ws: Union[ClientWebSocketResponse, WebSocketResponse], sink: Callable[[str], Awaitable[None]]
) -> None:

    ping_number = 0

    while True:
        # send PING and wait for PONG
        await send_ping(ws, ping_number)
        ping_number += 1
        response1 = await wait_for_pong(ws, timeout=20)  # this response should be PONG

        response2 = await wait_for_pong(ws, timeout=20)  # this should be normal WS event

        responses = [response1, response2]
        true_events = [response for response in responses if response is not None]

        for event in true_events:
            await sink(event.data)
            await ws.send_str(await ack(event))


async def send_stream(
    ws: Union[ClientWebSocketResponse, WebSocketResponse],
    stream: AsyncGenerator[str, None],
    get_message_func: Callable[[], Awaitable[WSMessage]],
    send_timeout: float = 1.0,
    ack_timeout: float = 1.0,
) -> None:
    async def receive_and_ack() -> None:
        ack_response = await get_message_func()
        await ack(ack_response)

    async def send_message(msg: str) -> None:
        await asyncio.wait_for(ws.send_str(data=msg), timeout=send_timeout)
        await asyncio.wait_for(receive_and_ack(), timeout=ack_timeout)

    sleep_time = 0
    async for message in stream:
        logger.info(f"[SERVER] Sending message = '{message}'")
        await send_message(message)
        await asyncio.sleep(sleep_time)
        sleep_time = (sleep_time + 1) % 5
    return None


async def websocket_handler(request: Request) -> web.WebSocketResponse:
    ws = web.WebSocketResponse(autoping=True)
    await ws.prepare(request)

    stream = events_stream(100)

    await send_stream(ws, stream, get_message_func=ws.receive)

    async for msg in ws:
        if msg.type == aiohttp.WSMsgType.TEXT:
            if msg.data == 'close':
                await ws.close()
            else:
                await ws.send_str(msg.data + '/answer')
        elif msg.type == aiohttp.WSMsgType.ERROR:
            logger.error(f"ws connection closed with exception {ws.exception()}")

    logger.info('Websocket connection closed')

    return ws


async def websocket_handler_custom_ping(request: Request) -> web.WebSocketResponse:
    ws = web.WebSocketResponse(autoping=False)
    await ws.prepare(request)

    stream = events_stream(100)
    message_queue: asyncio.Queue[WSMessage] = asyncio.Queue()

    async def consume_events(web_socket: web.WebSocketResponse, queue: asyncio.Queue[WSMessage]) -> None:
        async for msg in ws:
            if msg.type == aiohttp.WSMsgType.PING:
                message = msg.data.decode("utf-8")
                logger.debug(f"[SERVER] Received PING message = '{message}'")
                await web_socket.pong(msg.data)
            elif msg.type == aiohttp.WSMsgType.TEXT:
                if msg.data == 'close':
                    await ws.close()
                else:
                    await queue.put(msg)
            elif msg.type == aiohttp.WSMsgType.ERROR:
                logger.error(f"ws connection closed with exception {ws.exception()}")
                raise ws.exception()

    events_consumption_task = asyncio.create_task(consume_events(ws, message_queue))
    stream_sending_task = asyncio.create_task(send_stream(ws, stream, get_message_func=message_queue.get))

    await asyncio.gather(events_consumption_task, stream_sending_task)

    logger.info('Websocket connection closed')

    return ws


async def await_termination() -> None:
    # By design aiohttp server do not hang:
    # https://docs.aiohttp.org/en/stable/web_advanced.html#application-runners
    while True:
        await asyncio.sleep(3600.0)


async def server() -> None:
    app = web.Application(logger=logger)
    app.add_routes([web.get('/ws-normal', websocket_handler)])
    app.add_routes([web.get('/ws-custom', websocket_handler_custom_ping)])
    app_runner = web.AppRunner(app)
    try:
        await app_runner.setup()
        site = web.TCPSite(app_runner, SERVER_HOST, SERVER_PORT)
        await site.start()
        logger.info(f"Successfully started server {SERVER_HOST}:{SERVER_PORT}")
        await await_termination()
    finally:
        logger.info(f"Cleaning up the server")
        await app_runner.cleanup()


async def client(endpoint: str) -> None:
    async def client_sink(message: str) -> None:
        logger.info(f"[CLIENT] Received message = '{message}'")

    await stream_from_ws(url=f"ws://{SERVER_HOST}:{SERVER_PORT}/ws-{endpoint}", sink=client_sink)


async def events_stream(events_number: int) -> AsyncGenerator[str, None]:
    for i in range(events_number):
        yield f"Message {i} from events stream"
        await asyncio.sleep(1)


async def main(client_endpoint: str) -> None:
    server_task = asyncio.create_task(server())
    client_task = asyncio.create_task(client(client_endpoint))

    await asyncio.gather(server_task, client_task)


if __name__ == "__main__":
    parser = argparse.ArgumentParser(description='Show PING-PONG issue in aiohttp')
    parser.add_argument('--client-endpoint', default="normal", help="Define which endpoint client should use")
    parser.add_argument('--logging-level', default="INFO", help="Logging level as in standard Python logging library")
    args = parser.parse_args()

    endpoint = args.client_endpoint
    logging_level = logging._nameToLevel[args.logging_level]

    logging.basicConfig(format='%(asctime)s:%(levelname)s:%(message)s', level=logging_level)
    asyncio.run(main(endpoint))

Expected behavior

We are using receive-acknowledge protocol:

  1. Client is establishing Websocket connection with the server by doing simple request
  2. Server is preparing event stream for the client
  3. Server is sending one message at a time and then is waiting for ACK message
  4. Client is receiving the main message and sending ok string as ACK response
  5. Server, after receiving ACK knows that the next message can be sent to client

PING-PONG

  1. At any given time client can send PING message to server
  2. If the connection is established server should respond with PONG right away

Logs/tracebacks

By running `python ping_pong.py --client-endpoint=normal --logging-level=INFO` you can see that with `autoping=True`
the PONG messages are correlated with the speed of server-stream.


python ping_pong.py --client-endpoint=normal --logging-level=INFO
2022-09-06 10:56:58,417:INFO:Connection attempt...
2022-09-06 10:56:58,423:INFO:Successfully started server localhost:8080
2022-09-06 10:56:58,425:INFO:[SERVER] Sending message = 'Message 0 from events stream'
2022-09-06 10:56:58,425:INFO:Connected with autoping to set to False
2022-09-06 10:56:58,426:INFO:Client was waiting for PONG for - 0.0004050731658935547!
2022-09-06 10:56:58,426:INFO:[CLIENT] Received message = 'Message 0 from events stream'
2022-09-06 10:56:59,427:INFO:[SERVER] Sending message = 'Message 1 from events stream'
2022-09-06 10:56:59,429:INFO:Client was waiting for PONG for - 1.0026228427886963!
2022-09-06 10:56:59,429:INFO:[CLIENT] Received message = 'Message 1 from events stream'
2022-09-06 10:57:01,432:INFO:[SERVER] Sending message = 'Message 2 from events stream'
2022-09-06 10:57:01,434:INFO:Client was waiting for PONG for - 2.004383087158203!
2022-09-06 10:57:01,434:INFO:[CLIENT] Received message = 'Message 2 from events stream'
2022-09-06 10:57:04,437:INFO:[SERVER] Sending message = 'Message 3 from events stream'
2022-09-06 10:57:04,438:INFO:Client was waiting for PONG for - 3.004377841949463!
2022-09-06 10:57:04,438:INFO:[CLIENT] Received message = 'Message 3 from events stream'
2022-09-06 10:57:08,440:INFO:[SERVER] Sending message = 'Message 4 from events stream'
2022-09-06 10:57:08,441:INFO:Client was waiting for PONG for - 4.002522945404053!
2022-09-06 10:57:08,441:INFO:[CLIENT] Received message = 'Message 4 from events stream'


By running separate coroutine that responds right-away I am able to overcome this issue but it does feel like a hack:
`python ping_pong.py --client-endpoint=custom --logging-level=INFO`

```text
python ping_pong.py --client-endpoint=custom --logging-level=INFO
2022-09-06 10:57:35,876:INFO:Connection attempt...
2022-09-06 10:57:35,882:INFO:Successfully started server localhost:8080
2022-09-06 10:57:35,883:INFO:[SERVER] Sending message = 'Message 0 from events stream'
2022-09-06 10:57:35,883:INFO:Connected with autoping to set to False
2022-09-06 10:57:35,884:INFO:Client was waiting for PONG for - 0.0005218982696533203!
2022-09-06 10:57:35,884:INFO:[CLIENT] Received message = 'Message 0 from events stream'
2022-09-06 10:57:35,885:INFO:Client was waiting for PONG for - 0.0004208087921142578!
2022-09-06 10:57:36,886:INFO:[SERVER] Sending message = 'Message 1 from events stream'
2022-09-06 10:57:36,887:INFO:[CLIENT] Received message = 'Message 1 from events stream'
2022-09-06 10:57:36,888:INFO:Client was waiting for PONG for - 0.0006530284881591797!
2022-09-06 10:57:38,890:INFO:[SERVER] Sending message = 'Message 2 from events stream'
2022-09-06 10:57:38,891:INFO:[CLIENT] Received message = 'Message 2 from events stream'
2022-09-06 10:57:38,892:INFO:Client was waiting for PONG for - 0.00048804283142089844!
2022-09-06 10:57:41,894:INFO:[SERVER] Sending message = 'Message 3 from events stream'
2022-09-06 10:57:41,895:INFO:[CLIENT] Received message = 'Message 3 from events stream'
2022-09-06 10:57:41,896:INFO:Client was waiting for PONG for - 0.0007541179656982422!
2022-09-06 10:57:45,897:INFO:[SERVER] Sending message = 'Message 4 from events stream'
2022-09-06 10:57:45,898:INFO:[CLIENT] Received message = 'Message 4 from events stream'
2022-09-06 10:57:45,899:INFO:Client was waiting for PONG for - 0.0006568431854248047!


### Python Version

```console
$ python --version
Python 3.9.12

aiohttp Version

$ python -m pip show aiohttp
Name: aiohttp
Version: 3.8.1
Summary: Async http client/server framework (asyncio)
Home-page: https://github.com/aio-libs/aiohttp
Author: 
Author-email: 
License: Apache 2
Location: /Users/jgorazda/anaconda3/envs/cv-test-orchestrator/lib/python3.9/site-packages
Requires: attrs, yarl, multidict, aiosignal, charset-normalizer, async-timeout, frozenlist
Required-by:

multidict Version

$ python -m pip show multidict
Name: multidict
Version: 6.0.2
Summary: multidict implementation
Home-page: https://github.com/aio-libs/multidict
Author: Andrew Svetlov
Author-email: andrew.svetlov@gmail.com
License: Apache 2
Location: /Users/jgorazda/anaconda3/envs/cv-test-orchestrator/lib/python3.9/site-packages
Requires: 
Required-by: yarl, aiohttp

yarl Version

$ python -m pip show yarl
Name: yarl
Version: 1.7.2
Summary: Yet another URL library
Home-page: https://github.com/aio-libs/yarl/
Author: Andrew Svetlov
Author-email: andrew.svetlov@gmail.com
License: Apache 2
Location: /Users/jgorazda/anaconda3/envs/cv-test-orchestrator/lib/python3.9/site-packages
Requires: multidict, idna
Required-by: aiohttp

OS

macOS

Related component

Server, Client

Additional context

No response

Code of Conduct

  • I agree to follow the aio-libs Code of Conduct
@jangor1912 jangor1912 added the bug label Sep 7, 2022
@bizzyvinci
Copy link

I'll like to take this up. Does @Dreamsorcerer have any recommendation?

@Dreamsorcerer
Copy link
Member

I've not looked into it, but that's quite a lengthy reproducer. So, first thing would be to try and convert that into the simplest test you can and add it to our test suite. If the fix isn't obvious, we can merge the test with xfail first and come back to the solution later.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

No branches or pull requests

3 participants