diff --git a/integration/README.rst b/integration/README.rst index aebbd57ef55..e9e04582c83 100644 --- a/integration/README.rst +++ b/integration/README.rst @@ -44,8 +44,7 @@ your machine. Caveats, Gotchas, and Good-To-Knows ----------------------------------- * The tests in this repository only exercise components that originate in the Fabric repository. -* Currently, docker is only used in the ginkgo tests when using chaincode, kafka, zookeeper, - or couchdb images. +* Currently, docker is only used in the ginkgo tests when using chaincode or couchdb images. Getting Started diff --git a/integration/ledger/reset_rollback_test.go b/integration/ledger/reset_rollback_test.go index d44b967e0ef..df9d9182cd8 100644 --- a/integration/ledger/reset_rollback_test.go +++ b/integration/ledger/reset_rollback_test.go @@ -137,7 +137,7 @@ var _ = Describe("rollback, reset, pause, resume, and unjoin peer node commands" helper.assertPausedChannel(org3peer0) By("Checking preResetHeightFile exists for a paused channel that is also rolled back or reset") - setup.startBrokerAndOrderer() + setup.startOrderer() preResetHeightFile := filepath.Join(setup.network.PeerLedgerDir(org3peer0), "chains/chains", helper.channelID, "__preResetHeight") Expect(preResetHeightFile).To(BeARegularFile()) @@ -153,7 +153,7 @@ var _ = Describe("rollback, reset, pause, resume, and unjoin peer node commands" } By("Bringing the peers to recent height by starting the orderer") - setup.startBrokerAndOrderer() + setup.startOrderer() for _, peer := range setup.peers { By("Verifying endorsement is enabled and preResetHeightFile is removed on peer " + peer.ID()) helper.waitUntilEndorserEnabled(peer) @@ -263,7 +263,6 @@ type setup struct { peerProcess []ifrit.Process orderer *nwo.Orderer ordererProcess ifrit.Process - brokerProcess ifrit.Process } func initThreeOrgsSetup() *setup { @@ -291,7 +290,7 @@ func initThreeOrgsSetup() *setup { channelID: "testchannel", } - setup.startBrokerAndOrderer() + setup.startOrderer() setup.startPeer(peers[0]) setup.startPeer(peers[1]) @@ -319,10 +318,6 @@ func (s *setup) terminateAllProcess() { Eventually(s.ordererProcess.Wait(), s.network.EventuallyTimeout).Should(Receive()) s.ordererProcess = nil - s.brokerProcess.Signal(syscall.SIGTERM) - Eventually(s.brokerProcess.Wait(), s.network.EventuallyTimeout).Should(Receive()) - s.brokerProcess = nil - for _, p := range s.peerProcess { p.Signal(syscall.SIGTERM) Eventually(p.Wait(), s.network.EventuallyTimeout).Should(Receive()) @@ -351,12 +346,7 @@ func (s *setup) startPeer(peer *nwo.Peer) { s.peerProcess = append(s.peerProcess, peerProcess) } -func (s *setup) startBrokerAndOrderer() { - brokerRunner := s.network.BrokerGroupRunner() - brokerProcess := ifrit.Invoke(brokerRunner) - Eventually(brokerProcess.Ready(), s.network.EventuallyTimeout).Should(BeClosed()) - s.brokerProcess = brokerProcess - +func (s *setup) startOrderer() { ordererRunner := s.network.OrdererGroupRunner() ordererProcess := ifrit.Invoke(ordererRunner) Eventually(ordererProcess.Ready(), s.network.EventuallyTimeout).Should(BeClosed()) diff --git a/integration/ledger/snapshot_test.go b/integration/ledger/snapshot_test.go index b412f0bfd73..385116538ed 100644 --- a/integration/ledger/snapshot_test.go +++ b/integration/ledger/snapshot_test.go @@ -636,8 +636,8 @@ func initAndStartFourOrgsNetwork() *setup { } Expect(setup.testDir).To(Equal(setup.network.RootDir)) - By("starting broker and orderer") - setup.startBrokerAndOrderer() + By("starting orderer") + setup.startOrderer() By("starting peers") setup.startPeers() diff --git a/integration/nwo/network.go b/integration/nwo/network.go index 8036a6c2611..18dc0336f7c 100644 --- a/integration/nwo/network.go +++ b/integration/nwo/network.go @@ -18,7 +18,6 @@ import ( "path/filepath" "runtime" "sort" - "strconv" "strings" "sync" "syscall" @@ -65,13 +64,10 @@ type Consortium struct { Organizations []string `yaml:"organizations,omitempty"` } -// Consensus indicates the orderer types and how many broker and zookeeper -// instances. +// Consensus indicates the orderer types. type Consensus struct { Type string `yaml:"type,omitempty"` BootstrapMethod string `yaml:"bootstrap_method,omitempty"` - Brokers int `yaml:"brokers,omitempty"` - ZooKeepers int `yaml:"zookeepers,omitempty"` ChannelParticipationEnabled bool `yaml:"channel_participation_enabled,omitempty"` } @@ -157,7 +153,6 @@ type Network struct { TLSEnabled bool GatewayEnabled bool - PortsByBrokerID map[string]Ports PortsByOrdererID map[string]Ports PortsByPeerID map[string]Ports Organizations []*Organization @@ -188,7 +183,6 @@ func New(c *Config, rootDir string, dockerClient *docker.Client, startPort int, NetworkID: runner.UniqueName(), EventuallyTimeout: time.Minute, MetricsProvider: "prometheus", - PortsByBrokerID: map[string]Ports{}, PortsByOrdererID: map[string]Ports{}, PortsByPeerID: map[string]Ports{}, @@ -230,14 +224,6 @@ func New(c *Config, rootDir string, dockerClient *docker.Client, startPort int, network.SessionCreateInterval = time.Second } - for i := 0; i < network.Consensus.Brokers; i++ { - ports := Ports{} - for _, portName := range BrokerPortNames() { - ports[portName] = network.ReservePort() - } - network.PortsByBrokerID[strconv.Itoa(i)] = ports - } - for _, o := range c.Orderers { ports := Ports{} for _, portName := range OrdererPortNames() { @@ -1245,87 +1231,6 @@ func (n *Network) Osnadmin(command Command) (*gexec.Session, error) { return n.StartSession(cmd, command.SessionName()) } -// ZooKeeperRunner returns a runner for a ZooKeeper instance. -func (n *Network) ZooKeeperRunner(idx int) *runner.ZooKeeper { - colorCode := n.nextColor() - name := fmt.Sprintf("zookeeper-%d-%s", idx, n.NetworkID) - - return &runner.ZooKeeper{ - ZooMyID: idx + 1, // IDs must be between 1 and 255 - Client: n.DockerClient, - Name: name, - NetworkName: n.NetworkID, - OutputStream: gexec.NewPrefixedWriter( - fmt.Sprintf("\x1b[32m[o]\x1b[%s[%s]\x1b[0m ", colorCode, name), - ginkgo.GinkgoWriter, - ), - ErrorStream: gexec.NewPrefixedWriter( - fmt.Sprintf("\x1b[91m[e]\x1b[%s[%s]\x1b[0m ", colorCode, name), - ginkgo.GinkgoWriter, - ), - } -} - -func (n *Network) minBrokersInSync() int { - if n.Consensus.Brokers < 2 { - return n.Consensus.Brokers - } - return 2 -} - -func (n *Network) defaultBrokerReplication() int { - if n.Consensus.Brokers < 3 { - return n.Consensus.Brokers - } - return 3 -} - -// BrokerRunner returns a runner for a Kafka broker instance. -func (n *Network) BrokerRunner(id int, zookeepers []string) *runner.Kafka { - colorCode := n.nextColor() - name := fmt.Sprintf("kafka-%d-%s", id, n.NetworkID) - - return &runner.Kafka{ - BrokerID: id + 1, - Client: n.DockerClient, - AdvertisedListeners: "127.0.0.1", - HostPort: int(n.PortsByBrokerID[strconv.Itoa(id)][HostPort]), - Name: name, - NetworkName: n.NetworkID, - MinInsyncReplicas: n.minBrokersInSync(), - DefaultReplicationFactor: n.defaultBrokerReplication(), - ZooKeeperConnect: strings.Join(zookeepers, ","), - OutputStream: gexec.NewPrefixedWriter( - fmt.Sprintf("\x1b[32m[o]\x1b[%s[%s]\x1b[0m ", colorCode, name), - ginkgo.GinkgoWriter, - ), - ErrorStream: gexec.NewPrefixedWriter( - fmt.Sprintf("\x1b[91m[e]\x1b[%s[%s]\x1b[0m ", colorCode, name), - ginkgo.GinkgoWriter, - ), - } -} - -// BrokerGroupRunner returns a runner that manages the processes that make up -// the Kafka broker network for fabric. -func (n *Network) BrokerGroupRunner() ifrit.Runner { - members := grouper.Members{} - zookeepers := []string{} - - for i := 0; i < n.Consensus.ZooKeepers; i++ { - zk := n.ZooKeeperRunner(i) - zookeepers = append(zookeepers, fmt.Sprintf("%s:2181", zk.Name)) - members = append(members, grouper.Member{Name: zk.Name, Runner: zk}) - } - - for i := 0; i < n.Consensus.Brokers; i++ { - kafka := n.BrokerRunner(i, zookeepers) - members = append(members, grouper.Member{Name: kafka.Name, Runner: kafka}) - } - - return grouper.NewOrdered(syscall.SIGTERM, members) -} - // OrdererRunner returns an ifrit.Runner for the specified orderer. The runner // can be used to start and manage an orderer process. func (n *Network) OrdererRunner(o *Orderer, env ...string) *ginkgomon.Runner { @@ -1342,12 +1247,6 @@ func (n *Network) OrdererRunner(o *Orderer, env ...string) *ginkgomon.Runner { StartCheckTimeout: 15 * time.Second, } - // After consensus-type migration, the #brokers is >0, but the type is etcdraft - if n.Consensus.Type == "kafka" && n.Consensus.Brokers != 0 { - config.StartCheck = "Start phase completed successfully" - config.StartCheckTimeout = 30 * time.Second - } - return ginkgomon.New(config) } @@ -1396,7 +1295,6 @@ func (n *Network) PeerGroupRunner() ifrit.Runner { // entire fabric network. func (n *Network) NetworkGroupRunner() ifrit.Runner { members := grouper.Members{ - {Name: "brokers", Runner: n.BrokerGroupRunner()}, {Name: "orderers", Runner: n.OrdererGroupRunner()}, {Name: "peers", Runner: n.PeerGroupRunner()}, } @@ -1786,21 +1684,6 @@ func OrdererPortNames() []PortName { return []PortName{ListenPort, ProfilePort, OperationsPort, ClusterPort, AdminPort} } -// BrokerPortNames returns the list of ports that need to be reserved for a -// Kafka broker. -func BrokerPortNames() []PortName { - return []PortName{HostPort} -} - -// BrokerAddresses returns the list of broker addresses for the network. -func (n *Network) BrokerAddresses(portName PortName) []string { - addresses := []string{} - for _, ports := range n.PortsByBrokerID { - addresses = append(addresses, fmt.Sprintf("127.0.0.1:%d", ports[portName])) - } - return addresses -} - // OrdererAddress returns the address (host and port) exposed by the Orderer // for the named port. Command line tools should use the returned address when // connecting to the orderer. diff --git a/integration/nwo/runner/kafka.go b/integration/nwo/runner/kafka.go deleted file mode 100644 index 91b0320064e..00000000000 --- a/integration/nwo/runner/kafka.go +++ /dev/null @@ -1,304 +0,0 @@ -/* -Copyright IBM Corp. All Rights Reserved. - -SPDX-License-Identifier: Apache-2.0 -*/ - -package runner - -import ( - "context" - "fmt" - "io" - "net" - "os" - "strconv" - "sync" - "time" - - docker "github.com/fsouza/go-dockerclient" - "github.com/pkg/errors" - "github.com/tedsuo/ifrit" -) - -const KafkaDefaultImage = "confluentinc/cp-kafka:5.3.1" - -// Kafka manages the execution of an instance of a dockerized CouchDB -// for tests. -type Kafka struct { - Client *docker.Client - Image string - HostIP string - HostPort int - ContainerPort docker.Port - Name string - NetworkName string - StartTimeout time.Duration - - MessageMaxBytes int - ReplicaFetchMaxBytes int - UncleanLeaderElectionEnable bool - DefaultReplicationFactor int - MinInsyncReplicas int - BrokerID int - ZooKeeperConnect string - ReplicaFetchResponseMaxBytes int - AdvertisedListeners string - - ErrorStream io.Writer - OutputStream io.Writer - - ContainerID string - HostAddress string - ContainerAddress string - Address string - - mutex sync.Mutex - stopped bool -} - -// Run runs a Kafka container. It implements the ifrit.Runner interface -func (k *Kafka) Run(sigCh <-chan os.Signal, ready chan<- struct{}) error { - if k.Image == "" { - k.Image = KafkaDefaultImage - } - - if k.Name == "" { - k.Name = DefaultNamer() - } - - if k.HostIP == "" { - k.HostIP = "127.0.0.1" - } - - if k.ContainerPort == docker.Port("") { - k.ContainerPort = docker.Port("9092/tcp") - } - - if k.StartTimeout == 0 { - k.StartTimeout = DefaultStartTimeout - } - - if k.Client == nil { - client, err := docker.NewClientFromEnv() - if err != nil { - return err - } - k.Client = client - } - - if k.DefaultReplicationFactor == 0 { - k.DefaultReplicationFactor = 1 - } - - if k.MinInsyncReplicas == 0 { - k.MinInsyncReplicas = 1 - } - - if k.ZooKeeperConnect == "" { - k.ZooKeeperConnect = "zookeeper:2181/kafka" - } - - if k.MessageMaxBytes == 0 { - k.MessageMaxBytes = 1000012 - } - - if k.ReplicaFetchMaxBytes == 0 { - k.ReplicaFetchMaxBytes = 1048576 - } - - if k.ReplicaFetchResponseMaxBytes == 0 { - k.ReplicaFetchResponseMaxBytes = 10485760 - } - - containerOptions := docker.CreateContainerOptions{ - Name: k.Name, - Config: &docker.Config{ - Image: k.Image, - Env: k.buildEnv(), - }, - HostConfig: &docker.HostConfig{ - AutoRemove: true, - PortBindings: map[docker.Port][]docker.PortBinding{ - k.ContainerPort: {{ - HostIP: k.HostIP, - HostPort: strconv.Itoa(k.HostPort), - }}, - }, - }, - } - - if k.NetworkName != "" { - nw, err := k.Client.NetworkInfo(k.NetworkName) - if err != nil { - return err - } - - containerOptions.NetworkingConfig = &docker.NetworkingConfig{ - EndpointsConfig: map[string]*docker.EndpointConfig{ - k.NetworkName: { - NetworkID: nw.ID, - }, - }, - } - } - - container, err := k.Client.CreateContainer(containerOptions) - if err != nil { - return err - } - k.ContainerID = container.ID - - err = k.Client.StartContainer(container.ID, nil) - if err != nil { - return err - } - defer k.Stop() - - container, err = k.Client.InspectContainer(container.ID) - if err != nil { - return err - } - - k.HostAddress = net.JoinHostPort( - container.NetworkSettings.Ports[k.ContainerPort][0].HostIP, - container.NetworkSettings.Ports[k.ContainerPort][0].HostPort, - ) - k.ContainerAddress = net.JoinHostPort( - container.NetworkSettings.Networks[k.NetworkName].IPAddress, - k.ContainerPort.Port(), - ) - - logContext, cancelLogs := context.WithCancel(context.Background()) - defer cancelLogs() - go k.streamLogs(logContext) - - containerExit := k.wait() - ctx, cancel := context.WithTimeout(context.Background(), k.StartTimeout) - defer cancel() - - select { - case <-ctx.Done(): - return errors.Wrapf(ctx.Err(), "kafka broker in container %s did not start", k.ContainerID) - case <-containerExit: - return errors.New("container exited before ready") - case <-k.ready(ctx, k.ContainerAddress): - k.Address = k.ContainerAddress - case <-k.ready(ctx, k.HostAddress): - k.Address = k.HostAddress - } - - cancel() - close(ready) - - for { - select { - case err := <-containerExit: - return err - case <-sigCh: - if err := k.Stop(); err != nil { - return err - } - } - } -} - -func (k *Kafka) buildEnv() []string { - env := []string{ - "KAFKA_LOG_RETENTION_MS=-1", - //"KAFKA_AUTO_CREATE_TOPICS_ENABLE=false", - fmt.Sprintf("KAFKA_MESSAGE_MAX_BYTES=%d", k.MessageMaxBytes), - fmt.Sprintf("KAFKA_REPLICA_FETCH_MAX_BYTES=%d", k.ReplicaFetchMaxBytes), - fmt.Sprintf("KAFKA_UNCLEAN_LEADER_ELECTION_ENABLE=%s", strconv.FormatBool(k.UncleanLeaderElectionEnable)), - fmt.Sprintf("KAFKA_DEFAULT_REPLICATION_FACTOR=%d", k.DefaultReplicationFactor), - fmt.Sprintf("KAFKA_MIN_INSYNC_REPLICAS=%d", k.MinInsyncReplicas), - fmt.Sprintf("KAFKA_BROKER_ID=%d", k.BrokerID), - fmt.Sprintf("KAFKA_ZOOKEEPER_CONNECT=%s", k.ZooKeeperConnect), - fmt.Sprintf("KAFKA_REPLICA_FETCH_RESPONSE_MAX_BYTES=%d", k.ReplicaFetchResponseMaxBytes), - fmt.Sprintf("KAFKA_ADVERTISED_LISTENERS=EXTERNAL://localhost:%d,%s://%s:9093", k.HostPort, k.NetworkName, k.Name), - fmt.Sprintf("KAFKA_LISTENERS=EXTERNAL://0.0.0.0:9092,%s://0.0.0.0:9093", k.NetworkName), - fmt.Sprintf("KAFKA_LISTENER_SECURITY_PROTOCOL_MAP=EXTERNAL:PLAINTEXT,%s:PLAINTEXT", k.NetworkName), - fmt.Sprintf("KAFKA_INTER_BROKER_LISTENER_NAME=%s", k.NetworkName), - } - return env -} - -func (k *Kafka) ready(ctx context.Context, addr string) <-chan struct{} { - readyCh := make(chan struct{}) - go func() { - ticker := time.NewTicker(100 * time.Millisecond) - defer ticker.Stop() - - for { - conn, err := net.DialTimeout("tcp", addr, 50*time.Millisecond) - if err == nil { - conn.Close() - close(readyCh) - return - } - - select { - case <-ticker.C: - case <-ctx.Done(): - return - } - } - }() - - return readyCh -} - -func (k *Kafka) wait() <-chan error { - exitCh := make(chan error) - go func() { - exitCode, err := k.Client.WaitContainer(k.ContainerID) - if err == nil { - err = fmt.Errorf("kafka: process exited with %d", exitCode) - } - exitCh <- err - }() - - return exitCh -} - -func (k *Kafka) streamLogs(ctx context.Context) error { - if k.ErrorStream == nil && k.OutputStream == nil { - return nil - } - - logOptions := docker.LogsOptions{ - Context: ctx, - Container: k.ContainerID, - ErrorStream: k.ErrorStream, - OutputStream: k.OutputStream, - Stderr: k.ErrorStream != nil, - Stdout: k.OutputStream != nil, - Follow: true, - } - return k.Client.Logs(logOptions) -} - -// Start starts the Kafka container using an ifrit runner -func (k *Kafka) Start() error { - p := ifrit.Invoke(k) - - select { - case <-p.Ready(): - return nil - case err := <-p.Wait(): - return err - } -} - -// Stop stops and removes the Kafka container -func (k *Kafka) Stop() error { - k.mutex.Lock() - if k.stopped { - k.mutex.Unlock() - return errors.Errorf("container %s already stopped", k.ContainerID) - } - k.stopped = true - k.mutex.Unlock() - - return k.Client.StopContainer(k.ContainerID, 0) -} diff --git a/integration/nwo/runner/zookeeper.go b/integration/nwo/runner/zookeeper.go deleted file mode 100644 index 0d5e946396b..00000000000 --- a/integration/nwo/runner/zookeeper.go +++ /dev/null @@ -1,247 +0,0 @@ -/* -Copyright IBM Corp. All Rights Reserved. - -SPDX-License-Identifier: Apache-2.0 -*/ - -package runner - -import ( - "context" - "fmt" - "io" - "net" - "os" - "sync" - "time" - - docker "github.com/fsouza/go-dockerclient" - "github.com/pkg/errors" - "github.com/tedsuo/ifrit" -) - -const ZooKeeperDefaultImage = "confluentinc/cp-zookeeper:5.3.1" - -type ZooKeeper struct { - Client *docker.Client - Image string - HostIP string - HostPort []int - ContainerPorts []docker.Port - Name string - StartTimeout time.Duration - - NetworkName string - ClientPort docker.Port - LeaderPort docker.Port - PeerPort docker.Port - ZooMyID int - ZooServers string - - ErrorStream io.Writer - OutputStream io.Writer - - containerID string - containerAddress string - address string - - mutex sync.Mutex - stopped bool -} - -func (z *ZooKeeper) Run(sigCh <-chan os.Signal, ready chan<- struct{}) error { - if z.Image == "" { - z.Image = ZooKeeperDefaultImage - } - - if z.Name == "" { - z.Name = DefaultNamer() - } - - if z.HostIP == "" { - z.HostIP = "127.0.0.1" - } - - if z.ContainerPorts == nil { - if z.ClientPort == docker.Port("") { - z.ClientPort = docker.Port("2181/tcp") - } - if z.LeaderPort == docker.Port("") { - z.LeaderPort = docker.Port("3888/tcp") - } - if z.PeerPort == docker.Port("") { - z.PeerPort = docker.Port("2888/tcp") - } - - z.ContainerPorts = []docker.Port{ - z.ClientPort, - z.LeaderPort, - z.PeerPort, - } - } - - if z.StartTimeout == 0 { - z.StartTimeout = DefaultStartTimeout - } - - if z.ZooMyID == 0 { - z.ZooMyID = 1 - } - - if z.Client == nil { - client, err := docker.NewClientFromEnv() - if err != nil { - return err - } - z.Client = client - } - - containerOptions := docker.CreateContainerOptions{ - Name: z.Name, - HostConfig: &docker.HostConfig{ - AutoRemove: true, - }, - Config: &docker.Config{ - Image: z.Image, - Env: []string{ - fmt.Sprintf("ZOOKEEPER_MY_ID=%d", z.ZooMyID), - fmt.Sprintf("ZOOKEEPER_SERVERS=%s", z.ZooServers), - fmt.Sprintf("ZOOKEEPER_CLIENT_PORT=%s", z.ClientPort.Port()), - }, - }, - } - - if z.NetworkName != "" { - nw, err := z.Client.NetworkInfo(z.NetworkName) - if err != nil { - return err - } - - containerOptions.NetworkingConfig = &docker.NetworkingConfig{ - EndpointsConfig: map[string]*docker.EndpointConfig{ - z.NetworkName: { - NetworkID: nw.ID, - }, - }, - } - } - - container, err := z.Client.CreateContainer(containerOptions) - if err != nil { - return err - } - z.containerID = container.ID - - err = z.Client.StartContainer(container.ID, nil) - if err != nil { - return err - } - defer z.Stop() - - container, err = z.Client.InspectContainer(container.ID) - if err != nil { - return err - } - - z.containerAddress = net.JoinHostPort( - container.NetworkSettings.IPAddress, - z.ContainerPorts[0].Port(), - ) - - streamCtx, streamCancel := context.WithCancel(context.Background()) - defer streamCancel() - go z.streamLogs(streamCtx) - - containerExit := z.wait() - ctx, cancel := context.WithTimeout(context.Background(), z.StartTimeout) - defer cancel() - - select { - case <-ctx.Done(): - return errors.Wrapf(ctx.Err(), "zookeeper in container %s did not start", z.containerID) - case <-containerExit: - return errors.New("container exited before ready") - default: - z.address = z.containerAddress - } - - close(ready) - - for { - select { - case err := <-containerExit: - return err - case <-sigCh: - if err := z.Stop(); err != nil { - return err - } - } - } -} - -func (z *ZooKeeper) wait() <-chan error { - exitCh := make(chan error) - go func() { - exitCode, err := z.Client.WaitContainer(z.containerID) - if err == nil { - err = fmt.Errorf("zookeeper: process exited with %d", exitCode) - } - exitCh <- err - }() - - return exitCh -} - -func (z *ZooKeeper) streamLogs(ctx context.Context) error { - if z.ErrorStream == nil && z.OutputStream == nil { - return nil - } - - logOptions := docker.LogsOptions{ - Context: ctx, - Container: z.ContainerID(), - ErrorStream: z.ErrorStream, - OutputStream: z.OutputStream, - Stderr: z.ErrorStream != nil, - Stdout: z.OutputStream != nil, - Follow: true, - } - return z.Client.Logs(logOptions) -} - -func (z *ZooKeeper) ContainerID() string { - return z.containerID -} - -func (z *ZooKeeper) ContainerAddress() string { - return z.containerAddress -} - -func (z *ZooKeeper) Start() error { - p := ifrit.Invoke(z) - - select { - case <-p.Ready(): - return nil - case err := <-p.Wait(): - return err - } -} - -func (z *ZooKeeper) Stop() error { - z.mutex.Lock() - if z.stopped { - z.mutex.Unlock() - return errors.Errorf("container %s already stopped", z.Name) - } - z.stopped = true - z.mutex.Unlock() - - err := z.Client.StopContainer(z.containerID, 0) - if err != nil { - return err - } - - _, err = z.Client.PruneVolumes(docker.PruneVolumesOptions{}) - return err -} diff --git a/integration/nwo/standard_networks.go b/integration/nwo/standard_networks.go index 70ab1da1b1a..a75e8d71325 100644 --- a/integration/nwo/standard_networks.go +++ b/integration/nwo/standard_networks.go @@ -181,16 +181,6 @@ func MultiChannelBasicSolo() *Config { return config } -func BasicKafka() *Config { - config := BasicSolo() - - config.Consensus.Type = "kafka" - config.Consensus.ZooKeepers = 1 - config.Consensus.Brokers = 1 - - return config -} - func BasicEtcdRaft() *Config { config := BasicSolo() diff --git a/integration/nwo/template/configtx_template.go b/integration/nwo/template/configtx_template.go index 409b740e2a2..25192270742 100644 --- a/integration/nwo/template/configtx_template.go +++ b/integration/nwo/template/configtx_template.go @@ -151,12 +151,6 @@ Profiles:{{ range .Profiles }} PreferredMaxBytes: 512 KB Capabilities: V2_0: true - {{- if eq $w.Consensus.Type "kafka" }} - Kafka: - Brokers:{{ range $w.BrokerAddresses "HostPort" }} - - {{ . }} - {{- end }} - {{- end }} {{- if eq $w.Consensus.Type "etcdraft" }} EtcdRaft: Options: diff --git a/integration/nwo/template/orderer_template.go b/integration/nwo/template/orderer_template.go index 65b9f8248f0..efbbdc50a90 100644 --- a/integration/nwo/template/orderer_template.go +++ b/integration/nwo/template/orderer_template.go @@ -53,38 +53,6 @@ General: TimeWindow: 15m FileLedger: Location: {{ .OrdererDir Orderer }}/system -{{ if eq .Consensus.Type "kafka" -}} -Kafka: - Retry: - ShortInterval: 5s - ShortTotal: 10m - LongInterval: 5m - LongTotal: 12h - NetworkTimeouts: - DialTimeout: 10s - ReadTimeout: 10s - WriteTimeout: 10s - Metadata: - RetryBackoff: 250ms - RetryMax: 3 - Producer: - RetryBackoff: 100ms - RetryMax: 3 - Consumer: - RetryBackoff: 2s - Topic: - ReplicationFactor: 1 - Verbose: false - TLS: - Enabled: false - PrivateKey: - Certificate: - RootCAs: - SASLPlain: - Enabled: false - User: - Password: - Version:{{ end }} Debug: BroadcastTraceDir: DeliverTraceDir: diff --git a/integration/pvtdata/pvtdata_test.go b/integration/pvtdata/pvtdata_test.go index 1012ebfb17f..584766f7e85 100644 --- a/integration/pvtdata/pvtdata_test.go +++ b/integration/pvtdata/pvtdata_test.go @@ -182,7 +182,6 @@ var _ bool = Describe("PrivateData", func() { network.Bootstrap() members := grouper.Members{ - {Name: "brokers", Runner: network.BrokerGroupRunner()}, {Name: "orderers", Runner: network.OrdererGroupRunner()}, } networkRunner := grouper.NewOrdered(syscall.SIGTERM, members) diff --git a/integration/raft/migration_test.go b/integration/raft/migration_test.go index f9b7148eec9..cd7f42b6685 100644 --- a/integration/raft/migration_test.go +++ b/integration/raft/migration_test.go @@ -40,7 +40,6 @@ var _ = Describe("Solo2RaftMigration", func() { network *nwo.Network process ifrit.Process - brokerProc ifrit.Process o1Proc, o2Proc, o3Proc ifrit.Process o1Runner, o2Runner *ginkgomon.Runner @@ -68,11 +67,6 @@ var _ = Describe("Solo2RaftMigration", func() { } } - if brokerProc != nil { - brokerProc.Signal(syscall.SIGTERM) - Eventually(brokerProc.Wait(), network.EventuallyTimeout).Should(Receive()) - } - if network != nil { network.Cleanup() }