forked from googleapis/google-cloud-go
/
subscription.go
265 lines (226 loc) · 8.56 KB
/
subscription.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
// Copyright 2016 Google Inc. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package pubsub
import (
"errors"
"fmt"
"strings"
"time"
"cloud.google.com/go/iam"
"golang.org/x/net/context"
)
// The default period for which to automatically extend Message acknowledgement deadlines.
const DefaultMaxExtension = 10 * time.Minute
// The default maximum number of messages that are prefetched from the server.
const DefaultMaxPrefetch = 100
// Subscription is a reference to a PubSub subscription.
type Subscription struct {
s service
// The fully qualified identifier for the subscription, in the format "projects/<projid>/subscriptions/<name>"
name string
}
// Subscription creates a reference to a subscription.
func (c *Client) Subscription(id string) *Subscription {
return &Subscription{
s: c.s,
name: fmt.Sprintf("projects/%s/subscriptions/%s", c.projectID, id),
}
}
// String returns the globally unique printable name of the subscription.
func (s *Subscription) String() string {
return s.name
}
// ID returns the unique identifier of the subscription within its project.
func (s *Subscription) ID() string {
slash := strings.LastIndex(s.name, "/")
if slash == -1 {
// name is not a fully-qualified name.
panic("bad subscription name")
}
return s.name[slash+1:]
}
// Subscriptions returns an iterator which returns all of the subscriptions for the client's project.
func (c *Client) Subscriptions(ctx context.Context) *SubscriptionIterator {
return &SubscriptionIterator{
s: c.s,
next: c.s.listProjectSubscriptions(ctx, c.fullyQualifiedProjectName()),
}
}
// SubscriptionIterator is an iterator that returns a series of subscriptions.
type SubscriptionIterator struct {
s service
next nextStringFunc
}
// Next returns the next subscription. If there are no more subscriptions, iterator.Done will be returned.
func (subs *SubscriptionIterator) Next() (*Subscription, error) {
subName, err := subs.next()
if err != nil {
return nil, err
}
return &Subscription{s: subs.s, name: subName}, nil
}
// PushConfig contains configuration for subscriptions that operate in push mode.
type PushConfig struct {
// A URL locating the endpoint to which messages should be pushed.
Endpoint string
// Endpoint configuration attributes. See https://cloud.google.com/pubsub/reference/rest/v1/projects.subscriptions#PushConfig.FIELDS.attributes for more details.
Attributes map[string]string
}
// Subscription config contains the configuration of a subscription.
type SubscriptionConfig struct {
Topic *Topic
PushConfig PushConfig
// The default maximum time after a subscriber receives a message before
// the subscriber should acknowledge the message. Note: messages which are
// obtained via a MessageIterator need not be acknowledged within this
// deadline, as the deadline will be automatically extended.
AckDeadline time.Duration
}
// Delete deletes the subscription.
func (s *Subscription) Delete(ctx context.Context) error {
return s.s.deleteSubscription(ctx, s.name)
}
// Exists reports whether the subscription exists on the server.
func (s *Subscription) Exists(ctx context.Context) (bool, error) {
return s.s.subscriptionExists(ctx, s.name)
}
// Config fetches the current configuration for the subscription.
func (s *Subscription) Config(ctx context.Context) (*SubscriptionConfig, error) {
conf, topicName, err := s.s.getSubscriptionConfig(ctx, s.name)
if err != nil {
return nil, err
}
conf.Topic = &Topic{
s: s.s,
name: topicName,
}
return conf, nil
}
// Pull returns a MessageIterator that can be used to fetch Messages. The MessageIterator
// will automatically extend the ack deadline of all fetched Messages, for the
// period specified by DefaultMaxExtension. This may be overridden by supplying
// a MaxExtension pull option.
//
// If ctx is cancelled or exceeds its deadline, outstanding acks or deadline
// extensions will fail.
//
// The caller must call Stop on the MessageIterator once finished with it.
func (s *Subscription) Pull(ctx context.Context, opts ...PullOption) (*MessageIterator, error) {
config, err := s.Config(ctx)
if err != nil {
return nil, err
}
po := processPullOptions(opts)
po.ackDeadline = config.AckDeadline
return newMessageIterator(ctx, s.s, s.name, po), nil
}
// ModifyPushConfig updates the endpoint URL and other attributes of a push subscription.
func (s *Subscription) ModifyPushConfig(ctx context.Context, conf *PushConfig) error {
if conf == nil {
return errors.New("must supply non-nil PushConfig")
}
return s.s.modifyPushConfig(ctx, s.name, conf)
}
func (s *Subscription) IAM() *iam.Handle {
return s.s.iamHandle(s.name)
}
// A PullOption is an optional argument to Subscription.Pull.
type PullOption interface {
setOptions(o *pullOptions)
}
type pullOptions struct {
// maxExtension is the maximum period for which the iterator should
// automatically extend the ack deadline for each message.
maxExtension time.Duration
// maxPrefetch is the maximum number of Messages to have in flight, to
// be returned by MessageIterator.Next.
maxPrefetch int32
// ackDeadline is the default ack deadline for the subscription. Not
// configurable via a PullOption.
ackDeadline time.Duration
}
func processPullOptions(opts []PullOption) *pullOptions {
po := &pullOptions{
maxExtension: DefaultMaxExtension,
maxPrefetch: DefaultMaxPrefetch,
}
for _, o := range opts {
o.setOptions(po)
}
return po
}
type maxPrefetch int32
func (max maxPrefetch) setOptions(o *pullOptions) {
if o.maxPrefetch = int32(max); o.maxPrefetch < 1 {
o.maxPrefetch = 1
}
}
// MaxPrefetch returns a PullOption that limits Message prefetching.
//
// For performance reasons, the pubsub library may prefetch a pool of Messages
// to be returned serially from MessageIterator.Next. MaxPrefetch is used to limit the
// the size of this pool.
//
// If num is less than 1, it will be treated as if it were 1.
func MaxPrefetch(num int) PullOption {
return maxPrefetch(trunc32(int64(num)))
}
type maxExtension time.Duration
func (max maxExtension) setOptions(o *pullOptions) {
if o.maxExtension = time.Duration(max); o.maxExtension < 0 {
o.maxExtension = 0
}
}
// MaxExtension returns a PullOption that limits how long acks deadlines are
// extended for.
//
// A MessageIterator will automatically extend the ack deadline of all fetched
// Messages for the duration specified. Automatic deadline extension may be
// disabled by specifying a duration of 0.
func MaxExtension(duration time.Duration) PullOption {
return maxExtension(duration)
}
// CreateSubscription creates a new subscription on a topic.
//
// name is the name of the subscription to create. It must start with a letter,
// and contain only letters ([A-Za-z]), numbers ([0-9]), dashes (-),
// underscores (_), periods (.), tildes (~), plus (+) or percent signs (%). It
// must be between 3 and 255 characters in length, and must not start with
// "goog".
//
// topic is the topic from which the subscription should receive messages. It
// need not belong to the same project as the subscription.
//
// ackDeadline is the maximum time after a subscriber receives a message before
// the subscriber should acknowledge the message. It must be between 10 and 600
// seconds (inclusive), and is rounded down to the nearest second. If the
// provided ackDeadline is 0, then the default value of 10 seconds is used.
// Note: messages which are obtained via a MessageIterator need not be
// acknowledged within this deadline, as the deadline will be automatically
// extended.
//
// pushConfig may be set to configure this subscription for push delivery.
//
// If the subscription already exists an error will be returned.
func (c *Client) CreateSubscription(ctx context.Context, id string, topic *Topic, ackDeadline time.Duration, pushConfig *PushConfig) (*Subscription, error) {
if ackDeadline == 0 {
ackDeadline = 10 * time.Second
}
if d := ackDeadline.Seconds(); d < 10 || d > 600 {
return nil, fmt.Errorf("ack deadline must be between 10 and 600 seconds; got: %v", d)
}
sub := c.Subscription(id)
err := c.s.createSubscription(ctx, topic.name, sub.name, ackDeadline, pushConfig)
return sub, err
}