Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

receive_bytes returns weird bytes; max_message_size_bytes loses messages #40

Closed
MarshalX opened this issue Jul 18, 2023 · 3 comments
Closed
Assignees
Labels
bug Something isn't working polar

Comments

@MarshalX
Copy link

MarshalX commented Jul 18, 2023

Describe the bug

Received bytes are truncated

To Reproduce

  1. Install required deps:
pip install dag-cbor aiohttp httpx-ws starlette
  1. Create .py file and copy-paste the code below
import asyncio
import io

import aiohttp
import dag_cbor

from httpx_ws import aconnect_ws

URI = 'ws://127.0.0.1:8000/ws'  # for aithttp
URL = 'http://127.0.0.1:8000/ws'  # for httpx


def _decode_bytes(msg: io.BytesIO) -> None:
    header = dag_cbor.decode(msg, allow_concat=True)
    if header.get('t') != '#commit':
        return
    dag_cbor.decode(msg)
    print('Alive')


async def _aiohttp_client() -> None:
    async with aiohttp.ClientSession() as client, client.ws_connect(URI) as ws:
        while True:
            msg = io.BytesIO(await ws.receive_bytes())
            _decode_bytes(msg)


async def _httpx_client() -> None:
    max_bytes = 1024 * 1024 * 5  # 5MB
    async with aconnect_ws(URL, max_message_size_bytes=max_bytes) as ws:
        while True:
            msg = io.BytesIO(await ws.receive_bytes())
            _decode_bytes(msg)


async def main() -> None:
    # UNCOMMENT ME
    # await _aiohttp_client()
    await _httpx_client()


if __name__ == '__main__':
    asyncio.run(main())
  1. Create .py file and copy-paste the code below
from starlette.applications import Starlette
from starlette.routing import WebSocketRoute


PAYLOADS = []
for i in range(100):
    with open(f'payloads/{i}.txt', 'rb') as f:
        PAYLOADS.append(f.read())


async def ws_bug(websocket):
    await websocket.accept()
    for p in PAYLOADS:
        await websocket.send_bytes(p)
    await websocket.close()


app = Starlette(
    routes=[
        WebSocketRoute('/ws', ws_bug),
    ],
)
  1. Download payloads.zip; unzip
  2. Run the server
  3. Run the client. See the exceptions
  4. Uncomment await _aiohttp_client() line in the main() and run again. See that there are no exceptions

Expected behavior

The decoding should not raise any exceptions (Unexpected EOF, invalid lenght and so on related to len). The log must be only with "Alive" messages

Configuration

  • Python version: 3.8.16
  • httpx-ws version: 0.4.1

Additional context

with aiohttp websocket client it decodes fine; i tried to increase max_message_size_bytes

Funding

  • You can sponsor this specific effort via a Polar.sh pledge below
  • We receive the pledge once the issue is completed & verified
Fund with Polar
@MarshalX MarshalX added the bug Something isn't working label Jul 18, 2023
@MarshalX
Copy link
Author

MarshalX commented Jul 18, 2023

I simplified the client to not decode payloads at all. The count of received messages is not always 100. aiohttp client always receives 100

what I noticed. if we pass not the default max_message_size_bytes it will lose messages more often

# pip install aiohttp httpx-ws starlette
import asyncio

import aiohttp
from httpx_ws import WebSocketDisconnect, aconnect_ws

URI = 'ws://127.0.0.1:8000/ws'  # for aithttp
URL = 'http://127.0.0.1:8000/ws'  # for httpx

AIOHTTP_MSG_LENS = []
HTTPX_MSG_LENS = []


async def _aiohttp_client() -> None:
    async with aiohttp.ClientSession() as client, client.ws_connect(URI) as ws:
        for _ in range(100):
            AIOHTTP_MSG_LENS.append(len(await ws.receive_bytes()))


async def _httpx_client() -> None:
    max_bytes = 1024 * 1024 * 5  # 5MB
    async with aconnect_ws(URL, max_message_size_bytes=max_bytes) as ws:
        for _ in range(100):
            try:
                HTTPX_MSG_LENS.append(len(await ws.receive_bytes()))
            except WebSocketDisconnect:
                return


