-
Notifications
You must be signed in to change notification settings - Fork 13
/
redis.go
367 lines (315 loc) · 8.99 KB
/
redis.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
// ClueGetter - Does things with mail
//
// Copyright 2016 Dolf Schimmel, Freeaqingme.
//
// This Source Code Form is subject to the terms of the two-clause BSD license.
// For its contents, please refer to the LICENSE file.
//
package core
import (
"encoding/json"
"fmt"
"io/ioutil"
"math"
"strconv"
"strings"
"time"
"github.com/glenn-brown/golang-pkg-pcre/src/pkg/pcre"
redis "gopkg.in/redis.v3"
)
type RedisClientBase interface {
Del(keys ...string) *redis.IntCmd
Exists(key string) *redis.BoolCmd
Expire(key string, expiration time.Duration) *redis.BoolCmd
Get(key string) *redis.StringCmd
HSet(key, field, value string) *redis.BoolCmd
HGetAllMap(key string) *redis.StringStringMapCmd
LPush(key string, values ...string) *redis.IntCmd
LPushX(key, value interface{}) *redis.IntCmd
LRange(key string, start, stop int64) *redis.StringSliceCmd
LSet(key string, index int64, value interface{}) *redis.StatusCmd
Ping() *redis.StatusCmd
RPop(key string) *redis.StringCmd
SAdd(key string, members ...string) *redis.IntCmd
SMembers(key string) *redis.StringSliceCmd
Set(key string, value interface{}, expiration time.Duration) *redis.StatusCmd
SetNX(key string, value interface{}, expiration time.Duration) *redis.BoolCmd
ZAdd(key string, members ...redis.Z) *redis.IntCmd
ZCount(key, min, max string) *redis.IntCmd
ZRemRangeByScore(key, min, max string) *redis.IntCmd
Eval(string, []string, []string) *redis.Cmd
EvalSha(sha1 string, keys []string, args []string) *redis.Cmd
ScriptExists(scripts ...string) *redis.BoolSliceCmd
ScriptLoad(script string) *redis.StringCmd
}
type RedisClient interface {
RedisClientBase
PSubscribe(patterns ...string) (*redis.PubSub, error)
Publish(channel, message string) *redis.IntCmd
}
type RedisClientMulti interface {
RedisClientBase
Exec(f func() error) ([]redis.Cmder, error)
}
type RedisKeyValue struct {
key string
value []byte
}
var (
redisClient RedisClient
RedisLPushChan chan *RedisKeyValue
redisDumpKeys = make([]*pcre.Regexp, 0)
)
func redisStart() {
if !Config.Redis.Enabled {
return
}
if len(Config.Redis.Host) == 0 {
Log.Fatal("No Redis.Host specified")
}
if Config.Redis.Dump_Dir != "" {
if len(Config.Redis.Dump_Key) == 0 {
Config.Redis.Dump_Key = []string{"^cluegetter!"}
}
for _, regexStr := range Config.Redis.Dump_Key {
regex, err := pcre.Compile(regexStr, 0)
if err != nil {
Log.Fatal("Could not compile redis key regex: /%s/. Error: %s", regexStr, err.String())
}
redisDumpKeys = append(redisDumpKeys, ®ex)
}
}
RedisLPushChan = make(chan *RedisKeyValue, 255)
redisClient = redisNewClient()
cg.Redis = redisClient
go redisChannelListener()
go func() {
ticker := time.NewTicker(30 * time.Second)
for {
select {
case <-ticker.C:
redisUpdateRunningList()
}
}
}()
go redisUpdateRunningList()
go redisRpc()
Log.Info("Redis module started successfully")
}
func redisNewClient() RedisClient {
var client RedisClient
switch Config.Redis.Method {
case "standalone":
client = redis.NewClient(&redis.Options{
Addr: Config.Redis.Host[0],
Password: "",
DB: 0,
})
case "sentinel":
client = redis.NewFailoverClient(&redis.FailoverOptions{
MasterName: "master",
SentinelAddrs: Config.Redis.Host,
})
default:
Log.Fatal("Unknown redis connection method specified")
}
return client
}
// Because transactions can be blocking the rest of the
// connection we set up a separate client for the transaction
func redisNewTransaction(keys ...string) (RedisClientMulti, error) {
client := redisNewClient().(*redis.Client)
return client.Watch(keys...)
}
func redisChannelListener() {
for {
select {
case cmd := <-RedisLPushChan:
go redisLPush(cmd)
}
}
}
func redisLPush(cmd *RedisKeyValue) {
res := redisClient.LPush(cmd.key, string(cmd.value))
Log.Debug("Added 1 item to Redis Queue %s. New size: %d", cmd.key, res.Val())
}
func redisListSubscribe(list string, input chan []byte, output chan []byte) {
if !Config.Redis.Enabled {
go redisListSubscribeBypass(input, output)
return
}
go redisListSubscriptionPoller(list, output)
go redisListSubscriptionQueueHandler(list, input)
}
func redisListSubscriptionQueueHandler(list string, input chan []byte) {
for {
data := <-input
go redisLPush(&RedisKeyValue{list, data})
}
}
func redisListSubscriptionPoller(list string, output chan []byte) {
ticker := time.NewTicker(100 * time.Millisecond)
for {
select {
case <-ticker.C:
for {
res, err := redisClient.RPop(list).Bytes()
if err == redis.Nil {
break
}
if err != nil {
Log.Error("Error while polling from Redis: %s", err.Error())
time.Sleep(5 * time.Second)
break
}
for {
rdbmsErr := Rdbms.Ping()
if rdbmsErr == nil {
break
}
Log.Error("Mysql seems down: %s", rdbmsErr.Error())
time.Sleep(2500 * time.Millisecond)
}
output <- res
}
}
}
}
func redisListSubscribeBypass(input chan []byte, output chan []byte) {
for {
data := <-input
output <- data
}
}
type service struct {
Hostname string
Instance uint
}
func redisUpdateRunningList() {
Log.Debug("Running redisUpdateRunningList()")
now := time.Now().Unix()
start := int(float64(now) - math.Mod(float64(now), 60.0))
values := &service{
Hostname: hostname,
Instance: instance,
}
value, _ := json.Marshal(values)
valueStr := string(value)
for i := -1; i <= 1; i++ {
key := fmt.Sprintf("cluegetter-%d-service-%d", instance, start+(i*60))
redisClient.HSet(key, hostname, valueStr)
redisClient.Expire(key, time.Duration(120)*time.Second)
key = fmt.Sprintf("cluegetter-service-%d", start+(i*60))
redisClient.SAdd(key, valueStr)
redisClient.Expire(key, time.Duration(120)*time.Second)
}
}
func redisGetServices() []*service {
now := time.Now().Unix()
start := int(float64(now) - math.Mod(float64(now), 60.0))
key := fmt.Sprintf("cluegetter-service-%d", start)
out := make([]*service, 0)
for _, jsonStr := range redisClient.SMembers(key).Val() {
value := &service{}
err := json.Unmarshal([]byte(jsonStr), &value)
if err != nil {
Log.Error("Could not parse json service string: %s", err.Error())
continue
}
out = append(out, value)
}
return out
}
func redisPublish(key string, msg []byte) error {
var logMsg string
if len(logMsg) > 128 {
logMsg = string(logMsg[:128]) + "..."
} else {
logMsg = string(logMsg)
}
redisDumpPublish("out", key, msg)
Log.Info("Publising on Redis channel '%s': %s", key, logMsg)
return redisClient.Publish(key, string(msg)).Err()
}
func redisRpc() {
pubsub, err := redisClient.PSubscribe("cluegetter!*")
if err != nil {
Log.Fatal("Could not connect to Redis or subscribe to the RPC channels: ", err.Error())
}
defer pubsub.Close()
listeners := make(map[string][]chan string, 0)
for _, module := range cg.Modules() {
for pattern, channel := range module.Rpc() {
if listeners[pattern] == nil {
listeners[pattern] = make([]chan string, 0)
}
listeners[pattern] = append(listeners[pattern], channel)
}
}
for {
msg, err := pubsub.ReceiveMessage()
if err != nil {
Log.Error("Error from redis/pubsub.ReceiveMessage(): %s", err.Error())
time.Sleep(1 * time.Second)
continue
}
logMsg := msg.Payload
if len(logMsg) > 128 {
logMsg = logMsg[:128] + "..."
}
logMsg = strings.Replace(logMsg, "\n", "\\n", -1)
elements := strings.SplitN(msg.Channel, "!", 3)
if len(elements) < 3 || elements[0] != "cluegetter" {
Log.Notice("Received invalid RPC channel <%s>%s", msg.Channel, logMsg)
continue
}
if msgInstance, err := strconv.Atoi(elements[1]); err == nil && len(elements[1]) > 0 {
if msgInstance != int(instance) {
Log.Debug("Received RPC message for other instance (%d). Ignoring: <%s>%s", instance, msg.Channel, logMsg)
continue
}
} else if len(elements[1]) > 0 && elements[1] != hostname {
Log.Debug("Received RPC message for other service (%s). Ignoring: <%s>%s", elements[1], msg.Channel, logMsg)
continue
}
if listeners[elements[2]] == nil {
Log.Debug("Received RPC message but no such pattern was registered, ignoring: <%s>%s", msg.Channel, logMsg)
continue
}
Log.Info("Received RPC Message: <%s>%s", msg.Channel, logMsg)
redisDumpPublish("in", msg.Channel, []byte(msg.Payload))
for _, channel := range listeners[elements[2]] {
go func(payload string) {
channel <- payload
}(msg.Payload)
}
}
}
func redisDumpPublish(direction, key string, msg []byte) {
if Config.Redis.Dump_Dir == "" {
return
}
dump := false
for _, regexp := range redisDumpKeys {
if len(regexp.FindIndex([]byte(key), 0)) != 0 {
dump = true
break
}
}
if !dump {
return
}
filename := fmt.Sprintf("cluegetter-redisPublish-%s-%s-", key, direction)
f, err := ioutil.TempFile(Config.Redis.Dump_Dir, filename)
if err != nil {
Log.Error("Could not open file for dump file: %s", err.Error())
return
}
defer f.Close()
count, err := f.Write(msg)
if err != nil {
Log.Error("Wrote %d bytes to '%s', then got error: %s", count, f.Name(), err.Error())
return
}
Log.Debug("Wrote %d bytes to '%s'", count, f.Name())
}