Skip to content
Permalink
Browse files
fix(pubsublite): ack assignment after removed subscribers have termin…
…ated (#4217)

This waits for pending commits to be acknowledged by the server. Avoids commits races with the next assigned client.
  • Loading branch information
tmdiep committed Jun 15, 2021
1 parent a3249e1 commit 0ad3f168b8525033e6926882059cb0b430d1f350
Showing with 46 additions and 15 deletions.
  1. +5 −12 pubsublite/internal/wire/assigner.go
  2. +19 −2 pubsublite/internal/wire/subscriber.go
  3. +22 −1 pubsublite/internal/wire/subscriber_test.go
@@ -139,30 +139,23 @@ func (a *assigner) onStreamStatusChange(status streamStatus) {
}

func (a *assigner) onResponse(response interface{}) {
assignment, _ := response.(*pb.PartitionAssignment)
err := a.receiveAssignment(newPartitionSet(assignment))

a.mu.Lock()
defer a.mu.Unlock()

if a.status >= serviceTerminating {
return
}

assignment, _ := response.(*pb.PartitionAssignment)
if err := a.handleAssignment(assignment); err != nil {
if err != nil {
a.unsafeInitiateShutdown(serviceTerminated, err)
return
}
}

func (a *assigner) handleAssignment(assignment *pb.PartitionAssignment) error {
if err := a.receiveAssignment(newPartitionSet(assignment)); err != nil {
return err
}

a.stream.Send(&pb.PartitionAssignmentRequest{
Request: &pb.PartitionAssignmentRequest_Ack{
Ack: &pb.PartitionAssignmentAck{},
},
})
return nil
}

func (a *assigner) unsafeInitiateShutdown(targetStatus serviceStatus, err error) {
@@ -465,6 +465,21 @@ func newAssigningSubscriber(allClients apiClients, assignmentClient *vkit.Partit
}

func (as *assigningSubscriber) handleAssignment(partitions partitionSet) error {
removedSubscribers, err := as.doHandleAssignment(partitions)
if err != nil {
return err
}

// Wait for removed subscribers to completely stop (which waits for commit
// acknowledgments from the server) before acking the assignment. This avoids
// commits racing with the new assigned client.
for _, subscriber := range removedSubscribers {
subscriber.WaitStopped()
}
return nil
}

func (as *assigningSubscriber) doHandleAssignment(partitions partitionSet) ([]*singlePartitionSubscriber, error) {
as.mu.Lock()
defer as.mu.Unlock()

@@ -474,26 +489,28 @@ func (as *assigningSubscriber) handleAssignment(partitions partitionSet) error {
subscriber := as.subFactory.New(partition)
if err := as.unsafeAddServices(subscriber); err != nil {
// Occurs when the assigningSubscriber is stopping/stopped.
return err
return nil, err
}
as.subscribers[partition] = subscriber
}
}

// Handle removed partitions.
var removedSubscribers []*singlePartitionSubscriber
for partition, subscriber := range as.subscribers {
if !partitions.Contains(partition) {
// Ignore unacked messages from this point on to avoid conflicting with
// the commits of the new subscriber that will be assigned this partition.
subscriber.Terminate()
removedSubscribers = append(removedSubscribers, subscriber)

as.unsafeRemoveService(subscriber)
// Safe to delete map entry during range loop:
// https://golang.org/ref/spec#For_statements
delete(as.subscribers, partition)
}
}
return nil
return removedSubscribers, nil
}

// Terminate shuts down all singlePartitionSubscribers without waiting for
@@ -887,6 +887,17 @@ func (as *assigningSubscriber) Partitions() []int {
return partitions
}

func (as *assigningSubscriber) Subscribers() []*singlePartitionSubscriber {
as.mu.Lock()
defer as.mu.Unlock()

var subscribers []*singlePartitionSubscriber
for _, s := range as.subscribers {
subscribers = append(subscribers, s)
}
return subscribers
}

func (as *assigningSubscriber) FlushCommits() {
as.mu.Lock()
defer as.mu.Unlock()
@@ -1110,10 +1121,20 @@ func TestAssigningSubscriberIgnoreOutstandingAcks(t *testing.T) {
// Partition assignments are initially {1}.
receiver.ValidateMsg(msg1).Ack()
ack2 := receiver.ValidateMsg(msg2)
subscribers := sub.Subscribers()

// Partition assignments will now be {}.
assignmentBarrier1.Release()
assignmentBarrier2.Release() // Wait for ack to ensure the test is deterministic
assignmentBarrier2.ReleaseAfter(func() {
// Verify that the assignment is acked after the subscriber has terminated.
if got, want := len(subscribers), 1; got != want {
t.Errorf("singlePartitionSubcriber count: got %d, want %d", got, want)
return
}
if got, want := subscribers[0].Status(), serviceTerminated; got != want {
t.Errorf("singlePartitionSubcriber status: got %v, want %v", got, want)
}
})

// Partition 1 has already been unassigned, so this ack is discarded.
ack2.Ack()

0 comments on commit 0ad3f16

Please sign in to comment.