Skip to content
Permalink
Browse files
fix(pubsublite): make SubscriberClient.Receive identical to pubsub (#…
…4281)

- Ensures that pscompat.SubscriberClient.Receive and pubsub.Subscription.Receive have identical interfaces.
- Adds examples for how to declare common interfaces for pubsublite and pubsub.
  • Loading branch information
tmdiep committed Jun 22, 2021
1 parent 634847b commit 5b5d0f782b224f324dcfa13cc4145ee33a395d09
Showing with 106 additions and 38 deletions.
  1. +21 −15 pubsublite/doc.go
  2. +1 −2 pubsublite/internal/wire/README.md
  3. +7 −1 pubsublite/pscompat/doc.go
  4. +64 −0 pubsublite/pscompat/example_test.go
  5. +12 −19 pubsublite/pscompat/subscriber.go
  6. +1 −1 pubsublite/pscompat/subscriber_test.go
@@ -29,14 +29,30 @@ https://cloud.google.com/pubsub/docs/choosing-pubsub-or-lite.
More information about Pub/Sub Lite is available at
https://cloud.google.com/pubsub/lite.
See https://pkg.go.dev/cloud.google.com/go for authentication, timeouts,
connection pooling and similar aspects of this package.
Note: This library is in BETA. Backwards-incompatible changes may be made before
stable v1.0.0 is released.
Introduction
See https://pkg.go.dev/cloud.google.com/go for authentication, timeouts,
connection pooling and similar aspects of this package.
Examples can be found at
https://pkg.go.dev/cloud.google.com/go/pubsublite#pkg-examples
and
https://pkg.go.dev/cloud.google.com/go/pubsublite/pscompat#pkg-examples.
Complete sample programs can be found at
https://github.com/GoogleCloudPlatform/golang-samples/tree/master/pubsublite.
The cloud.google.com/go/pubsublite/pscompat subpackage contains clients for
publishing and receiving messages, which have similar interfaces to their
pubsub.Topic and pubsub.Subscription counterparts in cloud.google.com/go/pubsub.
The following examples demonstrate how to declare common interfaces:
https://pkg.go.dev/cloud.google.com/go/pubsublite/pscompat#example-NewPublisherClient-Interface
and
https://pkg.go.dev/cloud.google.com/go/pubsublite/pscompat#example-NewSubscriberClient-Interface.
The following imports are required for code snippets below:
@@ -46,11 +62,6 @@ The following imports are required for code snippets below:
"cloud.google.com/go/pubsublite/pscompat"
)
More complete examples can be found at
https://pkg.go.dev/cloud.google.com/go/pubsublite#pkg-examples
and
https://pkg.go.dev/cloud.google.com/go/pubsublite/pscompat#pkg-examples.
Creating Topics
@@ -83,11 +94,6 @@ where Pub/Sub Lite is available.
Publishing
The pubsublite/pscompat subpackage contains clients for publishing and receiving
messages, which have similar interfaces to their pubsub.Topic and
pubsub.Subscription counterparts in the Cloud Pub/Sub library:
https://pkg.go.dev/cloud.google.com/go/pubsub.
Pub/Sub Lite uses gRPC streams extensively for high throughput. For more
differences, see https://pkg.go.dev/cloud.google.com/go/pubsublite/pscompat.
@@ -118,7 +124,7 @@ service:
Once you've finishing publishing all messages, call Stop to flush all messages
to the service and close gRPC streams. The PublisherClient can no longer be used
after it has been stopped or has terminated due to a permanent service error.
after it has been stopped or has terminated due to a permanent error.
publisher.Stop()
@@ -167,8 +173,8 @@ subscriber client is connected to).
// TODO: Handle error.
}
Receive blocks until either the context is canceled or a fatal service error
occurs. To terminate a call to Receive, cancel its context:
Receive blocks until either the context is canceled or a permanent error occurs.
To terminate a call to Receive, cancel its context:
cancel()
@@ -1,7 +1,6 @@
# Wire

