-
Notifications
You must be signed in to change notification settings - Fork 0
/
schemastate.go
134 lines (115 loc) · 3.25 KB
/
schemastate.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
package edb
import (
"log"
"reflect"
"time"
"go.etcd.io/bbolt"
)
func (db *DB) tableState(tbl *Table) *tableState {
return db.tableStates[tbl.pos]
}
type tableState struct {
MinSchemaVer uint64 `msgpack:"s"`
LastIndexOrdinal uint64 `msgpack:"li"`
Indices map[string]*indexState `msgpack:"i"`
LastSeen time.Time `msgpack:"t"`
DeletionCounter int `msgpack:"delcnt,omitempty"`
table *Table `msgpack:"-"`
indexStates []*indexState `msgpack:"-"`
indexStatesByOrd map[uint64]*indexState `msgpack:"-"`
}
func (ts *tableState) indexOrdinal(idx *Index) uint64 {
return ts.indexStates[idx.pos].IndexOrdinal
}
func (ts *tableState) indexByOrdinal(ord uint64) *Index {
is := ts.indexStatesByOrd[ord]
if is == nil {
return nil
}
return is.index
}
func (ts *tableState) hasPendingIndices() bool {
for _, is := range ts.Indices {
if !is.Built {
return true
}
}
return false
}
type indexState struct {
index *Index `msgpack:"-"`
IndexOrdinal uint64 `msgpack:"o"`
Built bool `msgpack:"f"`
}
var tableStateKey = []byte("_state")
const tableStateEncoding = MsgPack
func prepareTable(tx *Tx, tbl *Table, now time.Time) *tableState {
tableRootB := must(tx.btx.CreateBucketIfNotExists(tbl.buck.Raw()))
_ = must(tableRootB.CreateBucketIfNotExists(dataBucket.Raw()))
for _, idx := range tbl.indices {
_ = must(tableRootB.CreateBucketIfNotExists(idx.buck.Raw()))
}
ts := new(tableState)
if rawTS := tableRootB.Get(tableStateKey); rawTS != nil {
err := tableStateEncoding.DecodeValue(rawTS, reflect.ValueOf(ts))
if err != nil {
panic(tableErrf(tbl, nil, nil, err, "failed to decode table state"))
}
}
ts.table = tbl
if ts.Indices == nil {
ts.Indices = make(map[string]*indexState)
}
ts.LastSeen = now
ts.indexStates = make([]*indexState, len(tbl.indices))
ts.indexStatesByOrd = make(map[uint64]*indexState)
for i, idx := range tbl.indices {
is := ts.Indices[idx.name]
if is == nil {
ts.LastIndexOrdinal++
is = &indexState{
IndexOrdinal: ts.LastIndexOrdinal,
}
ts.Indices[idx.name] = is
}
is.index = idx
ts.indexStates[i] = is
ts.indexStatesByOrd[is.IndexOrdinal] = is
}
for k, is := range ts.Indices {
if is.index == nil {
dropDeletedIndex(tbl, tableRootB, k)
delete(ts.Indices, k)
}
}
return ts
}
func (ts *tableState) migrate(tx *Tx) {
tbl := ts.table
if ts.hasPendingIndices() {
for c := tx.TableScan(tbl, FullScan()); c.Next(); {
rowVal, _ := c.RowVal()
tx.PutVal(tbl, rowVal)
}
for _, is := range ts.Indices {
is.Built = true
}
}
}
func (ts *tableState) save(tx *Tx) {
// log.Printf("table state for %s: %s", ts.table.Name(), must(json.Marshal(ts)))
rawTS := tableStateEncoding.EncodeValue(nil, reflect.ValueOf(ts))
tableRootB := ts.table.rootBucketIn(tx.btx)
ensure(tableRootB.Put(tableStateKey, rawTS))
}
func dropDeletedIndex(tbl *Table, tableRootB *bbolt.Bucket, name string) {
err := tableRootB.DeleteBucket(makeIndexBucketName(name).Raw())
if err == bbolt.ErrBucketNotFound {
return
}
ensure(err)
log.Printf("deleted index %s.%s", tbl.Name(), name)
}
func prepareMap(tx *Tx, mp *KVMap) {
must(tx.btx.CreateBucketIfNotExists(mp.buck.Raw()))
}