/
broker.go
122 lines (111 loc) · 2.46 KB
/
broker.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
package keybroker
import (
"context"
"fmt"
"log"
"sync"
"time"
)
// Renewer represents behaviour for marking a broker for renewal
type Renewer interface {
Renew()
}
// Closer represents behaviour for closing a broker
type Closer interface {
Close()
}
// Config represents broker configuration
type Config struct {
Interval time.Duration
Source Source
}
func newBroker(keyType string, config *Config) *broker {
if config.Interval == 0 {
config.Interval = 5 * time.Second
}
return &broker{
interval: config.Interval,
source: config.Source,
ticker: time.NewTicker(config.Interval),
renew: make(chan struct{}, 1),
cancelled: make(chan struct{}, 1),
res: make(chan []byte, 1),
err: make(chan error, 1),
keyType: keyType,
}
}
type broker struct {
interval time.Duration
source Source
ticker *time.Ticker
renew chan struct{}
cancelled chan struct{}
res chan []byte
err chan error
running bool
keyType string
mu sync.Mutex
}
func (b *broker) isRunning() bool {
b.mu.Lock()
defer b.mu.Unlock()
return b.running
}
// Renew will inform the broker to force renewal of the key
func (b *broker) Renew() {
select {
// Return early if the cancelled channel is already closed
case <-b.cancelled:
return
case b.renew <- struct{}{}:
// Exit if we can't send or receive on any channels
default:
}
}
// Close stops the ticker and releases resources
func (b *broker) Close() {
// Close the cancelled channel first to stop all select switches.
b.ticker.Stop()
close(b.cancelled)
}
// Run starts the broker.
func (b *broker) Run(ctx context.Context) {
log.Printf("running %s broker checking for new key every %d second(s)\n", b.keyType, b.interval/time.Second)
b.mu.Lock()
b.running = true
b.mu.Unlock()
defer func() {
b.mu.Lock()
b.running = false
b.mu.Unlock()
}()
defer close(b.renew)
for {
select {
case <-b.cancelled:
err := fmt.Errorf("%s broker cancelled", b.keyType)
log.Println(err)
b.err <- err
return
case <-b.ticker.C:
select {
case <-b.renew:
bts, err := b.source.Get(ctx)
if err != nil {
log.Printf("%s broker interval error: %v\n", b.keyType, err)
b.Renew()
}
b.res <- bts
default:
}
case <-ctx.Done():
b.err <- fmt.Errorf("%s broker quit due to context timeout", b.keyType)
return
}
}
}
// Halt will attempt to gracefully shut down the broker.
func (b *broker) Halt(_ context.Context) error {
b.Close()
return nil
}