Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
feat(pubsublite): Added Pub/Sub Lite clients and routing headers (#3105)
Added functions to create all the required gapic clients. Updated the AdminClient to use these.
Added utils for attaching headers for routing to the correct backend server handling a particular topic or subscription.
  • Loading branch information
tmdiep committed Oct 30, 2020
1 parent b66727a commit 98668fa
Show file tree
Hide file tree
Showing 2 changed files with 77 additions and 6 deletions.
9 changes: 3 additions & 6 deletions pubsublite/admin.go
Expand Up @@ -17,13 +17,12 @@ import (
"context"

"google.golang.org/api/option"
"google.golang.org/api/option/internaloption"

vkit "cloud.google.com/go/pubsublite/apiv1"
pb "google.golang.org/genproto/googleapis/cloud/pubsublite/v1"
)

// AdminClient provides admin operations for Google Pub/Sub Lite resources
// AdminClient provides admin operations for Cloud Pub/Sub Lite resources
// within a Google Cloud region. An AdminClient may be shared by multiple
// goroutines.
type AdminClient struct {
Expand All @@ -33,14 +32,12 @@ type AdminClient struct {
// NewAdminClient creates a new Cloud Pub/Sub Lite client to perform admin
// operations for resources within a given region.
// See https://cloud.google.com/pubsub/lite/docs/locations for the list of
// regions and zones where Google Pub/Sub Lite is available.
// regions and zones where Cloud Pub/Sub Lite is available.
func NewAdminClient(ctx context.Context, region string, opts ...option.ClientOption) (*AdminClient, error) {
if err := validateRegion(region); err != nil {
return nil, err
}
options := []option.ClientOption{internaloption.WithDefaultEndpoint(region + "-pubsublite.googleapis.com:443")}
options = append(options, opts...)
admin, err := vkit.NewAdminClient(ctx, options...)
admin, err := newAdminClient(ctx, region, opts...)
if err != nil {
return nil, err
}
Expand Down
74 changes: 74 additions & 0 deletions pubsublite/rpc.go
Expand Up @@ -14,11 +14,18 @@
package pubsublite

import (
"context"
"fmt"
"net/url"
"time"

"google.golang.org/api/option"
"google.golang.org/api/option/internaloption"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/metadata"
"google.golang.org/grpc/status"

vkit "cloud.google.com/go/pubsublite/apiv1"
gax "github.com/googleapis/gax-go/v2"
)

Expand Down Expand Up @@ -98,3 +105,70 @@ func isRetryableStreamError(err error, isEligible func(codes.Code) bool) bool {
}
return isEligible(s.Code())
}

const (
pubsubLiteDefaultEndpoint = "-pubsublite.googleapis.com:443"
routingMetadataHeader = "x-goog-request-params"
)

func defaultClientOptions(region string) []option.ClientOption {
return []option.ClientOption{
internaloption.WithDefaultEndpoint(region + pubsubLiteDefaultEndpoint),
}
}

func newAdminClient(ctx context.Context, region string, opts ...option.ClientOption) (*vkit.AdminClient, error) {
if err := validateRegion(region); err != nil {
return nil, err
}
options := append(defaultClientOptions(region), opts...)
return vkit.NewAdminClient(ctx, options...)
}

func newPublisherClient(ctx context.Context, region string, opts ...option.ClientOption) (*vkit.PublisherClient, error) {
if err := validateRegion(region); err != nil {
return nil, err
}
options := append(defaultClientOptions(region), opts...)
return vkit.NewPublisherClient(ctx, options...)
}

func newSubscriberClient(ctx context.Context, region string, opts ...option.ClientOption) (*vkit.SubscriberClient, error) {
if err := validateRegion(region); err != nil {
return nil, err
}
options := append(defaultClientOptions(region), opts...)
return vkit.NewSubscriberClient(ctx, options...)
}

func newCursorClient(ctx context.Context, region string, opts ...option.ClientOption) (*vkit.CursorClient, error) {
if err := validateRegion(region); err != nil {
return nil, err
}
options := append(defaultClientOptions(region), opts...)
return vkit.NewCursorClient(ctx, options...)
}

func newPartitionAssignmentClient(ctx context.Context, region string, opts ...option.ClientOption) (*vkit.PartitionAssignmentClient, error) {
if err := validateRegion(region); err != nil {
return nil, err
}
options := append(defaultClientOptions(region), opts...)
return vkit.NewPartitionAssignmentClient(ctx, options...)
}

func addTopicRoutingMetadata(ctx context.Context, topic TopicPath, partition int) context.Context {
md, _ := metadata.FromOutgoingContext(ctx)
md = md.Copy()
val := fmt.Sprintf("partition=%d&topic=%s", partition, url.QueryEscape(topic.String()))
md[routingMetadataHeader] = append(md[routingMetadataHeader], val)
return metadata.NewOutgoingContext(ctx, md)
}

func addSubscriptionRoutingMetadata(ctx context.Context, subs SubscriptionPath, partition int) context.Context {
md, _ := metadata.FromOutgoingContext(ctx)
md = md.Copy()
val := fmt.Sprintf("partition=%d&subscription=%s", partition, url.QueryEscape(subs.String()))
md[routingMetadataHeader] = append(md[routingMetadataHeader], val)
return metadata.NewOutgoingContext(ctx, md)
}

0 comments on commit 98668fa

Please sign in to comment.