/
replicator.go
187 lines (149 loc) · 3.88 KB
/
replicator.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
package turing
import (
"io"
"github.com/cockroachdb/pebble"
"github.com/lni/dragonboat/v3/statemachine"
"github.com/256dpi/turing/wire"
)
type replicator struct {
config Config
registry *registry
manager *manager
database *database
instructions []Instruction
operations []wire.Operation
references []Ref
}
func newReplicator(config Config, registry *registry, manager *manager) *replicator {
return &replicator{
config: config,
registry: registry,
manager: manager,
instructions: make([]Instruction, config.ProposalBatchSize),
operations: make([]wire.Operation, config.ProposalBatchSize),
references: make([]Ref, config.ProposalBatchSize),
}
}
func (r *replicator) Open(stop <-chan struct{}) (uint64, error) {
// open database
database, index, err := openDatabase(r.config, r.registry, r.manager)
if err != nil {
return 0, err
}
// set database
r.database = database
return index, nil
}
var replicatorUpdate = systemMetrics.WithLabelValues("replicator.Update")
func (r *replicator) Update(entries []statemachine.Entry) ([]statemachine.Entry, error) {
// observe
timer := observe(replicatorUpdate)
defer timer.finish()
// handle entries
for i, entry := range entries {
// reset lists
instructions := r.instructions[:0]
operations := r.operations[:0]
references := r.references[:0]
// decode command
err := wire.WalkCommand(entry.Cmd, func(i int, op wire.Operation) (bool, error) {
// build instruction
ins, err := r.registry.build(op.Name)
if err != nil {
return false, err
}
// decode instruction
err = ins.Decode(op.Code)
if err != nil {
return false, err
}
// add instruction
instructions = append(instructions, ins)
return true, nil
})
if err != nil {
return nil, err
}
// execute instructions
err = r.database.update(instructions, entry.Index)
if err != nil {
return nil, err
}
// encode operations
for _, ins := range instructions {
// append empty operation when no result
if ins.Describe().NoResult {
operations = append(operations, wire.Operation{
Name: ins.Describe().Name,
})
continue
}
// encode instruction
bytes, ref, err := ins.Encode()
if err != nil {
return nil, err
}
// set append operation
operations = append(operations, wire.Operation{
Name: ins.Describe().Name,
Code: bytes,
})
// append reference
if ref != nil {
references = append(references, ref)
}
// recycle instruction if possible
recycler := ins.Describe().Recycler
if recycler != nil {
recycler(ins)
}
}
// prepare command
cmd := wire.Command{
Operations: operations,
}
// TODO: Borrow slice.
// Improve dragonboat to provide a release mechanism
// encode command
bytes, _, err := cmd.Encode(false)
if err != nil {
return nil, err
}
// release references
for _, ref := range references {
ref.Release()
}
// set result
entries[i].Result.Data = bytes
}
return entries, nil
}
func (r *replicator) Sync() error {
return r.database.sync()
}
var replicatorLookup = systemMetrics.WithLabelValues("replicator.Lookup")
func (r *replicator) Lookup(data interface{}) (interface{}, error) {
// observe
timer := observe(replicatorLookup)
defer timer.finish()
// get instructions
list := data.([]Instruction)
// perform lookup
err := r.database.lookup(list)
if err != nil {
return nil, err
}
return nil, nil
}
func (r *replicator) PrepareSnapshot() (interface{}, error) {
return r.database.snapshot()
}
func (r *replicator) SaveSnapshot(snapshot interface{}, sink io.Writer, abort <-chan struct{}) error {
return r.database.backup(snapshot.(*pebble.Snapshot), sink, abort)
}
func (r *replicator) RecoverFromSnapshot(source io.Reader, abort <-chan struct{}) error {
return r.database.restore(source)
}
func (r *replicator) Close() error {
return r.database.close()
}