forked from csigo/test
/
service_launcher.go
179 lines (157 loc) · 4.49 KB
/
service_launcher.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
package test
import (
"fmt"
"sync"
"sync/atomic"
"github.com/fsouza/go-dockerclient"
)
// supported service types
const (
ZooKeeper ServiceType = "zookeeper"
HBase ServiceType = "hbase"
Redis ServiceType = "redis"
Etcd ServiceType = "etcd"
Gnatsd ServiceType = "gnatsd"
Disque ServiceType = "disque"
Consul ServiceType = "consul"
ElasticSearch ServiceType = "elasticsearch"
)
// service running state
const (
stateNew int32 = iota
stateStarting = iota
stateReady = iota
stateStopped = iota
)
var (
srvFactories = struct {
sync.RWMutex
facs map[ServiceType]ServiceFactory
}{facs: map[ServiceType]ServiceFactory{}}
)
// ServiceType defines type
type ServiceType string
// ServiceOption defines option function to setup service
type ServiceOption func(Service) error
// ServiceLauncher defines an interface to create service
type ServiceLauncher interface {
// Start creates and starts an instance of supported service by the give type. It
// returns its listening ip:port and the corresponding stop function.
Start(ServiceType, ...ServiceOption) (ipport string, stopFunc func() error, err error)
// StopAll stop all created services
StopAll() error
// Get retruns service, return nil if no service for the given ipport
Get(ipport string) interface{}
}
// Service represents a service
type Service interface {
// Start launches the service and return its listening port
Start() (string, error)
// Stop stops the service
Stop() error
// StartDocker launches the service and return its listening port via docker
StartDocker(*docker.Client) (string, error)
// Stop stops the service via docker
StopDocker(*docker.Client) error
}
// ServiceFactory represents service factory
type ServiceFactory func() Service
// RegisterService registers a service factory of the given type
func RegisterService(t ServiceType, f ServiceFactory) {
srvFactories.Lock()
defer srvFactories.Unlock()
if _, ok := srvFactories.facs[t]; ok {
panic(fmt.Errorf("aready register service type %s", t))
}
srvFactories.facs[t] = f
}
// NewServiceLauncher returns an instance of ServiceLauncher
func NewServiceLauncher() ServiceLauncher {
return &serviceLauncherImpl{
services: map[string]Service{},
}
}
// serviceLauncherImpl implements ServiceLauncher
type serviceLauncherImpl struct {
// service stores created services
services map[string]Service
// mutx to protected services
sync.Mutex
}
// Create returns an instance of supported service by the give type
func (s *serviceLauncherImpl) Start(t ServiceType, options ...ServiceOption) (string, func() error, error) {
s.Lock()
defer s.Unlock()
srvFactories.RLock()
fac, ok := srvFactories.facs[t]
srvFactories.RUnlock()
if !ok {
return "", nil, fmt.Errorf("unsupported service type %v", t)
}
// guard with state checker
srv := &stateChkService{
state: stateNew,
Service: fac(),
}
// apply option functions
for _, opt := range options {
if err := opt(srv.Service); err != nil {
return "", nil, fmt.Errorf("failed to apply option %v", opt)
}
}
// start service
ipport, err := srv.Start()
if err != nil {
return "", nil, fmt.Errorf("unable to start service %v, err %v", t, err)
}
// store raw service
s.services[ipport] = srv.Service
return ipport, srv.Stop, nil
}
// StopAll stop all created services
func (s *serviceLauncherImpl) StopAll() error {
s.Lock()
defer s.Unlock()
errs := []error{}
for _, s := range s.services {
errs = append(errs, s.Stop())
}
s.services = map[string]Service{}
return CombineError(errs...)
}
// Get return service by givne ip port
func (s *serviceLauncherImpl) Get(ipport string) interface{} {
s.Lock()
defer s.Unlock()
return s.services[ipport]
}
// stateChkService helps to guard status of the embed service
// state machine: new -> starting -> ready -> stopped
type stateChkService struct {
Service
state int32
cl *docker.Client
}
func (s *stateChkService) Start() (ipport string, err error) {
if !atomic.CompareAndSwapInt32(&s.state, stateNew, stateStarting) {
return "", fmt.Errorf("state is not ready")
}
if s.cl != nil {
ipport, err = s.Service.StartDocker(s.cl)
} else {
ipport, err = s.Service.Start()
}
if err == nil {
atomic.StoreInt32(&s.state, stateReady)
}
return ipport, err
}
func (s *stateChkService) Stop() error {
if !atomic.CompareAndSwapInt32(&s.state, stateReady, stateStopped) {
return fmt.Errorf("state is not ready")
}
if s.cl != nil {
return s.Service.StopDocker(s.cl)
}
return s.Service.Stop()
}