-
Notifications
You must be signed in to change notification settings - Fork 492
/
processor_manager.go
86 lines (67 loc) · 1.74 KB
/
processor_manager.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
package worker
import (
"context"
"sync"
"time"
"go.uber.org/atomic"
"google.golang.org/grpc"
)
const (
notifyShutdownTimeout = 5 * time.Second
)
// Manages processor goroutines for single grpc connection.
type processorManager struct {
p processor
conn *grpc.ClientConn
address string
// Main context to control all goroutines.
ctx context.Context
wg sync.WaitGroup
// Cancel functions for individual goroutines.
cancelsMu sync.Mutex
cancels []context.CancelFunc
currentProcessors *atomic.Int32
}
func newProcessorManager(ctx context.Context, p processor, conn *grpc.ClientConn, address string) *processorManager {
return &processorManager{
p: p,
ctx: ctx,
conn: conn,
address: address,
currentProcessors: atomic.NewInt32(0),
}
}
func (pm *processorManager) stop() {
// Notify the remote query-frontend or query-scheduler we're shutting down.
// We use a new context to make sure it's not cancelled.
notifyCtx, cancel := context.WithTimeout(context.Background(), notifyShutdownTimeout)
defer cancel()
pm.p.notifyShutdown(notifyCtx, pm.conn, pm.address)
// Stop all goroutines.
pm.concurrency(0)
// Wait until they finish.
pm.wg.Wait()
_ = pm.conn.Close()
}
func (pm *processorManager) concurrency(n int) {
pm.cancelsMu.Lock()
defer pm.cancelsMu.Unlock()
if n < 0 {
n = 0
}
for len(pm.cancels) < n {
ctx, cancel := context.WithCancel(pm.ctx)
pm.cancels = append(pm.cancels, cancel)
pm.wg.Add(1)
go func() {
defer pm.wg.Done()
pm.currentProcessors.Inc()
defer pm.currentProcessors.Dec()
pm.p.processQueriesOnSingleStream(ctx, pm.conn, pm.address)
}()
}
for len(pm.cancels) > n {
pm.cancels[0]()
pm.cancels = pm.cancels[1:]
}
}