forked from go-micro/go-micro
-
Notifications
You must be signed in to change notification settings - Fork 0
/
kubernetes.go
319 lines (279 loc) · 7.23 KB
/
kubernetes.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
// Package kubernetes implements kubernetes micro runtime
package kubernetes
import (
"errors"
"fmt"
"strconv"
"strings"
"sync"
"time"
"github.com/arun-spire/go-micro/runtime"
"github.com/arun-spire/go-micro/runtime/kubernetes/client"
"github.com/arun-spire/go-micro/util/log"
)
type kubernetes struct {
sync.RWMutex
// options configure runtime
options runtime.Options
// indicates if we're running
running bool
// used to start new services
start chan *runtime.Service
// used to stop the runtime
closed chan bool
// service tracks deployed services
services map[string]*runtime.Service
// client is kubernetes client
client client.Kubernetes
}
// NewRuntime creates new kubernetes runtime
func NewRuntime(opts ...runtime.Option) runtime.Runtime {
// get default options
options := runtime.Options{}
// apply requested options
for _, o := range opts {
o(&options)
}
// kubernetes client
client := client.NewClientInCluster()
return &kubernetes{
options: options,
closed: make(chan bool),
start: make(chan *runtime.Service, 128),
services: make(map[string]*runtime.Service),
client: client,
}
}
// Init initializes runtime options
func (k *kubernetes) Init(opts ...runtime.Option) error {
k.Lock()
defer k.Unlock()
for _, o := range opts {
o(&k.options)
}
return nil
}
// Registers a service
func (k *kubernetes) Create(s *runtime.Service, opts ...runtime.CreateOption) error {
k.Lock()
defer k.Unlock()
// TODO:
// * create service
// * create deployment
// NOTE: our services have micro- prefix
muName := strings.Split(s.Name, ".")
s.Name = "micro-" + muName[len(muName)-1]
// NOTE: we are tracking this in memory for now
if _, ok := k.services[s.Name]; ok {
return errors.New("service already registered")
}
var options runtime.CreateOptions
for _, o := range opts {
o(&options)
}
// save service
k.services[s.Name] = s
// push into start queue
k.start <- k.services[s.Name]
return nil
}
// Remove a service
func (k *kubernetes) Delete(s *runtime.Service) error {
k.Lock()
defer k.Unlock()
// TODO:
// * delete service
// * delete dpeloyment
// NOTE: we are tracking this in memory for now
if s, ok := k.services[s.Name]; ok {
delete(k.services, s.Name)
return nil
}
return nil
}
// Update the service in place
func (k *kubernetes) Update(s *runtime.Service) error {
type body struct {
Spec *client.Spec `json:"spec"`
}
// parse version into human readable timestamp
updateTimeStamp, err := strconv.ParseInt(s.Version, 10, 64)
if err != nil {
return err
}
unixTimeUTC := time.Unix(updateTimeStamp, 0)
// metada which we will PATCH deployment with
reqBody := body{
Spec: &client.Spec{
Template: &client.Template{
Metadata: &client.Metadata{
Annotations: map[string]string{
"build": unixTimeUTC.Format(time.RFC3339),
},
},
},
},
}
return k.client.UpdateDeployment(s.Name, reqBody)
}
// List the managed services
func (k *kubernetes) List() ([]*runtime.Service, error) {
labels := map[string]string{
"micro": "service",
}
// list all micro core deployments
deployments, err := k.client.ListDeployments(labels)
if err != nil {
return nil, err
}
log.Debugf("Runtime found %d micro deployments with labels %v", len(deployments.Items), labels)
services := make([]*runtime.Service, 0, len(deployments.Items))
for _, service := range deployments.Items {
buildTime, err := time.Parse(time.RFC3339, service.Metadata.Annotations["build"])
if err != nil {
log.Debugf("Runtime error parsing build time for %s: %v", service.Metadata.Name, err)
continue
}
// add the service to the list of services
svc := &runtime.Service{
Name: service.Metadata.Name,
Version: fmt.Sprintf("%d", buildTime.Unix()),
}
services = append(services, svc)
}
return services, nil
}
// run runs the runtime management loop
func (k *kubernetes) run(events <-chan runtime.Event) {
t := time.NewTicker(time.Second * 10)
defer t.Stop()
for {
select {
case <-t.C:
// check running services
services, err := k.List()
if err != nil {
log.Debugf("Runtime failed listing running services: %v", err)
continue
}
// TODO: for now we just log the running services
// * make sure all core deployments exist
// * make sure all core services are exposed
for _, service := range services {
log.Debugf("Runtime found running service: %v", service)
}
case service := <-k.start:
// TODO: this is a noop for now
// * create a deployment
// * expose a service
log.Debugf("Runtime starting service: %s", service.Name)
case event := <-events:
// NOTE: we only handle Update events for now
log.Debugf("Runtime received notification event: %v", event)
switch event.Type {
case runtime.Update:
// parse returned response to timestamp
updateTimeStamp, err := strconv.ParseInt(event.Version, 10, 64)
if err != nil {
log.Debugf("Runtime error parsing update build time: %v", err)
continue
}
buildTime := time.Unix(updateTimeStamp, 0)
processEvent := func(event runtime.Event, service *runtime.Service) error {
buildTimeStamp, err := strconv.ParseInt(service.Version, 10, 64)
if err != nil {
return err
}
muBuild := time.Unix(buildTimeStamp, 0)
if buildTime.After(muBuild) {
version := fmt.Sprintf("%d", buildTime.Unix())
muService := &runtime.Service{
Name: service.Name,
Source: service.Source,
Path: service.Path,
Exec: service.Exec,
Version: version,
}
if err := k.Update(muService); err != nil {
return err
}
service.Version = version
}
return nil
}
k.Lock()
if len(event.Service) > 0 {
service, ok := k.services[event.Service]
if !ok {
log.Debugf("Runtime unknown service: %s", event.Service)
k.Unlock()
continue
}
if err := processEvent(event, service); err != nil {
log.Debugf("Runtime error updating service %s: %v", event.Service, err)
}
k.Unlock()
continue
}
// if blank service was received we update all services
for _, service := range k.services {
if err := processEvent(event, service); err != nil {
log.Debugf("Runtime error updating service %s: %v", service.Name, err)
}
}
k.Unlock()
}
case <-k.closed:
log.Debugf("Runtime stopped")
return
}
}
}
// starts the runtime
func (k *kubernetes) Start() error {
k.Lock()
defer k.Unlock()
// already running
if k.running {
return nil
}
// set running
k.running = true
k.closed = make(chan bool)
var events <-chan runtime.Event
if k.options.Notifier != nil {
var err error
events, err = k.options.Notifier.Notify()
if err != nil {
// TODO: should we bail here?
log.Debugf("Runtime failed to start update notifier")
}
}
go k.run(events)
return nil
}
// Shutdown the runtime
func (k *kubernetes) Stop() error {
k.Lock()
defer k.Unlock()
if !k.running {
return nil
}
select {
case <-k.closed:
return nil
default:
close(k.closed)
// set not running
k.running = false
// stop the notifier too
if k.options.Notifier != nil {
return k.options.Notifier.Close()
}
}
return nil
}
// String implements stringer interface
func (k *kubernetes) String() string {
return "kubernetes"
}