Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

message.ack() not working in the 3.2.x releases #1665

Closed
uvesten opened this issue Dec 16, 2022 · 8 comments
Closed

message.ack() not working in the 3.2.x releases #1665

uvesten opened this issue Dec 16, 2022 · 8 comments
Assignees
Labels
api: pubsub Issues related to the googleapis/nodejs-pubsub API. priority: p3 Desirable enhancement or fix. May not be included in next release. type: feature request ‘Nice-to-have’ improvement, new feature or different behavior or design.

Comments

@uvesten
Copy link

uvesten commented Dec 16, 2022

(This seems related to the previous issues #1653 and #1648)

We've been using this package with no issues (and no code changes) on our side up to version 3.0.1. After upgrading to version 3.2.0 or 3.2.1 message acknowledgements are silently being ignored. It's quite an insidious bug, since the behavior of message.ack() is supposed to be unchanged (we're not using exactly-once delivery), and since it's a void function it's not really possible to see that the behavior has actually changed.

If this was intended, it should be a major version upgrade, IMHO, since it breaks existing code.

Environment details

  • OS: Ubuntu(?) (Google Cloud Functions v1)
  • Node.js version: 16
  • npm version: N/A
  • @google-cloud/pubsub version: 3.2.0, 3.2.1

Steps to reproduce

  1. Create a topic and a subscription
  2. Insert at least 5 messages
  3. Use the following code (or similar) to ack() them:
import { Message, PubSub } from "@google-cloud/pubsub";

let messageCount = 0;
const pubSubClient = new PubSub();
const subscription = pubSubClient.subscription(SUBSCRIPTION_NAME);
subscription.on("message", messageHandler);

function messageHandler(message: Message): void {
  messageCount += 1;
  // Do something with the message
  message.ack();
  if (messageCount > 5) {
    subscription.removeListener("message", messageHandler);
  }
}

The above works as expected (i.e. the messages get acknowledged) using 3.0.1, but it fails (messages remain on the topic, don't get acknowledged) using 3.2.0 and 3.2.1

@uvesten uvesten added priority: p2 Moderately-important priority. Fix may not be included in next release. type: bug Error or flaw in code with unintended results or allowing sub-optimal usage patterns. labels Dec 16, 2022
@product-auto-label product-auto-label bot added the api: pubsub Issues related to the googleapis/nodejs-pubsub API. label Dec 16, 2022
@feywind
Copy link
Collaborator

feywind commented Dec 16, 2022

@uvesten I made a new test sample to kind of close the loop, code-wise, and it seems to be working for me:

import {Message, PubSub} from '@google-cloud/pubsub';

let messageCount = 0;
const pubSubClient = new PubSub();
const subscription = pubSubClient.subscription(subName);
subscription.on('message', messageHandler);

function messageHandler(message: Message): void {
  console.log('received', message.data.toString());
  messageCount += 1;
  message.ack();
  if (messageCount >= 5) {
    subscription.removeListener('message', messageHandler);
  }
}

const topic = pubSubClient.topic(topicName);

for (let i = 0; i < 5; i++) {
  console.log('sending', i);
  topic.publishMessage({
    data: Buffer.from('foo ' + i),
  });
}

The output I get is:

sending 0
sending 1
sending 2
sending 3
sending 4
received foo 0
received foo 1
received foo 2
received foo 3
received foo 4

(with a pause after the last 'sending', and sometimes the received messages come out of order, which is all within expectations)

Can you post some (sensitive info excluded) metadata about your topic and subscription? Some stuff definitely changed between 3.0 and 3.2, and it's possible something else is triggering it. gcloud pubsub topics describe just gives me the name of my test topic, while gcloud pubsub subscriptions describe gives me this, for comparison:

ackDeadlineSeconds: 600
expirationPolicy: {}
messageRetentionDuration: 600s
name: ...
pushConfig: {}
retryPolicy:
  maximumBackoff: 600s
  minimumBackoff: 10s
state: ACTIVE
topic: ...

@feywind
Copy link
Collaborator

feywind commented Dec 16, 2022

Forgot to say: If this was intended, it should be a major version upgrade, IMHO, since it breaks existing code. Definitely not intended :) I try to be a stickler about semver adherence.

@uvesten
Copy link
Author

uvesten commented Dec 19, 2022

Thanks for the analysis, @feywind! In your code example, though, how do you see that the messages got ack()ed? What happens if you let messageHandler listen for more than 5 messages?

Subscription metadata:

ackDeadlineSeconds: 10
expirationPolicy:
  ttl: 2678400s
messageRetentionDuration: 604800s
name: ...
pushConfig: {}
state: ACTIVE
topic: ...

(I also just get the name as metadata for the topic)

@feywind
Copy link
Collaborator

feywind commented Dec 22, 2022

Ah, you make a good point there... I commented out the if so my sample never stops on its own, changed my subscription to a 10 second ack timeout, and it doesn't look like I'm receiving any duplicates after a while, still.

I'm not quite sure what's going on there in your app, but I'll add this to a list of things to ask around about. If you don't need the new features in the versions that don't work for you, the immediate action I'd take is just downgrading versions for now. Maybe also try 3.1.x to see if that still works.

@nyxtom
Copy link

nyxtom commented Feb 2, 2023

Starting in 3.2.x the behavior for close changed

add({ackId}: Message, deadline?: number): Promise<void> {
if (this._closed) {
if (this._subscriber.isExactlyOnceDelivery) {
throw new AckError(AckResponses.Invalid, 'Subscriber closed');
} else {
return Promise.resolve();
}
}
const {maxMessages, maxMilliseconds} = this._options;
const responsePromise = defer<void>();
this._requests.push({
ackId,
deadline,
responsePromise,
retryCount: 0,
});
this.numPendingRequests++;
this.numInFlightRequests++;
if (this._requests.length >= maxMessages!) {
this.flush();
} else if (!this._timer) {
this._timer = setTimeout(() => this.flush(), maxMilliseconds!);
}
return responsePromise.promise;
}
, previously it was possible to close a subscription

https://github.com/googleapis/nodejs-pubsub/blob/v3.1.1/src/subscriber.ts#L447-L459

followed by acking a message if you had a callback for instance

https://github.com/googleapis/nodejs-pubsub/blob/v3.1.1/src/subscriber.ts#L431-L438

https://github.com/googleapis/nodejs-pubsub/blob/v3.1.1/src/message-queues.ts#L110-L122

that would cause a flush to happen on a timeout for instance ^

However, now in 3.2.x and above, the behavior for close will also close the ack queue

https://github.com/googleapis/nodejs-pubsub/blob/v3.2.0/src/subscriber.ts#L565-L580

https://github.com/googleapis/nodejs-pubsub/blob/v3.2.0/src/message-queues.ts#L142-L161

This would mean that any call to ack() after close will get discarded as so

https://github.com/googleapis/nodejs-pubsub/blob/v3.2.0/src/message-queues.ts#L180-L208

Lesson learned, in newer versions, if you wish to ack any message, you need to be sure that the subscription is open. If you have called close, prior to calling ack on any message make sure to call open. In doing so however, your ack message queue will be completely wiped clean and re-initialized. For the most part this shouldn't be a problem since you are only filling up that ack queue after open anyway.

@feywind
Copy link
Collaborator

feywind commented Jun 20, 2023

@nyxtom Thanks for your detailed comment. I'll add this to our meeting to talk about... I'm not sure if it's a breaking change or not, since we haven't been very precise up to now about what close() does, exactly. But the surprise is also not very nice, so we might want to re-visit this.

@feywind
Copy link
Collaborator

feywind commented Oct 16, 2023

fwiw, updating the close behaviour has been added to the schedule for Q4, so hopefully there will be a nice, deterministic shutdown method soon.

@feywind feywind added type: feature request ‘Nice-to-have’ improvement, new feature or different behavior or design. and removed type: bug Error or flaw in code with unintended results or allowing sub-optimal usage patterns. labels Feb 8, 2024
@feywind feywind added priority: p3 Desirable enhancement or fix. May not be included in next release. and removed priority: p2 Moderately-important priority. Fix may not be included in next release. labels May 2, 2024
@feywind
Copy link
Collaborator

feywind commented May 2, 2024

Closed in favour of #1917

@feywind feywind closed this as completed May 2, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
api: pubsub Issues related to the googleapis/nodejs-pubsub API. priority: p3 Desirable enhancement or fix. May not be included in next release. type: feature request ‘Nice-to-have’ improvement, new feature or different behavior or design.
Projects
None yet
Development

No branches or pull requests

3 participants