Pure Python, asynchronous, event-loop-agnostic implementation of the subscriptions-transport-ws protocol.
The GraphQLWSProtocol
class implements the subscriptions-transport-ws
server-side protocol, but leaves performing I/O and executing GraphQL subscription queries to you, the implementer.
For the sake of example, here is a no-dependency script which simulates a client/server flow:
import asyncio
import itertools
import typing
from subscriptions_transport_ws import GraphQLWSProtocol
class WebSocket:
# Simple queue-based WebSocket implementation.
# In practice, you would be given this object by your WebSocket library or
# web framework.
def __init__(self):
self._queue = asyncio.Queue()
async def receive(self) -> dict:
return await self._queue.get()
async def _put(self, value: typing.Any):
self._queue.put_nowait(value)
await asyncio.sleep(0)
async def send(self, message: dict):
await self._put(message)
async def close(self, close_code: int):
await self._put({"close_code": close_code})
async def start_counter(**kwargs):
# Example subscription generator.
# In practice, you would use an asynchronous GraphQL engine here.
for counter in itertools.count(0):
yield {"counter": counter}
await asyncio.sleep(1)
async def main():
ws = WebSocket()
# Instanciate the protocol.
proto = GraphQLWSProtocol(
send=ws.send,
close=ws.close,
subscribe=start_counter,
raised_when_closed=[asyncio.CancelledError],
)
# Simulate a client requesting the GraphQL WebSocket endpoint.
await proto({"type": "connection_init"})
print(await ws.receive()) # {"type": "connection_ack"}
task = asyncio.create_task(proto({"type": "start"}))
print(await ws.receive()) # {"type": "data", "payload": {"counter": 0}}
print(await ws.receive()) # {"type": "data", "payload": {"counter": 1}}
await proto({"type": "stop"})
await proto({"type": "connection_terminate"})
print(await ws.receive()) # {"close_code": 1011}
# Cancel the running subscription.
task.cancel()
if __name__ == "__main__":
asyncio.run(main())
In real-world setups, creating, running and keeping track of protocol handlers would be done server-side, while clients (a browser or another machine) would send protocol-compliant messages over the WebSocket.
See the example directory for a client-server example built using asyncio
and the websockets library.
This package is not yet available on PyPI. For now, you can install the latest version using:
pip install "git+https://github.com/florimondmanca/subscriptions-transport-ws-python.git"
-
await
send
: a callable for sending a protocol JSON message over the WebSocket.- Parameters:
message
(dict
) - Return type:
Coroutine[None]
- Parameters:
-
await
close
: a callable for closing the WebSocket connection.- Parameters:
close_code
(int
) - Return type:
Coroutine[None]
- Parameters:
-
subscribe
: a callable which returns an iterator of events generated by the GraphQL subscription.- Parameters:
query
(str
orNone),
variables(
dict),
operation_name(
stror
None) (as provided in the
payload` by the client). - Return type:
Union[AsyncIterator[dict], AsyncGenerator[dict, None]]
- Parameters:
-
raised_when_closed
: a list or tuple of exception classes which, if they are raised while a subscription is running, should not result in the protocol trying to send messages over the WebSocket (because it has already been closed). This should typically include exceptions associated to coroutine cancellation (depending on your chosen I/O library).
-
await
.__call__()
: start a protocol handler. Note that astart
(GQL_START
) message will start the (potentially infinite)subscribe
iterator. As a result, prefer running this in parallel (e.g., if your chosen event loops has a task system, by wrapping them in tasks) instead ofawait
ing it directly.- Parameters:
message
(dict
): a message received over the WebSocket. - Return type:
Coroutine[None]
- Parameters:
-
await
.stop()
: unregister all pending subscription operations. Should be called when the WebSocket connection has been closed and before performing any cancellation on protocol handlers.
Want to contribute? Great! Be sure to read our Contributing guidelines.
Changes to this project are recorded in the changelog.
MIT