- WebSocket-based GraphQL server implemented on the Django Channels.
- Graphene-like subscriptions.
- All GraphQL requests are processed concurrently (in parallel).
- Subscription notifications delivered in the order they were issued.
- Optional subscription activation message can be sent to a client. Sometimes this is necessary to avoid race conditions on the client side. Consider the case when client subscribes to some subscription and immediately invokes a mutations which triggers this subscription. In such case the subscription notification can be lost, cause these subscription and mutation requests are processed concurrently. To avoid this client shall wait for the subscription activation before sending such mutation request.
- Customizable notification strategies:
- A subscription can be put to one or many subscription groups. This allows to granularly notify only selected clients, or, looking from the client's perspective - to subscribe to some selected source of events. For example, imaginary subscription "OnNewMessage" may accept argument "user" so subscription will only trigger on new messages from the selected user.
- Notification can be suppressed in the subscription resolver method
publish. For example, this is useful to avoid sending self-notifications.
- All GraphQL "resolvers" run in a threadpool so they never block the server itself and may communicate with database or perform other blocking tasks.
- Resolvers (including subscription's
subscribe&publish) can be represented both as synchronous or asynchronous (async def) methods. - Subscription notifications can be sent from both synchronous and
asynchronous contexts. Just call
MySubscription.broadcast()orawait MySubscription.broadcast()depending on the context. - Clients for the GraphQL WebSocket server:
- AIOHTTP-based client.
- Client for unit test based on the Django Channels testing communicator.
pip install django-channels-graphql-wsCreate a GraphQL schema using Graphene. Note the MySubscription class.
import channels_graphql_ws
import graphene
class MySubscription(channels_graphql_ws.Subscription):
"""Simple GraphQL subscription."""
# Subscription payload.
event = graphene.String()
class Arguments:
"""That is how subscription arguments are defined."""
arg1 = graphene.String()
arg2 = graphene.String()
@staticmethod
def subscribe(root, info, arg1, arg2):
"""Called when user subscribes."""
# Return the list of subscription group names.
return ['group42']
@staticmethod
def publish(payload, info, arg1, arg2):
"""Called to notify the client."""
# Here `payload` contains the `payload` from the `broadcast()`
# invocation (see below). You can return `MySubscription.SKIP`
# if you wish to suppress the notification to a particular
# client. For example, this allows to avoid notifications for
# the actions made by this particular client.
return MySubscription(event='Something has happened!')
class Query(graphene.ObjectType):
"""Root GraphQL query."""
# Check Graphene docs to see how to define queries.
pass
class Mutation(graphene.ObjectType):
"""Root GraphQL mutation."""
# Check Graphene docs to see how to define mutations.
pass
class Subscription(graphene.ObjectType):
"""Root GraphQL subscription."""
my_subscription = MySubscription.Field()
graphql_schema = graphene.Schema(
query=Query,
mutation=Mutation,
subscription=Subscription,
)Make your own WebSocket consumer subclass and set the schema it serves:
class MyGraphqlWsConsumer(channels_graphql_ws.GraphqlWsConsumer):
"""Channels WebSocket consumer which provides GraphQL API."""
schema = graphql_schema
# Uncomment to send keepalive message every 42 seconds.
# send_keepalive_every = 42
# Uncomment to process requests sequentially (useful for tests).
# strict_ordering = True
async def on_connect(self, payload):
"""New client connection handler."""
# You can `raise` from here to reject the connection.
print("New client connected!")Setup Django Channels routing:
application = channels.routing.ProtocolTypeRouter({
'websocket': channels.routing.URLRouter([
django.urls.path('graphql/', MyGraphqlWsConsumer),
])
})Notify clients when some event happens using the broadcast()
or broadcast_sync() method from the OS thread where
there is no running event loop:
MySubscription.broadcast(
# Subscription group to notify clients in.
group='group42',
# Dict delivered to the `publish` method.
payload={},
)Notify clients in an coroutine function using the broadcast()
or broadcast_async() method:
await MySubscription.broadcast(
# Subscription group to notify clients in.
group='group42',
# Dict delivered to the `publish` method.
payload={},
)You can find simple usage example in the example directory.
Run:
cd example/
./manage.py runserverPlay with the API though the GraphiQL browser at http://127.0.0.1:8000.
You can start with the following GraphQL requests:
query q {
history(chatroom: "kittens") {
chatroom
message
sender
}
}
mutation m {
sendChatMessage(chatroom: "kittens", username: "Luke", message: "Hello all!"){
ok
}
}
subscription s {
onChatMessageSent(chatroom: "kittens", username: "Robot") {
message
chatroom
}
}The channels_graphql_ws module provides the following key classes:
GraphqlWsConsumer: Django Channels WebSocket consumer which maintains WebSocket connection with the client.Subscription: Subclass this to define GraphQL subscription. Very similar to defining mutations with Graphene. (The class itself is a "creative" copy of the GrapheneMutationclass.)GraphqlWsClient: A client for the GraphQL backend. Executes strings with queries and receives subscription notifications.GraphqlWsTransport: WebSocket transport interface for the client.GraphqlWsTransportAiohttp: WebSocket transport implemented on the AIOHTTP library.
For details check the source code which is thoroughly commented. The docstrings of classes are especially useful.
Since the WebSocket handling is based on the Django Channels and subscriptions are implemented in the Graphene-like style it is recommended to have a look the documentation of these great projects:
The implemented WebSocket-based protocol was taken from the library subscription-transport-ws which is used by the Apollo GraphQL. Check the protocol description for details.
The Subscription.broadcast uses Channels groups to deliver a message
to the Subscription's publish method. ASGI specification
clearly states what can be sent over a channel, and Django models are
not in the list. Since it is common to notify clients about Django
models changes we manually serialize the payload using MessagePack
and hack the process to automatically serialize Django models following
the the Django's guide Serializing Django objects.
- Different requests from different WebSocket client are processed asynchronously.
- By default different requests (WebSocket messages) from a single
client are processed concurrently in different worker threads. (It is
possible to change the maximum number of worker threads with the
max_worker_threadssetting.) So there is no guarantee that requests will be processed in the same the client sent these requests. Actually, with HTTP we have this behavior for decades. - It is possible to serialize message processing by setting
strict_orderingtoTrue. But note, this disables parallel requests execution - in other words, the server will not start processing another request from the client before it finishes the current one. See comments in the classGraphqlWsConsumer. - All subscription notifications are delivered in the order they were issued.
Implementing authentication is straightforward. Follow the Channels documentation.
Here is an example. Note the channels.auth.AuthMiddlewareStack class.
application = channels.routing.ProtocolTypeRouter({
'websocket': channels.auth.AuthMiddlewareStack(
channels.routing.URLRouter([
django.urls.path('graphql/', MyGraphqlWsConsumer),
])
),
})This gives you a Django user info.context.user in all the resolvers.
There is the GraphqlWsClient which implements GraphQL client working
over the WebSockets. The client needs a transport instance which
communicates with the server. Transport is an implementation of the
GraphqlWsTransport interface (class must be derived from it). There is
the GraphqlWsTransportAiohttp which implements the transport on the
AIOHTTP library. Here is an
example:
transport = channels_graphql_ws.GraphqlWsTransportAiohttp(
"ws://backend.endpoint/graphql/", cookies={"sessionid": session_id}
)
client = channels_graphql_ws.GraphqlWsClient(transport)
await client.connect_and_init()
result = await client.execute("query { users { id login email name } }")
users = result["data"]
await client.finalize()See the GraphqlWsClient class docstring for the details.
To test GraphQL WebSocket API read the appropriate page in the Channels documentation.
In order to simplify unit testing there is a GraphqlWsTransport
implementation based on the Django Channels testing communicator:
channels_graphql_ws.testing.GraphqlWsTransportChannels. Check its
docstring and take a look at the tests to see how to use it.
The original Apollo's protocol does not allow client to know when a
subscription activates. This inevitably leads to the race conditions on
the client side. Sometimes it is not that crucial, but there are cases
when this leads to serious issues. Here is the discussion
in the subscriptions-transport-ws
tracker.
To solve this problem, there is the GraphqlWsConsumer setting
confirm_subscriptions which when set to True will make the consumer
issue an additional data message which confirms the subscription
activation. Please note, you have to modify the client's code to make it
consume this message, otherwise it will be mistakenly considered as the
first subscription notification.
To customize the confirmation message itself set the GraphqlWsConsumer
setting subscription_confirmation_message. It must be a dictionary
with two keys "data" and "errors". By default it is set to
{"data": None, "errors": None}.
There is a Tomáš Ehrlich GitHubGist GraphQL Subscription with django-channels which this implementation was initially based on.
There is a promising GraphQL WS library by the Graphene authors. In particular this pull request gives a hope that there will be native Graphene implementation of the WebSocket transport with subscriptions one day.
Just a reminder of how to setup an environment for the development:
> python3 -m venv .venv
> direnv allow
> pip install poetry
> poetry install
> pre-commit install
> pytestUse:
This project is developed and maintained by DATADVANCE LLC. Please submit an issue if you have any questions or want to suggest an improvement.
This work is supported by the Russian Foundation for Basic Research (project No. 15-29-07043).