-
Notifications
You must be signed in to change notification settings - Fork 1.3k
/
publisher.go
404 lines (352 loc) · 12.7 KB
/
publisher.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
// Copyright 2020 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
package wire
import (
"context"
"errors"
"fmt"
"math/rand"
"reflect"
"time"
"google.golang.org/api/option"
"google.golang.org/grpc"
vkit "cloud.google.com/go/pubsublite/apiv1"
pb "google.golang.org/genproto/googleapis/cloud/pubsublite/v1"
)
var (
errInvalidInitialPubResponse = errors.New("pubsublite: first response from server was not an initial response for publish")
errInvalidMsgPubResponse = errors.New("pubsublite: received invalid publish response from server")
)
// singlePartitionPublisher publishes messages to a single topic partition.
//
// Life of a successfully published message:
// - Publish() receives the message from the user.
// - It is added to `batcher.msgBundler`, which performs batching in accordance
// with user-configured PublishSettings.
// - onNewBatch() receives new message batches from the bundler. The batch is
// added to `batcher.publishQueue` (in-flight batches) and sent to the publish
// stream, if connected. If the stream is currently reconnecting, the entire
// queue is resent to the stream immediately after it has reconnected, in
// onStreamStatusChange().
// - onResponse() receives the first cursor offset for the first batch in
// `batcher.publishQueue`. It assigns the cursor offsets for each message and
// releases the publish results to the user.
//
// See comments for unsafeInitiateShutdown() for error scenarios.
type singlePartitionPublisher struct {
// Immutable after creation.
pubClient *vkit.PublisherClient
topic topicPartition
initialReq *pb.PublishRequest
metadata pubsubMetadata
// Fields below must be guarded with mu.
stream *retryableStream
batcher *publishMessageBatcher
enableSendToStream bool
abstractService
}
// singlePartitionPublisherFactory creates instances of singlePartitionPublisher
// for given partition numbers.
type singlePartitionPublisherFactory struct {
ctx context.Context
pubClient *vkit.PublisherClient
settings PublishSettings
topicPath string
}
func (f *singlePartitionPublisherFactory) New(partition int) *singlePartitionPublisher {
pp := &singlePartitionPublisher{
pubClient: f.pubClient,
topic: topicPartition{Path: f.topicPath, Partition: partition},
initialReq: &pb.PublishRequest{
RequestType: &pb.PublishRequest_InitialRequest{
InitialRequest: &pb.InitialPublishRequest{
Topic: f.topicPath,
Partition: int64(partition),
},
},
},
metadata: newPubsubMetadata(),
}
pp.batcher = newPublishMessageBatcher(&f.settings, partition, pp.onNewBatch)
pp.stream = newRetryableStream(f.ctx, pp, f.settings.Timeout, reflect.TypeOf(pb.PublishResponse{}))
pp.metadata.AddTopicRoutingMetadata(pp.topic)
pp.metadata.AddClientInfo(f.settings.Framework)
return pp
}
// Start attempts to establish a publish stream connection.
func (pp *singlePartitionPublisher) Start() {
pp.mu.Lock()
defer pp.mu.Unlock()
if pp.unsafeUpdateStatus(serviceStarting, nil) {
pp.stream.Start()
}
}
// Stop initiates shutdown of the publisher. All pending messages are flushed.
func (pp *singlePartitionPublisher) Stop() {
pp.mu.Lock()
defer pp.mu.Unlock()
pp.unsafeInitiateShutdown(serviceTerminating, nil)
}
// Publish a pub/sub message.
func (pp *singlePartitionPublisher) Publish(msg *pb.PubSubMessage, onResult PublishResultFunc) {
pp.mu.Lock()
defer pp.mu.Unlock()
processMessage := func() error {
// Messages are accepted while the service is starting up or active. During
// startup, messages are queued in the batcher and will be published once
// the stream connects. If startup fails, the error will be set for the
// queued messages.
switch {
case pp.status == serviceUninitialized:
return ErrServiceUninitialized
case pp.status >= serviceTerminating:
return ErrServiceStopped
}
if err := pp.batcher.AddMessage(msg, onResult); err != nil {
return err
}
return nil
}
// If the new message cannot be published, flush pending messages and then
// terminate the stream once results are received.
if err := processMessage(); err != nil {
pp.unsafeInitiateShutdown(serviceTerminating, err)
onResult(nil, err)
}
}
func (pp *singlePartitionPublisher) newStream(ctx context.Context) (grpc.ClientStream, error) {
return pp.pubClient.Publish(pp.metadata.AddToContext(ctx))
}
func (pp *singlePartitionPublisher) initialRequest() (interface{}, initialResponseRequired) {
return pp.initialReq, initialResponseRequired(true)
}
func (pp *singlePartitionPublisher) validateInitialResponse(response interface{}) error {
pubResponse, _ := response.(*pb.PublishResponse)
if pubResponse.GetInitialResponse() == nil {
return errInvalidInitialPubResponse
}
return nil
}
func (pp *singlePartitionPublisher) onStreamStatusChange(status streamStatus) {
pp.mu.Lock()
defer pp.mu.Unlock()
switch status {
case streamReconnecting:
// Prevent onNewBatch() from sending any new batches to the stream.
pp.enableSendToStream = false
case streamConnected:
pp.unsafeUpdateStatus(serviceActive, nil)
// To ensure messages are sent in order, we should resend in-flight batches
// to the stream immediately after reconnecting, before any new batches.
batches := pp.batcher.InFlightBatches()
for _, batch := range batches {
if !pp.stream.Send(batch.ToPublishRequest()) {
return
}
}
pp.enableSendToStream = true
case streamTerminated:
pp.unsafeInitiateShutdown(serviceTerminated, pp.stream.Error())
}
}
func (pp *singlePartitionPublisher) onNewBatch(batch *publishBatch) {
pp.mu.Lock()
defer pp.mu.Unlock()
pp.batcher.AddBatch(batch)
if pp.enableSendToStream {
// Note: if the underlying stream is reconnecting or Send() fails, all
// in-flight batches will be sent to the stream once the connection has been
// re-established. Thus the return value is ignored.
pp.stream.Send(batch.ToPublishRequest())
}
}
func (pp *singlePartitionPublisher) onResponse(response interface{}) {
pp.mu.Lock()
defer pp.mu.Unlock()
processResponse := func() error {
pubResponse, _ := response.(*pb.PublishResponse)
if pubResponse.GetMessageResponse() == nil {
return errInvalidMsgPubResponse
}
firstOffset := pubResponse.GetMessageResponse().GetStartCursor().GetOffset()
if err := pp.batcher.OnPublishResponse(firstOffset); err != nil {
return err
}
pp.unsafeCheckDone()
return nil
}
if err := processResponse(); err != nil {
pp.unsafeInitiateShutdown(serviceTerminated, err)
}
}
// unsafeInitiateShutdown must be provided a target serviceStatus, which must be
// one of:
// * serviceTerminating: attempts to successfully publish all pending messages
// before terminating the publisher. Occurs when:
// - The user calls Stop().
// - A new message fails preconditions. This should block the publish of
// subsequent messages to ensure ordering, but all pending messages should
// be flushed.
// * serviceTerminated: immediately terminates the publisher and errors all
// in-flight batches and pending messages in the bundler. Occurs when:
// - The publish stream terminates with a non-retryable error.
// - An inconsistency is detected in the server's publish responses. Assume
// there is a bug on the server and terminate the publisher, as correct
// processing of messages cannot be guaranteed.
//
// Expected to be called with singlePartitionPublisher.mu held.
func (pp *singlePartitionPublisher) unsafeInitiateShutdown(targetStatus serviceStatus, err error) {
if !pp.unsafeUpdateStatus(targetStatus, wrapError("publisher", pp.topic.String(), err)) {
return
}
// Close the stream if this is an immediate shutdown. Otherwise leave it open
// to send pending messages.
if targetStatus == serviceTerminated {
pp.enableSendToStream = false
pp.stream.Stop()
}
// Bundler.Flush() blocks and invokes onNewBatch(), which acquires the mutex,
// so it cannot be held here.
// Updating the publisher status above prevents any new messages from being
// added to the Bundler after flush.
pp.mu.Unlock()
pp.batcher.Flush()
pp.mu.Lock()
// If flushing pending messages, close the stream if there's nothing left to
// publish.
if targetStatus == serviceTerminating {
pp.unsafeCheckDone()
return
}
// For immediate shutdown set the error message for all pending messages.
pp.batcher.OnPermanentError(err)
}
// unsafeCheckDone closes the stream once all pending messages have been
// published during shutdown.
func (pp *singlePartitionPublisher) unsafeCheckDone() {
if pp.status == serviceTerminating && pp.batcher.InFlightBatchesEmpty() {
pp.stream.Stop()
}
}
// routingPublisher publishes messages to multiple topic partitions, each
// managed by a singlePartitionPublisher. It supports increasing topic partition
// count, but not decreasing.
type routingPublisher struct {
// Immutable after creation.
msgRouterFactory *messageRouterFactory
pubFactory *singlePartitionPublisherFactory
partitionWatcher *partitionCountWatcher
// Fields below must be guarded with mu.
msgRouter messageRouter
publishers []*singlePartitionPublisher
apiClientService
}
func newRoutingPublisher(allClients apiClients, adminClient *vkit.AdminClient, msgRouterFactory *messageRouterFactory, pubFactory *singlePartitionPublisherFactory) *routingPublisher {
pub := &routingPublisher{
apiClientService: apiClientService{clients: allClients},
msgRouterFactory: msgRouterFactory,
pubFactory: pubFactory,
}
pub.init()
pub.partitionWatcher = newPartitionCountWatcher(pubFactory.ctx, adminClient, pubFactory.settings, pubFactory.topicPath, pub.onPartitionCountChanged)
pub.unsafeAddServices(pub.partitionWatcher)
return pub
}
func (rp *routingPublisher) onPartitionCountChanged(partitionCount int) {
rp.mu.Lock()
defer rp.mu.Unlock()
if rp.status >= serviceTerminating {
return
}
if partitionCount == len(rp.publishers) {
return
}
if partitionCount < len(rp.publishers) {
// TODO: Log the decrease in partition count.
return
}
prevPartitionCount := len(rp.publishers)
for i := prevPartitionCount; i < partitionCount; i++ {
pub := rp.pubFactory.New(i)
rp.publishers = append(rp.publishers, pub)
rp.unsafeAddServices(pub)
}
rp.msgRouter = rp.msgRouterFactory.New(partitionCount)
}
func (rp *routingPublisher) Publish(msg *pb.PubSubMessage, onResult PublishResultFunc) {
pub, err := rp.routeToPublisher(msg)
if err != nil {
onResult(nil, err)
return
}
pub.Publish(msg, onResult)
}
func (rp *routingPublisher) routeToPublisher(msg *pb.PubSubMessage) (*singlePartitionPublisher, error) {
rp.mu.Lock()
defer rp.mu.Unlock()
if err := rp.unsafeCheckServiceStatus(); err != nil {
return nil, err
}
if rp.msgRouter == nil {
// Should not occur.
rp.unsafeInitiateShutdown(serviceTerminating, ErrServiceUninitialized)
return nil, ErrServiceUninitialized
}
partition := rp.msgRouter.Route(msg.GetKey())
if partition >= len(rp.publishers) {
// Should not occur.
err := fmt.Errorf("pubsublite: publisher not found for partition %d", partition)
rp.unsafeInitiateShutdown(serviceTerminating, err)
return nil, err
}
return rp.publishers[partition], nil
}
// Publisher is the client interface exported from this package for publishing
// messages.
type Publisher interface {
Publish(*pb.PubSubMessage, PublishResultFunc)
Start()
WaitStarted() error
Stop()
WaitStopped() error
Error() error
}
// NewPublisher creates a new client for publishing messages.
func NewPublisher(ctx context.Context, settings PublishSettings, region, topicPath string, opts ...option.ClientOption) (Publisher, error) {
if err := ValidateRegion(region); err != nil {
return nil, err
}
if err := validatePublishSettings(settings); err != nil {
return nil, err
}
var allClients apiClients
pubClient, err := newPublisherClient(ctx, region, opts...)
if err != nil {
return nil, err
}
allClients = append(allClients, pubClient)
adminClient, err := NewAdminClient(ctx, region, opts...)
if err != nil {
allClients.Close()
return nil, err
}
allClients = append(allClients, adminClient)
msgRouterFactory := newMessageRouterFactory(rand.New(rand.NewSource(time.Now().UnixNano())))
pubFactory := &singlePartitionPublisherFactory{
ctx: ctx,
pubClient: pubClient,
settings: settings,
topicPath: topicPath,
}
return newRoutingPublisher(allClients, adminClient, msgRouterFactory, pubFactory), nil
}