forked from leaf-ai/go-service
/
components.go
104 lines (89 loc) · 2.48 KB
/
components.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
// Copyright 2018-2021 (c) The Go Service Components authors. All rights reserved. Issued under the Apache 2.0 License.
package components // import "github.com/leaf-ai/go-service/pkg/components"
// This file contains the implementation of a map of strings and booleans that are updated
// by a servers critical components. Clients can also request that if any of the components goes into
// a down (false) condition that they will be informed using a channel and when the entire collection
// goes into a true condition that they will also be informed.
//
// This allows for a server to maintain an overall check on whether any of its critical
// components are down and when they are all alive which is useful in health checking.
import (
"sync"
"time"
"golang.org/x/net/context"
)
type Components struct {
listeners []chan bool
components map[string]bool
clientUpdate chan struct{}
sync.Mutex
}
func (comps *Components) AddListener(listener chan bool) {
comps.Lock()
defer comps.Unlock()
comps.listeners = append(comps.listeners, listener)
}
func (comps *Components) SetModule(module string, up bool) {
comps.Lock()
defer comps.Unlock()
comps.components[module] = up
select {
case comps.clientUpdate <- struct{}{}:
default:
}
}
func (comps *Components) doUpdate() {
comps.Lock()
defer comps.Unlock()
// Is the sever entirely up or not
up := true
for _, v := range comps.components {
if !v {
up = false
break
}
}
// Tell everyone what the collective state is for the server
for i, listener := range comps.listeners {
func() {
defer func() {
// A send to a closed channel will panic and so if a
// panic does occur we remove the listener
if r := recover(); r != nil {
comps.Lock()
defer comps.Unlock()
if len(comps.listeners) <= 1 {
comps.listeners = []chan bool{}
return
}
comps.listeners = append(comps.listeners[:i], comps.listeners[i+1:]...)
}
}()
select {
case <-time.After(20 * time.Millisecond):
case listener <- up:
}
}()
}
}
func InitComponentTracking(ctx context.Context) (comps *Components) {
comps = &Components{
listeners: []chan bool{},
components: map[string]bool{},
clientUpdate: make(chan struct{}),
}
go func(comps *Components) {
internalCheck := time.Duration(5 * time.Second)
for {
select {
case <-time.After(internalCheck):
comps.doUpdate()
case <-comps.clientUpdate:
comps.doUpdate()
case <-ctx.Done():
return
}
}
}(comps)
return comps
}