forked from coreos/fleet
-
Notifications
You must be signed in to change notification settings - Fork 0
/
generator.go
129 lines (106 loc) · 2.99 KB
/
generator.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
/*
Copyright 2014 CoreOS, Inc.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package unit
import (
"encoding/json"
"time"
"github.com/coreos/fleet/log"
"github.com/coreos/fleet/pkg"
)
type UnitStateHeartbeat struct {
Name string
State *UnitState
}
func NewUnitStateGenerator(mgr UnitManager) *UnitStateGenerator {
return &UnitStateGenerator{
mgr: mgr,
subscribed: pkg.NewThreadsafeSet(),
}
}
type UnitStateGenerator struct {
mgr UnitManager
subscribed pkg.Set
lastSubscribed pkg.Set
}
func (g *UnitStateGenerator) MarshalJSON() ([]byte, error) {
data := struct {
Subscribed []string
}{
Subscribed: g.subscribed.Values(),
}
return json.Marshal(data)
}
// Run periodically calls Generate and sends received *UnitStateHeartbeat
// objects to the provided channel.
func (g *UnitStateGenerator) Run(receiver chan<- *UnitStateHeartbeat, stop chan bool) {
tick := time.Tick(time.Second)
for {
select {
case <-stop:
return
case <-tick:
beatchan, err := g.Generate()
if err != nil {
log.Errorf("Failed fetching current unit states: %v", err)
continue
}
for ush := range beatchan {
receiver <- ush
}
}
}
}
// Generate returns and fills a channel with *UnitStateHeartbeat objects. Objects will
// only be returned for units to which this generator is currently subscribed.
func (g *UnitStateGenerator) Generate() (<-chan *UnitStateHeartbeat, error) {
var lastSubscribed pkg.Set
if g.lastSubscribed != nil {
lastSubscribed = g.lastSubscribed.Copy()
}
subscribed := g.subscribed.Copy()
g.lastSubscribed = subscribed
reportable, err := g.mgr.GetUnitStates(subscribed)
if err != nil {
return nil, err
}
beatchan := make(chan *UnitStateHeartbeat)
go func() {
for name, us := range reportable {
us := us
beatchan <- &UnitStateHeartbeat{
Name: name,
State: us,
}
}
if lastSubscribed != nil {
// For all units that were part of the subscription list
// last time Generate ran, but are now not part of that
// list, send nil-State heartbeats to signal removal
for _, name := range lastSubscribed.Sub(subscribed).Values() {
beatchan <- &UnitStateHeartbeat{
Name: name,
}
}
}
close(beatchan)
}()
return beatchan, nil
}
// Subscribe adds a unit to the internal state filter
func (g *UnitStateGenerator) Subscribe(name string) {
g.subscribed.Add(name)
}
// Unsubscribe removes a unit from the internal state filter
func (g *UnitStateGenerator) Unsubscribe(name string) {
g.subscribed.Remove(name)
}