-
Notifications
You must be signed in to change notification settings - Fork 0
/
gcp.go
285 lines (246 loc) · 7.93 KB
/
gcp.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
package gcp
import (
"context"
"fmt"
"sync"
"sync/atomic"
"time"
"cloud.google.com/go/pubsub"
"github.com/pkg/errors"
"golang.org/x/oauth2/google"
"google.golang.org/api/option"
"github.com/cloudchacho/taskhawk-go"
)
type Backend struct {
client *pubsub.Client
settings Settings
getLogger taskhawk.GetLoggerFunc
QueueName string
}
var _ = taskhawk.ConsumerBackend(&Backend{})
var _ = taskhawk.PublisherBackend(&Backend{})
const defaultVisibilityTimeoutS = time.Second * 20
// Metadata is additional metadata associated with a message
type Metadata struct {
// Underlying pubsub message - ack id isn't exported so we have to store this object
pubsubMessage *pubsub.Message
// PublishTime is the time this message was originally published to Pub/Sub
PublishTime time.Time
// DeliveryAttempt is the counter received from Pub/Sub.
// The first delivery of a given message will have this value as 1. The value
// is calculated as best effort and is approximate.
DeliveryAttempt int
}
func (b *Backend) getTopic(priority taskhawk.Priority) string {
queue := fmt.Sprintf("taskhawk-%s", b.settings.QueueName)
switch priority {
case taskhawk.PriorityDefault:
case taskhawk.PriorityHigh:
queue += "-high-priority"
case taskhawk.PriorityLow:
queue += "-low-priority"
case taskhawk.PriorityBulk:
queue += "-bulk"
default:
panic(fmt.Sprintf("unhandled priority %v", priority))
}
return queue
}
func (b *Backend) getDLQTopic(priority taskhawk.Priority) string {
return b.getTopic(priority) + "-dlq"
}
// Publish a message represented by the payload, with specified attributes to the specific topic
func (b *Backend) Publish(ctx context.Context, payload []byte, attributes map[string]string, priority taskhawk.Priority) (string, error) {
err := b.ensureClient(ctx)
if err != nil {
return "", err
}
clientTopic := b.client.Topic(b.getTopic(priority))
defer clientTopic.Stop()
result := clientTopic.Publish(
ctx,
&pubsub.Message{
Data: payload,
Attributes: attributes,
},
)
messageID, err := result.Get(ctx)
if err != nil {
return "", errors.Wrap(err, "Failed to publish message to Pub/Sub")
}
return messageID, nil
}
// Receive messages from configured queue(s) and provide it through the callback. This should run indefinitely
// until the context is canceled. Provider metadata should include all info necessary to ack/nack a message.
func (b *Backend) Receive(ctx context.Context, priority taskhawk.Priority, numMessages uint32,
visibilityTimeout time.Duration, messageCh chan<- taskhawk.ReceivedMessage) error {
err := b.ensureClient(ctx)
if err != nil {
return err
}
defer b.client.Close()
subscriptionName := b.getTopic(priority)
pubsubSubscription := b.client.Subscription(subscriptionName)
pubsubSubscription.ReceiveSettings.NumGoroutines = 1
pubsubSubscription.ReceiveSettings.MaxOutstandingMessages = int(numMessages)
if visibilityTimeout != 0 {
pubsubSubscription.ReceiveSettings.MaxExtensionPeriod = visibilityTimeout
} else {
pubsubSubscription.ReceiveSettings.MaxExtensionPeriod = defaultVisibilityTimeoutS
}
err = pubsubSubscription.Receive(ctx, func(ctx context.Context, message *pubsub.Message) {
metadata := Metadata{
pubsubMessage: message,
PublishTime: message.PublishTime,
DeliveryAttempt: *message.DeliveryAttempt,
}
messageCh <- taskhawk.ReceivedMessage{
Payload: message.Data,
Attributes: message.Attributes,
ProviderMetadata: metadata,
}
})
if err != nil {
return err
}
// context cancelation doesn't return error from Receive
return ctx.Err()
}
// RequeueDLQ re-queues everything in the taskhawk.DLQ back into the taskhawk.queue
func (b *Backend) RequeueDLQ(ctx context.Context, priority taskhawk.Priority, numMessages uint32,
visibilityTimeout time.Duration) error {
err := b.ensureClient(ctx)
if err != nil {
return err
}
defer b.client.Close()
clientTopic := b.client.Topic(b.getTopic(priority))
defer clientTopic.Stop()
clientTopic.PublishSettings.CountThreshold = int(numMessages)
if visibilityTimeout != 0 {
clientTopic.PublishSettings.Timeout = visibilityTimeout
} else {
clientTopic.PublishSettings.Timeout = defaultVisibilityTimeoutS
}
pubsubSubscription := b.client.Subscription(b.getDLQTopic(priority))
pubsubSubscription.ReceiveSettings.MaxOutstandingMessages = int(numMessages)
pubsubSubscription.ReceiveSettings.MaxExtensionPeriod = clientTopic.PublishSettings.Timeout
// run a ticker that will fire after timeout and shutdown subscriber
overallTimeout := time.Second * 5
ticker := time.NewTicker(overallTimeout)
defer ticker.Stop()
wg := sync.WaitGroup{}
defer wg.Wait()
rctx, cancel := context.WithCancel(ctx)
defer cancel()
wg.Add(1)
go func() {
select {
case <-ticker.C:
cancel()
case <-rctx.Done():
}
wg.Done()
}()
var numMessagesRequeued uint32
progressTicker := time.NewTicker(time.Second * 1)
defer progressTicker.Stop()
wg.Add(1)
go func() {
defer wg.Done()
for {
select {
case <-progressTicker.C:
b.getLogger(ctx).Info("Re-queue DLQ progress", taskhawk.LoggingFields{"num_messages": atomic.LoadUint32(&numMessagesRequeued)})
case <-rctx.Done():
return
}
}
}()
publishErrCh := make(chan error, 10)
defer close(publishErrCh)
err = pubsubSubscription.Receive(rctx, func(ctx context.Context, message *pubsub.Message) {
ticker.Reset(overallTimeout)
result := clientTopic.Publish(rctx, message)
_, err := result.Get(rctx)
if err != nil {
message.Nack()
cancel()
publishErrCh <- err
} else {
message.Ack()
atomic.AddUint32(&numMessagesRequeued, 1)
}
})
if err != nil {
return err
}
// if publish failed, signal that
select {
case err = <-publishErrCh:
return err
default:
}
// context cancelation doesn't return error in Receive, don't return error from rctx since cancelation is happy
// path
return ctx.Err()
}
// NackMessage nacks a message on the queue
func (b *Backend) NackMessage(ctx context.Context, providerMetadata interface{}) error {
providerMetadata.(Metadata).pubsubMessage.Nack()
return nil
}
// AckMessage acknowledges a message on the queue
func (b *Backend) AckMessage(ctx context.Context, providerMetadata interface{}) error {
providerMetadata.(Metadata).pubsubMessage.Ack()
return nil
}
func (b *Backend) ensureClient(ctx context.Context) error {
googleCloudProject := b.settings.GoogleCloudProject
if googleCloudProject == "" {
creds, err := google.FindDefaultCredentials(ctx)
if err != nil {
return errors.Wrap(
err, "unable to discover google cloud project setting, either pass explicitly, or fix runtime environment")
} else if creds.ProjectID == "" {
return errors.New(
"unable to discover google cloud project setting, either pass explicitly, or fix runtime environment")
}
googleCloudProject = creds.ProjectID
}
if b.client != nil {
return nil
}
client, err := pubsub.NewClient(context.Background(), googleCloudProject, b.settings.PubsubClientOptions...)
if err != nil {
return err
}
b.client = client
return nil
}
// Settings for Hedwig
type Settings struct {
// taskhawk.queue name. Exclude the `taskhawk.` prefix
QueueName string
// GoogleCloudProject ID that contains Pub/Sub resources.
GoogleCloudProject string
// PubsubClientOptions is a list of options to pass to pubsub.NewClient. This may be useful to customize GRPC
// behavior for example.
PubsubClientOptions []option.ClientOption
}
func (b *Backend) initDefaults() {
if b.settings.PubsubClientOptions == nil {
b.settings.PubsubClientOptions = []option.ClientOption{}
}
if b.getLogger == nil {
stdLogger := &taskhawk.StdLogger{}
b.getLogger = func(_ context.Context) taskhawk.Logger { return stdLogger }
}
}
// NewBackend creates a Backend for publishing and consuming from GCP
// The provider metadata produced by this Backend will have concrete type: gcp.Metadata
func NewBackend(settings Settings, getLogger taskhawk.GetLoggerFunc) *Backend {
b := &Backend{settings: settings, getLogger: getLogger}
b.initDefaults()
return b
}