forked from kataras/go-sessions
-
Notifications
You must be signed in to change notification settings - Fork 0
/
database.go
189 lines (155 loc) · 4.9 KB
/
database.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
package leveldb
import (
"bytes"
"errors"
"runtime"
"github.com/kataras/go-sessions"
"github.com/kataras/golog"
"github.com/syndtr/goleveldb/leveldb"
"github.com/syndtr/goleveldb/leveldb/opt"
)
var (
// Options used to open the leveldb database, defaults to leveldb's default values.
Options = &opt.Options{}
// WriteOptions used to put and delete, defaults to leveldb's default values.
WriteOptions = &opt.WriteOptions{}
// ReadOptions used to iterate over the database, defaults to leveldb's default values.
ReadOptions = &opt.ReadOptions{}
)
// Database the LevelDB(file-based) session storage.
type Database struct {
// Service is the underline LevelDB database connection,
// it's initialized at `New` or `NewFromDB`.
// Can be used to get stats.
Service *leveldb.DB
async bool
}
// New creates and returns a new LevelDB(file-based) storage
// instance based on the "directoryPath".
// DirectoryPath should is the directory which the leveldb database will store the sessions,
// i.e ./sessions/
//
// It will remove any old session files.
func New(directoryPath string) (*Database, error) {
if directoryPath == "" {
return nil, errors.New("dir is missing")
}
// Second parameter is a "github.com/syndtr/goleveldb/leveldb/opt.Options{}"
// user can change the `Options` or create the sessiondb via `NewFromDB`
// if wants to use a customized leveldb database
// or an existing one, we don't require leveldb options at the constructor.
//
// The leveldb creates the directories, if necessary.
service, err := leveldb.OpenFile(directoryPath, Options)
if err != nil {
golog.Errorf("unable to initialize the LevelDB-based session database: %v", err)
return nil, err
}
return NewFromDB(service)
}
// NewFromDB same as `New` but accepts an already-created custom leveldb connection instead.
func NewFromDB(service *leveldb.DB) (*Database, error) {
if service == nil {
return nil, errors.New("underline database is missing")
}
db := &Database{Service: service}
runtime.SetFinalizer(db, closeDB)
return db, db.Cleanup()
}
// Cleanup removes any invalid(have expired) session entries,
// it's being called automatically on `New` as well.
func (db *Database) Cleanup() error {
iter := db.Service.NewIterator(nil, ReadOptions)
for iter.Next() {
// Remember that the contents of the returned slice should not be modified, and
// only valid until the next call to Next.
k := iter.Key()
if len(k) > 0 {
v := iter.Value()
storeDB, err := sessions.DecodeRemoteStore(v)
if err != nil {
continue
}
if storeDB.Lifetime.HasExpired() {
if err := db.Service.Delete(k, WriteOptions); err != nil {
golog.Warnf("troubles when cleanup a session remote store from LevelDB: %v", err)
}
}
}
}
iter.Release()
return iter.Error()
}
// Async if true passed then it will use different
// go routines to update the LevelDB(file-based) storage.
func (db *Database) Async(useGoRoutines bool) *Database {
db.async = useGoRoutines
return db
}
// Load loads the sessions from the LevelDB(file-based) session storage.
func (db *Database) Load(sid string) (storeDB sessions.RemoteStore) {
bsid := []byte(sid)
iter := db.Service.NewIterator(nil, ReadOptions)
for iter.Next() {
// Remember that the contents of the returned slice should not be modified, and
// only valid until the next call to Next.
k := iter.Key()
if len(k) > 0 {
v := iter.Value()
if bytes.Equal(k, bsid) { // session id should be the name of the key-value pair
store, err := sessions.DecodeRemoteStore(v) // decode the whole value, as a remote store
if err != nil {
golog.Errorf("error while trying to load from the remote store: %v", err)
} else {
storeDB = store
}
break
}
}
}
iter.Release()
if err := iter.Error(); err != nil {
golog.Errorf("error while trying to iterate over the database: %v", err)
}
return
}
// Sync syncs the database with the session's (memory) store.
func (db *Database) Sync(p sessions.SyncPayload) {
if db.async {
go db.sync(p)
} else {
db.sync(p)
}
}
func (db *Database) sync(p sessions.SyncPayload) {
bsid := []byte(p.SessionID)
if p.Action == sessions.ActionDestroy {
if err := db.destroy(bsid); err != nil {
golog.Errorf("error while destroying a session(%s) from leveldb: %v",
p.SessionID, err)
}
return
}
s, err := p.Store.Serialize()
if err != nil {
golog.Errorf("error while serializing the remote store: %v", err)
}
err = db.Service.Put(bsid, s, WriteOptions)
if err != nil {
golog.Errorf("error while writing the session(%s) to the database: %v", p.SessionID, err)
}
}
func (db *Database) destroy(bsid []byte) error {
return db.Service.Delete(bsid, WriteOptions)
}
// Close shutdowns the LevelDB connection.
func (db *Database) Close() error {
return closeDB(db)
}
func closeDB(db *Database) error {
err := db.Service.Close()
if err != nil {
golog.Warnf("closing the LevelDB connection: %v", err)
}
return err
}