Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added support for subscription #1107

Merged
merged 5 commits into from Mar 14, 2020
Merged

Added support for subscription #1107

merged 5 commits into from Mar 14, 2020

Conversation

rob-blackbourn
Copy link
Contributor

This is my first ever pull request so please be kind :)

This adds subscription functionality to v3 by introducing a subscribe method to the schema.

import asyncio
from datetime import datetime
from graphene import ObjectType, String, Schema, Field

# All schema must contain a query.
class Query(ObjectType):
    hello = String()

    def resolve_hello(root, info):
        return 'Hello, world!'

class Subscription(ObjectType):
    time_of_day = Field(String)

    async def subscribe_time_of_day(root, info):
        while True:
            yield { 'time_of_day': datetime.now().isoformat()}
            await asyncio.sleep(1)

SCHEMA = Schema(query=Query, subscription=Subscription)

async def main(schema):

    subscription = 'subscription { timeOfDay }'
    result = await schema.subscribe(subscription)
    async for item in result:
        print(item.data['timeOfDay'])

asyncio.run(main(SCHEMA))

@kazamatzuri
Copy link

looks like black is failing some lines. Do you have pre-commit hooks setup?

@rob-blackbourn
Copy link
Contributor Author

Hi @kazamatzuri

I don't have pre-commit hooks setup. How do I do that?

@rob-blackbourn
Copy link
Contributor Author

Found it.

Formatting now ...

@rob-blackbourn
Copy link
Contributor Author

Checks now pass for black and flake8.

@NathHorrigan
Copy link

@kazamatzuri Can this be merged?

@rob-blackbourn
Copy link
Contributor Author

Please let me know if there are any changes you need to get this merged.

I'd really like to start using graphene, but my main use cases require subscriptions.

@jkimbo
Copy link
Member

jkimbo commented Feb 8, 2020

@rob-blackbourn this looks really good and sorry for not getting to this earlier. I'll need to try it locally to get a good idea if it works and I'll try and get to that this weekend.

@rob-blackbourn
Copy link
Contributor Author

@jkimbo Superb! Let me know about any changes you need.

Copy link
Member

@jkimbo jkimbo left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry for the delay in reviewing this @rob-blackbourn but this looks great! Thanks so much.

@rob-blackbourn
Copy link
Contributor Author

I've integrated the latest changes from the master branch.

@jkimbo jkimbo merged commit 1cf303a into graphql-python:master Mar 14, 2020
@rob-blackbourn rob-blackbourn deleted the feature/add_subscription branch March 14, 2020 17:07
@rob-blackbourn
Copy link
Contributor Author

Superb

@dlai0001
Copy link

dlai0001 commented Apr 1, 2020

I have a question, how do we handle unsubscribe?

@rob-blackbourn
Copy link
Contributor Author

Hi @dlai0001

Unsubscribe happens when a client stops listening. The client is the consumer of an async generator (async for ...). It might look like this (taken from the docs):

async def main(schema):
    subscription = 'subscription { timeOfDay }'
    result = await schema.subscribe(subscription)
    async for item in result:
        print(item.data['timeOfDay'])

The subscription might exit the loop gracefully. In the main_graceful_exit function in the example below it exits on a counter, but it would more typically be the result of some event being set.

The alternative would be an ungraceful exit. Typically this is a WebSocket client in a browser exiting a page without properly closing the connection. We simulate this in the main_ungraceful_exit function below by raising an exception.

I have published a forked packaged of graphene that you can try this with:

pip install jetblack-graphene

The following is a worked example. You will find that a graceful exit raised an asyncio.CancelledError exception, while an ungraceful exit propagates the exception that was raised.

import asyncio
from datetime import datetime
from graphene import ObjectType, String, Schema, Field

# All schema require a query.


class Query(ObjectType):
    hello = String()

    def resolve_hello(root, info):
        return 'Hello, world!'


class Subscription(ObjectType):
    time_of_day = Field(String)

    async def subscribe_time_of_day(root, info):
        try:
            while True:
                yield {'time_of_day': datetime.now().isoformat()}
                await asyncio.sleep(1)
        except Exception as error:
            print(error)


SCHEMA = Schema(query=Query, subscription=Subscription)


async def main_graceful_exit(schema):
    subscription = 'subscription { timeOfDay }'
    result = await schema.subscribe(subscription)
    async for item in result:
        print(item.data['timeOfDay'])


async def main_graceful_exit(schema):
    count = 0
    subscription = 'subscription { timeOfDay }'
    result = await schema.subscribe(subscription)
    async for item in result:
        print(item.data['timeOfDay'])
        count += 1
        if count > 10:
            return


