Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(pubsublite): ensure api clients are closed when startup fails #4239

Merged
merged 2 commits into from Jun 10, 2021
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
18 changes: 8 additions & 10 deletions pubsublite/internal/wire/publisher.go
Expand Up @@ -279,7 +279,6 @@ func (pp *singlePartitionPublisher) unsafeCheckDone() {
// count, but not decreasing.
type routingPublisher struct {
// Immutable after creation.
clients apiClients
msgRouterFactory *messageRouterFactory
pubFactory *singlePartitionPublisherFactory
partitionWatcher *partitionCountWatcher
Expand All @@ -288,12 +287,12 @@ type routingPublisher struct {
msgRouter messageRouter
publishers []*singlePartitionPublisher

compositeService
apiClientService
}

func newRoutingPublisher(allClients apiClients, adminClient *vkit.AdminClient, msgRouterFactory *messageRouterFactory, pubFactory *singlePartitionPublisherFactory) *routingPublisher {
pub := &routingPublisher{
clients: allClients,
apiClientService: apiClientService{clients: allClients},
msgRouterFactory: msgRouterFactory,
pubFactory: pubFactory,
}
Expand Down Expand Up @@ -359,12 +358,6 @@ func (rp *routingPublisher) routeToPublisher(msg *pb.PubSubMessage) (*singlePart
return rp.publishers[partition], nil
}

func (rp *routingPublisher) WaitStopped() error {
err := rp.compositeService.WaitStopped()
rp.clients.Close()
return err
}

// Publisher is the client interface exported from this package for publishing
// messages.
type Publisher interface {
Expand All @@ -385,15 +378,20 @@ func NewPublisher(ctx context.Context, settings PublishSettings, region, topicPa
if err := validatePublishSettings(settings); err != nil {
return nil, err
}

var allClients apiClients
pubClient, err := newPublisherClient(ctx, region, opts...)
if err != nil {
return nil, err
}
allClients = append(allClients, pubClient)

adminClient, err := NewAdminClient(ctx, region, opts...)
if err != nil {
allClients.Close()
return nil, err
}
allClients := apiClients{pubClient, adminClient}
allClients = append(allClients, adminClient)

msgRouterFactory := newMessageRouterFactory(rand.New(rand.NewSource(time.Now().UnixNano())))
pubFactory := &singlePartitionPublisherFactory{
Expand Down
21 changes: 21 additions & 0 deletions pubsublite/internal/wire/service.go
Expand Up @@ -342,3 +342,24 @@ func removeFromSlice(services []service, removeIdx int) []service {
services[lastIdx] = nil
return services[:lastIdx]
}

// A compositeService that handles closing API clients on shutdown.
type apiClientService struct {
clients apiClients

compositeService
}

func (acs *apiClientService) WaitStarted() error {
err := acs.compositeService.WaitStarted()
if err != nil {
acs.WaitStopped()
}
return err
}

func (acs *apiClientService) WaitStopped() error {
err := acs.compositeService.WaitStopped()
acs.clients.Close()
return err
}
34 changes: 13 additions & 21 deletions pubsublite/internal/wire/subscriber.go
Expand Up @@ -402,15 +402,14 @@ func (f *singlePartitionSubscriberFactory) New(partition int) *singlePartitionSu
// partitions.
type multiPartitionSubscriber struct {
// Immutable after creation.
clients apiClients
subscribers []*singlePartitionSubscriber

compositeService
apiClientService
}

func newMultiPartitionSubscriber(allClients apiClients, subFactory *singlePartitionSubscriberFactory) *multiPartitionSubscriber {
ms := &multiPartitionSubscriber{
clients: allClients,
apiClientService: apiClientService{clients: allClients},
}
ms.init()

Expand All @@ -433,33 +432,26 @@ func (ms *multiPartitionSubscriber) Terminate() {
}
}

func (ms *multiPartitionSubscriber) WaitStopped() error {
err := ms.compositeService.WaitStopped()
ms.clients.Close()
return err
}

// assigningSubscriber uses the Pub/Sub Lite partition assignment service to
// listen to its assigned partition numbers and dynamically add/remove
// singlePartitionSubscribers.
type assigningSubscriber struct {
// Immutable after creation.
clients apiClients
subFactory *singlePartitionSubscriberFactory
assigner *assigner

// Fields below must be guarded with mu.
// Subscribers keyed by partition number. Updated as assignments change.
subscribers map[int]*singlePartitionSubscriber

compositeService
apiClientService
}

func newAssigningSubscriber(allClients apiClients, assignmentClient *vkit.PartitionAssignmentClient, genUUID generateUUIDFunc, subFactory *singlePartitionSubscriberFactory) (*assigningSubscriber, error) {
as := &assigningSubscriber{
clients: allClients,
subFactory: subFactory,
subscribers: make(map[int]*singlePartitionSubscriber),
apiClientService: apiClientService{clients: allClients},
subFactory: subFactory,
subscribers: make(map[int]*singlePartitionSubscriber),
}
as.init()

Expand Down Expand Up @@ -515,12 +507,6 @@ func (as *assigningSubscriber) Terminate() {
}
}

func (as *assigningSubscriber) WaitStopped() error {
err := as.compositeService.WaitStopped()
as.clients.Close()
return err
}

// Subscriber is the client interface exported from this package for receiving
// messages.
type Subscriber interface {
Expand All @@ -539,15 +525,20 @@ func NewSubscriber(ctx context.Context, settings ReceiveSettings, receiver Messa
if err := validateReceiveSettings(settings); err != nil {
return nil, err
}

var allClients apiClients
subClient, err := newSubscriberClient(ctx, region, opts...)
if err != nil {
return nil, err
}
allClients = append(allClients, subClient)

cursorClient, err := newCursorClient(ctx, region, opts...)
if err != nil {
allClients.Close()
return nil, err
}
allClients := apiClients{subClient, cursorClient}
allClients = append(allClients, cursorClient)

subFactory := &singlePartitionSubscriberFactory{
ctx: ctx,
Expand All @@ -563,6 +554,7 @@ func NewSubscriber(ctx context.Context, settings ReceiveSettings, receiver Messa
}
partitionClient, err := newPartitionAssignmentClient(ctx, region, opts...)
if err != nil {
allClients.Close()
return nil, err
}
allClients = append(allClients, partitionClient)
Expand Down