-
Notifications
You must be signed in to change notification settings - Fork 1
/
redis_adapter.go
217 lines (199 loc) · 5.98 KB
/
redis_adapter.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
package storage
import (
"context"
"fmt"
"log"
"sort"
"strings"
"time"
r "github.com/redis/go-redis/v9"
)
type message struct {
app string
messageBody string
}
func newMessage(app string, messageBody string) *message {
return &message{
app: app,
messageBody: messageBody,
}
}
type messagePipeliner struct {
timeout time.Duration
bufferSize int
messageCount int
pipeline r.Pipeliner
timeoutTicker *time.Ticker
queuedApps map[string]bool
errCh chan error
}
func newMessagePipeliner(bufferSize int, redisClient *r.ClusterClient, timeout time.Duration, errCh chan error) *messagePipeliner {
return &messagePipeliner{
timeout: timeout,
bufferSize: bufferSize,
pipeline: redisClient.Pipeline(),
timeoutTicker: time.NewTicker(timeout),
queuedApps: map[string]bool{},
errCh: errCh,
}
}
func (mp *messagePipeliner) addMessage(message *message) {
ctx, cancel := context.WithTimeout(context.Background(), mp.timeout)
defer cancel()
if err := mp.pipeline.Publish(ctx, message.app, message.messageBody).Err(); err != nil {
mp.errCh <- fmt.Errorf("error adding publish to %s to the pipeline: %s", message.app, err)
} else if err := mp.pipeline.RPush(ctx, message.app, message.messageBody).Err(); err != nil {
mp.errCh <- fmt.Errorf("error adding rpush to %s to the pipeline: %s", message.app, err)
} else {
mp.queuedApps[message.app] = true
mp.messageCount++
}
}
func (mp messagePipeliner) execPipeline() {
ctx, cancel := context.WithTimeout(context.Background(), mp.timeout)
defer cancel()
for app := range mp.queuedApps {
if err := mp.pipeline.LTrim(ctx, app, int64(-1*mp.bufferSize), -1).Err(); err != nil {
log.Printf("error adding ltrim of %s to the pipeline: %s", app, err)
mp.errCh <- fmt.Errorf("error adding ltrim of %s to the pipeline: %s", app, err)
}
}
if _, err := mp.pipeline.Exec(ctx); err != nil {
log.Printf("error executing pipeline: %s", err)
mp.errCh <- fmt.Errorf("error executing pipeline: %s", err)
}
}
func newClusterClient(cfg *redisConfig) (*r.ClusterClient, error) {
addrs := strings.Split(cfg.Addrs, ",")
sort.Strings(addrs)
return r.NewClusterClient(&r.ClusterOptions{
ClusterSlots: func(context.Context) ([]r.ClusterSlot, error) {
const slotsSize = 16383
var size = len(addrs)
var slotsRange = slotsSize / size
var slots []r.ClusterSlot
for index, addr := range addrs {
start := slotsRange * index
end := start + slotsRange
if (slotsSize - end) < slotsRange {
end = slotsSize
}
slots = append(slots, r.ClusterSlot{
Start: start,
End: end,
Nodes: []r.ClusterNode{{Addr: addr}},
})
}
return slots, nil
},
Password: cfg.Password, // "" == no password
RouteRandomly: true,
}), nil
}
type redisAdapter struct {
started bool
bufferSize int
redisClient *r.ClusterClient
messageChannel chan *message
stopCh chan struct{}
config *redisConfig
}
// NewRedisStorageAdapter returns a pointer to a new instance of a redis-based storage.Adapter.
func NewRedisStorageAdapter(bufferSize int) (Adapter, error) {
if bufferSize <= 0 {
return nil, fmt.Errorf("invalid buffer size: %d", bufferSize)
}
cfg, err := parseConfig(appName)
if err != nil {
return nil, err
}
client, err := newClusterClient(cfg)
if err != nil {
return nil, err
}
rsa := &redisAdapter{
bufferSize: bufferSize,
redisClient: client,
messageChannel: make(chan *message, bufferSize),
stopCh: make(chan struct{}),
config: cfg,
}
return rsa, nil
}
// Start the storage adapter. Invocations of this function are not concurrency safe and multiple
// serialized invocations have no effect.
func (a *redisAdapter) Start() {
if !a.started {
a.started = true
mp := newMessagePipeliner(a.bufferSize, a.redisClient, a.config.PipelineTimeout, make(chan error, a.bufferSize))
go func() {
for {
select {
case err := <-mp.errCh:
log.Printf("select pipeline message err: %v", err)
case <-a.stopCh:
return
case message := <-a.messageChannel:
mp.addMessage(message)
if mp.messageCount == a.config.PipelineLength {
mp.execPipeline()
}
case <-mp.timeoutTicker.C:
mp.execPipeline()
}
}
}()
}
}
// Write adds a log message to to an app-specific list in redis using ring-buffer-like semantics
func (a *redisAdapter) Write(app string, messageBody string) error {
a.messageChannel <- newMessage(app, messageBody)
return nil
}
// Read retrieves a specified number of log lines from an app-specific list in redis
func (a *redisAdapter) Read(app string, lines int) ([]string, error) {
ctx, cancel := context.WithTimeout(context.Background(), a.config.PipelineTimeout)
defer cancel()
stringSliceCmd := a.redisClient.LRange(ctx, app, int64(-1*lines), -1)
result, err := stringSliceCmd.Result()
if err != nil {
return nil, err
}
if len(result) > 0 {
return result, nil
}
return nil, fmt.Errorf("could not find logs for '%s'", app)
}
// Make Chan a pipeline to read logs all the time
func (a *redisAdapter) Chan(ctx context.Context, app string, size int) (chan string, error) {
channel := make(chan string, size)
go func() {
defer close(channel)
subscribe := a.redisClient.Subscribe(context.Background(), app)
defer subscribe.Close()
subscriptions := subscribe.Channel()
for len(channel) != size {
select {
case <-ctx.Done():
return
case message := <-subscriptions:
channel <- message.Payload
}
}
}()
return channel, nil
}
// Destroy deletes an app-specific list from redis
func (a *redisAdapter) Destroy(app string) error {
ctx, cancel := context.WithTimeout(context.Background(), a.config.PipelineTimeout)
defer cancel()
return a.redisClient.Del(ctx, app).Err()
}
// Reopen the storage adapter-- in the case of this implementation, a no-op
func (a *redisAdapter) Reopen() error {
return nil
}
// Stop the storage adapter. Additional writes may not be performed after stopping.
func (a *redisAdapter) Stop() {
close(a.stopCh)
}