forked from arrikto/boltstore
/
reaper.go
162 lines (137 loc) · 3.74 KB
/
reaper.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
package reaper
import (
"bytes"
"encoding/gob"
"log"
"time"
"github.com/go-extras/boltstore/shared"
"github.com/go-extras/boltstore/shared/protobuf"
bolt "go.etcd.io/bbolt"
)
//##############//
//### Public ###//
//##############//
// Run invokes a reap function as a goroutine.
func Run(db *bolt.DB, options Options) (chan<- struct{}, <-chan struct{}) {
options.setDefault()
quitC, doneC := make(chan struct{}), make(chan struct{})
go reap(db, options, quitC, doneC)
return quitC, doneC
}
// Quit terminates the reap goroutine.
func Quit(quitC chan<- struct{}, doneC <-chan struct{}) {
quitC <- struct{}{}
<-doneC
}
//###############//
//### Private ###//
//###############//
func reap(db *bolt.DB, options Options, quitC <-chan struct{}, doneC chan<- struct{}) {
// Create a new ticker
ticker := time.NewTicker(options.CheckInterval)
defer func() {
// Stop the ticker
ticker.Stop()
}()
var prevKey []byte
for {
select {
case <-quitC: // Check if a quit signal is sent.
doneC <- struct{}{}
return
case <-ticker.C: // Check if the ticker fires a signal.
// This slice is a buffer to save all expired session keys.
type kv struct {
key []byte
value *protobuf.Session
}
expiredSessionKeys := make([]kv, 0)
// Start a bolt read transaction.
err := db.View(func(tx *bolt.Tx) error {
bucket := tx.Bucket(options.BucketName)
if bucket == nil {
return nil
}
c := bucket.Cursor()
var i int
var isExpired bool
for k, v := c.Seek(prevKey); ; k, v = c.Next() {
// If we hit the end of our sessions then
// exit and start over next time.
if k == nil {
prevKey = nil
return nil
}
i++
// The flag if the session is expired
isExpired = false
session, err := shared.Session(v)
if err != nil {
// Just remove the session with the invalid session data.
// Log the error first.
log.Printf("boltstore: removing session from database with invalid value: %v", err)
isExpired = true
} else if shared.Expired(session) {
isExpired = true
}
if isExpired {
// Copy the byte slice key, because this data is
// not safe outside of this transaction.
temp := make([]byte, len(k))
copy(temp, k)
// Add it to the expired sessios keys slice
kv := kv{key: temp, value: &session}
expiredSessionKeys = append(expiredSessionKeys, kv)
}
if options.BatchSize == i {
// Store the current key to the previous key.
// Copy the byte slice key, because this data is
// not safe outside of this transaction.
prevKey = make([]byte, len(k))
copy(prevKey, k)
return nil
}
}
})
if err != nil {
log.Printf("boltstore: obtain expired sessions error: %v", err)
}
if len(expiredSessionKeys) > 0 {
// Remove the expired sessions from the database
err = db.Update(func(txu *bolt.Tx) error {
// Get the bucket
b := txu.Bucket(options.BucketName)
if b == nil {
return nil
}
// Remove all expired sessions in the slice
for _, kv := range expiredSessionKeys {
if options.PreDeleteFn != nil {
var values map[interface{}]interface{}
if len(kv.value.Values) != 0 {
values = make(map[interface{}]interface{})
dec := gob.NewDecoder(bytes.NewBuffer(kv.value.Values))
err := dec.Decode(&values)
if err != nil {
return err
}
}
err = options.PreDeleteFn(values)
if err != nil {
continue
}
}
err = b.Delete(kv.key)
if err != nil {
return err
}
}
return nil
})
if err != nil {
log.Printf("boltstore: remove expired sessions error: %v", err)
}
}
}
}
}