Skip to content

Commit a488dbb

Browse files
committed
[FAB-10653] Cleanup ZooKeeper and Kafka names
Change-Id: I29dbe28fa1cb5e389e2bc5803bb6c254063a0a5e Signed-off-by: Matthew Sykes <sykesmat@us.ibm.com>
1 parent b2a0aba commit a488dbb

File tree

9 files changed

+88
-102
lines changed

9 files changed

+88
-102
lines changed

integration/nwo/components.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ var RequiredImages = []string{
2323
"hyperledger/fabric-ccenv:latest",
2424
runner.CouchDBDefaultImage,
2525
runner.KafkaDefaultImage,
26-
runner.ZookeeperDefaultImage,
26+
runner.ZooKeeperDefaultImage,
2727
}
2828

2929
func (c *Components) Build(args ...string) {

integration/nwo/network.go

Lines changed: 9 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -83,7 +83,7 @@ type Orderer struct {
8383

8484
// ID provides a unique identifier for an orderer instance.
8585
func (o Orderer) ID() string {
86-
return fmt.Sprintf("%s.%s", o.Name, o.Organization)
86+
return fmt.Sprintf("%s.%s", o.Organization, o.Name)
8787
}
8888

8989
// Peer defines a peer instance, it's owning organization, and the list of
@@ -103,7 +103,7 @@ type PeerChannel struct {
103103

104104
// ID provides a unique identifier for a peer instance.
105105
func (p *Peer) ID() string {
106-
return fmt.Sprintf("%s.%s", p.Name, p.Organization)
106+
return fmt.Sprintf("%s.%s", p.Organization, p.Name)
107107
}
108108

109109
// Anchor returns true if this peer is an anchor for any channel it has joined.
@@ -570,14 +570,13 @@ func (n *Network) ConfigTxGen(command Command) (*gexec.Session, error) {
570570
return n.StartSession(cmd, command.SessionName())
571571
}
572572

573-
// ZooKeeperRunner returns a runner for a zookeeper instance.
574-
func (n *Network) ZooKeeperRunner(id int) *runner.Zookeeper {
575-
id++ // TODO: revisit
573+
// ZooKeeperRunner returns a runner for a ZooKeeper instance.
574+
func (n *Network) ZooKeeperRunner(idx int) *runner.ZooKeeper {
576575
colorCode := n.nextColor()
577-
name := fmt.Sprintf("zookeeper-%d", id)
576+
name := fmt.Sprintf("zookeeper-%d-%s", idx, n.NetworkID)
578577

579-
return &runner.Zookeeper{
580-
ZooMyID: id,
578+
return &runner.ZooKeeper{
579+
ZooMyID: idx + 1, // IDs must be between 1 and 255
581580
Client: n.DockerClient,
582581
Name: name,
583582
NetworkName: n.NetworkID,
@@ -609,7 +608,7 @@ func (n *Network) defaultBrokerReplication() int {
609608
// BrokerRunner returns a runner for an kafka broker instance.
610609
func (n *Network) BrokerRunner(id int, zookeepers []string) *runner.Kafka {
611610
colorCode := n.nextColor()
612-
name := fmt.Sprintf("kafka-%d", id+1) // TODO: revisit
611+
name := fmt.Sprintf("kafka-%d-%s", id, n.NetworkID)
613612

614613
return &runner.Kafka{
615614
BrokerID: id + 1,
@@ -620,7 +619,7 @@ func (n *Network) BrokerRunner(id int, zookeepers []string) *runner.Kafka {
620619
NetworkName: n.NetworkID,
621620
MinInsyncReplicas: n.minBrokersInSync(),
622621
DefaultReplicationFactor: n.defaultBrokerReplication(),
623-
ZookeeperConnect: strings.Join(zookeepers, ","),
622+
ZooKeeperConnect: strings.Join(zookeepers, ","),
624623
OutputStream: gexec.NewPrefixedWriter(
625624
fmt.Sprintf("\x1b[32m[o]\x1b[%s[%s]\x1b[0m ", colorCode, name),
626625
ginkgo.GinkgoWriter,

integration/runner/kafka.go

Lines changed: 32 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@ type Kafka struct {
3232
HostPort int
3333
ContainerPort docker.Port
3434
Name string
35+
NetworkName string
3536
StartTimeout time.Duration
3637

3738
MessageMaxBytes int
@@ -40,16 +41,14 @@ type Kafka struct {
4041
DefaultReplicationFactor int
4142
MinInsyncReplicas int
4243
BrokerID int
43-
ZookeeperConnect string
44+
ZooKeeperConnect string
4445
ReplicaFetchResponseMaxBytes int
4546
AdvertisedListeners string
4647
LogLevel string
4748

4849
ErrorStream io.Writer
4950
OutputStream io.Writer
5051

51-
NetworkID string
52-
NetworkName string
5352
ContainerID string
5453
HostAddress string
5554
ContainerAddress string
@@ -97,8 +96,8 @@ func (k *Kafka) Run(sigCh <-chan os.Signal, ready chan<- struct{}) error {
9796
k.MinInsyncReplicas = 1
9897
}
9998

100-
if k.ZookeeperConnect == "" {
101-
k.ZookeeperConnect = "zookeeper:2181/kafka"
99+
if k.ZooKeeperConnect == "" {
100+
k.ZooKeeperConnect = "zookeeper:2181/kafka"
102101
}
103102

104103
if k.MessageMaxBytes == 0 {
@@ -116,36 +115,40 @@ func (k *Kafka) Run(sigCh <-chan os.Signal, ready chan<- struct{}) error {
116115
if k.LogLevel == "" {
117116
k.LogLevel = "warn"
118117
}
119-
hostConfig := &docker.HostConfig{
120-
AutoRemove: true,
121-
PortBindings: map[docker.Port][]docker.PortBinding{
122-
k.ContainerPort: []docker.PortBinding{{
123-
HostIP: k.HostIP,
124-
HostPort: strconv.Itoa(k.HostPort),
125-
}},
118+
119+
containerOptions := docker.CreateContainerOptions{
120+
Name: k.Name,
121+
Config: &docker.Config{
122+
Image: k.Image,
123+
Env: k.buildEnv(),
124+
},
125+
HostConfig: &docker.HostConfig{
126+
AutoRemove: true,
127+
PortBindings: map[docker.Port][]docker.PortBinding{
128+
k.ContainerPort: []docker.PortBinding{{
129+
HostIP: k.HostIP,
130+
HostPort: strconv.Itoa(k.HostPort),
131+
}},
132+
},
126133
},
127134
}
128135

129-
config := &docker.Config{
130-
Image: k.Image,
131-
Env: k.buildEnv(),
132-
}
136+
if k.NetworkName != "" {
137+
nw, err := k.Client.NetworkInfo(k.NetworkName)
138+
if err != nil {
139+
return err
140+
}
133141

134-
networkingConfig := &docker.NetworkingConfig{
135-
EndpointsConfig: map[string]*docker.EndpointConfig{
136-
k.NetworkName: &docker.EndpointConfig{
137-
NetworkID: k.NetworkID,
142+
containerOptions.NetworkingConfig = &docker.NetworkingConfig{
143+
EndpointsConfig: map[string]*docker.EndpointConfig{
144+
k.NetworkName: &docker.EndpointConfig{
145+
NetworkID: nw.ID,
146+
},
138147
},
139-
},
148+
}
140149
}
141150

142-
container, err := k.Client.CreateContainer(
143-
docker.CreateContainerOptions{
144-
Name: k.Name,
145-
Config: config,
146-
HostConfig: hostConfig,
147-
NetworkingConfig: networkingConfig,
148-
})
151+
container, err := k.Client.CreateContainer(containerOptions)
149152
if err != nil {
150153
return err
151154
}
@@ -214,7 +217,7 @@ func (k *Kafka) buildEnv() []string {
214217
fmt.Sprintf("KAFKA_DEFAULT_REPLICATION_FACTOR=%d", k.DefaultReplicationFactor),
215218
fmt.Sprintf("KAFKA_MIN_INSYNC_REPLICAS=%d", k.MinInsyncReplicas),
216219
fmt.Sprintf("KAFKA_BROKER_ID=%d", k.BrokerID),
217-
fmt.Sprintf("KAFKA_ZOOKEEPER_CONNECT=%s", k.ZookeeperConnect),
220+
fmt.Sprintf("KAFKA_ZOOKEEPER_CONNECT=%s", k.ZooKeeperConnect),
218221
fmt.Sprintf("KAFKA_REPLICA_FETCH_RESPONSE_MAX_BYTES=%d", k.ReplicaFetchResponseMaxBytes),
219222
fmt.Sprintf("KAFKA_ADVERTISED_LISTENERS=EXTERNAL://localhost:%d,%s://%s:9093", k.HostPort, k.NetworkName, k.Name),
220223
fmt.Sprintf("KAFKA_LISTENERS=EXTERNAL://0.0.0.0:9092,%s://0.0.0.0:9093", k.NetworkName),

integration/runner/kafka_test.go

Lines changed: 8 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@ var _ = Describe("Kafka Runner", func() {
2929

3030
outBuffer *gbytes.Buffer
3131
kafka *runner.Kafka
32-
zookeeper *runner.Zookeeper
32+
zookeeper *runner.ZooKeeper
3333

3434
process ifrit.Process
3535
)
@@ -51,11 +51,10 @@ var _ = Describe("Kafka Runner", func() {
5151
},
5252
)
5353

54-
// Start a zookeeper
55-
zookeeper = &runner.Zookeeper{
54+
// Start a ZooKeeper
55+
zookeeper = &runner.ZooKeeper{
5656
Name: helpers.UniqueName(),
5757
ZooMyID: 1,
58-
NetworkID: network.ID,
5958
NetworkName: network.Name,
6059
}
6160
err = zookeeper.Start()
@@ -64,9 +63,8 @@ var _ = Describe("Kafka Runner", func() {
6463
kafka = &runner.Kafka{
6564
ErrorStream: GinkgoWriter,
6665
OutputStream: io.MultiWriter(outBuffer, GinkgoWriter),
67-
ZookeeperConnect: net.JoinHostPort(zookeeper.Name, "2181"),
66+
ZooKeeperConnect: net.JoinHostPort(zookeeper.Name, "2181"),
6867
BrokerID: 1,
69-
NetworkID: network.ID,
7068
NetworkName: network.Name,
7169
}
7270
})
@@ -131,44 +129,40 @@ var _ = Describe("Kafka Runner", func() {
131129

132130
It("multiples can be started and stopped", func() {
133131
k1 := &runner.Kafka{
134-
ZookeeperConnect: net.JoinHostPort(zookeeper.Name, "2181"),
132+
ZooKeeperConnect: net.JoinHostPort(zookeeper.Name, "2181"),
135133
BrokerID: 1,
136134
MinInsyncReplicas: 2,
137135
DefaultReplicationFactor: 3,
138-
NetworkID: network.ID,
139136
NetworkName: network.Name,
140137
}
141138
err := k1.Start()
142139
Expect(err).NotTo(HaveOccurred())
143140

144141
k2 := &runner.Kafka{
145-
ZookeeperConnect: net.JoinHostPort(zookeeper.Name, "2181"),
142+
ZooKeeperConnect: net.JoinHostPort(zookeeper.Name, "2181"),
146143
BrokerID: 2,
147144
MinInsyncReplicas: 2,
148145
DefaultReplicationFactor: 3,
149-
NetworkID: network.ID,
150146
NetworkName: network.Name,
151147
}
152148
err = k2.Start()
153149
Expect(err).NotTo(HaveOccurred())
154150

155151
k3 := &runner.Kafka{
156-
ZookeeperConnect: net.JoinHostPort(zookeeper.Name, "2181"),
152+
ZooKeeperConnect: net.JoinHostPort(zookeeper.Name, "2181"),
157153
BrokerID: 3,
158154
MinInsyncReplicas: 2,
159155
DefaultReplicationFactor: 3,
160-
NetworkID: network.ID,
161156
NetworkName: network.Name,
162157
}
163158
err = k3.Start()
164159
Expect(err).NotTo(HaveOccurred())
165160

166161
k4 := &runner.Kafka{
167-
ZookeeperConnect: net.JoinHostPort(zookeeper.Name, "2181"),
162+
ZooKeeperConnect: net.JoinHostPort(zookeeper.Name, "2181"),
168163
BrokerID: 4,
169164
MinInsyncReplicas: 2,
170165
DefaultReplicationFactor: 3,
171-
NetworkID: network.ID,
172166
NetworkName: network.Name,
173167
}
174168
err = k4.Start()

integration/runner/zookeeper.go

Lines changed: 19 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -20,9 +20,9 @@ import (
2020
"github.com/tedsuo/ifrit"
2121
)
2222

23-
const ZookeeperDefaultImage = "hyperledger/fabric-zookeeper:latest"
23+
const ZooKeeperDefaultImage = "hyperledger/fabric-zookeeper:latest"
2424

25-
type Zookeeper struct {
25+
type ZooKeeper struct {
2626
Client *docker.Client
2727
Image string
2828
HostIP string
@@ -31,7 +31,6 @@ type Zookeeper struct {
3131
Name string
3232
StartTimeout time.Duration
3333

34-
NetworkID string
3534
NetworkName string
3635
ClientPort docker.Port
3736
LeaderPort docker.Port
@@ -51,9 +50,9 @@ type Zookeeper struct {
5150
stopped bool
5251
}
5352

54-
func (z *Zookeeper) Run(sigCh <-chan os.Signal, ready chan<- struct{}) error {
53+
func (z *ZooKeeper) Run(sigCh <-chan os.Signal, ready chan<- struct{}) error {
5554
if z.Image == "" {
56-
z.Image = ZookeeperDefaultImage
55+
z.Image = ZooKeeperDefaultImage
5756
}
5857

5958
if z.Name == "" {
@@ -98,40 +97,33 @@ func (z *Zookeeper) Run(sigCh <-chan os.Signal, ready chan<- struct{}) error {
9897
z.Client = client
9998
}
10099

101-
config := &docker.Config{
102-
Image: z.Image,
103-
Env: []string{
104-
fmt.Sprintf("ZOO_MY_ID=%d", z.ZooMyID),
105-
fmt.Sprintf("ZOO_SERVERS=%s", z.ZooServers),
106-
},
107-
}
108-
109100
containerOptions := docker.CreateContainerOptions{
110101
Name: z.Name,
111102
HostConfig: &docker.HostConfig{
112103
AutoRemove: true,
113104
},
114-
Config: config,
105+
Config: &docker.Config{
106+
Image: z.Image,
107+
Env: []string{
108+
fmt.Sprintf("ZOO_MY_ID=%d", z.ZooMyID),
109+
fmt.Sprintf("ZOO_SERVERS=%s", z.ZooServers),
110+
},
111+
},
115112
}
116113

117114
if z.NetworkName != "" {
118115
nw, err := z.Client.NetworkInfo(z.NetworkName)
119116
if err != nil {
120117
return err
121118
}
122-
if z.NetworkID == "" {
123-
z.NetworkID = nw.ID
124-
}
125119

126-
networkingConfig := &docker.NetworkingConfig{
120+
containerOptions.NetworkingConfig = &docker.NetworkingConfig{
127121
EndpointsConfig: map[string]*docker.EndpointConfig{
128122
z.NetworkName: &docker.EndpointConfig{
129-
NetworkID: z.NetworkID,
123+
NetworkID: nw.ID,
130124
},
131125
},
132126
}
133-
134-
containerOptions.NetworkingConfig = networkingConfig
135127
}
136128

137129
container, err := z.Client.CreateContainer(containerOptions)
@@ -187,7 +179,7 @@ func (z *Zookeeper) Run(sigCh <-chan os.Signal, ready chan<- struct{}) error {
187179
}
188180
}
189181

190-
func (z *Zookeeper) wait() <-chan error {
182+
func (z *ZooKeeper) wait() <-chan error {
191183
exitCh := make(chan error)
192184
go func() {
193185
exitCode, err := z.Client.WaitContainer(z.containerID)
@@ -200,7 +192,7 @@ func (z *Zookeeper) wait() <-chan error {
200192
return exitCh
201193
}
202194

203-
func (z *Zookeeper) streamLogs(ctx context.Context) error {
195+
func (z *ZooKeeper) streamLogs(ctx context.Context) error {
204196
if z.ErrorStream == nil && z.OutputStream == nil {
205197
return nil
206198
}
@@ -217,15 +209,15 @@ func (z *Zookeeper) streamLogs(ctx context.Context) error {
217209
return z.Client.Logs(logOptions)
218210
}
219211

220-
func (z *Zookeeper) ContainerID() string {
212+
func (z *ZooKeeper) ContainerID() string {
221213
return z.containerID
222214
}
223215

224-
func (z *Zookeeper) ContainerAddress() string {
216+
func (z *ZooKeeper) ContainerAddress() string {
225217
return z.containerAddress
226218
}
227219

228-
func (z *Zookeeper) Start() error {
220+
func (z *ZooKeeper) Start() error {
229221
p := ifrit.Invoke(z)
230222

231223
select {
@@ -236,7 +228,7 @@ func (z *Zookeeper) Start() error {
236228
}
237229
}
238230

239-
func (z *Zookeeper) Stop() error {
231+
func (z *ZooKeeper) Stop() error {
240232
z.mutex.Lock()
241233
if z.stopped {
242234
z.mutex.Unlock()

0 commit comments

Comments
 (0)