-
Notifications
You must be signed in to change notification settings - Fork 2
/
discovery.go
187 lines (153 loc) · 4.69 KB
/
discovery.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
package server
import (
"os"
"sync"
"time"
log "github.com/cihub/seelog"
dscShared "github.com/hailo-platform/H2O/discovery-service/proto"
register "github.com/hailo-platform/H2O/discovery-service/proto/register"
"github.com/hailo-platform/H2O/platform/client"
"github.com/hailo-platform/H2O/platform/util"
"github.com/hailo-platform/H2O/service/auth"
"github.com/hailo-platform/H2O/protobuf/proto"
)
const (
lostContactInterval = 60 * time.Second
tryDiscoveryInterval = 10 * time.Second
maxDisconnects = 5
)
type discovery struct {
sync.RWMutex
id string
reg *registry
hostname string
connected bool
hb *heartbeat
// isMultiRegistered will be set to true if we believe we're registered with the discovery service. If this is
// false, we won't wait for a heartbeat timeout to try multiregistering again
isMultiRegistered bool
}
func newDiscovery(opts *Options) *discovery {
d := &discovery{
hostname: hostname,
connected: false,
hb: newHeartbeat(lostContactInterval),
}
go d.tick(opts.Die)
return d
}
func (self *discovery) tick(die bool) {
failCount := 0
ticker := time.NewTicker(tryDiscoveryInterval)
for {
select {
case <-ticker.C:
self.Lock()
if !self.isMultiRegistered || !self.hb.healthy() {
failCount++
log.Infof("[Server] Service has not received heartbeats within %v and is now disconnected", lostContactInterval)
if failCount >= maxDisconnects && die {
log.Criticalf("[Service] Max disconnects (%d) reached, bye bye cruel world", maxDisconnects)
cleanupLogs()
os.Exit(1)
}
self.connected = false
self.Unlock()
if err := self.connect(); err == nil {
// Successful connection = back to zero
failCount = 0
}
} else {
self.Unlock()
}
}
}
}
// connect our service/endpoints to the hive mind
// send discovery information on each endpoint to the discovery service
// if successful, marks us as connected
func (self *discovery) connect() error {
self.Lock()
defer self.Unlock()
log.Trace("[Discovery] Connecting")
if err := self.callDiscoveryService("multiregister", true); err != nil {
self.isMultiRegistered = false
return err
}
// now connected - set auth scope for service-to-service
// @todo ask moddie if this is the place to do this?
// I've put it here because i want the login service to work the same way, and it needs to message itself
if serviceToServiceAuth {
auth.SetCurrentService(Name)
}
self.isMultiRegistered = true
return nil
}
// disconnect our service/endpoints so we can quit cleanly
func (self *discovery) disconnect() error {
self.Lock()
defer self.Unlock()
if self.connected {
return self.callDiscoveryService("unregister", false)
}
return nil
}
// callDiscoveryService sends off a request to register or unregister to the discovery service
func (self *discovery) callDiscoveryService(action string, successState bool) error {
log.Infof("[Server] Attempting to %s with the discovery service...", action)
azName, _ := util.GetAwsAZName()
regSize := reg.size()
machineClass := os.Getenv("H2O_MACHINE_CLASS")
endpoints := make([]*register.MultiRequest_Endpoint, regSize)
i := 0
for _, endpoint := range reg.iterate() {
endpoints[i] = ®ister.MultiRequest_Endpoint{
Name: proto.String(endpoint.Name),
Mean: proto.Int32(endpoint.Mean),
Upper95: proto.Int32(endpoint.Upper95),
Subscribe: proto.String(endpoint.Subscribe),
}
i++
}
service := &dscShared.Service{
Name: proto.String(Name),
Description: proto.String(Description),
Version: proto.Uint64(Version),
Source: proto.String(Source),
OwnerEmail: proto.String(OwnerEmail),
OwnerMobile: proto.String(OwnerMobile),
OwnerTeam: proto.String(OwnerTeam),
}
request, err := ScopedRequest(
"com.hailocab.kernel.discovery",
action,
®ister.MultiRequest{
InstanceId: proto.String(InstanceID),
Hostname: proto.String(self.hostname),
MachineClass: proto.String(machineClass),
AzName: proto.String(azName),
Service: service,
Endpoints: endpoints,
},
)
if err != nil {
log.Warnf("[Server] Failed to build request when %sing services", action)
return err
}
// explicitly define timeout, since we're happy to wait
clientOptions := client.Options{"retries": 0, "timeout": 5 * time.Second}
rsp := ®ister.Response{}
if err := client.Req(request, rsp, clientOptions); err != nil {
log.Warnf("[Server] Failed to %s services: %v", action, err)
return err
}
// ok -- all done!
self.connected = successState
log.Infof("[Server] Successfully %sed with the hive mind!", action)
return nil
}
func (self *discovery) IsConnected() bool {
self.RLock()
defer self.RUnlock()
return self.connected
}