This directory contains internal implementation details for Cloud Pub/Sub Lite.
Its exported interface can change at any time.
This directory contains internal implementation details for Pub/Sub Lite.

## Conventions

@@ -19,7 +19,10 @@ This package is designed to compatible with the Cloud Pub/Sub library:
https://pkg.go.dev/cloud.google.com/go/pubsub. If interfaces are defined by the
client application, PublisherClient and SubscriberClient can be used as
substitutions for pubsub.Topic.Publish() and pubsub.Subscription.Receive(),
respectively, from the pubsub package.
respectively, from the pubsub package. See the following examples:
https://pkg.go.dev/cloud.google.com/go/pubsublite/pscompat#example-NewPublisherClient-Interface
and
https://pkg.go.dev/cloud.google.com/go/pubsublite/pscompat#example-NewSubscriberClient-Interface.
The Cloud Pub/Sub and Pub/Sub Lite services have some differences:
- Pub/Sub Lite does not support NACK for messages. By default, this will
@@ -42,6 +45,9 @@ https://cloud.google.com/pubsub/lite.
Information about choosing between Cloud Pub/Sub vs Pub/Sub Lite is available at
https://cloud.google.com/pubsub/docs/choosing-pubsub-or-lite.
Complete sample programs can be found at
https://github.com/GoogleCloudPlatform/golang-samples/tree/master/pubsublite.
See https://pkg.go.dev/cloud.google.com/go for authentication, timeouts,
connection pooling and similar aspects of this package.
*/
@@ -292,6 +292,70 @@ func ExampleSubscriberClient_Receive_manualPartitionAssignment() {
cancel()
}

// This example illustrates how to declare a common interface for publisher
// clients from Cloud Pub/Sub (cloud.google.com/go/pubsub) and Pub/Sub Lite
// (cloud.google.com/go/pubsublite/pscompat).
func ExampleNewPublisherClient_interface() {
// publisherInterface is implemented by both pscompat.PublisherClient and
// pubsub.Topic.
type publisherInterface interface {
Publish(context.Context, *pubsub.Message) *pubsub.PublishResult
Stop()
}

publish := func(publisher publisherInterface) {
defer publisher.Stop()
// TODO: Publish messages.
}

// Create a Pub/Sub Lite publisher client.
ctx := context.Background()
publisher, err := pscompat.NewPublisherClient(ctx, "projects/my-project/locations/zone/topics/my-topic")
if err != nil {
// TODO: Handle error.
}
publish(publisher)

// Create a Cloud Pub/Sub topic to publish.
client, err := pubsub.NewClient(ctx, "my-project")
if err != nil {
// TODO: Handle error.
}
topic := client.Topic("my-topic")
publish(topic)
}

// This example illustrates how to declare a common interface for subscriber
// clients from Cloud Pub/Sub (cloud.google.com/go/pubsub) and Pub/Sub Lite
// (cloud.google.com/go/pubsublite/pscompat).
func ExampleNewSubscriberClient_interface() {
// subscriberInterface is implemented by both pscompat.SubscriberClient and
// pubsub.Subscription.
type subscriberInterface interface {
Receive(context.Context, func(context.Context, *pubsub.Message)) error
}

receive := func(subscriber subscriberInterface) {
// TODO: Receive messages.
}

// Create a Pub/Sub Lite subscriber client.
ctx := context.Background()
subscriber, err := pscompat.NewSubscriberClient(ctx, "projects/my-project/locations/zone/subscriptions/my-subscription")
if err != nil {
// TODO: Handle error.
}
receive(subscriber)

// Create a Cloud Pub/Sub subscription to receive.
client, err := pubsub.NewClient(ctx, "my-project")
if err != nil {
// TODO: Handle error.
}
subscription := client.Subscription("my-subscription")
receive(subscription)
}

