From e89fd6cc3b4489537c71cca1e40547313c24924b Mon Sep 17 00:00:00 2001 From: Alex Hong <9397363+hongalex@users.noreply.github.com> Date: Mon, 26 Feb 2024 14:34:30 -0800 Subject: [PATCH] fix(pubsub): fix out of order issue when exactly once is enabled (#9472) * fix(pubsub): fix out of ordering issue with exactly once * add ordering test and fix race condition with resource cleanup * remove TODO comment --- pubsub/integration_test.go | 124 +++++++++++++++++++++++++++++++------ pubsub/iterator.go | 8 ++- 2 files changed, 112 insertions(+), 20 deletions(-) diff --git a/pubsub/integration_test.go b/pubsub/integration_test.go index a0853e29fd1..8eff7912121 100644 --- a/pubsub/integration_test.go +++ b/pubsub/integration_test.go @@ -1177,29 +1177,32 @@ func TestIntegration_OrderedKeys_Basic(t *testing.T) { } received := make(chan string, numItems) + ctx, cancel := context.WithCancel(ctx) go func() { - if err := sub.Receive(ctx, func(ctx context.Context, msg *Message) { - defer msg.Ack() - if msg.OrderingKey != orderingKey { - t.Errorf("got ordering key %s, expected %s", msg.OrderingKey, orderingKey) - } - - received <- string(msg.Data) - }); err != nil { - if c := status.Code(err); c != codes.Canceled { - t.Error(err) + for i := 0; i < numItems; i++ { + select { + case r := <-received: + if got, want := r, fmt.Sprintf("item-%d", i); got != want { + t.Errorf("%d: got %s, want %s", i, got, want) + } + case <-time.After(30 * time.Second): + t.Errorf("timed out after 30s waiting for item %d", i) + cancel() } } + cancel() }() - for i := 0; i < numItems; i++ { - select { - case r := <-received: - if got, want := r, fmt.Sprintf("item-%d", i); got != want { - t.Fatalf("%d: got %s, want %s", i, got, want) - } - case <-time.After(30 * time.Second): - t.Fatalf("timed out after 30s waiting for item %d", i) + if err := sub.Receive(ctx, func(ctx context.Context, msg *Message) { + defer msg.Ack() + if msg.OrderingKey != orderingKey { + t.Errorf("got ordering key %s, expected %s", msg.OrderingKey, orderingKey) + } + + received <- string(msg.Data) + }); err != nil { + if c := status.Code(err); c != codes.Canceled { + t.Error(err) } } } @@ -1445,6 +1448,91 @@ func TestIntegration_OrderedKeys_SubscriptionOrdering(t *testing.T) { } } +func TestIntegration_OrderingWithExactlyOnce(t *testing.T) { + ctx := context.Background() + client := integrationTestClient(ctx, t, option.WithEndpoint("us-west1-pubsub.googleapis.com:443")) + defer client.Close() + + topic, err := createTopicWithRetry(ctx, t, client, topicIDs.New(), nil) + if err != nil { + t.Fatal(err) + } + defer topic.Delete(ctx) + defer topic.Stop() + exists, err := topic.Exists(ctx) + if err != nil { + t.Fatal(err) + } + if !exists { + t.Fatalf("topic %v should exist, but it doesn't", topic) + } + var sub *Subscription + if sub, err = createSubWithRetry(ctx, t, client, subIDs.New(), SubscriptionConfig{ + Topic: topic, + EnableMessageOrdering: true, + EnableExactlyOnceDelivery: true, + }); err != nil { + t.Fatal(err) + } + defer sub.Delete(ctx) + exists, err = sub.Exists(ctx) + if err != nil { + t.Fatal(err) + } + if !exists { + t.Fatalf("subscription %s should exist, but it doesn't", sub.ID()) + } + + topic.PublishSettings.DelayThreshold = time.Second + topic.EnableMessageOrdering = true + + orderingKey := "some-ordering-key" + numItems := 10 + for i := 0; i < numItems; i++ { + r := topic.Publish(ctx, &Message{ + ID: fmt.Sprintf("id-%d", i), + Data: []byte(fmt.Sprintf("item-%d", i)), + OrderingKey: orderingKey, + }) + go func() { + if _, err := r.Get(ctx); err != nil { + t.Error(err) + } + }() + } + + received := make(chan string, numItems) + ctx, cancel := context.WithCancel(ctx) + go func() { + for i := 0; i < numItems; i++ { + select { + case r := <-received: + if got, want := r, fmt.Sprintf("item-%d", i); got != want { + t.Errorf("%d: got %s, want %s", i, got, want) + } + case <-time.After(30 * time.Second): + t.Errorf("timed out after 30s waiting for item %d", i) + cancel() + } + } + cancel() + }() + + if err := sub.Receive(ctx, func(ctx context.Context, msg *Message) { + defer msg.Ack() + if msg.OrderingKey != orderingKey { + t.Errorf("got ordering key %s, expected %s", msg.OrderingKey, orderingKey) + } + + received <- string(msg.Data) + }); err != nil { + if c := status.Code(err); c != codes.Canceled { + t.Error(err) + } + } + +} + func TestIntegration_CreateSubscription_DeadLetterPolicy(t *testing.T) { t.Parallel() ctx := context.Background() diff --git a/pubsub/iterator.go b/pubsub/iterator.go index f45f1b995a5..ca3069a0cd8 100644 --- a/pubsub/iterator.go +++ b/pubsub/iterator.go @@ -313,9 +313,13 @@ func (it *messageIterator) receive(maxToPull int32) ([]*Message, error) { } } // Only return for processing messages that were successfully modack'ed. + // Iterate over the original messages slice for ordering. v := make([]*ipubsub.Message, 0, len(pendingMessages)) - for _, m := range pendingMessages { - v = append(v, m) + for _, m := range msgs { + ackID := msgAckID(m) + if _, ok := pendingMessages[ackID]; ok { + v = append(v, m) + } } return v, nil }