/
machine.go
180 lines (149 loc) · 3.9 KB
/
machine.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
package turing
import (
"fmt"
)
// Options define options used during instruction execution.
type Options struct {
// StaleRead can be set to execute a stale read. While this is much faster
// the instruction might read stale data in relationship to the current
// state of the cluster. In other words, settings this value will reduce
// the default linearizable guarantee to a serializable guarantee.
StaleRead bool
}
// Machine maintains a raft cluster with members and maintains consensus about
// the executed instructions on the replicated database.
type Machine struct {
config Config
registry *registry
manager *manager
coordinator *coordinator
controller *controller
}
// Start will create a new machine using the specified configuration.
func Start(config Config) (*Machine, error) {
// validate config
err := config.Validate()
if err != nil {
return nil, err
}
// build registry
registry, err := buildRegistry(config)
if err != nil {
return nil, err
}
// prepare manager
manager := newManager()
// prepare coordinator
var coordinator *coordinator
if !config.Standalone {
coordinator, err = createCoordinator(config, registry, manager)
if err != nil {
return nil, err
}
}
// prepare controller
var controller *controller
if config.Standalone {
controller, err = createController(config, registry, manager)
if err != nil {
return nil, err
}
}
// create machine
m := &Machine{
config: config,
registry: registry,
manager: manager,
coordinator: coordinator,
controller: controller,
}
return m, nil
}
var machineExecute = systemMetrics.WithLabelValues("Machine.Execute")
// Execute will execute the specified instruction.
func (m *Machine) Execute(ins Instruction, opts ...Options) error {
return m.execute(ins, nil, opts...)
}
// ExecuteAsync will execute the specified instruction asynchronously. The
// specified function is called once the instruction has been executed.
func (m *Machine) ExecuteAsync(ins Instruction, fn func(error), opts ...Options) error {
return m.execute(ins, fn, opts...)
}
func (m *Machine) execute(ins Instruction, fn func(error), opts ...Options) error {
// observe
timer := observe(machineExecute)
defer timer.finish()
// get options
var options Options
if len(opts) == 1 {
options = opts[0]
}
// get description
desc := ins.Describe()
// validate description
err := desc.Validate()
if err != nil {
return err
}
// get effect
effect := ins.Effect()
if effect > m.config.MaxEffect {
return fmt.Errorf("turing: instruction effect too high")
} else if effect < 0 && effect != UnboundedEffect {
return fmt.Errorf("turing: invalid instruction effect")
}
// check registry
if m.registry.ins[desc.Name] == nil {
return fmt.Errorf("turing: missing instruction: %s", desc.Name)
}
// execute directly if standalone
if m.config.Standalone {
// perform lookup
if effect == 0 {
return m.controller.lookup(ins, fn)
}
// perform update
return m.controller.update(ins, fn)
}
// immediately perform read
if effect == 0 {
err = m.coordinator.lookup(ins, fn, options)
if err != nil {
return err
}
return nil
}
// perform update
err = m.coordinator.update(ins, fn)
if err != nil {
return err
}
return nil
}
// Subscribe will subscribe the provided observer.
func (m *Machine) Subscribe(observer Observer) {
m.manager.subscribe(observer)
}
// Unsubscribe will unsubscribe the provided observer.
func (m *Machine) Unsubscribe(observer Observer) {
m.manager.unsubscribe(observer)
}
// Status will return the current status.
func (m *Machine) Status() Status {
// get status from coordinator
if m.coordinator != nil {
return m.coordinator.status()
}
return Status{}
}
// Stop will stop the machine.
func (m *Machine) Stop() {
// close coordinator
if m.coordinator != nil {
m.coordinator.close()
}
// close controller
if m.controller != nil {
_ = m.controller.close()
}
}