Skip to content

Commit 6f7fa86

Browse files
authored
feat(pubsublite): Types for resource paths and topic/subscription configs (#3026)
{Topic|Subscription}ConfigToUpdate are really just a subset of {Topic|Subscription}Config, but I adopted the *ToUpdate pattern of some of the other client libraries and for future flexibility and to provide clearer documentation. The config fields were flattened for simplicity. Type and field names were chosen to match Cloud Pub/Sub, where there is overlap. See http://go/pubsublite-go-clientlib-interface.
1 parent 10ccca2 commit 6f7fa86

File tree

5 files changed

+997
-0
lines changed

5 files changed

+997
-0
lines changed

pubsublite/config.go

Lines changed: 264 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,264 @@
1+
// Copyright 2020 Google LLC
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// https://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
14+
package pubsublite
15+
16+
import (
17+
"fmt"
18+
"time"
19+
20+
"cloud.google.com/go/internal/optional"
21+
"github.com/golang/protobuf/ptypes"
22+
23+
pb "google.golang.org/genproto/googleapis/cloud/pubsublite/v1"
24+
fmpb "google.golang.org/genproto/protobuf/field_mask"
25+
)
26+
27+
// TopicConfig describes the properties of a Google Pub/Sub Lite topic.
28+
// See https://cloud.google.com/pubsub/lite/docs/topics for more information
29+
// about how topics are configured.
30+
type TopicConfig struct {
31+
// The full path of a topic.
32+
Name TopicPath
33+
34+
// The number of partitions in the topic. Must be at least 1. Cannot be
35+
// changed after creation.
36+
PartitionCount int64
37+
38+
// Publish throughput capacity per partition in MiB/s.
39+
// Must be >= 4 and <= 16.
40+
PublishCapacityMiBPerSec int32
41+
42+
// Subscribe throughput capacity per partition in MiB/s.
43+
// Must be >= 4 and <= 32.
44+
SubscribeCapacityMiBPerSec int32
45+
46+
// The provisioned storage, in bytes, per partition. If the number of bytes
47+
// stored in any of the topic's partitions grows beyond this value, older
48+
// messages will be dropped to make room for newer ones, regardless of the
49+
// value of `RetentionDuration`.
50+
PerPartitionBytes int64
51+
52+
// How long a published message is retained. If unset, messages will be
53+
// retained as long as the bytes retained for each partition is below
54+
// `PerPartitionBytes`.
55+
RetentionDuration optional.Duration
56+
}
57+
58+
func (tc *TopicConfig) toProto() *pb.Topic {
59+
topicpb := &pb.Topic{
60+
Name: tc.Name.String(),
61+
PartitionConfig: &pb.Topic_PartitionConfig{
62+
Count: tc.PartitionCount,
63+
Dimension: &pb.Topic_PartitionConfig_Capacity_{
64+
Capacity: &pb.Topic_PartitionConfig_Capacity{
65+
PublishMibPerSec: tc.PublishCapacityMiBPerSec,
66+
SubscribeMibPerSec: tc.SubscribeCapacityMiBPerSec,
67+
},
68+
},
69+
},
70+
RetentionConfig: &pb.Topic_RetentionConfig{
71+
PerPartitionBytes: tc.PerPartitionBytes,
72+
},
73+
}
74+
if tc.RetentionDuration != nil {
75+
duration := optional.ToDuration(tc.RetentionDuration)
76+
if duration >= 0 {
77+
topicpb.RetentionConfig.Period = ptypes.DurationProto(duration)
78+
}
79+
}
80+
return topicpb
81+
}
82+
83+
func protoToTopicConfig(t *pb.Topic) (*TopicConfig, error) {
84+
name, err := ParseTopicPath(t.GetName())
85+
if err != nil {
86+
return nil, fmt.Errorf("pubsublite: invalid topic name %q in topic config", t.GetName())
87+
}
88+
89+
partitionCfg := t.GetPartitionConfig()
90+
retentionCfg := t.GetRetentionConfig()
91+
topic := &TopicConfig{
92+
Name: name,
93+
PartitionCount: partitionCfg.GetCount(),
94+
PublishCapacityMiBPerSec: partitionCfg.GetCapacity().GetPublishMibPerSec(),
95+
SubscribeCapacityMiBPerSec: partitionCfg.GetCapacity().GetSubscribeMibPerSec(),
96+
PerPartitionBytes: retentionCfg.GetPerPartitionBytes(),
97+
}
98+
// An unset retention period proto denotes "infinite retention".
99+
if retentionCfg.Period != nil {
100+
period, err := ptypes.Duration(retentionCfg.Period)
101+
if err != nil {
102+
return nil, fmt.Errorf("pubsublite: invalid retention period in topic config: %v", err)
103+
}
104+
topic.RetentionDuration = period
105+
}
106+
return topic, nil
107+
}
108+
109+
// InfiniteRetention is sentinel used when updating topic configs to clear a
110+
// retention duration (i.e. retain messages as long as there is available
111+
// storage).
112+
const InfiniteRetention = time.Duration(-1)
113+
114+
// TopicConfigToUpdate specifies the properties to update for a topic.
115+
type TopicConfigToUpdate struct {
116+
// The full path of the topic to update. Required.
117+
Name TopicPath
118+
119+
// If non-zero, will update the publish throughput capacity per partition.
120+
PublishCapacityMiBPerSec int32
121+
122+
// If non-zero, will update the subscribe throughput capacity per partition.
123+
SubscribeCapacityMiBPerSec int32
124+
125+
// If non-zero, will update the provisioned storage per partition.
126+
PerPartitionBytes int64
127+
128+
// If specified, will update how long a published message is retained. To
129+
// clear a retention duration (i.e. retain messages as long as there is
130+
// available storage), set this to `pubsublite.InfiniteRetention`.
131+
RetentionDuration optional.Duration
132+
}
133+
134+
func (tc *TopicConfigToUpdate) toUpdateRequest() *pb.UpdateTopicRequest {
135+
updatedTopic := &pb.Topic{
136+
Name: tc.Name.String(),
137+
PartitionConfig: &pb.Topic_PartitionConfig{
138+
Dimension: &pb.Topic_PartitionConfig_Capacity_{
139+
Capacity: &pb.Topic_PartitionConfig_Capacity{
140+
PublishMibPerSec: tc.PublishCapacityMiBPerSec,
141+
SubscribeMibPerSec: tc.SubscribeCapacityMiBPerSec,
142+
},
143+
},
144+
},
145+
RetentionConfig: &pb.Topic_RetentionConfig{
146+
PerPartitionBytes: tc.PerPartitionBytes,
147+
},
148+
}
149+
150+
var fields []string
151+
if tc.PublishCapacityMiBPerSec > 0 {
152+
fields = append(fields, "partition_config.capacity.publish_mib_per_sec")
153+
}
154+
if tc.SubscribeCapacityMiBPerSec > 0 {
155+
fields = append(fields, "partition_config.capacity.subscribe_mib_per_sec")
156+
}
157+
if tc.PerPartitionBytes > 0 {
158+
fields = append(fields, "retention_config.per_partition_bytes")
159+
}
160+
if tc.RetentionDuration != nil {
161+
fields = append(fields, "retention_config.period")
162+
duration := optional.ToDuration(tc.RetentionDuration)
163+
// An unset retention period proto denotes "infinite retention".
164+
if duration >= 0 {
165+
updatedTopic.RetentionConfig.Period = ptypes.DurationProto(duration)
166+
}
167+
}
168+
169+
return &pb.UpdateTopicRequest{
170+
Topic: updatedTopic,
171+
UpdateMask: &fmpb.FieldMask{Paths: fields},
172+
}
173+
}
174+
175+
// DeliveryRequirement specifies when a subscription should send messages to
176+
// subscribers relative to persistence in storage.
177+
type DeliveryRequirement int32
178+
179+
const (
180+
// UnspecifiedDeliveryRequirement represents and unset delivery requirement.
181+
UnspecifiedDeliveryRequirement = DeliveryRequirement(pb.Subscription_DeliveryConfig_DELIVERY_REQUIREMENT_UNSPECIFIED)
182+
183+
// DeliverImmediately means the server will not not wait for a published
184+
// message to be successfully written to storage before delivering it to
185+
// subscribers.
186+
DeliverImmediately = DeliveryRequirement(pb.Subscription_DeliveryConfig_DELIVER_IMMEDIATELY)
187+
188+
// DeliverAfterStored means the server will not deliver a published message to
189+
// subscribers until the message has been successfully written to storage.
190+
// This will result in higher end-to-end latency, but consistent delivery.
191+
DeliverAfterStored = DeliveryRequirement(pb.Subscription_DeliveryConfig_DELIVER_AFTER_STORED)
192+
)
193+
194+
// SubscriptionConfig describes the properties of a Google Pub/Sub Lite
195+
// subscription, which is attached to a topic.
196+
// See https://cloud.google.com/pubsub/lite/docs/subscriptions for more
197+
// information about how subscriptions are configured.
198+
type SubscriptionConfig struct {
199+
// The full path of a subscription.
200+
Name SubscriptionPath
201+
202+
// The name of the topic this subscription is attached to. This cannot be
203+
// changed after creation.
204+
Topic TopicPath
205+
206+
// Whether a message should be delivered to subscribers immediately after it
207+
// has been published or after it has been successfully written to storage.
208+
DeliveryRequirement DeliveryRequirement
209+
}
210+
211+
func (sc *SubscriptionConfig) toProto() *pb.Subscription {
212+
return &pb.Subscription{
213+
Name: sc.Name.String(),
214+
Topic: sc.Topic.String(),
215+
DeliveryConfig: &pb.Subscription_DeliveryConfig{
216+
DeliveryRequirement: pb.Subscription_DeliveryConfig_DeliveryRequirement(sc.DeliveryRequirement),
217+
},
218+
}
219+
}
220+
221+
func protoToSubscriptionConfig(s *pb.Subscription) (*SubscriptionConfig, error) {
222+
name, err := ParseSubscriptionPath(s.GetName())
223+
if err != nil {
224+
return nil, fmt.Errorf("pubsublite: invalid subscription name %q in subscription config", s.GetName())
225+
}
226+
topic, err := ParseTopicPath(s.GetTopic())
227+
if err != nil {
228+
return nil, fmt.Errorf("pubsublite: invalid topic name %q in subscription config", s.GetTopic())
229+
}
230+
return &SubscriptionConfig{
231+
Name: name,
232+
Topic: topic,
233+
DeliveryRequirement: DeliveryRequirement(s.GetDeliveryConfig().GetDeliveryRequirement().Number()),
234+
}, nil
235+
}
236+
237+
// SubscriptionConfigToUpdate specifies the properties to update for a
238+
// subscription.
239+
type SubscriptionConfigToUpdate struct {
240+
// The full path of the subscription to update. Required.
241+
Name SubscriptionPath
242+
243+
// If non-zero, updates the message delivery requirement.
244+
DeliveryRequirement DeliveryRequirement
245+
}
246+
247+
func (sc *SubscriptionConfigToUpdate) toUpdateRequest() *pb.UpdateSubscriptionRequest {
248+
updatedSubs := &pb.Subscription{
249+
Name: sc.Name.String(),
250+
DeliveryConfig: &pb.Subscription_DeliveryConfig{
251+
DeliveryRequirement: pb.Subscription_DeliveryConfig_DeliveryRequirement(sc.DeliveryRequirement),
252+
},
253+
}
254+
255+
var fields []string
256+
if sc.DeliveryRequirement > 0 {
257+
fields = append(fields, "delivery_config.delivery_requirement")
258+
}
259+
260+
return &pb.UpdateSubscriptionRequest{
261+
Subscription: updatedSubs,
262+
UpdateMask: &fmpb.FieldMask{Paths: fields},
263+
}
264+
}

0 commit comments

Comments
 (0)