/
loop.go
96 lines (86 loc) · 2.48 KB
/
loop.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
package servicebroker
import (
"context"
"errors"
"github.com/deis/steward-framework"
"github.com/deis/steward-framework/k8s/data"
"k8s.io/client-go/pkg/watch"
)
var (
ErrCancelled = errors.New("stopped")
ErrNotAServiceBroker = errors.New("not a service broker")
ErrWatchClosed = errors.New("watch closed")
)
// RunLoop starts a blocking control loop that watches and takes action on service broker resources
func RunLoop(
ctx context.Context,
namespace string,
fn WatchServiceBrokerFunc,
updateFn UpdateServiceBrokerFunc,
cataloger framework.Cataloger,
createSvcClassFunc CreateServiceClassFunc) error {
watcher, err := fn(namespace)
if err != nil {
return err
}
ch := watcher.ResultChan()
defer watcher.Stop()
for {
select {
case <-ctx.Done():
return ErrCancelled
case evt, open := <-ch:
if !open {
watcher, err = fn(namespace)
if err != nil {
logger.Errorf("service broker loop watch channel was closed")
return ErrWatchClosed
}
ch = watcher.ResultChan()
}
logger.Debugf("service broker loop received event")
switch evt.Type {
case watch.Added:
if err := handleAddServiceBroker(ctx, cataloger, updateFn, createSvcClassFunc, evt); err != nil {
// TODO: try the handler again. See https://github.com/deis/steward-framework/issues/26
logger.Errorf("add service broker event handler failed (%s)", err)
}
}
}
}
}
func handleAddServiceBroker(
ctx context.Context,
cataloger framework.Cataloger,
updateFn UpdateServiceBrokerFunc,
createServiceClass CreateServiceClassFunc,
evt watch.Event) error {
serviceBroker := new(data.ServiceBroker)
if err := data.TranslateToTPR(evt.Object, serviceBroker, data.ServiceBrokerKind); err != nil {
return ErrNotAServiceBroker
}
serviceBroker.Status.State = data.ServiceBrokerStatePending
serviceBroker, err := updateFn(serviceBroker)
if err != nil {
return err
}
finalServiceBrokerState := data.ServiceBrokerStateFailed
defer func() {
serviceBroker.Status.State = finalServiceBrokerState
if _, err := updateFn(serviceBroker); err != nil {
logger.Errorf("failed to update service broker to state %s (%s)", finalServiceBrokerState, err)
}
}()
svcs, err := cataloger.List(ctx, serviceBroker.Spec)
if err != nil {
return err
}
for _, svc := range svcs {
sClass := translateServiceClass(serviceBroker, svc)
if err := createServiceClass(sClass); err != nil {
return err
}
}
finalServiceBrokerState = data.ServiceBrokerStateAvailable
return nil
}