Skip to content

Commit 749473e

Browse files
authored
feat(pubsublite): Pub/Sub Lite admin client (#3036)
Implements pubsublite.Client, which wraps the Pub/Sub Lite Admin Service. Includes integration tests for admin operations.
1 parent 5130694 commit 749473e

File tree

7 files changed

+535
-53
lines changed

7 files changed

+535
-53
lines changed

CONTRIBUTING.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -104,6 +104,7 @@ Next, ensure the following APIs are enabled in the general project:
104104
- Google Compute Engine Instance Groups API
105105
- Kubernetes Engine API
106106
- Cloud Error Reporting API
107+
- Pub/Sub Lite API
107108

108109
Next, create a Datastore database in the general project, and a Firestore
109110
database in the Firestore project.

pubsublite/admin.go

Lines changed: 208 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,208 @@
1+
// Copyright 2020 Google LLC
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// https://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
14+
package pubsublite
15+
16+
import (
17+
"context"
18+
19+
"google.golang.org/api/option"
20+
"google.golang.org/api/option/internaloption"
21+
22+
vkit "cloud.google.com/go/pubsublite/apiv1"
23+
pb "google.golang.org/genproto/googleapis/cloud/pubsublite/v1"
24+
)
25+
26+
// AdminClient provides admin operations for Google Pub/Sub Lite resources
27+
// within a Google Cloud region. An AdminClient may be shared by multiple
28+
// goroutines.
29+
type AdminClient struct {
30+
admin *vkit.AdminClient
31+
}
32+
33+
// NewAdminClient creates a new Cloud Pub/Sub Lite client to perform admin
34+
// operations for resources within a given region.
35+
// See https://cloud.google.com/pubsub/lite/docs/locations for the list of
36+
// regions and zones where Google Pub/Sub Lite is available.
37+
func NewAdminClient(ctx context.Context, region string, opts ...option.ClientOption) (*AdminClient, error) {
38+
if err := validateRegion(region); err != nil {
39+
return nil, err
40+
}
41+
options := []option.ClientOption{internaloption.WithDefaultEndpoint(region + "-pubsublite.googleapis.com:443")}
42+
options = append(options, opts...)
43+
admin, err := vkit.NewAdminClient(ctx, options...)
44+
if err != nil {
45+
return nil, err
46+
}
47+
return &AdminClient{admin: admin}, nil
48+
}
49+
50+
// CreateTopic creates a new topic from the given config.
51+
func (ac *AdminClient) CreateTopic(ctx context.Context, config TopicConfig) (*TopicConfig, error) {
52+
req := &pb.CreateTopicRequest{
53+
Parent: config.Name.location().String(),
54+
Topic: config.toProto(),
55+
TopicId: config.Name.TopicID,
56+
}
57+
topicpb, err := ac.admin.CreateTopic(ctx, req)
58+
if err != nil {
59+
return nil, err
60+
}
61+
return protoToTopicConfig(topicpb)
62+
}
63+
64+
// UpdateTopic updates an existing topic from the given config and returns the
65+
// new topic config.
66+
func (ac *AdminClient) UpdateTopic(ctx context.Context, config TopicConfigToUpdate) (*TopicConfig, error) {
67+
topicpb, err := ac.admin.UpdateTopic(ctx, config.toUpdateRequest())
68+
if err != nil {
69+
return nil, err
70+
}
71+
return protoToTopicConfig(topicpb)
72+
}
73+
74+
// DeleteTopic deletes a topic.
75+
func (ac *AdminClient) DeleteTopic(ctx context.Context, topic TopicPath) error {
76+
return ac.admin.DeleteTopic(ctx, &pb.DeleteTopicRequest{Name: topic.String()})
77+
}
78+
79+
// Topic retrieves the configuration of a topic.
80+
func (ac *AdminClient) Topic(ctx context.Context, topic TopicPath) (*TopicConfig, error) {
81+
topicpb, err := ac.admin.GetTopic(ctx, &pb.GetTopicRequest{Name: topic.String()})
82+
if err != nil {
83+
return nil, err
84+
}
85+
return protoToTopicConfig(topicpb)
86+
}
87+
88+
// TopicPartitions returns the number of partitions for a topic.
89+
func (ac *AdminClient) TopicPartitions(ctx context.Context, topic TopicPath) (int, error) {
90+
partitions, err := ac.admin.GetTopicPartitions(ctx, &pb.GetTopicPartitionsRequest{Name: topic.String()})
91+
if err != nil {
92+
return 0, err
93+
}
94+
return int(partitions.GetPartitionCount()), nil
95+
}
96+
97+
// TopicSubscriptions retrieves the list of subscription paths for a topic.
98+
func (ac *AdminClient) TopicSubscriptions(ctx context.Context, topic TopicPath) (*SubscriptionPathIterator, error) {
99+
subsPathIt := ac.admin.ListTopicSubscriptions(ctx, &pb.ListTopicSubscriptionsRequest{Name: topic.String()})
100+
return &SubscriptionPathIterator{it: subsPathIt}, nil
101+
}
102+
103+
// Topics retrieves the list of topic configs for a given project and zone.
104+
func (ac *AdminClient) Topics(ctx context.Context, location LocationPath) *TopicIterator {
105+
return &TopicIterator{
106+
it: ac.admin.ListTopics(ctx, &pb.ListTopicsRequest{Parent: location.String()}),
107+
}
108+
}
109+
110+
// CreateSubscription creates a new subscription from the given config.
111+
func (ac *AdminClient) CreateSubscription(ctx context.Context, config SubscriptionConfig) (*SubscriptionConfig, error) {
112+
req := &pb.CreateSubscriptionRequest{
113+
Parent: config.Name.location().String(),
114+
Subscription: config.toProto(),
115+
SubscriptionId: config.Name.SubscriptionID,
116+
}
117+
subspb, err := ac.admin.CreateSubscription(ctx, req)
118+
if err != nil {
119+
return nil, err
120+
}
121+
return protoToSubscriptionConfig(subspb)
122+
}
123+
124+
// UpdateSubscription updates an existing subscription from the given config and
125+
// returns the new subscription config.
126+
func (ac *AdminClient) UpdateSubscription(ctx context.Context, config SubscriptionConfigToUpdate) (*SubscriptionConfig, error) {
127+
subspb, err := ac.admin.UpdateSubscription(ctx, config.toUpdateRequest())
128+
if err != nil {
129+
return nil, err
130+
}
131+
return protoToSubscriptionConfig(subspb)
132+
}
133+
134+
// DeleteSubscription deletes a subscription.
135+
func (ac *AdminClient) DeleteSubscription(ctx context.Context, subscription SubscriptionPath) error {
136+
return ac.admin.DeleteSubscription(ctx, &pb.DeleteSubscriptionRequest{Name: subscription.String()})
137+
}
138+
139+
// Subscription retrieves the configuration of a subscription.
140+
func (ac *AdminClient) Subscription(ctx context.Context, subscription SubscriptionPath) (*SubscriptionConfig, error) {
141+
subspb, err := ac.admin.GetSubscription(ctx, &pb.GetSubscriptionRequest{Name: subscription.String()})
142+
if err != nil {
143+
return nil, err
144+
}
145+
return protoToSubscriptionConfig(subspb)
146+
}
147+
148+
// Subscriptions retrieves the list of subscription configs for a given project
149+
// and zone.
150+
func (ac *AdminClient) Subscriptions(ctx context.Context, location LocationPath) *SubscriptionIterator {
151+
return &SubscriptionIterator{
152+
it: ac.admin.ListSubscriptions(ctx, &pb.ListSubscriptionsRequest{Parent: location.String()}),
153+
}
154+
}
155+
156+
// Close releases any resources held by the client when it is no longer
157+
// required. If the client is available for the lifetime of the program, then
158+
// Close need not be called at exit.
159+
func (ac *AdminClient) Close() error {
160+
return ac.admin.Close()
161+
}
162+
163+
// TopicIterator is an iterator that returns a list of topic configs.
164+
type TopicIterator struct {
165+
it *vkit.TopicIterator
166+
}
167+
168+
// Next returns the next topic config. The second return value will be
169+
// iterator.Done if there are no more topic configs.
170+
func (t *TopicIterator) Next() (*TopicConfig, error) {
171+
topicpb, err := t.it.Next()
172+
if err != nil {
173+
return nil, err
174+
}
175+
return protoToTopicConfig(topicpb)
176+
}
177+
178+
// SubscriptionIterator is an iterator that returns a list of subscription
179+
// configs.
180+
type SubscriptionIterator struct {
181+
it *vkit.SubscriptionIterator
182+
}
183+
184+
// Next returns the next subscription config. The second return value will be
185+
// iterator.Done if there are no more subscription configs.
186+
func (s *SubscriptionIterator) Next() (*SubscriptionConfig, error) {
187+
subspb, err := s.it.Next()
188+
if err != nil {
189+
return nil, err
190+
}
191+
return protoToSubscriptionConfig(subspb)
192+
}
193+
194+
// SubscriptionPathIterator is an iterator that returns a list of subscription
195+
// paths.
196+
type SubscriptionPathIterator struct {
197+
it *vkit.StringIterator
198+
}
199+
200+
// Next returns the next subscription path. The second return value will be
201+
// iterator.Done if there are no more subscription paths.
202+
func (sp *SubscriptionPathIterator) Next() (SubscriptionPath, error) {
203+
subsPath, err := sp.it.Next()
204+
if err != nil {
205+
return SubscriptionPath{}, err
206+
}
207+
return parseSubscriptionPath(subsPath)
208+
}

pubsublite/config.go

Lines changed: 30 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,11 @@ import (
2424
fmpb "google.golang.org/genproto/protobuf/field_mask"
2525
)
2626

27+
// InfiniteRetention is a sentinel used in topic configs to denote an infinite
28+
// retention duration (i.e. retain messages as long as there is available
29+
// storage).
30+
const InfiniteRetention = time.Duration(-1)
31+
2732
// TopicConfig describes the properties of a Google Pub/Sub Lite topic.
2833
// See https://cloud.google.com/pubsub/lite/docs/topics for more information
2934
// about how topics are configured.
@@ -33,55 +38,52 @@ type TopicConfig struct {
3338

3439
// The number of partitions in the topic. Must be at least 1. Cannot be
3540
// changed after creation.
36-
PartitionCount int64
41+
PartitionCount int
3742

3843
// Publish throughput capacity per partition in MiB/s.
3944
// Must be >= 4 and <= 16.
40-
PublishCapacityMiBPerSec int32
45+
PublishCapacityMiBPerSec int
4146

4247
// Subscribe throughput capacity per partition in MiB/s.
4348
// Must be >= 4 and <= 32.
44-
SubscribeCapacityMiBPerSec int32
49+
SubscribeCapacityMiBPerSec int
4550

4651
// The provisioned storage, in bytes, per partition. If the number of bytes
4752
// stored in any of the topic's partitions grows beyond this value, older
4853
// messages will be dropped to make room for newer ones, regardless of the
49-
// value of `RetentionDuration`.
54+
// value of `RetentionDuration`. Must be > 0.
5055
PerPartitionBytes int64
5156

52-
// How long a published message is retained. If unset, messages will be
53-
// retained as long as the bytes retained for each partition is below
54-
// `PerPartitionBytes`.
55-
RetentionDuration optional.Duration
57+
// How long a published message is retained. If set to `InfiniteRetention`,
58+
// messages will be retained as long as the bytes retained for each partition
59+
// is below `PerPartitionBytes`. Otherwise, must be > 0.
60+
RetentionDuration time.Duration
5661
}
5762

5863
func (tc *TopicConfig) toProto() *pb.Topic {
5964
topicpb := &pb.Topic{
6065
Name: tc.Name.String(),
6166
PartitionConfig: &pb.Topic_PartitionConfig{
62-
Count: tc.PartitionCount,
67+
Count: int64(tc.PartitionCount),
6368
Dimension: &pb.Topic_PartitionConfig_Capacity_{
6469
Capacity: &pb.Topic_PartitionConfig_Capacity{
65-
PublishMibPerSec: tc.PublishCapacityMiBPerSec,
66-
SubscribeMibPerSec: tc.SubscribeCapacityMiBPerSec,
70+
PublishMibPerSec: int32(tc.PublishCapacityMiBPerSec),
71+
SubscribeMibPerSec: int32(tc.SubscribeCapacityMiBPerSec),
6772
},
6873
},
6974
},
7075
RetentionConfig: &pb.Topic_RetentionConfig{
7176
PerPartitionBytes: tc.PerPartitionBytes,
7277
},
7378
}
74-
if tc.RetentionDuration != nil {
75-
duration := optional.ToDuration(tc.RetentionDuration)
76-
if duration >= 0 {
77-
topicpb.RetentionConfig.Period = ptypes.DurationProto(duration)
78-
}
79+
if tc.RetentionDuration >= 0 {
80+
topicpb.RetentionConfig.Period = ptypes.DurationProto(tc.RetentionDuration)
7981
}
8082
return topicpb
8183
}
8284

8385
func protoToTopicConfig(t *pb.Topic) (*TopicConfig, error) {
84-
name, err := ParseTopicPath(t.GetName())
86+
name, err := parseTopicPath(t.GetName())
8587
if err != nil {
8688
return nil, fmt.Errorf("pubsublite: invalid topic name %q in topic config", t.GetName())
8789
}
@@ -90,10 +92,11 @@ func protoToTopicConfig(t *pb.Topic) (*TopicConfig, error) {
9092
retentionCfg := t.GetRetentionConfig()
9193
topic := &TopicConfig{
9294
Name: name,
93-
PartitionCount: partitionCfg.GetCount(),
94-
PublishCapacityMiBPerSec: partitionCfg.GetCapacity().GetPublishMibPerSec(),
95-
SubscribeCapacityMiBPerSec: partitionCfg.GetCapacity().GetSubscribeMibPerSec(),
95+
PartitionCount: int(partitionCfg.GetCount()),
96+
PublishCapacityMiBPerSec: int(partitionCfg.GetCapacity().GetPublishMibPerSec()),
97+
SubscribeCapacityMiBPerSec: int(partitionCfg.GetCapacity().GetSubscribeMibPerSec()),
9698
PerPartitionBytes: retentionCfg.GetPerPartitionBytes(),
99+
RetentionDuration: InfiniteRetention,
97100
}
98101
// An unset retention period proto denotes "infinite retention".
99102
if retentionCfg.Period != nil {
@@ -106,28 +109,23 @@ func protoToTopicConfig(t *pb.Topic) (*TopicConfig, error) {
106109
return topic, nil
107110
}
108111

109-
// InfiniteRetention is sentinel used when updating topic configs to clear a
110-
// retention duration (i.e. retain messages as long as there is available
111-
// storage).
112-
const InfiniteRetention = time.Duration(-1)
113-
114112
// TopicConfigToUpdate specifies the properties to update for a topic.
115113
type TopicConfigToUpdate struct {
116114
// The full path of the topic to update. Required.
117115
Name TopicPath
118116

119117
// If non-zero, will update the publish throughput capacity per partition.
120-
PublishCapacityMiBPerSec int32
118+
PublishCapacityMiBPerSec int
121119

122120
// If non-zero, will update the subscribe throughput capacity per partition.
123-
SubscribeCapacityMiBPerSec int32
121+
SubscribeCapacityMiBPerSec int
124122

125123
// If non-zero, will update the provisioned storage per partition.
126124
PerPartitionBytes int64
127125

128126
// If specified, will update how long a published message is retained. To
129127
// clear a retention duration (i.e. retain messages as long as there is
130-
// available storage), set this to `pubsublite.InfiniteRetention`.
128+
// available storage), set this to `InfiniteRetention`.
131129
RetentionDuration optional.Duration
132130
}
133131

@@ -137,8 +135,8 @@ func (tc *TopicConfigToUpdate) toUpdateRequest() *pb.UpdateTopicRequest {
137135
PartitionConfig: &pb.Topic_PartitionConfig{
138136
Dimension: &pb.Topic_PartitionConfig_Capacity_{
139137
Capacity: &pb.Topic_PartitionConfig_Capacity{
140-
PublishMibPerSec: tc.PublishCapacityMiBPerSec,
141-
SubscribeMibPerSec: tc.SubscribeCapacityMiBPerSec,
138+
PublishMibPerSec: int32(tc.PublishCapacityMiBPerSec),
139+
SubscribeMibPerSec: int32(tc.SubscribeCapacityMiBPerSec),
142140
},
143141
},
144142
},
@@ -219,11 +217,11 @@ func (sc *SubscriptionConfig) toProto() *pb.Subscription {
219217
}
220218

221219
func protoToSubscriptionConfig(s *pb.Subscription) (*SubscriptionConfig, error) {
222-
name, err := ParseSubscriptionPath(s.GetName())
220+
name, err := parseSubscriptionPath(s.GetName())
223221
if err != nil {
224222
return nil, fmt.Errorf("pubsublite: invalid subscription name %q in subscription config", s.GetName())
225223
}
226-
topic, err := ParseTopicPath(s.GetTopic())
224+
topic, err := parseTopicPath(s.GetTopic())
227225
if err != nil {
228226
return nil, fmt.Errorf("pubsublite: invalid topic name %q in subscription config", s.GetTopic())
229227
}

pubsublite/config_test.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -92,6 +92,7 @@ func TestTopicConfigToProtoConversion(t *testing.T) {
9292
PublishCapacityMiBPerSec: 4,
9393
SubscribeCapacityMiBPerSec: 8,
9494
PerPartitionBytes: 4294967296,
95+
RetentionDuration: InfiniteRetention,
9596
},
9697
},
9798
{

0 commit comments

Comments
 (0)