-
Notifications
You must be signed in to change notification settings - Fork 20
/
vardispatcher.go
77 lines (66 loc) · 2.54 KB
/
vardispatcher.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
package txnengine
import (
"fmt"
"goshawkdb.io/common"
cmsgs "goshawkdb.io/common/capnp"
"goshawkdb.io/server"
msgs "goshawkdb.io/server/capnp"
"goshawkdb.io/server/configuration"
"goshawkdb.io/server/db"
"goshawkdb.io/server/dispatcher"
)
type TopologyPublisher interface {
AddTopologySubscriber(TopologyChangeSubscriberType, TopologySubscriber) *configuration.Topology
RemoveTopologySubscriberAsync(TopologyChangeSubscriberType, TopologySubscriber)
}
type TopologySubscriber interface {
TopologyChanged(*configuration.Topology, func(bool))
}
type TopologyChangeSubscriberType uint8
const (
VarSubscriber TopologyChangeSubscriberType = iota
ProposerSubscriber TopologyChangeSubscriberType = iota
AcceptorSubscriber TopologyChangeSubscriberType = iota
ConnectionSubscriber TopologyChangeSubscriberType = iota
ConnectionManagerSubscriber TopologyChangeSubscriberType = iota
EmigratorSubscriber TopologyChangeSubscriberType = iota
TopologyChangeSubscriberTypeLimit int = iota
)
type VarDispatcher struct {
dispatcher.Dispatcher
varmanagers []*VarManager
}
func NewVarDispatcher(count uint8, rmId common.RMId, cm TopologyPublisher, db *db.Databases, lc LocalConnection) *VarDispatcher {
vd := &VarDispatcher{
varmanagers: make([]*VarManager, count),
}
vd.Dispatcher.Init(count)
for idx, exe := range vd.Executors {
vd.varmanagers[idx] = NewVarManager(exe, rmId, cm, db, lc)
}
return vd
}
func (vd *VarDispatcher) ApplyToVar(fun func(*Var), createIfMissing bool, vUUId *common.VarUUId) {
vd.withVarManager(vUUId, func(vm *VarManager) { vm.ApplyToVar(fun, createIfMissing, vUUId) })
}
func (vd *VarDispatcher) Status(sc *server.StatusConsumer) {
sc.Emit("Vars")
for idx, executor := range vd.Executors {
s := sc.Fork()
s.Emit(fmt.Sprintf("Var Manager %v", idx))
manager := vd.varmanagers[idx]
executor.Enqueue(func() { manager.Status(s) })
}
sc.Join()
}
func (vd *VarDispatcher) withVarManager(vUUId *common.VarUUId, fun func(*VarManager)) bool {
idx := uint8(vUUId[server.MostRandomByteIndex]) % vd.ExecutorCount
executor := vd.Executors[idx]
manager := vd.varmanagers[idx]
return executor.Enqueue(func() { fun(manager) })
}
type TranslationCallback func(*cmsgs.ClientAction, *msgs.Action, []common.RMId, map[common.RMId]bool) error
type LocalConnection interface {
RunClientTransaction(*cmsgs.ClientTxn, map[common.VarUUId]*common.Positions, TranslationCallback) (*TxnReader, *msgs.Outcome, error)
Status(*server.StatusConsumer)
}