/
store.go
166 lines (141 loc) · 3.46 KB
/
store.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
// Package store provides persitence layer.
package store
import (
"context"
"errors"
"fmt"
"strconv"
"strings"
"time"
"github.com/Diggs/go-backoff"
"github.com/go-redis/redis/v8"
log "github.com/sirupsen/logrus"
)
// DefaultDatabaseConnect is the default redis connect scheme
// ([[db:]password@]host:port)
const DefaultDatabaseConnect = "localhost:6379"
var (
rdb *redis.Client
ctx context.Context
)
// parseCnxNfo format: "[[db:]password@]host:port"
// ex :
// "0:password@localhost:6379"
// "password@localhost:6379"
// "localhost:6379"
func parseCnxNfo(s string) (*redis.Options, error) {
if s == "" {
return nil, errors.New("Bad connection parameter: empty string")
}
var (
addr string
pwd string
db int
err error
)
v := strings.Split(s, "@")
lv := len(v)
if lv > 2 {
return nil, fmt.Errorf("Bad connection parameter: got (%s)", s)
}
if lv == 1 {
addr = v[0]
} else {
addr = v[1]
dbPwd := strings.Split(v[0], ":")
lv = len(dbPwd)
if lv > 2 {
return nil, fmt.Errorf("Bad connection parameter: got (%s)", s)
}
if lv == 1 {
pwd = dbPwd[0]
} else {
pwd = dbPwd[1]
db, err = strconv.Atoi(dbPwd[0])
if err != nil {
return nil, fmt.Errorf("Bad connection parameter: got (%s)", s)
}
}
}
return &redis.Options{
Addr: addr,
Password: pwd,
DB: db,
}, nil
}
// Setup redis context and client
// store.Setup(ctx.Background(), "db:password@host:port")
// store.Setup(ctx.Background(), "0:password@localhost:6379")
// store.Setup(ctx.Background(), "password@localhost:6379")
// store.Setup(ctx.Background(), "localhost:6379")
func Setup(cx context.Context, c string) error {
opts, err := parseCnxNfo(c)
if err != nil {
return err
}
ctx = cx
rdb = redis.NewClient(opts)
return nil
}
// BlockedPingBakkoff ping redis with backoff retry if redis is not reachabe
func BlockedPingBakkoff() (pong string, err error) {
// Back off exponentially, starting at 3 milliseconds, capping at 320 seconds
exp := backoff.NewExponentialFullJitter(3*time.Millisecond, 320*time.Second)
for {
pong, err = Ping()
if err != nil {
log.Errorf("%s - backing off %d second(s)", err.Error(), exp.NextDuration/time.Second)
exp.Backoff()
} else {
exp.Reset()
return pong, nil
}
}
}
// Ping redis server
func Ping() (string, error) {
pong := rdb.Ping(ctx).Val()
if pong == "" {
return "", errors.New("unreachable redis server")
}
return pong, nil
}
// Hitable is the Hits interface
type Hitable interface {
Add(k string, i int64)
Top() (string, int64, error)
Reset()
}
// Hits is a hits bag identified by a Key
type Hits struct {
Key string
}
// NewHits returns a new hits bag.
func NewHits(k string) Hitable {
return Hits{Key: k}
}
// Add an hit in hits bag
func (h Hits) Add(k string, i int64) {
// TODO make a binarie string to compact the key
rresp, err := rdb.ZIncrBy(ctx, h.Key, float64(i), k).Result()
if err != nil {
log.Errorf("error to add hit (%s)", err.Error())
}
log.Debugf("success stores hit to %s count(%d)", k, int64(rresp))
}
// Top returns the most popular hit in hits bag
func (h Hits) Top() (string, int64, error) {
vals, err := rdb.ZRevRangeWithScores(ctx, h.Key, 0, 0).Result()
if err != nil {
return "", 0, err
}
if len(vals) < 1 {
return "", 0, nil
}
return vals[0].Member.(string), int64(vals[0].Score), nil
}
// Reset deletes key of hits bag
func (h Hits) Reset() {
del := rdb.Del(ctx, h.Key).Val()
log.Debugf("reset hits bag to %s redis(%d)", h.Key, del)
}