/
ruler_sync_queue.go
115 lines (93 loc) · 2.31 KB
/
ruler_sync_queue.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
// SPDX-License-Identifier: AGPL-3.0-only
package ruler
import (
"context"
"sync"
"time"
"github.com/grafana/dskit/services"
)
const (
defaultRulerSyncPollFrequency = 10 * time.Second
)
type rulerSyncQueue struct {
services.Service
queueMx sync.Mutex
queue map[string]struct{}
pollChan chan []string
pollFrequency time.Duration
}
func newRulerSyncQueue(pollFrequency time.Duration) *rulerSyncQueue {
q := &rulerSyncQueue{
pollChan: make(chan []string),
pollFrequency: pollFrequency,
queue: map[string]struct{}{},
}
q.Service = services.NewBasicService(nil, q.running, nil)
return q
}
func (q *rulerSyncQueue) running(ctx context.Context) error {
ticker := time.NewTicker(q.pollFrequency)
defer ticker.Stop()
for {
q.queueMx.Lock()
userIDs := make([]string, 0, len(q.queue))
for userID := range q.queue {
userIDs = append(userIDs, userID)
delete(q.queue, userID)
}
q.queueMx.Unlock()
if len(userIDs) > 0 {
select {
case q.pollChan <- userIDs:
case <-ctx.Done():
// We're done.
return nil
}
}
// Wait.
select {
case <-ticker.C:
case <-ctx.Done():
// We're done.
return nil
}
}
}
// enqueue adds to the queue the request to sync rules for the input userID.
func (q *rulerSyncQueue) enqueue(userIDs ...string) {
q.queueMx.Lock()
defer q.queueMx.Unlock()
for _, userID := range userIDs {
q.queue[userID] = struct{}{}
}
}
// poll returns a channel from which you can get the list of user IDs to sync.
func (q *rulerSyncQueue) poll() chan []string {
return q.pollChan
}
// rulerSyncQueueProcessor is a service which polls from a queue and invoke
// a callback function to process the polled tenants.
type rulerSyncQueueProcessor struct {
services.Service
queue *rulerSyncQueue
process func(ctx context.Context, userIDs []string)
}
func newRulerSyncQueueProcessor(queue *rulerSyncQueue, process func(ctx context.Context, userIDs []string)) *rulerSyncQueueProcessor {
q := &rulerSyncQueueProcessor{
queue: queue,
process: process,
}
q.Service = services.NewBasicService(nil, q.running, nil)
return q
}
func (p *rulerSyncQueueProcessor) running(ctx context.Context) error {
for ctx.Err() == nil {
select {
case userIDs := <-p.queue.poll():
p.process(ctx, userIDs)
case <-ctx.Done():
return nil
}
}
return nil
}