diff --git a/pubsub/subscription.go b/pubsub/subscription.go index b2cf82e00a6..64097febe5d 100644 --- a/pubsub/subscription.go +++ b/pubsub/subscription.go @@ -25,7 +25,6 @@ import ( "cloud.google.com/go/iam" "cloud.google.com/go/internal/optional" - ipubsub "cloud.google.com/go/internal/pubsub" pb "cloud.google.com/go/pubsub/apiv1/pubsubpb" "cloud.google.com/go/pubsub/internal/scheduler" gax "github.com/googleapis/gax-go/v2" @@ -1389,24 +1388,19 @@ func (s *Subscription) Receive(ctx context.Context, f func(context.Context, *Mes return nil } iter.eoMu.RLock() - ackh, _ := msgAckHandler(msg, iter.enableExactlyOnceDelivery) + msgAckHandler(msg, iter.enableExactlyOnceDelivery) iter.eoMu.RUnlock() - old := ackh.doneFunc - msgLen := len(msg.Data) - ackh.doneFunc = func(ackID string, ack bool, r *ipubsub.AckResult, receiveTime time.Time) { - defer fc.release(ctx, msgLen) - old(ackID, ack, r, receiveTime) - } + wg.Add(1) // Make sure the subscription has ordering enabled before adding to scheduler. var key string if s.enableOrdering { key = msg.OrderingKey } - // TODO(deklerk): Can we have a generic handler at the - // constructor level? + msgLen := len(msg.Data) if err := sched.Add(key, msg, func(msg interface{}) { defer wg.Done() + defer fc.release(ctx, msgLen) f(ctx2, msg.(*Message)) }); err != nil { wg.Done() diff --git a/pubsub/subscription_test.go b/pubsub/subscription_test.go index 5ad98a756ff..6208b408daf 100644 --- a/pubsub/subscription_test.go +++ b/pubsub/subscription_test.go @@ -787,3 +787,53 @@ func TestExactlyOnceDelivery_ReceiptModackError(t *testing.T) { t.Fatal("expected message to not have been delivered when exactly once enabled") }) } + +func TestSubscribeMessageExpirationFlowControl(t *testing.T) { + t.Parallel() + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + client, srv := newFake(t) + defer client.Close() + defer srv.Close() + + topic := mustCreateTopic(t, client, "t") + subConfig := SubscriptionConfig{ + Topic: topic, + } + s, err := client.CreateSubscription(ctx, "s", subConfig) + if err != nil { + t.Fatalf("create sub err: %v", err) + } + + s.ReceiveSettings.NumGoroutines = 1 + s.ReceiveSettings.MaxOutstandingMessages = 1 + s.ReceiveSettings.MaxExtension = 10 * time.Second + s.ReceiveSettings.MaxExtensionPeriod = 10 * time.Second + r := topic.Publish(ctx, &Message{ + Data: []byte("redelivered-message"), + }) + if _, err := r.Get(ctx); err != nil { + t.Fatalf("failed to publish message: %v", err) + } + + deliveryCount := 0 + ctx, cancel = context.WithTimeout(context.Background(), 30*time.Second) + defer cancel() + err = s.Receive(ctx, func(ctx context.Context, msg *Message) { + // Only acknowledge the message on the 2nd invocation of the callback (2nd delivery). + if deliveryCount == 1 { + msg.Ack() + } + // Otherwise, do nothing and let the message expire. + deliveryCount++ + if deliveryCount == 2 { + cancel() + } + }) + if deliveryCount != 2 { + t.Fatalf("expected 2 iterations of the callback, got %d", deliveryCount) + } + if err != nil { + t.Fatalf("s.Receive err: %v", err) + } +}