async def main() -> None:
    await _aiohttp_client()
    await _httpx_client()

    print(len(AIOHTTP_MSG_LENS))
    print(len(HTTPX_MSG_LENS))
    assert len(AIOHTTP_MSG_LENS) == len(HTTPX_MSG_LENS)
    print(sorted(AIOHTTP_MSG_LENS))
    print(sorted(HTTPX_MSG_LENS))
    assert sorted(AIOHTTP_MSG_LENS) == sorted(HTTPX_MSG_LENS)


if __name__ == '__main__':
    asyncio.run(main())

output example:

100
100
[100, 915, 1081, 1161, 1342, 1554, 2103, 2209, 2351, 2381, 2549, 2568, 2590, 2605, 2609, 2651, 2670, 2712, 2731, 2742, 2768, 2821, 2855, 2926, 2944, 2967, 3023, 3026, 3042, 3058, 3091, 3132, 3145, 3159, 3164, 3269, 3307, 3309, 3315, 3364, 3378, 3380, 3409, 3477, 3501, 3509, 3531, 3553, 3598, 3605, 3614, 3651, 3706, 3709, 3742, 3772, 3863, 3901, 3936, 3952, 3992, 3998, 4012, 4023, 4043, 4100, 4176, 4189, 4200, 4233, 4367, 4376, 4417, 4527, 4667, 4671, 4800, 4803, 4903, 4942, 4967, 5049, 5050, 5159, 5215, 5252, 5344, 5348, 5483, 5529, 5650, 5735, 5883, 5936, 5964, 6163, 6772, 6856, 7203, 7660]
[456, 915, 921, 1081, 1161, 1271, 1342, 1550, 1608, 1707, 2103, 2137, 2185, 2209, 2351, 2381, 2488, 2549, 2590, 2605, 2609, 2651, 2670, 2712, 2731, 2742, 2768, 2855, 2864, 2926, 2967, 3023, 3026, 3042, 3091, 3132, 3145, 3159, 3164, 3269, 3307, 3309, 3364, 3378, 3380, 3409, 3477, 3501, 3509, 3531, 3553, 3598, 3605, 3614, 3651, 3706, 3709, 3742, 3772, 3863, 3901, 3936, 3952, 3992, 3998, 4012, 4023, 4043, 4100, 4176, 4189, 4200, 4233, 4367, 4376, 4417, 4527, 4667, 4671, 4803, 4903, 4942, 5050, 5159, 5215, 5252, 5344, 5348, 5483, 5529, 5650, 5735, 5883, 5936, 5964, 6163, 6772, 6856, 7203, 7660]

@MarshalX MarshalX changed the title Received bytes are truncated receive_bytes returns weird bytes; max_message_size_bytes loses messages Jul 18, 2023
@frankie567
Copy link
Owner

Thank you for the detailed report @MarshalX! So, it looks like we miss a mechanism to buffer and reassemble long messages.

aiohttp does have it, which is obviously the correct way to do.

I'll look into that!

@frankie567 frankie567 self-assigned this Jul 19, 2023
@polar-sh polar-sh bot added the polar label Jul 19, 2023
Copy link
Owner

frankie567 commented Jul 19, 2023

You can pledge behind and help support this effort using Polar.sh

Fund with Polar

T-256 added a commit to T-256/httpx-ws that referenced this issue Aug 30, 2023
frankie567 added a commit that referenced this issue Sep 27, 2023
Bug fixes
---------

* Fix anyio `start_blocking_portal` import. Thanks @maparent 🎉
* Fix #40: handle large message buffering
* Fix #34: handle subprotocols corrrectly in `ASGIWebSocketTransport`
T-256 pushed a commit to T-256/httpx-ws that referenced this issue Jun 5, 2024
T-256 pushed a commit to T-256/httpx-ws that referenced this issue Jun 5, 2024
Bug fixes
---------

* Fix anyio `start_blocking_portal` import. Thanks @maparent 🎉
* Fix frankie567#40: handle large message buffering
* Fix frankie567#34: handle subprotocols corrrectly in `ASGIWebSocketTransport`
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something isn't working polar
Projects
None yet
Development

No branches or pull requests

2 participants