This repository has been archived by the owner on Sep 4, 2021. It is now read-only.
/
store.go
93 lines (85 loc) · 2.08 KB
/
store.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
package main
import (
"encoding/json"
controller "github.com/flynn/flynn/controller/client"
ct "github.com/flynn/flynn/controller/types"
discoverd "github.com/flynn/flynn/discoverd/client"
"github.com/flynn/flynn/pkg/stream"
router "github.com/flynn/flynn/router/types"
)
type Store interface {
List() ([]*router.Route, error)
Watch(ch chan *router.Event) (stream.Stream, error)
}
func NewControllerStore() (*ControllerStore, error) {
instances, err := discoverd.NewService("controller").Instances()
if err != nil {
return nil, err
}
inst := instances[0]
client, err := controller.NewClient("http://"+inst.Addr, inst.Meta["AUTH_KEY"])
if err != nil {
return nil, err
}
return &ControllerStore{client}, nil
}
type ControllerStore struct {
client controller.Client
}
func (c *ControllerStore) List() ([]*router.Route, error) {
return c.client.RouteList()
}
func (c *ControllerStore) Watch(ch chan *router.Event) (stream.Stream, error) {
events := make(chan *ct.Event)
eventStream, err := c.client.StreamEvents(ct.StreamEventsOptions{
ObjectTypes: []ct.EventType{
ct.EventTypeRoute,
ct.EventTypeRouteDeletion,
},
}, events)
if err != nil {
return nil, err
}
routeStream := stream.New()
go func() {
defer close(ch)
defer eventStream.Close()
for {
select {
case event, ok := <-events:
if !ok {
routeStream.Error = eventStream.Err()
return
}
var route router.Route
if err := json.Unmarshal(event.Data, &route); err != nil {
routeStream.Error = err
return
}
routerEvent := &router.Event{
Event: c.toRouterEventType(event.ObjectType),
ID: route.ID,
Route: &route,
}
select {
case ch <- routerEvent:
case <-routeStream.StopCh:
return
}
case <-routeStream.StopCh:
return
}
}
}()
return routeStream, nil
}
func (c *ControllerStore) toRouterEventType(typ ct.EventType) router.EventType {
switch typ {
case ct.EventTypeRoute:
return router.EventTypeRouteSet
case ct.EventTypeRouteDeletion:
return router.EventTypeRouteRemove
default:
return router.EventType("")
}
}