forked from huichen/wukong
-
Notifications
You must be signed in to change notification settings - Fork 10
/
persistent_storage_worker.go
120 lines (104 loc) · 2.66 KB
/
persistent_storage_worker.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
package engine
import (
"bytes"
"encoding/binary"
"encoding/gob"
"github.com/huichen/wukong/core"
"github.com/huichen/wukong/types"
"sync"
"sync/atomic"
)
type persistentStorageIndexDocumentRequest struct {
typ string //"info"or"index"
// typ=="info"时,以下两个字段有效
docId uint64
docInfo *types.DocInfo
// typ=="index"时,以下两个字段有效
keyword string
keywordIndices *types.KeywordIndices
}
func (engine *Engine) persistentStorageIndexDocumentWorker(shard int) {
for {
request := <-engine.persistentStorageIndexDocumentChannels[shard]
switch request.typ {
case "info":
// 得到key
b := make([]byte, 10)
length := binary.PutUvarint(b, request.docId)
// 得到value
var buf bytes.Buffer
enc := gob.NewEncoder(&buf)
err := enc.Encode(request.docInfo)
if err != nil {
atomic.AddUint64(&engine.numDocumentsStored, 1)
return
}
// 将key-value写入数据库
engine.dbs[shard][getDB(request.typ)].Set(b[0:length], buf.Bytes())
atomic.AddUint64(&engine.numDocumentsStored, 1)
case "index":
// 得到key
b := []byte(request.keyword)
// 得到value
var buf bytes.Buffer
enc := gob.NewEncoder(&buf)
err := enc.Encode(request.keywordIndices)
if err != nil {
return
}
// 将key-value写入数据库
engine.dbs[shard][getDB(request.typ)].Set(b, buf.Bytes())
}
}
}
func (engine *Engine) persistentStorageRemoveDocumentWorker(docId uint64, shard int) {
// 得到key
b := make([]byte, 10)
length := binary.PutUvarint(b, docId)
// 从数据库删除该key
engine.dbs[shard][getDB("info")].Delete(b[0:length])
}
func (engine *Engine) persistentStorageInitWorker(shard int) {
var finish sync.WaitGroup
finish.Add(2)
// 恢复docInfo
go func() {
defer finish.Add(-1)
engine.dbs[shard][getDB("info")].ForEach(func(k, v []byte) error {
key, value := k, v
// 得到docID
docId, _ := binary.Uvarint(key)
// 得到data
buf := bytes.NewReader(value)
dec := gob.NewDecoder(buf)
var data types.DocInfo
err := dec.Decode(&data)
if err == nil {
// 添加索引
core.AddDocInfo(shard, docId, &data)
}
return nil
})
}()
// 恢复invertedIndex
go func() {
defer finish.Add(-1)
engine.dbs[shard][getDB("index")].ForEach(func(k, v []byte) error {
key, value := k, v
// 得到keyword
keyword := string(key)
// 得到data
buf := bytes.NewReader(value)
dec := gob.NewDecoder(buf)
var data types.KeywordIndices
err := dec.Decode(&data)
if err == nil {
// 添加索引
core.AddKeywordIndices(shard, keyword, &data)
}
return nil
})
}()
finish.Wait()
engine.persistentStorageInitChannel <- true
}