func ExampleParseMessageMetadata_publisher() {
ctx := context.Background()
const topic = "projects/my-project/locations/zone/topics/my-topic"
@@ -86,11 +86,13 @@ func (f *wireSubscriberFactoryImpl) New(receiver wire.MessageReceiverFunc) (wire
return wire.NewSubscriber(context.Background(), f.settings, receiver, f.region, f.subscription.String(), f.options...)
}

type messageReceiverFunc = func(context.Context, *pubsub.Message)

// subscriberInstance wraps an instance of a wire.Subscriber. A new instance is
// created for each invocation of SubscriberClient.Receive().
type subscriberInstance struct {
settings ReceiveSettings
receiver MessageReceiverFunc
receiver messageReceiverFunc
recvCtx context.Context // Context passed to the receiver
recvCancel context.CancelFunc // Corresponding cancel func for recvCtx
wireSub wire.Subscriber
@@ -101,7 +103,7 @@ type subscriberInstance struct {
err error
}

func newSubscriberInstance(ctx context.Context, factory wireSubscriberFactory, settings ReceiveSettings, receiver MessageReceiverFunc) (*subscriberInstance, error) {
func newSubscriberInstance(ctx context.Context, factory wireSubscriberFactory, settings ReceiveSettings, receiver messageReceiverFunc) (*subscriberInstance, error) {
recvCtx, recvCancel := context.WithCancel(ctx)
subInstance := &subscriberInstance{
settings: settings,
@@ -221,17 +223,6 @@ func (si *subscriberInstance) Wait(ctx context.Context) error {
return err
}

// MessageReceiverFunc handles messages sent by the Pub/Sub Lite service.
//
// The implementation must arrange for pubsub.Message.Ack() or
// pubsub.Message.Nack() to be called after processing the message.
//
// The receiver func will be called from multiple goroutines if the subscriber
// is connected to multiple partitions. Only one call from any connected
// partition will be outstanding at a time, and blocking in this receiver
// callback will block the delivery of subsequent messages for the partition.
type MessageReceiverFunc func(context.Context, *pubsub.Message)

// SubscriberClient is a Pub/Sub Lite client to receive messages for a given
// subscription.
//
@@ -292,18 +283,20 @@ func NewSubscriberClientWithSettings(ctx context.Context, subscription string, s
// If there is a fatal service error, Receive returns that error after all of
// the outstanding calls to f have returned. If ctx is done, Receive returns nil
// after all of the outstanding calls to f have returned and all messages have
// been acknowledged.
// been acknowledged. The context passed to f will be canceled when ctx is Done
// or there is a fatal service error.
//
// Receive calls f concurrently from multiple goroutines if the SubscriberClient
// is connected to multiple partitions. All messages received by f must be ACKed
// or NACKed. Failure to do so can prevent Receive from returning.
// is connected to multiple partitions. Only one call from any connected
// partition will be outstanding at a time, and blocking in the receiver
// callback f will block the delivery of subsequent messages for the partition.
//
// The context passed to f will be canceled when ctx is Done or there is a fatal
// service error.
// All messages received by f must be ACKed or NACKed. Failure to do so can
// prevent Receive from returning.
//
// Each SubscriberClient may have only one invocation of Receive active at a
// time.
func (s *SubscriberClient) Receive(ctx context.Context, f MessageReceiverFunc) error {
func (s *SubscriberClient) Receive(ctx context.Context, f func(context.Context, *pubsub.Message)) error {
if err := s.setReceiveActive(true); err != nil {
return err
}
@@ -121,7 +121,7 @@ func (f *mockWireSubscriberFactory) New(receiver wire.MessageReceiverFunc) (wire
}, nil
}

func newTestSubscriberInstance(ctx context.Context, settings ReceiveSettings, receiver MessageReceiverFunc) *subscriberInstance {
func newTestSubscriberInstance(ctx context.Context, settings ReceiveSettings, receiver messageReceiverFunc) *subscriberInstance {
sub, _ := newSubscriberInstance(ctx, new(mockWireSubscriberFactory), settings, receiver)
return sub
}

0 comments on commit 5b5d0f7

Please sign in to comment.