-
Notifications
You must be signed in to change notification settings - Fork 0
/
schematable.go
213 lines (184 loc) · 5.57 KB
/
schematable.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
package edb
import (
"bytes"
"fmt"
"reflect"
"strings"
"go.etcd.io/bbolt"
)
type Table struct {
schema *Schema
name string
latestSchemaVer uint64
pos int // index in schema.tables, unstable across code changes
buck bucketName
rowType reflect.Type
rowTypePtr reflect.Type
rowInfo *structInfo
indices []*Index
indicesByName map[string]*Index
indexer func(row any, ib *IndexBuilder)
keyEnc *flatEncoding
keyType reflect.Type
valueEnc encodingMethod
keyStringSep string
zeroKey []byte
migrator func(tx *Tx, row any, oldVer uint64)
suppressContent bool
}
func (tbl *Table) Name() string {
return tbl.name
}
type tableOpt int
const (
SuppressContentWhenLogging = tableOpt(1)
)
func AddTable[Row any](scm *Schema, name string, latestSchemaVer uint64, indexer func(row *Row, ib *IndexBuilder), migrator func(tx *Tx, row *Row, oldVer uint64), indices []*Index, opts ...any) *Table {
return DefineTable[Row, any](scm, name, func(b *TableBuilder[Row, any]) {
b.SetSchemaVersion(latestSchemaVer)
if indexer != nil {
b.Indexer(indexer)
}
if migrator != nil {
b.Migrate(migrator)
}
for _, idx := range indices {
b.AddIndex(idx)
}
for _, opt := range opts {
switch opt := opt.(type) {
case tableOpt:
if opt == SuppressContentWhenLogging {
b.SuppressContentWhenLogging()
}
default:
panic(fmt.Errorf("invalid option %T %v", opt, opt))
}
}
})
}
func (tbl *Table) AddIndex(idx *Index) *Table {
if tbl.indicesByName[idx.name] != nil {
panic(fmt.Errorf("table %s already has index named %q", tbl.name, idx.name))
}
idx.pos = len(tbl.indices)
tbl.indices = append(tbl.indices, idx)
tbl.indicesByName[idx.name] = idx
idx.table = tbl
return tbl
}
func (tbl *Table) IndexNamed(name string) *Index {
return tbl.indicesByName[name]
}
func (tbl *Table) KeyType() reflect.Type {
return tbl.keyType
}
func (tbl *Table) newRow(schemaVer uint64) reflect.Value {
// TODO: create legacy version if needed
return reflect.New(tbl.rowType)
}
func (tbl *Table) rootBucketIn(btx *bbolt.Tx) *bbolt.Bucket {
return nonNil(btx.Bucket(tbl.buck.Raw()))
}
func (tbl *Table) dataBucketIn(tableRootB *bbolt.Bucket) *bbolt.Bucket {
return nonNil(tableRootB.Bucket(dataBucket.Raw()))
}
func (tbl *Table) ensureCorrectKeyType(keyVal reflect.Value) reflect.Value {
if keyVal.Type() != tbl.keyType {
if keyVal.CanConvert(tbl.keyType) {
return keyVal.Convert(tbl.keyType)
}
panic(fmt.Errorf("%s: key must be %v, got %v %v", tbl.name, tbl.keyType, keyVal.Type(), keyVal.Interface()))
}
return keyVal
}
func (tbl *Table) RowKeyVal(rowVal reflect.Value) reflect.Value {
return tbl.rowInfo.keyValue(rowVal)
}
func (tbl *Table) RowKey(row any) any {
return tbl.RowKeyVal(reflect.ValueOf(row)).Interface()
}
func (tbl *Table) SetRowKey(row, key any) {
tbl.SetRowKeyVal(reflect.ValueOf(row), reflect.ValueOf(key))
}
func (tbl *Table) SetRowKeyVal(rowVal, keyVal reflect.Value) {
tbl.RowKeyVal(rowVal).Set(keyVal)
}
func (tbl *Table) EncodeKey(key any) []byte {
return tbl.EncodeKeyVal(reflect.ValueOf(key))
}
func (tbl *Table) EncodeKeyVal(keyVal reflect.Value) []byte {
return tbl.keyEnc.encode(nil, keyVal)
}
func (tbl *Table) encodeKeyVal(buf []byte, key reflect.Value, zeroOK bool) []byte {
buf = tbl.keyEnc.encode(buf, key)
if !zeroOK && bytes.Equal(buf, tbl.zeroKey) {
v := key.Interface()
panic(fmt.Errorf("attempt to encode zero key for table %s: %T %v", tbl.Name(), v, v))
}
return buf
}
func (tbl *Table) RowHasZeroKey(row any) bool {
return tbl.RowValHasZeroKey(reflect.ValueOf(row))
}
func (tbl *Table) RowValHasZeroKey(rowVal reflect.Value) bool {
// TODO: better logic here!
return tbl.RowKeyVal(rowVal).IsZero()
}
func (tbl *Table) encodeRowVal(buf []byte, rowVal reflect.Value) []byte {
return tbl.valueEnc.EncodeValue(buf, rowVal)
}
func (tbl *Table) DecodeKeyVal(rawKey []byte) reflect.Value {
keyVal := reflect.New(tbl.keyType).Elem()
tbl.DecodeKeyValInto(keyVal, rawKey)
return keyVal
}
func (tbl *Table) DecodeKeyValInto(keyVal reflect.Value, rawKey []byte) {
err := tbl.keyEnc.decodeVal(rawKey, keyVal)
if err != nil {
panic(fmt.Errorf("failed to decode %s key %q: %w", tbl.name, rawKey, err))
}
}
func (tbl *Table) RawKeyString(keyRaw []byte) string {
tup, err := decodeTuple(keyRaw)
if err != nil {
panic(fmt.Errorf("%s key: %w", tbl.name, err))
}
return strings.Join(tbl.keyEnc.tupleToStrings(tup), tbl.keyStringSep)
// keyVal := tbl.decodeKey(keyRaw)
// return fmt.Sprint(keyVal.Interface())
}
func (tbl *Table) KeyString(key any) string {
return tbl.RawKeyString(tbl.EncodeKey(key))
}
func (tbl *Table) parseRawKeyFrom(buf []byte, s string) ([]byte, error) {
return tbl.keyEnc.stringsToRawKey(buf, strings.Split(s, tbl.keyStringSep))
}
func (tbl *Table) ParseKeyVal(s string) (reflect.Value, error) {
buf := keyBytesPool.Get().([]byte)
defer releaseKeyBytes(buf)
raw, err := tbl.parseRawKeyFrom(buf, s)
if err != nil {
return reflect.Value{}, err
}
return tbl.DecodeKeyVal(raw), nil
}
func (tbl *Table) ParseKey(s string) (any, error) {
val, err := tbl.ParseKeyVal(s)
if err != nil {
return nil, err
}
return val.Interface(), nil
}
func (tbl *Table) NewRowVal() reflect.Value {
return reflect.New(tbl.rowType)
}
func (tbl *Table) NewRow() any {
return tbl.NewRowVal().Interface()
}
func (tbl *Table) decodeRow(buf []byte) (reflect.Value, error) {
rowVal := reflect.New(tbl.rowType)
err := tbl.valueEnc.DecodeValue(buf, rowVal)
// log.Printf("decoded row from %q: %#v", buf, rowVal.Elem().Interface())
return rowVal, err
}