Skip to content
Permalink
Browse files
fix(pubsublite): ignore outstanding acks for unassigned partition sub…
…scribers (#3597)

When the assigningSubscriber removes a singlePartitionSubscriber, it should ignore unacked messages. This avoids conflicting with the commits from the new subscriber that will be assigned the partition.
  • Loading branch information
tmdiep committed Jan 28, 2021
1 parent e7ab014 commit eb91f1f3c96f4c868e523f3c43f8c22b10ad4de4
Showing with 65 additions and 0 deletions.
  1. +4 −0 pubsublite/internal/wire/subscriber.go
  2. +61 −0 pubsublite/internal/wire/subscriber_test.go
@@ -469,6 +469,10 @@ func (as *assigningSubscriber) handleAssignment(partitions partitionSet) error {
// Handle removed partitions.
for partition, subscriber := range as.subscribers {
if !partitions.Contains(partition) {
// Ignore unacked messages from this point on to avoid conflicting with
// the commits of the new subscriber that will be assigned this partition.
subscriber.Terminate()

as.unsafeRemoveService(subscriber)
// Safe to delete map entry during range loop:
// https://golang.org/ref/spec#For_statements
@@ -955,6 +955,67 @@ func TestAssigningSubscriberPermanentError(t *testing.T) {
}
}

func TestAssigningSubscriberIgnoreOutstandingAcks(t *testing.T) {
const subscription = "projects/123456/locations/us-central1-b/subscriptions/my-sub"
receiver := newTestMessageReceiver(t)
msg1 := seqMsgWithOffsetAndSize(11, 100)
msg2 := seqMsgWithOffsetAndSize(22, 200)

verifiers := test.NewVerifiers(t)

// Assignment stream
asnStream := test.NewRPCVerifier(t)
asnStream.Push(initAssignmentReq(subscription, fakeUUID[:]), assignmentResp([]int64{1}), nil)
assignmentBarrier := asnStream.PushWithBarrier(assignmentAckReq(), assignmentResp([]int64{2}), nil)
asnStream.Push(assignmentAckReq(), nil, nil)
verifiers.AddAssignmentStream(subscription, asnStream)

// Partition 1
subStream1 := test.NewRPCVerifier(t)
subStream1.Push(initSubReq(subscriptionPartition{Path: subscription, Partition: 1}), initSubResp(), nil)
subStream1.Push(initFlowControlReq(), msgSubResp(msg1), nil)
verifiers.AddSubscribeStream(subscription, 1, subStream1)

cmtStream1 := test.NewRPCVerifier(t)
cmtStream1.Push(initCommitReq(subscriptionPartition{Path: subscription, Partition: 1}), initCommitResp(), nil)
// Note: no commit expected.
verifiers.AddCommitStream(subscription, 1, cmtStream1)

// Partition 2
subStream2 := test.NewRPCVerifier(t)
subStream2.Push(initSubReq(subscriptionPartition{Path: subscription, Partition: 2}), initSubResp(), nil)
subStream2.Push(initFlowControlReq(), msgSubResp(msg2), nil)
verifiers.AddSubscribeStream(subscription, 2, subStream2)

cmtStream2 := test.NewRPCVerifier(t)
cmtStream2.Push(initCommitReq(subscriptionPartition{Path: subscription, Partition: 2}), initCommitResp(), nil)
cmtStream2.Push(commitReq(23), commitResp(1), nil)
verifiers.AddCommitStream(subscription, 2, cmtStream2)

mockServer.OnTestStart(verifiers)
defer mockServer.OnTestEnd()

sub := newTestAssigningSubscriber(t, receiver.onMessage, subscription)
if gotErr := sub.WaitStarted(); gotErr != nil {
t.Errorf("Start() got err: (%v)", gotErr)
}

// Partition assignments are initially {1}.
ack1 := receiver.ValidateMsg(msg1)

// Partition assignments will now be {2}.
assignmentBarrier.Release()
receiver.ValidateMsg(msg2).Ack()

// Partition 1 has already been unassigned, so this ack is discarded.
ack1.Ack()

sub.Stop()
if gotErr := sub.WaitStopped(); gotErr != nil {
t.Errorf("Stop() got err: (%v)", gotErr)
}
}

func TestNewSubscriberCreatesCorrectImpl(t *testing.T) {
const subscription = "projects/123456/locations/us-central1-b/subscriptions/my-sub"
const region = "us-central1"

0 comments on commit eb91f1f

Please sign in to comment.