This repository has been archived by the owner on Jul 6, 2022. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 100
/
broker.go
133 lines (119 loc) · 3.02 KB
/
broker.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
package broker
import (
"context"
"errors"
"fmt"
"github.com/Azure/open-service-broker-azure/pkg/api"
"github.com/Azure/open-service-broker-azure/pkg/service"
"github.com/Azure/open-service-broker-azure/pkg/storage"
log "github.com/Sirupsen/logrus"
"github.com/deis/async"
)
type errAsyncEngineStopped struct {
err error
}
func (e *errAsyncEngineStopped) Error() string {
return fmt.Sprintf("async engine stopped: %s", e.err)
}
type errAPIServerStopped struct {
err error
}
func (e *errAPIServerStopped) Error() string {
return fmt.Sprintf("api server stopped: %s", e.err)
}
// Broker is an interface to be implemented by components that implement full
// OSB functionality.
type Broker interface {
// Run starts all broker components (e.g. API server and async execution
// engine) and blocks until one of those components returns or fails.
Run(context.Context) error
}
type broker struct {
store storage.Store
apiServer api.Server
asyncEngine async.Engine
catalog service.Catalog
}
// NewBroker returns a new Broker
func NewBroker(
apiServer api.Server,
asyncEngine async.Engine,
store storage.Store,
catalog service.Catalog,
) (Broker, error) {
b := &broker{
apiServer: apiServer,
store: store,
asyncEngine: asyncEngine,
catalog: catalog,
}
err := b.asyncEngine.RegisterJob(
"executeProvisioningStep",
b.executeProvisioningStep,
)
if err != nil {
return nil, errors.New(
"error registering async job for executing provisioning steps",
)
}
err = b.asyncEngine.RegisterJob("executeUpdatingStep", b.executeUpdatingStep)
if err != nil {
return nil, errors.New(
"error registering async job for executing updating steps",
)
}
err = b.asyncEngine.RegisterJob(
"executeDeprovisioningStep",
b.executeDeprovisioningStep,
)
if err != nil {
return nil, errors.New(
"error registering async job for executing deprovisioning steps",
)
}
err = b.asyncEngine.RegisterJob("checkParentStatus", b.doCheckParentStatus)
if err != nil {
return nil, errors.New(
"error registering async job for executing check of parent status",
)
}
err = b.asyncEngine.RegisterJob(
"checkChildrenStatuses",
b.doCheckChildrenStatuses,
)
if err != nil {
return nil, errors.New(
"error registering async job for executing check of children " +
"statuses",
)
}
return b, nil
}
// Run starts all broker components (e.g. API server and async execution
// engine) and blocks until one of those components returns or fails.
func (b *broker) Run(ctx context.Context) error {
ctx, cancel := context.WithCancel(ctx)
defer cancel()
errChan := make(chan error)
// Start async engine
go func() {
select {
case errChan <- &errAsyncEngineStopped{err: b.asyncEngine.Run(ctx)}:
case <-ctx.Done():
}
}()
// Start api server
go func() {
select {
case errChan <- &errAPIServerStopped{err: b.apiServer.Run(ctx)}:
case <-ctx.Done():
}
}()
select {
case <-ctx.Done():
log.Debug("context canceled; broker shutting down")
return ctx.Err()
case err := <-errChan:
return err
}
}