Skip to content
Permalink
Browse files
fix(pubsublite): fix committer races (#3810)
Fixed 2 races in the committer and simplified TestAssigningSubscriberIgnoreOutstandingAcks.
  • Loading branch information
tmdiep committed Mar 16, 2021
1 parent 4188b73 commit d8689f1d32be83f9bbbacb9dd24ce085d81d79e8
@@ -51,6 +51,7 @@ type committer struct {
acks *ackTracker
cursorTracker *commitCursorTracker
pollCommits *periodicTask
enableCommits bool

abstractService
}
@@ -135,6 +136,7 @@ func (c *committer) onStreamStatusChange(status streamStatus) {

switch status {
case streamConnected:
c.enableCommits = true
c.unsafeUpdateStatus(serviceActive, nil)
// Once the stream connects, clear unconfirmed commits and immediately send
// the latest desired commit offset.
@@ -143,6 +145,8 @@ func (c *committer) onStreamStatusChange(status streamStatus) {
c.pollCommits.Start()

case streamReconnecting:
// Ensure there are no commits until streamConnected has been handled above.
c.enableCommits = false
c.pollCommits.Stop()

case streamTerminated:
@@ -185,6 +189,9 @@ func (c *committer) commitOffsetToStream() {
}

func (c *committer) unsafeCommitOffsetToStream() {
if !c.enableCommits {
return
}
nextOffset := c.cursorTracker.NextOffset()
if nextOffset == nilCursorOffset {
return
@@ -204,6 +211,7 @@ func (c *committer) unsafeCommitOffsetToStream() {

func (c *committer) unsafeInitiateShutdown(targetStatus serviceStatus, err error) {
if !c.unsafeUpdateStatus(targetStatus, wrapError("committer", c.subscription.String(), err)) {
c.unsafeCheckDone()
return
}

@@ -158,6 +158,33 @@ func TestCommitterTerminateDiscardsOutstandingAcks(t *testing.T) {
}
}

func TestCommitterStopThenTerminateDiscardsOutstandingAcks(t *testing.T) {
subscription := subscriptionPartition{"projects/123456/locations/us-central1-b/subscriptions/my-subs", 0}
ack := newAckConsumer(33, 0, nil)
acks := newAckTracker()
acks.Push(ack)

verifiers := test.NewVerifiers(t)
stream := test.NewRPCVerifier(t)
stream.Push(initCommitReq(subscription), initCommitResp(), nil)
// No commits expected.
verifiers.AddCommitStream(subscription.Path, subscription.Partition, stream)

mockServer.OnTestStart(verifiers)
defer mockServer.OnTestEnd()

cmt := newTestCommitter(t, subscription, acks)
if gotErr := cmt.StartError(); gotErr != nil {
t.Errorf("Start() got err: (%v)", gotErr)
}

cmt.Stop() // Stop waits for outstanding acks
cmt.Terminate() // Terminate should discard all outstanding acks
if gotErr := cmt.FinalError(); gotErr != nil {
t.Errorf("Final err: (%v), want: <nil>", gotErr)
}
}

func TestCommitterPermanentStreamError(t *testing.T) {
subscription := subscriptionPartition{"projects/123456/locations/us-central1-b/subscriptions/my-subs", 0}
acks := newAckTracker()
@@ -983,31 +983,20 @@ func TestAssigningSubscriberIgnoreOutstandingAcks(t *testing.T) {
// Assignment stream
asnStream := test.NewRPCVerifier(t)
asnStream.Push(initAssignmentReq(subscription, fakeUUID[:]), assignmentResp([]int64{1}), nil)
assignmentBarrier1 := asnStream.PushWithBarrier(assignmentAckReq(), assignmentResp([]int64{2}), nil)
assignmentBarrier1 := asnStream.PushWithBarrier(assignmentAckReq(), assignmentResp([]int64{}), nil)
assignmentBarrier2 := asnStream.PushWithBarrier(assignmentAckReq(), nil, nil)
verifiers.AddAssignmentStream(subscription, asnStream)

// Partition 1
subStream1 := test.NewRPCVerifier(t)
subStream1.Push(initSubReq(subscriptionPartition{Path: subscription, Partition: 1}), initSubResp(), nil)
subStream1.Push(initFlowControlReq(), msgSubResp(msg1), nil)
verifiers.AddSubscribeStream(subscription, 1, subStream1)

cmtStream1 := test.NewRPCVerifier(t)
commitBarrier := cmtStream1.PushWithBarrier(initCommitReq(subscriptionPartition{Path: subscription, Partition: 1}), initCommitResp(), nil)
// Note: no commit expected.
verifiers.AddCommitStream(subscription, 1, cmtStream1)

// Partition 2
subStream2 := test.NewRPCVerifier(t)
subStream2.Push(initSubReq(subscriptionPartition{Path: subscription, Partition: 2}), initSubResp(), nil)
subStream2.Push(initFlowControlReq(), msgSubResp(msg2), nil)
verifiers.AddSubscribeStream(subscription, 2, subStream2)
subStream := test.NewRPCVerifier(t)
subStream.Push(initSubReq(subscriptionPartition{Path: subscription, Partition: 1}), initSubResp(), nil)
subStream.Push(initFlowControlReq(), msgSubResp(msg1, msg2), nil)
verifiers.AddSubscribeStream(subscription, 1, subStream)

cmtStream2 := test.NewRPCVerifier(t)
cmtStream2.Push(initCommitReq(subscriptionPartition{Path: subscription, Partition: 2}), initCommitResp(), nil)
cmtStream2.Push(commitReq(23), commitResp(1), nil)
verifiers.AddCommitStream(subscription, 2, cmtStream2)
cmtStream := test.NewRPCVerifier(t)
cmtStream.Push(initCommitReq(subscriptionPartition{Path: subscription, Partition: 1}), initCommitResp(), nil)
cmtStream.Push(commitReq(12), commitResp(1), nil)
verifiers.AddCommitStream(subscription, 1, cmtStream)

mockServer.OnTestStart(verifiers)
defer mockServer.OnTestEnd()
@@ -1018,18 +1007,15 @@ func TestAssigningSubscriberIgnoreOutstandingAcks(t *testing.T) {
}

// Partition assignments are initially {1}.
ack1 := receiver.ValidateMsg(msg1)
receiver.ValidateMsg(msg1).Ack()
ack2 := receiver.ValidateMsg(msg2)

// Partition assignments will now be {2}.
// Partition assignments will now be {}.
assignmentBarrier1.Release()
receiver.ValidateMsg(msg2).Ack()
assignmentBarrier2.Release() // Wait for ack to ensure the test is deterministic

// These barriers ensure that this test is deterministic by ensuring that the
// server has received expected requests before proceeding.
commitBarrier.Release()
assignmentBarrier2.Release()
// Partition 1 has already been unassigned, so this ack is discarded.
ack1.Ack()
ack2.Ack()

sub.Stop()
if gotErr := sub.WaitStopped(); gotErr != nil {

0 comments on commit d8689f1

Please sign in to comment.