diff --git a/graphql/e2e/subscription/subscription_test.go b/graphql/e2e/subscription/subscription_test.go index 724b0505bcd..77fa144194f 100644 --- a/graphql/e2e/subscription/subscription_test.go +++ b/graphql/e2e/subscription/subscription_test.go @@ -950,3 +950,125 @@ func TestSubscriptionAuthHeaderCaseInsensitive(t *testing.T) { require.JSONEq(t, `{"queryTodo":[{"owner":"jatin","text":"GraphQL is exciting!!"}]}`, string(resp.Data)) } + +func TestSubscriptionAuth_MultiSubscriptionResponses(t *testing.T) { + dg, err := testutil.DgraphClient(groupOnegRPC) + require.NoError(t, err) + testutil.DropAll(t, dg) + + // Upload schema + add := &common.GraphQLParams{ + Query: `mutation updateGQLSchema($sch: String!) { + updateGQLSchema(input: { set: { schema: $sch }}) { + gqlSchema { + schema + } + } + }`, + Variables: map[string]interface{}{"sch": schAuth}, + } + addResult := add.ExecuteAsPost(t, adminEndpoint) + require.Nil(t, addResult.Errors) + time.Sleep(time.Second * 2) + + metaInfo := &testutil.AuthMeta{ + PublicKey: "secret", + Namespace: "https://dgraph.io", + Algo: "HS256", + Header: "Authorization", + } + metaInfo.AuthVars = map[string]interface{}{ + "USER": "jatin", + "ROLE": "USER", + } + + jwtToken, err := metaInfo.GetSignedToken("secret", 5*time.Second) + require.NoError(t, err) + + // first subscription + payload := fmt.Sprintf(`{"Authorization": "%s"}`, jwtToken) + subscriptionClient, err := common.NewGraphQLSubscription(subscriptionEndpoint, &schema.Request{ + Query: `subscription{ + queryTodo{ + owner + text + } + }`, + }, payload) + require.Nil(t, err) + + res, err := subscriptionClient.RecvMsg() + require.NoError(t, err) + + var resp common.GraphQLResponse + err = json.Unmarshal(res, &resp) + require.NoError(t, err) + + require.Nil(t, resp.Errors) + require.JSONEq(t, `{"queryTodo":[]}`, + string(resp.Data)) + // Terminate subscription and wait for 1 second before starting new subscription + subscriptionClient.Terminate() + time.Sleep(time.Second) + + jwtToken1, err := metaInfo.GetSignedToken("secret", 5*time.Second) + require.NoError(t, err) + + // Second subscription + payload = fmt.Sprintf(`{"Authorization": "%s"}`, jwtToken1) + subscriptionClient1, err := common.NewGraphQLSubscription(subscriptionEndpoint, &schema.Request{ + Query: `subscription{ + queryTodo{ + owner + text + } + }`, + }, payload) + require.Nil(t, err) + + res, err = subscriptionClient1.RecvMsg() + require.NoError(t, err) + + err = json.Unmarshal(res, &resp) + require.NoError(t, err) + + require.Nil(t, resp.Errors) + require.JSONEq(t, `{"queryTodo":[]}`, + string(resp.Data)) + + // for user jatin + add = &common.GraphQLParams{ + Query: `mutation{ + addTodo(input: [ + {text : "GraphQL is exciting!!", + owner : "jatin"} + ]) + { + todo{ + text + owner + } + } + }`, + } + + addResult = add.ExecuteAsPost(t, graphQLEndpoint) + require.Nil(t, addResult.Errors) + time.Sleep(time.Second) + + // 1st response + res, err = subscriptionClient1.RecvMsg() + require.NoError(t, err) + err = json.Unmarshal(res, &resp) + require.NoError(t, err) + + require.Nil(t, resp.Errors) + require.JSONEq(t, `{"queryTodo":[{"owner":"jatin","text":"GraphQL is exciting!!"}]}`, + string(resp.Data)) + + // second response should be nil + res, err = subscriptionClient1.RecvMsg() + require.NoError(t, err) + require.Nil(t, res) + subscriptionClient1.Terminate() +} diff --git a/graphql/subscription/poller.go b/graphql/subscription/poller.go index b613ad5f44f..722ca96621c 100644 --- a/graphql/subscription/poller.go +++ b/graphql/subscription/poller.go @@ -114,10 +114,9 @@ func (p *Poller) AddSubscriber( expiry: customClaims.StandardClaims.ExpiresAt.Time, updateCh: updateCh} p.pollRegistry[bucketID] = subscriptions - if len(subscriptions) != 1 { - // Already there is subscription for this bucket. So,no need to poll the server. We can - // use the existing polling routine to publish the update. - + if ok { + // Already there is a running go routine for this bucket. So,no need to poll the server. + // We can use the existing polling routine to publish the update. return &SubscriberResponse{ BucketID: bucketID, SubscriptionID: subscriptionID, @@ -176,11 +175,12 @@ func (p *Poller) poll(req *pollRequest) { // Don't update if there is no change in response. continue } - // Every thirty poll. We'll check there is any active subscription for the - // current poll. If not we'll terminate this poll. + // Every second poll, we'll check if there is any active subscription for the + // current goroutine. If not we'll terminate this poll. p.Lock() subscribers, ok := p.pollRegistry[req.bucketID] if !ok || len(subscribers) == 0 { + delete(p.pollRegistry, req.bucketID) p.Unlock() return } @@ -200,6 +200,7 @@ func (p *Poller) poll(req *pollRequest) { if !ok || len(subscribers) == 0 { // There is no subscribers to push the update. So, kill the current polling // go routine. + delete(p.pollRegistry, req.bucketID) p.Unlock() return }