Consume Server-Sent Event (SSE) messages with HTTPX.
Table of contents
NOTE: This is beta software. Please be sure to pin your dependencies.
pip install httpx-sse=="0.4.*"
httpx-sse
provides the connect_sse
and aconnect_sse
helpers for connecting to an SSE endpoint. The resulting EventSource
object exposes the .iter_sse()
and .aiter_sse()
methods to iterate over the server-sent events.
Example usage:
import httpx
from httpx_sse import connect_sse
with httpx.Client() as client:
with connect_sse(client, "GET", "http://localhost:8000/sse") as event_source:
for sse in event_source.iter_sse():
print(sse.event, sse.data, sse.id, sse.retry)
You can try this against this example Starlette server (credit):
# Requirements: pip install uvicorn starlette sse-starlette
import asyncio
import uvicorn
from starlette.applications import Starlette
from starlette.routing import Route
from sse_starlette.sse import EventSourceResponse
async def numbers(minimum, maximum):
for i in range(minimum, maximum + 1):
await asyncio.sleep(0.9)
yield {"data": i}
async def sse(request):
generator = numbers(1, 5)
return EventSourceResponse(generator)
routes = [
Route("/sse", endpoint=sse)
]
app = Starlette(routes=routes)
if __name__ == "__main__":
uvicorn.run(app)
You can call into Python web apps with HTTPX and httpx-sse
to test SSE endpoints directly.
Here's an example of calling into a Starlette ASGI app...
import asyncio
import httpx
from httpx_sse import aconnect_sse
from sse_starlette.sse import EventSourceResponse
from starlette.applications import Starlette
from starlette.routing import Route
async def auth_events(request):
async def events():
yield {
"event": "login",
"data": '{"user_id": "4135"}',
}
return EventSourceResponse(events())
app = Starlette(routes=[Route("/sse/auth/", endpoint=auth_events)])
async def main():
async with httpx.AsyncClient(app=app) as client:
async with aconnect_sse(
client, "GET", "http://localhost:8000/sse/auth/"
) as event_source:
events = [sse async for sse in event_source.aiter_sse()]
(sse,) = events
assert sse.event == "login"
assert sse.json() == {"user_id": "4135"}
asyncio.run(main())
(Advanced)
SSETransport
and AsyncSSETransport
don't have reconnection built-in. This is because how to perform retries is generally dependent on your use case. As a result, if the connection breaks while attempting to read from the server, you will get an httpx.ReadError
from iter_sse()
(or aiter_sse()
).
However, httpx-sse
does allow implementing reconnection by using the Last-Event-ID
and reconnection time (in milliseconds), exposed as sse.id
and sse.retry
respectively.
Here's how you might achieve this using stamina
...
import time
from typing import Iterator
import httpx
from httpx_sse import connect_sse, ServerSentEvent
from stamina import retry
def iter_sse_retrying(client, method, url):
last_event_id = ""
reconnection_delay = 0.0
# `stamina` will apply jitter and exponential backoff on top of
# the `retry` reconnection delay sent by the server.
@retry(on=httpx.ReadError)
def _iter_sse():
nonlocal last_event_id, reconnection_delay
time.sleep(reconnection_delay)
headers = {"Accept": "text/event-stream"}
if last_event_id:
headers["Last-Event-ID"] = last_event_id
with connect_sse(client, method, url, headers=headers) as event_source:
for sse in event_source.iter_sse():
last_event_id = sse.id
if sse.retry is not None:
reconnection_delay = sse.retry / 1000
yield sse
return _iter_sse()
Usage:
with httpx.Client() as client:
for sse in iter_sse_retrying(client, "GET", "http://localhost:8000/sse"):
print(sse.event, sse.data)
def connect_sse(
client: httpx.Client,
method: str,
url: Union[str, httpx.URL],
**kwargs,
) -> ContextManager[EventSource]
Connect to an SSE endpoint and return an EventSource
context manager.
This sets Cache-Control: no-store
on the request, as per the SSE spec, as well as Accept: text/event-stream
.
If the response Content-Type
is not text/event-stream
, this will raise an SSEError
.
async def aconnect_sse(
client: httpx.AsyncClient,
method: str,
url: Union[str, httpx.URL],
**kwargs,
) -> AsyncContextManager[EventSource]
An async equivalent to connect_sse
.
def __init__(response: httpx.Response)
Helper for working with an SSE response.
The underlying httpx.Response
.
You may use this to perform more operations and checks on the response, such as checking for HTTP status errors:
with connect_sse(...) as event_source:
event_source.response.raise_for_status()
for sse in event_source.iter_sse():
...
def iter_sse() -> Iterator[ServerSentEvent]
Decode the response content and yield corresponding ServerSentEvent
.
Example usage:
for sse in event_source.iter_sse():
...
async def iter_sse() -> AsyncIterator[ServerSentEvent]
An async equivalent to iter_sse
.
Represents a server-sent event.
event: str
- Defaults to"message"
.data: str
- Defaults to""
.id: str
- Defaults to""
.retry: str | None
- Defaults toNone
.
Methods:
json() -> Any
- Returnssse.data
decoded as JSON.
An error that occurred while making a request to an SSE endpoint.
Parents:
httpx.TransportError
MIT