-
-
Notifications
You must be signed in to change notification settings - Fork 201
/
level_db.go
115 lines (100 loc) · 2.1 KB
/
level_db.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
package cloudbackup
import (
"os"
"path/filepath"
"strings"
"sync"
"github.com/admpub/once"
"github.com/syndtr/goleveldb/leveldb"
"github.com/webx-top/com"
"github.com/webx-top/echo"
"github.com/webx-top/echo/param"
)
var (
levelDBPool *dbPool
levelDBOnce once.Once
LevelDBDir = `data/cache/backup-db`
)
func LevelDB() *dbPool {
levelDBOnce.Do(initLevelDB)
return levelDBPool
}
func initLevelDB() {
levelDBPool = NewLevelDBPool()
}
func NewLevelDBPool() *dbPool {
return &dbPool{mp: map[uint]*leveldb.DB{}}
}
type dbPool struct {
mu sync.RWMutex
mp map[uint]*leveldb.DB
}
func (t *dbPool) OpenDB(taskId uint) (*leveldb.DB, error) {
t.mu.RLock()
db := t.mp[taskId]
t.mu.RUnlock()
if db == nil {
t.mu.Lock()
defer t.mu.Unlock()
var err error
db, err = openLevelDB(taskId)
if err != nil {
return nil, err
}
t.mp[taskId] = db
}
return db, nil
}
func openLevelDB(taskId uint) (*leveldb.DB, error) {
idKey := com.String(taskId)
dbDir := filepath.Join(echo.Wd(), LevelDBDir)
err := com.MkdirAll(dbDir, os.ModePerm)
if err != nil {
return nil, err
}
dbFile := filepath.Join(dbDir, idKey)
return leveldb.OpenFile(dbFile, nil)
}
func removeLevelDB(taskId uint) error {
idKey := com.String(taskId)
dbFile := filepath.Join(echo.Wd(), LevelDBDir, idKey)
if !com.FileExists(dbFile) {
return nil
}
return os.RemoveAll(dbFile)
}
func (t *dbPool) CloseDB(taskId uint) {
t.mu.Lock()
if db, ok := t.mp[taskId]; ok {
db.Close()
delete(t.mp, taskId)
}
t.mu.Unlock()
}
func (t *dbPool) RemoveDB(taskId uint) error {
t.mu.Lock()
if db, ok := t.mp[taskId]; ok {
db.Close()
delete(t.mp, taskId)
}
err := removeLevelDB(taskId)
t.mu.Unlock()
return err
}
func (t *dbPool) CloseAllDB() {
t.mu.Lock()
for _, db := range t.mp {
db.Close()
}
t.mu.Unlock()
}
func ParseDBValue(val []byte) (md5 string, startTs, endTs, fileModifyTs, fileSize int64) {
parts := strings.Split(com.Bytes2str(val), `||`)
md5 = parts[0]
if len(parts) > 1 {
com.SliceExtractCallback(parts[1:], func(v string) int64 {
return param.AsInt64(v)
}, &startTs, &endTs, &fileModifyTs, &fileSize)
}
return
}