Skip to content
Permalink
Browse files
feat(pubsublite): Single and multi partition subscribers (#3221)
Implements:
- subscriberStream: directly wraps the Subscribe bidi stream
- singlePartitionSubscriber: wraps the committer and subscriberStream
- multiPartitionSubscriber: receives messages from multiple fixed partitions
  • Loading branch information
tmdiep committed Nov 19, 2020
1 parent 758a88d commit 299b803aaee9a0dc0b2ec8c81fac66341045b8b2
@@ -32,7 +32,7 @@ type flowControlTokens struct {
Messages int64
}

// A TokenCounter stores the amount of outstanding byte and message flow control
// A tokenCounter stores the amount of outstanding byte and message flow control
// tokens that the client believes exists for the stream.
type tokenCounter struct {
Bytes int64
@@ -81,7 +81,7 @@ func (tc *tokenCounter) ToFlowControlRequest() *pb.FlowControlRequest {

// flowControlBatcher tracks flow control tokens and manages batching of flow
// control requests to avoid overwhelming the server. It is only accessed by
// the wireSubscriber.
// the subscribeStream.
type flowControlBatcher struct {
// The current amount of outstanding byte and message flow control tokens.
clientTokens tokenCounter
@@ -142,7 +142,7 @@ func (fc *flowControlBatcher) ShouldExpediteBatchRequest() bool {
}

// subscriberOffsetTracker tracks the expected offset of the next message
// received from the server. It is only accessed by the wireSubscriber.
// received from the server. It is only accessed by the subscribeStream.
type subscriberOffsetTracker struct {
minNextOffset int64
}
@@ -0,0 +1,327 @@
// Copyright 2020 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and

package wire

import (
"context"
"errors"
"reflect"
"time"

"google.golang.org/grpc"

vkit "cloud.google.com/go/pubsublite/apiv1"
pb "google.golang.org/genproto/googleapis/cloud/pubsublite/v1"
)

var (
errServerNoMessages = errors.New("pubsublite: server delivered no messages")
errInvalidInitialSubscribeResponse = errors.New("pubsublite: first response from server was not an initial response for subscribe")
errInvalidSubscribeResponse = errors.New("pubsublite: received invalid subscribe response from server")
errNoInFlightSeek = errors.New("pubsublite: received seek response for no in-flight seek")
)

// ReceivedMessage stores a received Pub/Sub message and AckConsumer for
// acknowledging the message.
type ReceivedMessage struct {
Msg *pb.SequencedMessage
Ack AckConsumer
}

// MessageReceiverFunc receives a batch of Pub/Sub messages from a topic
// partition.
type MessageReceiverFunc func([]*ReceivedMessage)

// The frequency of sending batch flow control requests.
const batchFlowControlPeriod = 100 * time.Millisecond

// subscribeStream directly wraps the subscribe client stream. It passes
// messages to the message receiver and manages flow control. Flow control
// tokens are batched and sent to the stream via a periodic background task,
// although it can be expedited if the user is rapidly acking messages.
//
// Client-initiated seek unsupported.
type subscribeStream struct {
// Immutable after creation.
subClient *vkit.SubscriberClient
settings ReceiveSettings
subscription subscriptionPartition
initialReq *pb.SubscribeRequest
receiver MessageReceiverFunc

// Fields below must be guarded with mutex.
stream *retryableStream
acks *ackTracker
offsetTracker subscriberOffsetTracker
flowControl flowControlBatcher
pollFlowControl *periodicTask
seekInFlight bool

abstractService
}

func newSubscribeStream(ctx context.Context, subClient *vkit.SubscriberClient, settings ReceiveSettings,
receiver MessageReceiverFunc, subscription subscriptionPartition, acks *ackTracker, disableTasks bool) *subscribeStream {

s := &subscribeStream{
subClient: subClient,
settings: settings,
subscription: subscription,
initialReq: &pb.SubscribeRequest{
Request: &pb.SubscribeRequest_Initial{
Initial: &pb.InitialSubscribeRequest{
Subscription: subscription.Path,
Partition: int64(subscription.Partition),
},
},
},
receiver: receiver,
acks: acks,
}
s.stream = newRetryableStream(ctx, s, settings.Timeout, reflect.TypeOf(pb.SubscribeResponse{}))

backgroundTask := s.sendBatchFlowControl
if disableTasks {
backgroundTask = func() {}
}
s.pollFlowControl = newPeriodicTask(batchFlowControlPeriod, backgroundTask)
return s
}

// Start establishes a subscribe stream connection and initializes flow control
// tokens from ReceiveSettings.
func (s *subscribeStream) Start() {
s.mu.Lock()
defer s.mu.Unlock()

if s.unsafeUpdateStatus(serviceStarting, nil) {
s.stream.Start()
s.pollFlowControl.Start()

s.flowControl.OnClientFlow(flowControlTokens{
Bytes: int64(s.settings.MaxOutstandingBytes),
Messages: int64(s.settings.MaxOutstandingMessages),
})
}
}

// Stop immediately terminates the subscribe stream.
func (s *subscribeStream) Stop() {
s.mu.Lock()
defer s.mu.Unlock()
s.unsafeInitiateShutdown(serviceTerminating, nil)
}

func (s *subscribeStream) newStream(ctx context.Context) (grpc.ClientStream, error) {
return s.subClient.Subscribe(addSubscriptionRoutingMetadata(ctx, s.subscription))
}

func (s *subscribeStream) initialRequest() (interface{}, bool) {
return s.initialReq, true
}

func (s *subscribeStream) validateInitialResponse(response interface{}) error {
subscribeResponse, _ := response.(*pb.SubscribeResponse)
if subscribeResponse.GetInitial() == nil {
return errInvalidInitialSubscribeResponse
}
return nil
}

func (s *subscribeStream) onStreamStatusChange(status streamStatus) {
s.mu.Lock()
defer s.mu.Unlock()

switch status {
case streamConnected:
s.unsafeUpdateStatus(serviceActive, nil)

// Reinitialize the offset and flow control tokens when a new subscribe
// stream instance is connected.
if seekReq := s.offsetTracker.RequestForRestart(); seekReq != nil {
// Note: If Send() returns false, the subscriber will either terminate or
// the stream will be reconnected.
if s.stream.Send(&pb.SubscribeRequest{
Request: &pb.SubscribeRequest_Seek{Seek: seekReq},
}) {
s.seekInFlight = true
}
}
s.unsafeSendFlowControl(s.flowControl.RequestForRestart())
s.pollFlowControl.Start()

case streamReconnecting:
s.seekInFlight = false
s.pollFlowControl.Stop()

case streamTerminated:
s.unsafeInitiateShutdown(serviceTerminated, s.stream.Error())
}
}

func (s *subscribeStream) onResponse(response interface{}) {
var receivedMsgs []*ReceivedMessage
var err error
s.mu.Lock()

subscribeResponse, _ := response.(*pb.SubscribeResponse)
switch {
case subscribeResponse.GetMessages() != nil:
receivedMsgs, err = s.unsafeOnMessageResponse(subscribeResponse.GetMessages())
case subscribeResponse.GetSeek() != nil:
err = s.unsafeOnSeekResponse(subscribeResponse.GetSeek())
default:
err = errInvalidSubscribeResponse
}

if receivedMsgs != nil {
// Deliver messages without holding the mutex to prevent deadlocks.
s.mu.Unlock()
s.receiver(receivedMsgs)
return
}
if err != nil {
s.unsafeInitiateShutdown(serviceTerminated, err)
}
s.mu.Unlock()
}

func (s *subscribeStream) unsafeOnSeekResponse(response *pb.SeekResponse) error {
if !s.seekInFlight {
return errNoInFlightSeek
}
s.seekInFlight = false
return nil
}

func (s *subscribeStream) unsafeOnMessageResponse(response *pb.MessageResponse) ([]*ReceivedMessage, error) {
if len(response.Messages) == 0 {
return nil, errServerNoMessages
}
if err := s.offsetTracker.OnMessages(response.Messages); err != nil {
return nil, err
}
if err := s.flowControl.OnMessages(response.Messages); err != nil {
return nil, err
}

var receivedMsgs []*ReceivedMessage
for _, msg := range response.Messages {
// Register outstanding acks, which are primarily handled by the
// `committer`.
ack := newAckConsumer(msg.GetCursor().GetOffset(), msg.GetSizeBytes(), s.onAck)
if err := s.acks.Push(ack); err != nil {
return nil, err
}
receivedMsgs = append(receivedMsgs, &ReceivedMessage{Msg: msg, Ack: ack})
}
return receivedMsgs, nil
}

func (s *subscribeStream) onAck(ac *ackConsumer) {
// Don't block the user's goroutine with potentially expensive ack processing.
go s.onAckAsync(ac.MsgBytes)
}

func (s *subscribeStream) onAckAsync(msgBytes int64) {
s.mu.Lock()
defer s.mu.Unlock()

if s.status == serviceActive {
s.unsafeAllowFlow(flowControlTokens{Bytes: msgBytes, Messages: 1})
}
}

// sendBatchFlowControl is called by the periodic background task.
func (s *subscribeStream) sendBatchFlowControl() {
s.mu.Lock()
defer s.mu.Unlock()
s.unsafeSendFlowControl(s.flowControl.ReleasePendingRequest())
}

func (s *subscribeStream) unsafeAllowFlow(allow flowControlTokens) {
s.flowControl.OnClientFlow(allow)
if s.flowControl.ShouldExpediteBatchRequest() {
s.unsafeSendFlowControl(s.flowControl.ReleasePendingRequest())
}
}

func (s *subscribeStream) unsafeSendFlowControl(req *pb.FlowControlRequest) {
if req == nil {
return
}

// Note: If Send() returns false, the stream will be reconnected and
// flowControlBatcher.RequestForRestart() will be sent when the stream
// reconnects. So its return value is ignored.
s.stream.Send(&pb.SubscribeRequest{
Request: &pb.SubscribeRequest_FlowControl{FlowControl: req},
})
}

func (s *subscribeStream) unsafeInitiateShutdown(targetStatus serviceStatus, err error) {
if !s.unsafeUpdateStatus(targetStatus, err) {
return
}

// No data to send. Immediately terminate the stream.
s.pollFlowControl.Stop()
s.stream.Stop()
}

// singlePartitionSubscriber receives messages from a single topic partition.
// It requires 2 child services:
// - subscribeStream to receive messages from the subscribe stream.
// - committer to commit cursor offsets to the streaming commit cursor stream.
type singlePartitionSubscriber struct {
compositeService
}

type singlePartitionSubscriberFactory struct {
ctx context.Context
subClient *vkit.SubscriberClient
cursorClient *vkit.CursorClient
settings ReceiveSettings
subscriptionPath string
receiver MessageReceiverFunc
disableTasks bool
}

func (f *singlePartitionSubscriberFactory) New(partition int) *singlePartitionSubscriber {
subscription := subscriptionPartition{Path: f.subscriptionPath, Partition: partition}
acks := newAckTracker()
commit := newCommitter(f.ctx, f.cursorClient, f.settings, subscription, acks, f.disableTasks)
sub := newSubscribeStream(f.ctx, f.subClient, f.settings, f.receiver, subscription, acks, f.disableTasks)
ps := new(singlePartitionSubscriber)
ps.init()
ps.unsafeAddServices(sub, commit)
return ps
}

// multiPartitionSubscriber receives messages from a fixed set of topic
// partitions.
type multiPartitionSubscriber struct {
compositeService
}

func newMultiPartitionSubscriber(subFactory *singlePartitionSubscriberFactory) *multiPartitionSubscriber {
ms := new(multiPartitionSubscriber)
ms.init()

for _, partition := range subFactory.settings.Partitions {
subscriber := subFactory.New(partition)
ms.unsafeAddServices(subscriber)
}
return ms
}

0 comments on commit 299b803

Please sign in to comment.