Skip to content

Commit

Permalink
fix(GraphQL): This PR fixes issue of multiple responses in a subscrip…
Browse files Browse the repository at this point in the history
…tion for an update. (#6868) (#6935)

Fixes GRAPHQL-764
If we start and Terminate multiple subscriptions and at the end start a new subscription with a 1-second duration between each terminate and start then for any update we were getting multiple responses for the last running subscription whereas we should be getting the only one response.

Co-authored-by: Pawan Rawal <pawan0201@gmail.com>
(cherry picked from commit 734bbe1)
  • Loading branch information
JatinDev543 committed Nov 23, 2020
1 parent 46d984e commit 430376f
Show file tree
Hide file tree
Showing 2 changed files with 129 additions and 6 deletions.
122 changes: 122 additions & 0 deletions graphql/e2e/subscription/subscription_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -950,3 +950,125 @@ func TestSubscriptionAuthHeaderCaseInsensitive(t *testing.T) {
require.JSONEq(t, `{"queryTodo":[{"owner":"jatin","text":"GraphQL is exciting!!"}]}`,
string(resp.Data))
}

func TestSubscriptionAuth_MultiSubscriptionResponses(t *testing.T) {
dg, err := testutil.DgraphClient(groupOnegRPC)
require.NoError(t, err)
testutil.DropAll(t, dg)

// Upload schema
add := &common.GraphQLParams{
Query: `mutation updateGQLSchema($sch: String!) {
updateGQLSchema(input: { set: { schema: $sch }}) {
gqlSchema {
schema
}
}
}`,
Variables: map[string]interface{}{"sch": schAuth},
}
addResult := add.ExecuteAsPost(t, adminEndpoint)
require.Nil(t, addResult.Errors)
time.Sleep(time.Second * 2)

metaInfo := &testutil.AuthMeta{
PublicKey: "secret",
Namespace: "https://dgraph.io",
Algo: "HS256",
Header: "Authorization",
}
metaInfo.AuthVars = map[string]interface{}{
"USER": "jatin",
"ROLE": "USER",
}

jwtToken, err := metaInfo.GetSignedToken("secret", 5*time.Second)
require.NoError(t, err)

// first subscription
payload := fmt.Sprintf(`{"Authorization": "%s"}`, jwtToken)
subscriptionClient, err := common.NewGraphQLSubscription(subscriptionEndpoint, &schema.Request{
Query: `subscription{
queryTodo{
owner
text
}
}`,
}, payload)
require.Nil(t, err)

res, err := subscriptionClient.RecvMsg()
require.NoError(t, err)

var resp common.GraphQLResponse
err = json.Unmarshal(res, &resp)
require.NoError(t, err)

require.Nil(t, resp.Errors)
require.JSONEq(t, `{"queryTodo":[]}`,
string(resp.Data))
// Terminate subscription and wait for 1 second before starting new subscription
subscriptionClient.Terminate()
time.Sleep(time.Second)

jwtToken1, err := metaInfo.GetSignedToken("secret", 5*time.Second)
require.NoError(t, err)

// Second subscription
payload = fmt.Sprintf(`{"Authorization": "%s"}`, jwtToken1)
subscriptionClient1, err := common.NewGraphQLSubscription(subscriptionEndpoint, &schema.Request{
Query: `subscription{
queryTodo{
owner
text
}
}`,
}, payload)
require.Nil(t, err)

res, err = subscriptionClient1.RecvMsg()
require.NoError(t, err)

err = json.Unmarshal(res, &resp)
require.NoError(t, err)

require.Nil(t, resp.Errors)
require.JSONEq(t, `{"queryTodo":[]}`,
string(resp.Data))

// for user jatin
add = &common.GraphQLParams{
Query: `mutation{
addTodo(input: [
{text : "GraphQL is exciting!!",
owner : "jatin"}
])
{
todo{
text
owner
}
}
}`,
}

addResult = add.ExecuteAsPost(t, graphQLEndpoint)
require.Nil(t, addResult.Errors)
time.Sleep(time.Second)

// 1st response
res, err = subscriptionClient1.RecvMsg()
require.NoError(t, err)
err = json.Unmarshal(res, &resp)
require.NoError(t, err)

require.Nil(t, resp.Errors)
require.JSONEq(t, `{"queryTodo":[{"owner":"jatin","text":"GraphQL is exciting!!"}]}`,
string(resp.Data))

// second response should be nil
res, err = subscriptionClient1.RecvMsg()
require.NoError(t, err)
require.Nil(t, res)
subscriptionClient1.Terminate()
}
13 changes: 7 additions & 6 deletions graphql/subscription/poller.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,10 +114,9 @@ func (p *Poller) AddSubscriber(
expiry: customClaims.StandardClaims.ExpiresAt.Time, updateCh: updateCh}
p.pollRegistry[bucketID] = subscriptions

if len(subscriptions) != 1 {
// Already there is subscription for this bucket. So,no need to poll the server. We can
// use the existing polling routine to publish the update.

if ok {
// Already there is a running go routine for this bucket. So,no need to poll the server.
// We can use the existing polling routine to publish the update.
return &SubscriberResponse{
BucketID: bucketID,
SubscriptionID: subscriptionID,
Expand Down Expand Up @@ -176,11 +175,12 @@ func (p *Poller) poll(req *pollRequest) {
// Don't update if there is no change in response.
continue
}
// Every thirty poll. We'll check there is any active subscription for the
// current poll. If not we'll terminate this poll.
// Every second poll, we'll check if there is any active subscription for the
// current goroutine. If not we'll terminate this poll.
p.Lock()
subscribers, ok := p.pollRegistry[req.bucketID]
if !ok || len(subscribers) == 0 {
delete(p.pollRegistry, req.bucketID)
p.Unlock()
return
}
Expand All @@ -200,6 +200,7 @@ func (p *Poller) poll(req *pollRequest) {
if !ok || len(subscribers) == 0 {
// There is no subscribers to push the update. So, kill the current polling
// go routine.
delete(p.pollRegistry, req.bucketID)
p.Unlock()
return
}
Expand Down

0 comments on commit 430376f

Please sign in to comment.