-
Notifications
You must be signed in to change notification settings - Fork 2
/
gcp.go
328 lines (283 loc) · 10.1 KB
/
gcp.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
package gcp
import (
"context"
"fmt"
"math"
"regexp"
"sync"
"sync/atomic"
"time"
"cloud.google.com/go/pubsub"
"github.com/pkg/errors"
"golang.org/x/oauth2/google"
"golang.org/x/sync/errgroup"
"google.golang.org/api/option"
"github.com/cloudchacho/hedwig-go"
)
type Backend struct {
client *pubsub.Client
settings Settings
logger hedwig.Logger
}
var _ = hedwig.ConsumerBackend(&Backend{})
var _ = hedwig.PublisherBackend(&Backend{})
const defaultVisibilityTimeoutS = time.Second * 20
// Metadata is additional metadata associated with a message
type Metadata struct {
// Underlying pubsub message - ack id isn't exported so we have to store this object
pubsubMessage *pubsub.Message
// PublishTime is the time this message was originally published to Pub/Sub
PublishTime time.Time
// DeliveryAttempt is the counter received from Pub/Sub.
// The first delivery of a given message will have this value as 1. The value
// is calculated as best effort and is approximate.
DeliveryAttempt int
// The name of the subscription the message was received from
SubscriptionName string
}
// Publish a message represented by the payload, with specified attributes to the specific topic
func (b *Backend) Publish(ctx context.Context, message *hedwig.Message, payload []byte, attributes map[string]string, topic string) (string, error) {
err := b.ensureClient(ctx)
if err != nil {
return "", err
}
clientTopic := b.client.Topic(fmt.Sprintf("hedwig-%s", topic))
defer clientTopic.Stop()
result := clientTopic.Publish(
ctx,
&pubsub.Message{
Data: payload,
Attributes: attributes,
},
)
messageID, err := result.Get(ctx)
if err != nil {
return "", errors.Wrap(err, "Failed to publish message to Pub/Sub")
}
return messageID, nil
}
// Receive messages from configured queue(s) and provide it through the callback. This should run indefinitely
// until the context is canceled. Provider metadata should include all info necessary to ack/nack a message.
func (b *Backend) Receive(ctx context.Context, numMessages uint32, visibilityTimeout time.Duration, messageCh chan<- hedwig.ReceivedMessage) error {
err := b.ensureClient(ctx)
if err != nil {
return err
}
defer b.client.Close()
subscriptions := []string{}
// all subscriptions live in an app's project, but cross-project subscriptions are named differently
for _, subscription := range b.settings.SubscriptionsCrossProject {
subscriptionName := fmt.Sprintf("hedwig-%s-%s-%s", b.settings.QueueName, subscription.ProjectID, subscription.Subscription)
subscriptions = append(subscriptions, subscriptionName)
}
for _, subscription := range b.settings.Subscriptions {
subscriptionName := fmt.Sprintf("hedwig-%s-%s", b.settings.QueueName, subscription)
subscriptions = append(subscriptions, subscriptionName)
}
// main queue for DLQ re-queued messages
subscriptionName := fmt.Sprintf("hedwig-%s", b.settings.QueueName)
subscriptions = append(subscriptions, subscriptionName)
hedwigRe := regexp.MustCompile(`^hedwig-`)
group, gctx := errgroup.WithContext(ctx)
for _, subscription := range subscriptions {
pubsubSubscription := b.client.Subscription(subscription)
subID := pubsubSubscription.ID()
pubsubSubscription.ReceiveSettings.NumGoroutines = 1
pubsubSubscription.ReceiveSettings.MaxOutstandingMessages = int(numMessages)
if visibilityTimeout != 0 {
pubsubSubscription.ReceiveSettings.MaxExtensionPeriod = visibilityTimeout
} else {
pubsubSubscription.ReceiveSettings.MaxExtensionPeriod = defaultVisibilityTimeoutS
}
group.Go(func() error {
recvErr := pubsubSubscription.Receive(gctx, func(ctx context.Context, message *pubsub.Message) {
// deliveryAttempt is nil for subscriptions without a dlq
deliveryAttemptDefault := -1
if message.DeliveryAttempt == nil {
message.DeliveryAttempt = &deliveryAttemptDefault
}
metadata := Metadata{
pubsubMessage: message,
PublishTime: message.PublishTime,
DeliveryAttempt: *message.DeliveryAttempt,
SubscriptionName: hedwigRe.ReplaceAllString(subID, ""),
}
messageCh <- hedwig.ReceivedMessage{
Payload: message.Data,
Attributes: message.Attributes,
ProviderMetadata: metadata,
}
})
return recvErr
})
}
err = group.Wait()
if err != nil {
return err
}
// context cancelation doesn't return error in the group
return ctx.Err()
}
// RequeueDLQ re-queues everything in the Hedwig DLQ back into the Hedwig queue
func (b *Backend) RequeueDLQ(ctx context.Context, numMessages uint32, visibilityTimeout time.Duration, numConcurrency uint32) error {
err := b.ensureClient(ctx)
if err != nil {
return err
}
defer b.client.Close()
clientTopic := b.client.Topic(fmt.Sprintf("hedwig-%s", b.settings.QueueName))
defer clientTopic.Stop()
clientTopic.PublishSettings.CountThreshold = int(numMessages)
clientTopic.PublishSettings.FlowControlSettings.MaxOutstandingMessages = int(numMessages)
clientTopic.PublishSettings.NumGoroutines = int(numConcurrency)
if visibilityTimeout != 0 {
clientTopic.PublishSettings.Timeout = visibilityTimeout
} else {
clientTopic.PublishSettings.Timeout = defaultVisibilityTimeoutS
}
// PublishSettings.BufferedByteLimit does not have an unlimited. Mimic what is
// being set here in the library when PublishSettings.FlowControlSettings.MaxOutstandingBytes is set
// ref: https://github.com/googleapis/google-cloud-go/blob/d8b933189d677e987f408fa45b50d134a418e2b0/pubsub/topic.go#L654
clientTopic.PublishSettings.BufferedByteLimit = math.MaxInt64
pubsubSubscription := b.client.Subscription(fmt.Sprintf("hedwig-%s-dlq", b.settings.QueueName))
pubsubSubscription.ReceiveSettings.MaxOutstandingMessages = int(numMessages)
pubsubSubscription.ReceiveSettings.MaxExtensionPeriod = clientTopic.PublishSettings.Timeout
pubsubSubscription.ReceiveSettings.NumGoroutines = int(numConcurrency)
// run a ticker that will fire after timeout and shutdown subscriber
overallTimeout := time.Second * 30
ticker := time.NewTicker(overallTimeout)
defer ticker.Stop()
wg := sync.WaitGroup{}
defer wg.Wait()
rctx, cancel := context.WithCancel(ctx)
defer cancel()
wg.Add(1)
go func() {
select {
case <-ticker.C:
cancel()
case <-rctx.Done():
}
wg.Done()
}()
var numMessagesRequeued uint32
progressTicker := time.NewTicker(time.Second * 1)
defer progressTicker.Stop()
wg.Add(1)
go func() {
defer wg.Done()
for {
select {
case <-progressTicker.C:
b.logger.Debug(ctx, "Re-queue DLQ progress", "num_messages", atomic.LoadUint32(&numMessagesRequeued))
case <-rctx.Done():
return
}
}
}()
publishErrCh := make(chan error, 1)
defer close(publishErrCh)
err = pubsubSubscription.Receive(rctx, func(ctx context.Context, message *pubsub.Message) {
ticker.Reset(overallTimeout)
result := clientTopic.Publish(rctx, message)
_, err := result.Get(rctx)
if err != nil {
message.Nack()
cancel()
if err != context.Canceled {
// try to send but since channel is buffered, this may not work, ignore since we anyway only want just
// one error
select {
case publishErrCh <- err:
default:
}
}
} else {
message.Ack()
atomic.AddUint32(&numMessagesRequeued, 1)
}
})
if err != nil {
return err
}
// if publish failed, signal that
select {
case err = <-publishErrCh:
return err
default:
}
// context cancelation doesn't return error in Receive, don't return error from rctx since cancelation of rctx is
// happy path
return ctx.Err()
}
// NackMessage nacks a message on the queue
func (b *Backend) NackMessage(ctx context.Context, providerMetadata interface{}) error {
providerMetadata.(Metadata).pubsubMessage.Nack()
return nil
}
// AckMessage acknowledges a message on the queue
func (b *Backend) AckMessage(ctx context.Context, providerMetadata interface{}) error {
providerMetadata.(Metadata).pubsubMessage.Ack()
return nil
}
func (b *Backend) ensureClient(ctx context.Context) error {
googleCloudProject := b.settings.GoogleCloudProject
if googleCloudProject == "" {
creds, err := google.FindDefaultCredentials(ctx)
if err != nil {
return errors.Wrap(
err, "unable to discover google cloud project setting, either pass explicitly, or fix runtime environment")
} else if creds.ProjectID == "" {
return errors.New(
"unable to discover google cloud project setting, either pass explicitly, or fix runtime environment")
}
googleCloudProject = creds.ProjectID
}
if b.client != nil {
return nil
}
client, err := pubsub.NewClient(context.Background(), googleCloudProject, b.settings.PubsubClientOptions...)
if err != nil {
return err
}
b.client = client
return nil
}
// SubscriptionProject represents a tuple of subscription name and project for cross-project Google subscriptions
type SubscriptionProject struct {
// Subscription name
Subscription string
// ProjectID
ProjectID string
}
// Settings for Hedwig
type Settings struct {
// Hedwig queue name. Exclude the `HEDWIG-` prefix
QueueName string
// GoogleCloudProject ID that contains Pub/Sub resources.
GoogleCloudProject string
// PubsubClientOptions is a list of options to pass to pubsub.NewClient. This may be useful to customize GRPC
// behavior for example.
PubsubClientOptions []option.ClientOption
// Subscriptions is a list of all the Hedwig topics that the app is subscribed to (exclude the ``hedwig-`` prefix).
// For subscribing to cross-project topic messages, use SubscriptionsCrossProject. Google only.
Subscriptions []string
// SubscriptionsCrossProject is a list of tuples of topic name and GCP project for cross-project topic messages.
// Google only.
SubscriptionsCrossProject []SubscriptionProject
}
func (b *Backend) initDefaults() {
if b.settings.PubsubClientOptions == nil {
b.settings.PubsubClientOptions = []option.ClientOption{}
}
if b.logger == nil {
b.logger = &hedwig.StdLogger{}
}
}
// NewBackend creates a Backend for publishing and consuming from GCP
// The provider metadata produced by this Backend will have concrete type: gcp.Metadata
func NewBackend(settings Settings, logger hedwig.Logger) *Backend {
b := &Backend{settings: settings, logger: logger}
b.initDefaults()
return b
}