async def main_ungraceful_exit(schema):
    count = 0
    subscription = 'subscription { timeOfDay }'
    result = await schema.subscribe(subscription)
    async for item in result:
        print(item.data['timeOfDay'])
        count += 1
        if count > 10:
            raise RuntimeError("User exception")

asyncio.run(main_graceful_exit(SCHEMA))
# asyncio.run(main_ungraceful_exit(SCHEMA))

Is this the answer you needed?

Rob

@dlai0001
Copy link

dlai0001 commented Apr 1, 2020

@rob-blackbourn - Thank you for your prompt response.

I think I'm new to async programming in python. I found this post while looking into supporting subscribe on our graphene schema.

Say I have some object I want to subscribe to, and push the latest value to the connected user. I want to be able to unsubscribe from that object when that user disconnects.

Would this be what I should be doing?

class Subscription(ObjectType):
    time_of_day = Field(String)

    async def subscribe_time_of_day(root, info):
        try:
            # say I was subscribing to an RxPy observable here
            time_observable = getTimeObservable()
            while True:
                # and I want to keep yielding the next value.
                yield await time_observable.next()
        except Exception as error:
            print(error)
            #  <<<< would this be where I put my call to unsubscribe.
            time_observable.unsubscribe()

@rob-blackbourn
Copy link
Contributor Author

Hi @dlai0001

I had a feeling there was something more to your question!

A quick bit of internet stalking leads me to believe your main focus is from the JavaScript/browser side :)

The short answer is: you don't need to do anything!

When the browser stops subscribing (either through the browser API or by navigating away from the page) an exception is raised on the python side. For a graceful termination (e.g. an explicit unsubscribe) the exception will be asyncio.CancelledError indicating the task was cancelled. For any other error (if the page just navigated away without explicitly terminating the subscription) some other exception will be raised. Either way the subscription will terminate, All is good!

I haven't used it, but I believe the strawberry package has support for subscriptions, and might be a good base for development. This would be my recommendation for good support.

If you like to live on the bleeding edge you can check out my package - bareASG-graphql-next. This is stuff I'm using at work that they let me publish. Rather than using WebSockets I'm using a streaming fetch for subscriptions (see client and server).

@rob-blackbourn
Copy link
Contributor Author

Hi @dlai0001

I think I've figured out your use case. There has to be a consumer and a producer. You're concerned with how the producer terminates.

In the trivial time example the consume triggers the producer to start sending the time. When the subscription terminates the producer loop ends "naturally".

In your example it looks like the subscription creates some kind of service which needs to be cleaned up after the subscription has ended. You are correct in handling the cleanup in the exception handler. However I think a better approach might be to use the finally clause which gets called regardless of the exception raised.

@dlai0001
Copy link

dlai0001 commented Apr 2, 2020

@rob-blackbourn Thanks for the explaination. That really helps. It turns out the project I"m working on is graphene v2. Looks like I'll have to explore options.

@jkimbo
Copy link
Member

jkimbo commented Apr 6, 2020

FYI just released v3.0.0b1 that contains these changes.

@FredeJ
Copy link

FredeJ commented May 19, 2020

Maybe I'm misunderstanding something but I'm unable to get this to work.

My implementation is as follows:

class Subscriptions(graphene.ObjectType):
    time_of_day = graphene.Field(graphene.String)

    async def subscribe_time_of_day(root, info):
        print("Subscribed")
        while True:
            print("iterating")
            yield {'times_of_day': datetime.now().isoformat() }
            await asyncio.sleep(1)

Then calling (through GraphiQL):

subscription {
  timeOfDay
}

results in:

{
  "data": {
    "timeOfDay": null
  }
}

What am I doing wrong?

@rob-blackbourn
Copy link
Contributor Author

Hi @FredeJ

I think:

yield {'times_of_day': datetime.now().isoformat() }

should be:

yield {'time_of_day': datetime.now().isoformat() }

I've posted an example here

@FredeJ
Copy link

FredeJ commented May 20, 2020

I feel stupid for that part, but the change unfortunately did not fix my issue.

Nothing is printed when I call it, so it seems the function is never entered.

@FredeJ
Copy link

FredeJ commented May 20, 2020

It seems that I can recreate your example in a test - so the subscription part of the schema works. That leaves the routing from the /graphql/ endpoint to the schema and back, I guess?

@FredeJ
Copy link

FredeJ commented May 20, 2020

I'm using django so bear with me here, but the code should be understandable:

