-
Notifications
You must be signed in to change notification settings - Fork 672
/
voter.go
126 lines (107 loc) · 2.96 KB
/
voter.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
// (c) 2019-2020, Ava Labs, Inc. All rights reserved.
// See the file LICENSE for licensing terms.
package avalanche
import (
"github.com/ava-labs/avalanchego/ids"
"github.com/ava-labs/avalanchego/snow/consensus/snowstorm"
"github.com/ava-labs/avalanchego/snow/engine/avalanche/vertex"
)
// Voter records chits received from [vdr] once its dependencies are met.
type voter struct {
t *Transitive
vdr ids.ShortID
requestID uint32
response []ids.ID
deps ids.Set
}
func (v *voter) Dependencies() ids.Set { return v.deps }
// Mark that a dependency has been met.
func (v *voter) Fulfill(id ids.ID) {
v.deps.Remove(id)
v.Update()
}
// Abandon this attempt to record chits.
func (v *voter) Abandon(id ids.ID) { v.Fulfill(id) }
func (v *voter) Update() {
if v.deps.Len() != 0 || v.t.errs.Errored() {
return
}
results, finished := v.t.polls.Vote(v.requestID, v.vdr, v.response)
if !finished {
return
}
results, err := v.bubbleVotes(results)
if err != nil {
v.t.errs.Add(err)
return
}
v.t.Ctx.Log.Debug("Finishing poll with:\n%s", &results)
if err := v.t.Consensus.RecordPoll(results); err != nil {
v.t.errs.Add(err)
return
}
orphans := v.t.Consensus.Orphans()
txs := make([]snowstorm.Tx, 0, orphans.Len())
for orphanID := range orphans {
if tx, err := v.t.VM.Get(orphanID); err == nil {
txs = append(txs, tx)
} else {
v.t.Ctx.Log.Warn("Failed to fetch %s during attempted re-issuance", orphanID)
}
}
if len(txs) > 0 {
v.t.Ctx.Log.Debug("Re-issuing %d transactions", len(txs))
}
if _, err := v.t.batch(txs, true /*=force*/, false /*empty*/, false /*=limit*/); err != nil {
v.t.errs.Add(err)
return
}
if v.t.Consensus.Quiesce() {
v.t.Ctx.Log.Debug("Avalanche engine can quiesce")
return
}
v.t.Ctx.Log.Debug("Avalanche engine can't quiesce")
v.t.errs.Add(v.t.repoll())
}
func (v *voter) bubbleVotes(votes ids.UniqueBag) (ids.UniqueBag, error) {
vertexHeap := vertex.NewHeap()
for vote := range votes {
vtx, err := v.t.Manager.Get(vote)
if err != nil {
continue
}
vertexHeap.Push(vtx)
}
for vertexHeap.Len() > 0 {
vtx := vertexHeap.Pop()
vtxID := vtx.ID()
set := votes.GetSet(vtxID)
status := vtx.Status()
if !status.Fetched() {
v.t.Ctx.Log.Verbo("Dropping %d vote(s) for %s because the vertex is unknown",
set.Len(), vtxID)
votes.RemoveSet(vtxID)
continue
}
if status.Decided() {
v.t.Ctx.Log.Verbo("Dropping %d vote(s) for %s because the vertex is decided as %s",
set.Len(), vtxID, status)
votes.RemoveSet(vtxID)
continue
}
if !v.t.Consensus.VertexIssued(vtx) {
v.t.Ctx.Log.Verbo("Bubbling %d vote(s) for %s because the vertex isn't issued",
set.Len(), vtxID)
votes.RemoveSet(vtxID) // Remove votes for this vertex because it hasn't been issued
parents, err := vtx.Parents()
if err != nil {
return votes, err
}
for _, parentVtx := range parents {
votes.UnionSet(parentVtx.ID(), set)
vertexHeap.Push(parentVtx)
}
}
}
return votes, nil
}