Skip to content
Permalink
Browse files
feat(pubsublite): assigning subscriber implementation (#3238)
assigningSubscriber uses the Pub/Sub Lite partition assignment service to listen to its assigned partition numbers and dynamically add/remove singlePartitionSubscribers.
  • Loading branch information
tmdiep committed Dec 7, 2020
1 parent c8ad113 commit d1c03dae383f5a175e4237d5f46dc1bdc2cd33f0
@@ -6,6 +6,7 @@ require (
cloud.google.com/go v0.72.0
github.com/golang/protobuf v1.4.3
github.com/google/go-cmp v0.5.4
github.com/google/uuid v1.1.2
github.com/googleapis/gax-go/v2 v2.0.5
golang.org/x/mod v0.4.0 // indirect
golang.org/x/tools v0.0.0-20201204162204-73cf035baebf // indirect
@@ -102,6 +102,7 @@ github.com/google/pprof v0.0.0-20200430221834-fc25d7d30c6d/go.mod h1:ZgVRPoUq/hf
github.com/google/pprof v0.0.0-20200708004538-1a94d8640e99/go.mod h1:ZgVRPoUq/hfqzAqh7sHMqb3I9Rq5C59dIz2SbBwJ4eM=
github.com/google/pprof v0.0.0-20201023163331-3e6fc7fc9c4c/go.mod h1:kpwsk12EmLew5upagYY7GY0pfYCcupk39gWOCRROcvE=
github.com/google/renameio v0.1.0/go.mod h1:KWCgfxg9yswjAJkECMjeO8J8rahYeXnNhOm40UhjYkI=
github.com/google/uuid v1.1.2 h1:EVhdT+1Kseyi1/pUmXKaFxYsDNy9RQYkMWRH68J/W7Y=
github.com/google/uuid v1.1.2/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
github.com/googleapis/gax-go/v2 v2.0.4/go.mod h1:0Wqv26UfaUD9n4G6kQubkQ+KchISgw+vpHVxEJEs9eg=
github.com/googleapis/gax-go/v2 v2.0.5 h1:sjZBwGj9Jlw33ImPtvFviGYvseOtDM7hkSKB7+Tv3SM=
@@ -0,0 +1,169 @@
// Copyright 2020 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and

package wire

import (
"context"
"errors"
"fmt"
"reflect"

"github.com/google/uuid"
"google.golang.org/grpc"

vkit "cloud.google.com/go/pubsublite/apiv1"
pb "google.golang.org/genproto/googleapis/cloud/pubsublite/v1"
)

// partitionSet is a set of partition numbers.
type partitionSet map[int]struct{}

func newPartitionSet(assignmentpb *pb.PartitionAssignment) partitionSet {
var void struct{}
partitions := make(map[int]struct{})
for _, p := range assignmentpb.GetPartitions() {
partitions[int(p)] = void
}
return partitionSet(partitions)
}

func (ps partitionSet) Ints() (partitions []int) {
for p := range ps {
partitions = append(partitions, p)
}
return
}

func (ps partitionSet) Contains(partition int) bool {
_, exists := ps[partition]
return exists
}

// A function that generates a 16-byte UUID.
type generateUUIDFunc func() (uuid.UUID, error)

// partitionAssignmentReceiver must enact the received partition assignment from
// the server, or otherwise return an error, which will break the stream. The
// receiver must not call the assigner, as this would result in a deadlock.
type partitionAssignmentReceiver func(partitionSet) error

// assigner wraps the partition assignment stream and notifies a receiver when
// the server sends a new set of partition assignments for a subscriber.
type assigner struct {
// Immutable after creation.
assignmentClient *vkit.PartitionAssignmentClient
initialReq *pb.PartitionAssignmentRequest
receiveAssignment partitionAssignmentReceiver

// Fields below must be guarded with mu.
stream *retryableStream

abstractService
}

func newAssigner(ctx context.Context, assignmentClient *vkit.PartitionAssignmentClient, genUUID generateUUIDFunc, settings ReceiveSettings, subscriptionPath string, receiver partitionAssignmentReceiver) (*assigner, error) {
clientID, err := genUUID()
if err != nil {
return nil, fmt.Errorf("pubsublite: failed to generate client UUID: %v", err)
}

a := &assigner{
assignmentClient: assignmentClient,
initialReq: &pb.PartitionAssignmentRequest{
Request: &pb.PartitionAssignmentRequest_Initial{
Initial: &pb.InitialPartitionAssignmentRequest{
Subscription: subscriptionPath,
ClientId: clientID[:],
},
},
},
receiveAssignment: receiver,
}
a.stream = newRetryableStream(ctx, a, settings.Timeout, reflect.TypeOf(pb.PartitionAssignment{}))
return a, nil
}

func (a *assigner) Start() {
a.mu.Lock()
defer a.mu.Unlock()
if a.unsafeUpdateStatus(serviceStarting, nil) {
a.stream.Start()
}
}

func (a *assigner) Stop() {
a.mu.Lock()
defer a.mu.Unlock()
a.unsafeInitiateShutdown(serviceTerminating, nil)
}

func (a *assigner) newStream(ctx context.Context) (grpc.ClientStream, error) {
return a.assignmentClient.AssignPartitions(ctx)
}

func (a *assigner) initialRequest() (interface{}, bool) {
return a.initialReq, false // No initial response expected
}

func (a *assigner) validateInitialResponse(_ interface{}) error {
// Should not be called.
return errors.New("pubsublite: unexpected initial response")
}

func (a *assigner) onStreamStatusChange(status streamStatus) {
a.mu.Lock()
defer a.mu.Unlock()

switch status {
case streamConnected:
a.unsafeUpdateStatus(serviceActive, nil)
case streamTerminated:
a.unsafeInitiateShutdown(serviceTerminated, a.stream.Error())
}
}

func (a *assigner) onResponse(response interface{}) {
a.mu.Lock()
defer a.mu.Unlock()

if a.status >= serviceTerminating {
return
}

assignment, _ := response.(*pb.PartitionAssignment)
if err := a.handleAssignment(assignment); err != nil {
a.unsafeInitiateShutdown(serviceTerminated, err)
}
}

func (a *assigner) handleAssignment(assignment *pb.PartitionAssignment) error {
if err := a.receiveAssignment(newPartitionSet(assignment)); err != nil {
return err
}

a.stream.Send(&pb.PartitionAssignmentRequest{
Request: &pb.PartitionAssignmentRequest_Ack{
Ack: &pb.PartitionAssignmentAck{},
},
})
return nil
}

func (a *assigner) unsafeInitiateShutdown(targetStatus serviceStatus, err error) {
if !a.unsafeUpdateStatus(targetStatus, err) {
return
}
// No data to send. Immediately terminate the stream.
a.stream.Stop()
}
@@ -0,0 +1,200 @@
// Copyright 2020 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and

package wire

import (
"context"
"errors"
"sort"
"testing"
"time"

"cloud.google.com/go/internal/testutil"
"cloud.google.com/go/pubsublite/internal/test"
"github.com/google/uuid"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"

pb "google.golang.org/genproto/googleapis/cloud/pubsublite/v1"
)

func TestPartitionSet(t *testing.T) {
partitions := newPartitionSet(&pb.PartitionAssignment{
Partitions: []int64{8, 5, 8, 1},
})

wantPartitions := []int{1, 5, 8}
for _, partition := range wantPartitions {
if !partitions.Contains(partition) {
t.Errorf("Contains(%d) got false, want true", partition)
}
}
for _, partition := range []int{2, 3, 4, 6, 7} {
if partitions.Contains(partition) {
t.Errorf("Contains(%d) got true, want false", partition)
}
}

gotPartitions := partitions.Ints()
sort.Ints(gotPartitions)
if !testutil.Equal(gotPartitions, wantPartitions) {
t.Errorf("Ints() got %v, want %v", gotPartitions, wantPartitions)
}
}

var fakeUUID = [16]byte{'0', '1', '2', '3', '4', '5', '6', '7', '8', '9', '0', '1', '2', '3', '4', '5'}

func fakeGenerateUUID() (uuid.UUID, error) {
return fakeUUID, nil
}

// testAssigner wraps an assigner for ease of testing.
type testAssigner struct {
// Fake error to simulate receiver unable to handle assignment.
RetError error

t *testing.T
asn *assigner
partitions chan []int

serviceTestProxy
}

func newTestAssigner(t *testing.T, subscription string) *testAssigner {
ctx := context.Background()
assignmentClient, err := newPartitionAssignmentClient(ctx, "ignored", testClientOpts...)
if err != nil {
t.Fatal(err)
}

ta := &testAssigner{
t: t,
partitions: make(chan []int, 1),
}
asn, err := newAssigner(ctx, assignmentClient, fakeGenerateUUID, testReceiveSettings(), subscription, ta.receiveAssignment)
if err != nil {
t.Fatal(err)
}
ta.asn = asn
ta.initAndStart(t, ta.asn, "Assigner")
return ta
}

func (ta *testAssigner) receiveAssignment(partitions partitionSet) error {
p := partitions.Ints()
sort.Ints(p)
ta.partitions <- p

if ta.RetError != nil {
return ta.RetError
}
return nil
}

func (ta *testAssigner) NextPartitions() []int {
select {
case <-time.After(serviceTestWaitTimeout):
ta.t.Errorf("%s partitions not received within %v", ta.name, serviceTestWaitTimeout)
return nil
case p := <-ta.partitions:
return p
}
}

func TestAssignerNoInitialResponse(t *testing.T) {
subscription := "projects/123456/locations/us-central1-b/subscriptions/my-subs"

verifiers := test.NewVerifiers(t)
stream := test.NewRPCVerifier(t)
barrier := stream.PushWithBarrier(initAssignmentReq(subscription, fakeUUID[:]), nil, nil)
verifiers.AddAssignmentStream(subscription, stream)

mockServer.OnTestStart(verifiers)
defer mockServer.OnTestEnd()

asn := newTestAssigner(t, subscription)

// Assigner starts even though no initial response was received from the
// server.
if gotErr := asn.StartError(); gotErr != nil {
t.Errorf("Start() got err: (%v)", gotErr)
}
// To ensure test is deterministic, i.e. server must receive initial request
// before stopping the client.
barrier.Release()
asn.StopVerifyNoError()
}

func TestAssignerReconnect(t *testing.T) {
subscription := "projects/123456/locations/us-central1-b/subscriptions/my-subs"
permanentErr := status.Error(codes.FailedPrecondition, "failed")

verifiers := test.NewVerifiers(t)

// Simulate a transient error that results in a reconnect.
stream1 := test.NewRPCVerifier(t)
stream1.Push(initAssignmentReq(subscription, fakeUUID[:]), nil, status.Error(codes.Unavailable, "server unavailable"))
verifiers.AddAssignmentStream(subscription, stream1)

// Send 2 partition assignments before terminating with permanent error.
stream2 := test.NewRPCVerifier(t)
stream2.Push(initAssignmentReq(subscription, fakeUUID[:]), assignmentResp([]int64{3, 2, 4}), nil)
stream2.Push(assignmentAckReq(), assignmentResp([]int64{0, 3, 3}), nil)
stream2.Push(assignmentAckReq(), nil, permanentErr)
verifiers.AddAssignmentStream(subscription, stream2)

mockServer.OnTestStart(verifiers)
defer mockServer.OnTestEnd()

asn := newTestAssigner(t, subscription)

if gotErr := asn.StartError(); gotErr != nil {
t.Errorf("Start() got err: (%v)", gotErr)
}
if got, want := asn.NextPartitions(), []int{2, 3, 4}; !testutil.Equal(got, want) {
t.Errorf("Partition assignment #1: got %v, want %v", got, want)
}
if got, want := asn.NextPartitions(), []int{0, 3}; !testutil.Equal(got, want) {
t.Errorf("Partition assignment #2: got %v, want %v", got, want)
}
if gotErr, wantErr := asn.FinalError(), permanentErr; !test.ErrorEqual(gotErr, wantErr) {
t.Errorf("Final err: (%v), want: (%v)", gotErr, wantErr)
}
}

func TestAssignerHandlePartitionFailure(t *testing.T) {
subscription := "projects/123456/locations/us-central1-b/subscriptions/my-subs"

verifiers := test.NewVerifiers(t)
stream := test.NewRPCVerifier(t)
stream.Push(initAssignmentReq(subscription, fakeUUID[:]), assignmentResp([]int64{1, 2}), nil)
verifiers.AddAssignmentStream(subscription, stream)

mockServer.OnTestStart(verifiers)
defer mockServer.OnTestEnd()

asn := newTestAssigner(t, subscription)
// Simulates the assigningSubscriber discarding assignments.
asn.RetError = errors.New("subscriber shutting down")

if gotErr := asn.StartError(); gotErr != nil {
t.Errorf("Start() got err: (%v)", gotErr)
}
if got, want := asn.NextPartitions(), []int{1, 2}; !testutil.Equal(got, want) {
t.Errorf("Partition assignments: got %v, want %v", got, want)
}
if gotErr := asn.FinalError(); !test.ErrorEqual(gotErr, asn.RetError) {
t.Errorf("Final err: (%v), want: (%v)", gotErr, asn.RetError)
}
}

0 comments on commit d1c03da

Please sign in to comment.