-
Notifications
You must be signed in to change notification settings - Fork 24
/
zk_utils.go
140 lines (124 loc) · 3.26 KB
/
zk_utils.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
package testutils
import (
"fmt"
"net"
"runtime"
"strconv"
"time"
"github.com/docker/docker/api/types"
"github.com/docker/docker/api/types/container"
"github.com/docker/go-connections/nat"
"github.com/pkg/errors"
"golang.org/x/net/context"
)
// ZKConfig captures configuration/runtime constraints for a containerized ZK instance.
type ZKConfig struct {
StartupTimeout time.Duration
ImageName string
Entrypoint []string
Command []string
ClientPort int
}
// DefaultZKConfig returns a copy of the default ZK container/runtime configuration.
func DefaultZKConfig() ZKConfig {
return ZKConfig{
StartupTimeout: 10 * time.Second,
ImageName: "docker.io/jplock/zookeeper:3.4.10",
Entrypoint: []string{"/opt/zookeeper/bin/zkServer.sh"},
Command: []string{"start-foreground"},
ClientPort: 2181,
}
}
// StartZookeeper starts a new zookeeper container.
func StartZookeeper(opts ...func(*ZKConfig)) (*ZkControl, error) {
config := DefaultZKConfig()
for _, f := range opts {
if f != nil {
f(&config)
}
}
dcli, err := DockerClient()
if err != nil {
return nil, errors.Wrap(err, "could not get docker client")
}
if err := pullDockerImage(dcli, config.ImageName); err != nil {
return nil, err
}
// the container IP is not routable on Darwin, thus needs port
// mapping for the container.
hostConfig := &container.HostConfig{}
if runtime.GOOS == "darwin" {
hostConfig.PortBindings = nat.PortMap{
nat.Port(fmt.Sprintf("%d/tcp", config.ClientPort)): []nat.PortBinding{{
HostIP: "0.0.0.0",
HostPort: strconv.Itoa(config.ClientPort),
}},
}
}
r, err := dcli.ContainerCreate(
context.Background(),
&container.Config{
Image: config.ImageName,
Entrypoint: config.Entrypoint,
Cmd: config.Command,
},
hostConfig,
nil, "")
if err != nil {
return nil, errors.Wrap(err, "could not create zk container")
}
// create a teardown that will be used here to try to tear down the
// container if anything fails in setup
cleanup := func() {
removeContainer(dcli, r.ID)
}
// start the container
if err := dcli.ContainerStart(context.Background(), r.ID, types.ContainerStartOptions{}); err != nil {
cleanup()
return nil, errors.Wrap(err, "could not start zk container")
}
info, err := dcli.ContainerInspect(context.Background(), r.ID)
if err != nil {
cleanup()
return nil, errors.Wrap(err, "could not inspect container")
}
var addr string
if runtime.GOOS == "darwin" {
addr = "127.0.0.1:" + strconv.Itoa(config.ClientPort)
} else {
addr = net.JoinHostPort(info.NetworkSettings.IPAddress, strconv.Itoa(config.ClientPort))
}
done := make(chan struct{})
defer close(done)
connected := make(chan struct{})
go func() {
for {
select {
case <-done:
return
default:
conn, err := net.Dial("tcp", addr)
if err != nil {
time.Sleep(1)
continue
}
fmt.Println("successfully connected to ZK at", addr)
conn.Close()
close(connected)
return
}
}
}()
select {
case <-connected:
case <-time.After(config.StartupTimeout):
cleanup()
return nil, errors.Errorf("could not connect to zookeeper in %s", config.StartupTimeout)
}
control := &ZkControl{
dockerClient: dcli,
containerID: r.ID,
addr: addr,
}
return control, nil
}