forked from vitessio/vitess
-
Notifications
You must be signed in to change notification settings - Fork 1
/
health.go
155 lines (133 loc) · 4.08 KB
/
health.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
package health
import (
"fmt"
"html/template"
"strings"
"sync"
"github.com/youtube/vitess/go/vt/concurrency"
"github.com/youtube/vitess/go/vt/topo"
)
var (
defaultAggregator *Aggregator
)
const (
// ReplicationLag should be the key for any reporters
// reporting MySQL repliaction lag.
ReplicationLag = "replication_lag"
// ReplicationLagHigh should be the value for any reporters
// indicating that the replication lag is too high.
ReplicationLagHigh = "high"
)
func init() {
defaultAggregator = NewAggregator()
}
// Reporter reports the health status of a tablet.
type Reporter interface {
// Report returns a map of health states for the tablet
// assuming that its tablet type is typ. If Report returns an
// error it implies that the tablet is in a bad shape and not
// able to handle queries.
Report(typ topo.TabletType) (status map[string]string, err error)
// HTMLName returns a displayable name for the module.
// Can be used to be displayed in the status page.
HTMLName() template.HTML
}
// FunctionReporter is a function that may act as a Reporter.
type FunctionReporter func(typ topo.TabletType) (map[string]string, error)
// Report implements Reporter.Report
func (fc FunctionReporter) Report(typ topo.TabletType) (status map[string]string, err error) {
return fc(typ)
}
// HTMLName implements Reporter.HTMLName
func (fc FunctionReporter) HTMLName() template.HTML {
return template.HTML("FunctionReporter")
}
// Aggregator aggregates the results of many Reporters.
type Aggregator struct {
// mu protects all fields below its declaration.
mu sync.Mutex
reporters map[string]Reporter
}
// NewAggregator returns a new empty Aggregator
func NewAggregator() *Aggregator {
return &Aggregator{
reporters: make(map[string]Reporter),
}
}
// Run runs aggregates health statuses from all the reporters. If any
// errors occur during the reporting, they will be logged, but only
// the first error will be returned.
// It may return an empty map if no health condition is detected. Note
// it will not return nil, but an empty map.
func (ag *Aggregator) Run(typ topo.TabletType) (map[string]string, error) {
var (
wg sync.WaitGroup
rec concurrency.AllErrorRecorder
)
results := make(chan map[string]string, len(ag.reporters))
ag.mu.Lock()
for name, rep := range ag.reporters {
wg.Add(1)
go func(name string, rep Reporter) {
defer wg.Done()
status, err := rep.Report(typ)
if err != nil {
rec.RecordError(fmt.Errorf("%v: %v", name, err))
return
}
results <- status
}(name, rep)
}
ag.mu.Unlock()
wg.Wait()
close(results)
if err := rec.Error(); err != nil {
return nil, err
}
// merge and return the results
result := make(map[string]string)
for part := range results {
for k, v := range part {
if _, ok := result[k]; ok {
return nil, fmt.Errorf("duplicate key: %v", k)
}
result[k] = v
}
}
return result, nil
}
// Register registers rep with ag. Only keys specified in keys will be
// aggregated from this particular Reporter.
func (ag *Aggregator) Register(name string, rep Reporter) {
ag.mu.Lock()
defer ag.mu.Unlock()
if _, ok := ag.reporters[name]; ok {
panic("reporter named " + name + " is already registered")
}
ag.reporters[name] = rep
}
// HTMLName returns an aggregate name for all the reporters
func (ag *Aggregator) HTMLName() template.HTML {
ag.mu.Lock()
defer ag.mu.Unlock()
result := make([]string, 0, len(ag.reporters))
for _, rep := range ag.reporters {
result = append(result, string(rep.HTMLName()))
}
return template.HTML(strings.Join(result, " + "))
}
// Run collects all the health statuses from the default health
// aggregator.
func Run(typ topo.TabletType) (map[string]string, error) {
return defaultAggregator.Run(typ)
}
// Register registers rep under name with the default health
// aggregator. Only keys specified in keys will be aggregated from
// this particular Reporter.
func Register(name string, rep Reporter) {
defaultAggregator.Register(name, rep)
}
// HTMLName returns an aggregate name for the default reporter
func HTMLName() template.HTML {
return defaultAggregator.HTMLName()
}