-
Notifications
You must be signed in to change notification settings - Fork 672
/
acceptor.go
163 lines (132 loc) · 4.22 KB
/
acceptor.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
// Copyright (C) 2019-2022, Ava Labs, Inc. All rights reserved.
// See the file LICENSE for licensing terms.
package snow
import (
"fmt"
"sync"
"go.uber.org/zap"
"github.com/ava-labs/avalanchego/ids"
"github.com/ava-labs/avalanchego/utils/logging"
)
var (
_ Acceptor = noOpAcceptor{}
_ Acceptor = &AcceptorTracker{}
_ Acceptor = acceptorWrapper{}
_ AcceptorGroup = &acceptorGroup{}
)
// Acceptor is implemented when a struct is monitoring if a message is accepted
type Acceptor interface {
// Accept must be called before [containerID] is committed to the VM as
// accepted.
//
// If the returned error is non-nil, the chain associated with [ctx] should
// shut down and not commit [container] or any other container to its
// database as accepted.
Accept(ctx *ConsensusContext, containerID ids.ID, container []byte) error
}
type noOpAcceptor struct{}
func (noOpAcceptor) Accept(*ConsensusContext, ids.ID, []byte) error { return nil }
// AcceptorTracker tracks the dispatched accept events by its ID and counts.
// Useful for testing.
type AcceptorTracker struct {
lock sync.RWMutex
accepted map[ids.ID]int
}
func NewAcceptorTracker() *AcceptorTracker {
return &AcceptorTracker{
accepted: make(map[ids.ID]int),
}
}
func (a *AcceptorTracker) Accept(ctx *ConsensusContext, containerID ids.ID, container []byte) error {
a.lock.Lock()
a.accepted[containerID]++
a.lock.Unlock()
return nil
}
func (a *AcceptorTracker) IsAccepted(containerID ids.ID) (int, bool) {
a.lock.RLock()
count, ok := a.accepted[containerID]
a.lock.RUnlock()
return count, ok
}
type acceptorWrapper struct {
Acceptor
// If true and Accept returns an error, the chain this callback corresponds
// to will stop.
dieOnError bool
}
type AcceptorGroup interface {
// Calling Accept() calls all of the registered acceptors for the relevant
// chain.
Acceptor
// RegisterAcceptor causes [acceptor] to be called every time an operation
// is accepted on chain [chainID].
// If [dieOnError], chain [chainID] stops if Accept returns a non-nil error.
RegisterAcceptor(chainID ids.ID, acceptorName string, acceptor Acceptor, dieOnError bool) error
// DeregisterAcceptor removes an acceptor from the group.
DeregisterAcceptor(chainID ids.ID, acceptorName string) error
}
type acceptorGroup struct {
log logging.Logger
lock sync.RWMutex
// Chain ID --> Acceptor Name --> Acceptor
acceptors map[ids.ID]map[string]acceptorWrapper
}
func NewAcceptorGroup(log logging.Logger) AcceptorGroup {
return &acceptorGroup{
log: log,
acceptors: make(map[ids.ID]map[string]acceptorWrapper),
}
}
func (a *acceptorGroup) Accept(ctx *ConsensusContext, containerID ids.ID, container []byte) error {
a.lock.RLock()
defer a.lock.RUnlock()
for acceptorName, acceptor := range a.acceptors[ctx.ChainID] {
if err := acceptor.Accept(ctx, containerID, container); err != nil {
a.log.Error("failed accepting container",
zap.String("acceptorName", acceptorName),
zap.Stringer("chainID", ctx.ChainID),
zap.Stringer("containerID", containerID),
zap.Error(err),
)
if acceptor.dieOnError {
return fmt.Errorf("acceptor %s on chain %s erred while accepting %s: %w", acceptorName, ctx.ChainID, containerID, err)
}
}
}
return nil
}
func (a *acceptorGroup) RegisterAcceptor(chainID ids.ID, acceptorName string, acceptor Acceptor, dieOnError bool) error {
a.lock.Lock()
defer a.lock.Unlock()
acceptors, exist := a.acceptors[chainID]
if !exist {
acceptors = make(map[string]acceptorWrapper)
a.acceptors[chainID] = acceptors
}
if _, ok := acceptors[acceptorName]; ok {
return fmt.Errorf("callback %s already exists on chain %s", acceptorName, chainID)
}
acceptors[acceptorName] = acceptorWrapper{
Acceptor: acceptor,
dieOnError: dieOnError,
}
return nil
}
func (a *acceptorGroup) DeregisterAcceptor(chainID ids.ID, acceptorName string) error {
a.lock.Lock()
defer a.lock.Unlock()
acceptors, exist := a.acceptors[chainID]
if !exist {
return fmt.Errorf("chain %s has no callbacks", chainID)
}
if _, ok := acceptors[acceptorName]; !ok {
return fmt.Errorf("callback %s does not exist on chain %s", acceptorName, chainID)
}
if len(acceptors) == 1 {
delete(a.acceptors, chainID)
} else {
delete(acceptors, acceptorName)
}
return nil
}