-
Notifications
You must be signed in to change notification settings - Fork 1.2k
/
receive_scheduler.go
139 lines (123 loc) · 3.89 KB
/
receive_scheduler.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
// Copyright 2019 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package scheduler
import (
"errors"
"sync"
)
// ReceiveScheduler is a scheduler which is designed for Pub/Sub's Receive flow.
//
// Each item is added with a given key. Items added to the empty string key are
// handled in random order. Items added to any other key are handled
// sequentially.
type ReceiveScheduler struct {
// workers is a channel that represents workers. Rather than a pool, where
// worker are "removed" until the pool is empty, the channel is more like a
// set of work desks, where workers are "added" until all the desks are full.
//
// A worker taking an item from the unordered queue (key="") completes a
// single item and then goes back to the pool.
//
// A worker taking an item from an ordered queue (key="something") completes
// all work in that queue until the queue is empty, then deletes the queue,
// then goes back to the pool.
workers chan struct{}
done chan struct{}
mu sync.Mutex
m map[string][]func()
}
// NewReceiveScheduler creates a new ReceiveScheduler.
//
// The workers arg is the number of concurrent calls to handle. If the workers
// arg is 0, then a healthy default of 10 workers is used. If less than 0, this
// will be set to an large number, similar to PublishScheduler's handler limit.
func NewReceiveScheduler(workers int) *ReceiveScheduler {
if workers == 0 {
workers = 10
} else if workers < 0 {
workers = 1e9
}
return &ReceiveScheduler{
workers: make(chan struct{}, workers),
done: make(chan struct{}),
m: make(map[string][]func()),
}
}
// Add adds the item to be handled. Add may block.
//
// Buffering happens above the ReceiveScheduler in the form of a flow controller
// that requests batches of messages to pull. A backed up ReceiveScheduler.Add
// call causes pushback to the pubsub service (less Receive calls on the
// long-lived stream), which keeps memory footprint stable.
func (s *ReceiveScheduler) Add(key string, item interface{}, handle func(item interface{})) error {
select {
case <-s.done:
return errors.New("draining")
default:
}
if key == "" {
// Spawn a worker.
s.workers <- struct{}{}
go func() {
// Unordered keys can be handled immediately.
handle(item)
<-s.workers
}()
return nil
}
// Add it to the queue. This has to happen before we enter the goroutine
// below to prevent a race from the next iteration of the key-loop
// adding another item before this one gets queued.
s.mu.Lock()
_, ok := s.m[key]
s.m[key] = append(s.m[key], func() {
handle(item)
})
s.mu.Unlock()
if ok {
// Someone is already working on this key.
return nil
}
// Spawn a worker.
s.workers <- struct{}{}
go func() {
defer func() { <-s.workers }()
// Key-Loop: loop through the available items in the key's queue.
for {
s.mu.Lock()
if len(s.m[key]) == 0 {
// We're done processing items - the queue is empty. Delete
// the queue from the map and free up the worker.
delete(s.m, key)
s.mu.Unlock()
return
}
// Pop an item from the queue.
next := s.m[key][0]
s.m[key] = s.m[key][1:]
s.mu.Unlock()
next() // Handle next in queue.
}
}()
return nil
}
// Shutdown begins flushing messages and stops accepting new Add calls. Shutdown
// does not block, or wait for all messages to be flushed.
func (s *ReceiveScheduler) Shutdown() {
select {
case <-s.done:
default:
close(s.done)
}
}