-
Notifications
You must be signed in to change notification settings - Fork 671
/
set.go
158 lines (134 loc) · 4.09 KB
/
set.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
// (c) 2019-2020, Ava Labs, Inc. All rights reserved.
// See the file LICENSE for licensing terms.
package poll
import (
"fmt"
"strings"
"time"
"github.com/prometheus/client_golang/prometheus"
"github.com/ava-labs/avalanchego/ids"
"github.com/ava-labs/avalanchego/utils/logging"
"github.com/ava-labs/avalanchego/utils/timer"
)
type poll struct {
Poll
start time.Time
}
type set struct {
log logging.Logger
numPolls prometheus.Gauge
durPolls prometheus.Histogram
factory Factory
polls map[uint32]poll
}
// NewSet returns a new empty set of polls
func NewSet(
factory Factory,
log logging.Logger,
namespace string,
registerer prometheus.Registerer,
) Set {
numPolls := prometheus.NewGauge(prometheus.GaugeOpts{
Namespace: namespace,
Name: "polls",
Help: "Number of pending network polls",
})
if err := registerer.Register(numPolls); err != nil {
log.Error("failed to register polls statistics due to %s", err)
}
durPolls := prometheus.NewHistogram(prometheus.HistogramOpts{
Namespace: namespace,
Name: "poll_duration",
Help: "Length of time the poll existed in milliseconds",
Buckets: timer.MillisecondsBuckets,
})
if err := registerer.Register(durPolls); err != nil {
log.Error("failed to register poll_duration statistics due to %s", err)
}
return &set{
log: log,
numPolls: numPolls,
durPolls: durPolls,
factory: factory,
polls: make(map[uint32]poll),
}
}
// Add to the current set of polls
// Returns true if the poll was registered correctly and the network sample
// should be made.
func (s *set) Add(requestID uint32, vdrs ids.ShortBag) bool {
if _, exists := s.polls[requestID]; exists {
s.log.Debug("dropping poll due to duplicated requestID: %d", requestID)
return false
}
s.log.Verbo("creating poll with requestID %d and validators %s",
requestID,
&vdrs)
s.polls[requestID] = poll{
Poll: s.factory.New(vdrs), // create the new poll
start: time.Now(),
}
s.numPolls.Inc() // increase the metrics
return true
}
// Vote registers the connections response to a query for [id]. If there was no
// query, or the response has already be registered, nothing is performed.
func (s *set) Vote(
requestID uint32,
vdr ids.ShortID,
vote ids.ID,
) (ids.Bag, bool) {
poll, exists := s.polls[requestID]
if !exists {
s.log.Verbo("dropping vote from %s to an unknown poll with requestID: %d",
vdr,
requestID)
return ids.Bag{}, false
}
s.log.Verbo("processing vote from %s in the poll with requestID: %d with the vote %s",
vdr,
requestID,
vote)
poll.Vote(vdr, vote)
if !poll.Finished() {
return ids.Bag{}, false
}
s.log.Verbo("poll with requestID %d finished as %s", requestID, poll)
delete(s.polls, requestID) // remove the poll from the current set
s.durPolls.Observe(float64(time.Since(poll.start).Milliseconds()))
s.numPolls.Dec() // decrease the metrics
return poll.Result(), true
}
// Drop registers the connections response to a query for [id]. If there was no
// query, or the response has already be registered, nothing is performed.
func (s *set) Drop(requestID uint32, vdr ids.ShortID) (ids.Bag, bool) {
poll, exists := s.polls[requestID]
if !exists {
s.log.Verbo("dropping vote from %s to an unknown poll with requestID: %d",
vdr,
requestID)
return ids.Bag{}, false
}
s.log.Verbo("processing dropped vote from %s in the poll with requestID: %d",
vdr,
requestID)
poll.Drop(vdr)
if !poll.Finished() {
return ids.Bag{}, false
}
s.log.Verbo("poll with requestID %d finished as %s", requestID, poll)
delete(s.polls, requestID) // remove the poll from the current set
s.durPolls.Observe(float64(time.Since(poll.start).Milliseconds()))
s.numPolls.Dec() // decrease the metrics
return poll.Result(), true
}
// Len returns the number of outstanding polls
func (s *set) Len() int { return len(s.polls) }
func (s *set) String() string {
sb := strings.Builder{}
sb.WriteString(fmt.Sprintf("current polls: (Size = %d)", len(s.polls)))
for requestID, poll := range s.polls {
sb.WriteString(fmt.Sprintf("\n %d: %s", requestID, poll.PrefixedString(" ")))
}
return sb.String()
}