Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Race condition when flushing message queues while a batch is in-flight #951

Closed
jeffijoe opened this issue Apr 9, 2020 · 0 comments · Fixed by #952
Closed

Race condition when flushing message queues while a batch is in-flight #951

jeffijoe opened this issue Apr 9, 2020 · 0 comments · Fixed by #952
Assignees
Labels
api: pubsub Issues related to the googleapis/nodejs-pubsub API. priority: p2 Moderately-important priority. Fix may not be included in next release. type: bug Error or flaw in code with unintended results or allowing sub-optimal usage patterns.

Comments

@jeffijoe
Copy link
Contributor

jeffijoe commented Apr 9, 2020

There seems to be a race condition in subscriber.close() (_waitFlush) when the check for pending requests is made while a batch is currently in-flight.

I think we should track the in-flight requests as well, and resolve the _waitFlush when in-flight requests have drained.

Environment details

  • OS: macOS Catalina
  • Node.js version: 13.8.0
  • npm version: 6.14.3
  • @google-cloud/pubsub version: 1.7.1

Steps to reproduce

  1. Add a topic and subscription.
  2. Subscribe and ack a message
  3. Immediately after acking (message.ack(): void), await subscription.close()
  4. Delete the subscription.
  5. Depending on timing, you will see something like Cannot ack messages: Subscription does not exist

I used this test to reproduce the issue, and also tested a potential fix (PR incoming):

import { getEnv } from '../../../env/env'
import * as pubsub from '@google-cloud/pubsub'
import { v4 } from 'uuid'
import { defer } from 'bluebird'

jest.setTimeout(1000000)

test('errors if messages are inflight while closing, then deleting subscription causing it to fail', async () => {
  const client = createPubSubSdk(getEnv())
  // with enough iterations we should be able to trigger it.
  // NOTE: when the fix is applied, this succeeds.
  for (let i = 0; i < 100; i++) {
    const topicName = `bug-topic-${v4()}`
    const subName = `bug-sub-${v4()}`
    const [topic] = await client.createTopic(topicName)
    const [sub] = await topic.createSubscription(subName, {
      expirationPolicy: { ttl: { seconds: 60 } },
    })

    await topic.publish(Buffer.from(JSON.stringify({ hello: 'world' })))

    const onHandlerEnter = defer()
    const onError = defer()
    sub.setOptions({ batching: { maxMilliseconds: 100 } })
    sub.addListener('message', (message: pubsub.Message) => {
      message.ack()
      onHandlerEnter.resolve()
    })
    sub.addListener('error', (err) => {
      console.log(err)
      onError.reject(err)
    })
    sub.open()

    await onHandlerEnter.promise

    // Kicks off the flush
    await new Promise((resolve) => setTimeout(resolve, 100))

    // Close while requests are pending. Wait for it.
    await sub.close()
    // Delete the subscription. This will trigger an error in ack.
    await sub.delete()
    await topic.delete()
    await Promise.race([
      new Promise((resolve) => setTimeout(resolve, 1000)),
      onError.promise,
    ])
  }
})

function createPubSubSdk(env: any) {
  return new pubsub.PubSub({
    projectId: env.GCLOUD_PROJECT,
    credentials: JSON.parse(env.GCLOUD_CREDENTIALS),
  })
}
@product-auto-label product-auto-label bot added the api: pubsub Issues related to the googleapis/nodejs-pubsub API. label Apr 9, 2020
@feywind feywind added priority: p2 Moderately-important priority. Fix may not be included in next release. type: bug Error or flaw in code with unintended results or allowing sub-optimal usage patterns. labels Apr 14, 2020
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
api: pubsub Issues related to the googleapis/nodejs-pubsub API. priority: p2 Moderately-important priority. Fix may not be included in next release. type: bug Error or flaw in code with unintended results or allowing sub-optimal usage patterns.
Projects
None yet
Development

Successfully merging a pull request may close this issue.

2 participants