-
Notifications
You must be signed in to change notification settings - Fork 25
/
environment.go
114 lines (95 loc) · 2.81 KB
/
environment.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
package zk
import (
"github.com/go-kit/kit/log"
"github.com/go-kit/kit/log/level"
"github.com/go-kit/kit/sd"
gokitzk "github.com/go-kit/kit/sd/zk"
"github.com/xmidt-org/webpa-common/logging"
"github.com/xmidt-org/webpa-common/service"
)
func newService(r Registration) (string, gokitzk.Service) {
url := service.FormatInstance(
r.scheme(),
r.address(),
r.port(),
)
return url, gokitzk.Service{
Path: r.path(),
Name: r.name(),
Data: []byte(url),
}
}
// clientFactory is the factory function used to create a go-kit zookeeper Client.
// Tests can change this for mocked behavior.
var clientFactory = gokitzk.NewClient
func newClient(l log.Logger, zo Options) (gokitzk.Client, error) {
client := zo.client()
return clientFactory(
client.servers(),
l,
gokitzk.ConnectTimeout(client.connectTimeout()),
gokitzk.SessionTimeout(client.sessionTimeout()),
)
}
func newInstancer(l log.Logger, c gokitzk.Client, path string) (i sd.Instancer, err error) {
i, err = gokitzk.NewInstancer(c, path, l)
if err == nil {
i = service.NewContextualInstancer(i, map[string]interface{}{"path": path})
}
return
}
func newInstancers(l log.Logger, c gokitzk.Client, zo Options) (i service.Instancers, err error) {
for _, path := range zo.watches() {
if i.Has(path) {
l.Log(level.Key(), level.WarnValue(), logging.MessageKey(), "skipping duplicate watch", "path", path)
continue
}
var instancer sd.Instancer
instancer, err = newInstancer(l, c, path)
if err != nil {
// ensure the previously create instancers are stopped
i.Stop()
return
}
i.Set(path, instancer)
}
return
}
func newRegistrars(base log.Logger, c gokitzk.Client, zo Options) (r service.Registrars) {
for _, registration := range zo.registrations() {
instance, s := newService(registration)
if r.Has(instance) {
base.Log(level.Key(), level.WarnValue(), logging.MessageKey(), "skipping duplicate registration", "instance", instance)
continue
}
r.Add(instance, gokitzk.NewRegistrar(c, s, log.With(base, "instance", instance)))
}
return
}
// NewEnvironment constructs a Zookeeper-based service.Environment using both a zookeeper Options (typically unmarshaled
// from configuration) and an optional extra set of environment options.
func NewEnvironment(l log.Logger, zo Options, eo ...service.Option) (service.Environment, error) {
if l == nil {
l = logging.DefaultLogger()
}
if len(zo.Watches) == 0 && len(zo.Registrations) == 0 {
return nil, service.ErrIncomplete
}
c, err := newClient(l, zo)
if err != nil {
return nil, err
}
i, err := newInstancers(l, c, zo)
if err != nil {
c.Stop()
return nil, err
}
return service.NewEnvironment(
append(
eo,
service.WithRegistrars(newRegistrars(l, c, zo)),
service.WithInstancers(i),
service.WithCloser(func() error { c.Stop(); return nil }),
)...,
), nil
}