From 430376fe32e919440f434903a379d2745a90aa33 Mon Sep 17 00:00:00 2001 From: Jatin Dev <64803093+JatinDevDG@users.noreply.github.com> Date: Mon, 23 Nov 2020 19:17:56 +0530 Subject: [PATCH] fix(GraphQL): This PR fixes issue of multiple responses in a subscription for an update. (#6868) (#6935) Fixes GRAPHQL-764 If we start and Terminate multiple subscriptions and at the end start a new subscription with a 1-second duration between each terminate and start then for any update we were getting multiple responses for the last running subscription whereas we should be getting the only one response. Co-authored-by: Pawan Rawal (cherry picked from commit 734bbe151ea94b6c2d3ab4c51f969c26288eaec6) --- graphql/e2e/subscription/subscription_test.go | 122 ++++++++++++++++++ graphql/subscription/poller.go | 13 +- 2 files changed, 129 insertions(+), 6 deletions(-) diff --git a/graphql/e2e/subscription/subscription_test.go b/graphql/e2e/subscription/subscription_test.go index 724b0505bcd..77fa144194f 100644 --- a/graphql/e2e/subscription/subscription_test.go +++ b/graphql/e2e/subscription/subscription_test.go @@ -950,3 +950,125 @@ func TestSubscriptionAuthHeaderCaseInsensitive(t *testing.T) { require.JSONEq(t, `{"queryTodo":[{"owner":"jatin","text":"GraphQL is exciting!!"}]}`, string(resp.Data)) } + +func TestSubscriptionAuth_MultiSubscriptionResponses(t *testing.T) { + dg, err := testutil.DgraphClient(groupOnegRPC) + require.NoError(t, err) + testutil.DropAll(t, dg) + + // Upload schema + add := &common.GraphQLParams{ + Query: `mutation updateGQLSchema($sch: String!) { + updateGQLSchema(input: { set: { schema: $sch }}) { + gqlSchema { + schema + } + } + }`, + Variables: map[string]interface{}{"sch": schAuth}, + } + addResult := add.ExecuteAsPost(t, adminEndpoint) + require.Nil(t, addResult.Errors) + time.Sleep(time.Second * 2) + + metaInfo := &testutil.AuthMeta{ + PublicKey: "secret", + Namespace: "https://dgraph.io", + Algo: "HS256", + Header: "Authorization", + } + metaInfo.AuthVars = map[string]interface{}{ + "USER": "jatin", + "ROLE": "USER", + } + + jwtToken, err := metaInfo.GetSignedToken("secret", 5*time.Second) + require.NoError(t, err) + + // first subscription + payload := fmt.Sprintf(`{"Authorization": "%s"}`, jwtToken) + subscriptionClient, err := common.NewGraphQLSubscription(subscriptionEndpoint, &schema.Request{ + Query: `subscription{ + queryTodo{ + owner + text + } + }`, + }, payload) + require.Nil(t, err) + + res, err := subscriptionClient.RecvMsg() + require.NoError(t, err) + + var resp common.GraphQLResponse + err = json.Unmarshal(res, &resp) + require.NoError(t, err) + + require.Nil(t, resp.Errors) + require.JSONEq(t, `{"queryTodo":[]}`, + string(resp.Data)) + // Terminate subscription and wait for 1 second before starting new subscription + subscriptionClient.Terminate() + time.Sleep(time.Second) + + jwtToken1, err := metaInfo.GetSignedToken("secret", 5*time.Second) + require.NoError(t, err) + + // Second subscription + payload = fmt.Sprintf(`{"Authorization": "%s"}`, jwtToken1) + subscriptionClient1, err := common.NewGraphQLSubscription(subscriptionEndpoint, &schema.Request{ + Query: `subscription{ + queryTodo{ + owner + text + } + }`, + }, payload) + require.Nil(t, err) + + res, err = subscriptionClient1.RecvMsg() + require.NoError(t, err) + + err = json.Unmarshal(res, &resp) + require.NoError(t, err) + + require.Nil(t, resp.Errors) + require.JSONEq(t, `{"queryTodo":[]}`, + string(resp.Data)) + + // for user jatin + add = &common.GraphQLParams{ + Query: `mutation{ + addTodo(input: [ + {text : "GraphQL is exciting!!", + owner : "jatin"} + ]) + { + todo{ + text + owner + } + } + }`, + } + + addResult = add.ExecuteAsPost(t, graphQLEndpoint) + require.Nil(t, addResult.Errors) + time.Sleep(time.Second) + + // 1st response + res, err = subscriptionClient1.RecvMsg() + require.NoError(t, err) + err = json.Unmarshal(res, &resp) + require.NoError(t, err) + + require.Nil(t, resp.Errors) + require.JSONEq(t, `{"queryTodo":[{"owner":"jatin","text":"GraphQL is exciting!!"}]}`, + string(resp.Data)) + + // second response should be nil + res, err = subscriptionClient1.RecvMsg() + require.NoError(t, err) + require.Nil(t, res) + subscriptionClient1.Terminate() +} diff --git a/graphql/subscription/poller.go b/graphql/subscription/poller.go index b613ad5f44f..722ca96621c 100644 --- a/graphql/subscription/poller.go +++ b/graphql/subscription/poller.go @@ -114,10 +114,9 @@ func (p *Poller) AddSubscriber( expiry: customClaims.StandardClaims.ExpiresAt.Time, updateCh: updateCh} p.pollRegistry[bucketID] = subscriptions - if len(subscriptions) != 1 { - // Already there is subscription for this bucket. So,no need to poll the server. We can - // use the existing polling routine to publish the update. - + if ok { + // Already there is a running go routine for this bucket. So,no need to poll the server. + // We can use the existing polling routine to publish the update. return &SubscriberResponse{ BucketID: bucketID, SubscriptionID: subscriptionID, @@ -176,11 +175,12 @@ func (p *Poller) poll(req *pollRequest) { // Don't update if there is no change in response. continue } - // Every thirty poll. We'll check there is any active subscription for the - // current poll. If not we'll terminate this poll. + // Every second poll, we'll check if there is any active subscription for the + // current goroutine. If not we'll terminate this poll. p.Lock() subscribers, ok := p.pollRegistry[req.bucketID] if !ok || len(subscribers) == 0 { + delete(p.pollRegistry, req.bucketID) p.Unlock() return } @@ -200,6 +200,7 @@ func (p *Poller) poll(req *pollRequest) { if !ok || len(subscribers) == 0 { // There is no subscribers to push the update. So, kill the current polling // go routine. + delete(p.pollRegistry, req.bucketID) p.Unlock() return }