from django.test import TestCase
from graphene.test import Client
from .schema import schema
import asyncio

class SubscriptionTest(TestCase):
    def setUp(self):
        self.client = Client(schema)

    def test_request_time_of_day(self):
        #This doesn't work
        result = self.client.execute('''
            subscription{
                timeOfDay
            }
        ''')
        self.assertNotEqual(result['data']['timeOfDay'],None)

    def test_time_of_day(self):
        #this works
        async def subscribe(schema):
            sub = 'subscription { timeOfDay }'
            result = await schema.subscribe(sub)
            async for item in result:
                print(item.data['timeOfDay'])
        
        asyncio.run(subscribe(schema))

Now, I realize that the reason that the test_request_time_of_day function fails is likely because it's calling execute and not subscribe. However subscribe is not available on the test client and (correct me if I'm wrong) is not how the graphql endpoint handles things?

So that means my problem is really in graphene-django and not in graphene? And the issue is that it's calling execute everytime, rather than subscribe when it is relevant? :)

@rob-blackbourn
Copy link
Contributor Author

@FredeJ,

Sadly I know nothing about Django or the test client :(

There are a couple of things which might be useful though.

  1. For graphene to find the subscription resolver the method must start with subscribe_ (just like query resolvers must start with resolve_).
  2. In the base graphql-core library, subscriptions and queries/mutations get called by different methods, so if your method gets interpreted as a query I don't think it will work.

We could really use an expert who knows Django and subscriptions. Maybe that will end up being you :P

@rob-blackbourn
Copy link
Contributor Author

@FredeJ,

Aha! If you look at the code for the test client, all it's doing is schema.execute.

So the way you call it that works (schema.subscribe) does exactly what the client would do if the subscribe method had been implemented.

So I think you have it exactly right!

When I have time I'll update the client and make a pull request. In the mean time you can just copy the client class and add the subscription call.

@jkimbo
Copy link
Member

jkimbo commented Jul 12, 2020

@rob-blackbourn so as part of the graphene v3 release notes I’ve been digging into this api for subscriptions more and I’m not sure this is the best api for defining them in graphene. I would really appreciate your feedback on the issues I've found with this api:

Firstly I find it odd that you have to return an object from a subscription resolver like this:

    async def subscribe_time_of_day(root, info):
        while True:
            yield { 'time_of_day': datetime.now().isoformat()}
            await asyncio.sleep(1)

I would expect you to only have to return the value since that fits better with the normal query resolvers.

Secondly, after reading how the subscribe function is implemented in graphql-core, I noticed that if you have 2 fields on the Subscription object and you try and select both of them only the first subscribe_ resolver will be called. See: https://github.com/graphql-python/graphql-core/blob/3267c4c5c5ea06968beb59f16425de4a4b835ee1/src/graphql/subscription/subscribe.py#L147-L151

Both of these points leads me to believe that the subscription api should be more like the mutation api where you define a subscription class with a single subscribe function rather than trying to expand on the query resolver pattern. Something like this:

class TimeOfDay(Subscription):
    Output = DateTime(required=True)

    async def subscribe(root, info):
        while True:
            yield datetime.now().isoformat()
            await asyncio.sleep(1)

class Subscription(ObjectType):
    time_of_day = TimeOfDay.Field()

schema = Schema(query=Query, subscription=Subscription)

There should also be some validation to make sure that when starting a subscription you can only select 1 field at a time.

What do you think?

@jkimbo
Copy link
Member

jkimbo commented Jul 12, 2020

An alternative api that I've been thinking about is to use decorators for subscription functions like this:

@subscription(DateTime)
async def time_of_day(root, info):
	while True:
    	yield datetime.now().isoformat()
        await asyncio.sleep(1)

schema = Schema(
	query=Query,
	subscriptions=[time_of_day] # pass all your subscriptions here and Graphene will automatically create the Subscription ObjectType for you 
)

I prefer this api because I think it's simpler and re-enforces that each subscription field is a standalone function and that subscriptions are different to queries. This decorator api can also apply to mutations quite nicely.

@rob-blackbourn
Copy link
Contributor Author

Hi @jkimbo

First I should say that I'm not precious about my implementation. I was more concerned with getting something working. A more consistent API would be fantastic.

Regarding the first issue of having to return an object: this really sucks, and is very confusing for someone getting started. I played around a bit with trying to remove this, but it seemed to be a feature of graphql-core (so presumably graphql-js).

I thought one solution might be to provide a wrapper for the output. I baulked at doing this, as:

  1. It looked like it would require an async consumer and producer with felt like it could be tricky.
  2. It would be inconsistent with the operation of graphql-core, but I'm not sure how much we care about that?
  3. It introduces another layer which will have some performance impact.

I'm happy to take a look. Do you have any other ideas or information that might help?

@rob-blackbourn
Copy link
Contributor Author

rob-blackbourn commented Jul 12, 2020

Hi @jkimbo

I don't understand the second point about two fields.

Do you mean this can't work?

class Subscription(ObjectType):
    time_of_day1 = Field(String)
    time_of_day2 = Field(String)

    async def subscribe_time_of_day1(root, info):
        try:
            count = 0
            while count < 10:
                yield {'time_of_day': datetime.now().isoformat()}
                await asyncio.sleep(1)
                count += 1
        except Exception as error:
            print(error)

    async def subscribe_time_of_day2(root, info):
        try:
            count = 0
            while count < 10:
                yield {'time_of_day': datetime.now().isoformat()}
                await asyncio.sleep(1)
                count += 1
        except Exception as error:
            print(error)

@jkimbo
Copy link
Member

jkimbo commented Jul 12, 2020

@rob-blackbourn

I don't understand the second point about two fields.
Do you mean this can't work?

In that example if you create a subscription query like this:

subscription {
	timeOfDay1
	timeOfDay2
}

you'll see that only the first subscribe_time_of_day1 function gets called and so that timeOfDay2 is always None. This seems to be fundamental to the graphql-core and graphql-js implementation. It should be possible to validate that you only ever have 1 field in a subscription query to avoid this problem.

@jkimbo
Copy link
Member

jkimbo commented Jul 12, 2020

Regarding the first issue of having to return an object: this really sucks, and is very confusing for someone getting started. I played around a bit with trying to remove this, but it seemed to be a feature of graphql-core (so presumably graphql-js).

I thought one solution might be to provide a wrapper for the output. I baulked at doing this, as:

  1. It looked like it would require an async consumer and producer with felt like it could be tricky.

I think the wrapper is the way to go. I've taken a stab at getting it to work here: https://github.com/graphql-python/graphene/compare/subscription-api-update?expand=1
I would need to add more test cases to be confident that it works properly but it seems to work for now. (it also adds the field validation that I mentioned above)

  1. It would be inconsistent with the operation of graphql-core, but I'm not sure how much we care about that?

I don't think this matters. Graphene is an opinionated way of creating a GraphQL server so we should create an API that works makes sense regardless of how it compares to GraphQL-core

  1. It introduces another layer which will have some performance impact.

I'm not sure about the performance impact this has and I'm not sure how to test it. Any ideas?

@rob-blackbourn
Copy link
Contributor Author

@jkimbo

Gotcha.

I had a quick look at the graphql-js implementation and it's functionally the same.

Sounds like a class containing a single subscription is the way to go.

@jkimbo
Copy link
Member

jkimbo commented Jul 12, 2020

@rob-blackbourn what do you think about the API I proposed here: #1107 (comment) ?

@rob-blackbourn
Copy link
Contributor Author

@jkimbo Very cool. All of this is about making features directly accessible, and that looks as good as it gets.

@jkimbo
Copy link
Member

jkimbo commented Jul 12, 2020

@rob-blackbourn thanks! I'm going to try and get some wider feedback on the API because it's quite different to what we have now (and I think it goes hand in hand with a new API for mutations).

Also just wanted to say thanks for creating this PR in the first place. Really appreciate the contribution and I'm excited to release this feature!

@rob-blackbourn
Copy link
Contributor Author

@jkimbo Nice: https://github.com/graphql-python/graphene/compare/subscription-api-update?expand=1

So it turned out to be super simple :)

I should have tried harder :(

@jkimbo
Copy link
Member

jkimbo commented Jul 12, 2020

Ok I've created new proposal here @rob-blackbourn : #1226

We should probably hold off on implementing the resolver wrapping until there is consensus on what the API should look like for subscriptions and mutations but in the meantime we should add the validation step in the subscribe method. I've created an issue to track it here: #1227

@jkimbo
Copy link
Member

jkimbo commented Jul 14, 2020

@rob-blackbourn btw since we haven't settled on a stable API for subscriptions yet and I'd like to release Graphene v3 soon, I've created this PR to rename the subscribe method to make it clear that it's experimental: #1229

Hope that's ok with you.

@rob-blackbourn
Copy link
Contributor Author

@jkimbo All good. That way we don't need to make an api change within a major version when we settle on a solution.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

6 participants