-
Notifications
You must be signed in to change notification settings - Fork 1
/
generator.go
161 lines (133 loc) · 3.32 KB
/
generator.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
package main
import (
"fmt"
"log"
"time"
"github.com/garyburd/redigo/redis"
uuid "github.com/satori/go.uuid"
)
const (
//Key lock name.
LOCK_NAME = "lock:gen"
//Another generator is active
LOCK_OTHER_EXISTS = iota
//Lock have been successfully refreshed
LOCK_REFRESHED
//Lock exists, but belongs to another generator
LOCK_NOT_REFRESHED
//Unable to connect to redis
CONNECTION_PROBLEM
)
type Generator struct {
pool *redis.Pool
queue string
name string
interval int
pingInterval int
stop chan struct{}
isActive bool
}
//Creates new Generator
func NewGen(p *redis.Pool, queue string, interval, pinterval int) *Generator {
return &Generator{
pool: p,
queue: queue,
interval: interval,
pingInterval: pinterval,
name: uuid.NewV4().String(),
stop: make(chan struct{}),
}
}
//Tries to acquire new lock. If there is no lock, that means there is no
//active generator. Acquiring lock means cuurrent generator will be activated.
func (g *Generator) AcquireLock() bool {
pc := g.pool.Get()
defer pc.Close()
//Set lock expire time pingInterval+5 seconds.
//5 is a "magic number", time for network lag, etc.
lock, err := pc.Do("SET", LOCK_NAME, g.name, "EX", g.pingInterval+5, "NX")
if err != nil {
log.Println("acquirelock: unable to execute SET:", err)
return false
}
return lock == "OK"
}
//Refreshes it's own lock
func (g *Generator) RefreshLock() byte {
pc := g.pool.Get()
defer pc.Close()
_, err := pc.Do("WATCH", LOCK_NAME)
if err != nil {
log.Println("refreshlock: unable to execute WATCH:", err)
return CONNECTION_PROBLEM
}
cg, err := pc.Do("GET", LOCK_NAME)
if err != nil {
log.Println("refreshlock: unable to execute GET:", err)
return CONNECTION_PROBLEM
}
cg, err = redis.String(cg, err)
LogIf(err)
if cg == g.name {
pc.Send("MULTI")
pc.Send("SET", LOCK_NAME, g.name, "EX", g.pingInterval+5, "XX")
lock, err := redis.Values(pc.Do("EXEC"))
if err != nil {
log.Println("refreshlock: unable to execute EXEC:", err)
return CONNECTION_PROBLEM
}
var s string
redis.Scan(lock, &s)
if s == "OK" {
return LOCK_REFRESHED
} else {
return LOCK_NOT_REFRESHED
}
} else {
_, err := pc.Do("UNWATCH")
if err != nil {
log.Println("refreshlock: unable to execute UNWATCH:", err)
return CONNECTION_PROBLEM
}
}
return LOCK_NOT_REFRESHED
}
//Generates random string
func (g *Generator) Message() string {
return fmt.Sprintf("%s at %s", uuid.NewV4(), time.Now())
}
//Added data to redis list
func (g *Generator) Start() {
if g.RefreshLock() == LOCK_OTHER_EXISTS {
log.Println("generator run: another generator in progress")
return
}
pc := g.pool.Get()
defer pc.Close()
defer func(g *Generator) {
log.Printf("Generator stopped (name: %s).\n", g.name)
g.isActive = false
}(g)
ticker := time.NewTicker(time.Millisecond * time.Duration(g.interval))
log.Printf("Generator started (name: %s).\n", g.name)
g.isActive = true
for {
select {
case <-ticker.C:
if _, err := pc.Do("LPUSH", g.queue, g.Message()); err != nil {
log.Println("messages: unable to push to redis:", err)
return
}
case <-g.stop:
return
}
}
}
//Stops generator
func (g *Generator) Stop() {
g.stop <- struct{}{}
}
//Returns generator state
func (g *Generator) IsActive() bool {
return g.isActive
}