Skip to content

Commit

Permalink
Clean up integration tests from kafka (#3521)
Browse files Browse the repository at this point in the history
Signed-off-by: Yoav Tock <tock@il.ibm.com>
Change-Id: I0e9f3afac0ba1d1c20b2f4d528ef7a4fd6be2ee0
  • Loading branch information
tock-ibm committed Jul 20, 2022
1 parent bab6e94 commit 3119076
Show file tree
Hide file tree
Showing 11 changed files with 8 additions and 742 deletions.
3 changes: 1 addition & 2 deletions integration/README.rst
Original file line number Diff line number Diff line change
Expand Up @@ -44,8 +44,7 @@ your machine.
Caveats, Gotchas, and Good-To-Knows
-----------------------------------
* The tests in this repository only exercise components that originate in the Fabric repository.
* Currently, docker is only used in the ginkgo tests when using chaincode, kafka, zookeeper,
or couchdb images.
* Currently, docker is only used in the ginkgo tests when using chaincode or couchdb images.


Getting Started
Expand Down
18 changes: 4 additions & 14 deletions integration/ledger/reset_rollback_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,7 @@ var _ = Describe("rollback, reset, pause, resume, and unjoin peer node commands"
helper.assertPausedChannel(org3peer0)

By("Checking preResetHeightFile exists for a paused channel that is also rolled back or reset")
setup.startBrokerAndOrderer()
setup.startOrderer()
preResetHeightFile := filepath.Join(setup.network.PeerLedgerDir(org3peer0), "chains/chains", helper.channelID, "__preResetHeight")
Expect(preResetHeightFile).To(BeARegularFile())

Expand All @@ -153,7 +153,7 @@ var _ = Describe("rollback, reset, pause, resume, and unjoin peer node commands"
}

By("Bringing the peers to recent height by starting the orderer")
setup.startBrokerAndOrderer()
setup.startOrderer()
for _, peer := range setup.peers {
By("Verifying endorsement is enabled and preResetHeightFile is removed on peer " + peer.ID())
helper.waitUntilEndorserEnabled(peer)
Expand Down Expand Up @@ -263,7 +263,6 @@ type setup struct {
peerProcess []ifrit.Process
orderer *nwo.Orderer
ordererProcess ifrit.Process
brokerProcess ifrit.Process
}

func initThreeOrgsSetup() *setup {
Expand Down Expand Up @@ -291,7 +290,7 @@ func initThreeOrgsSetup() *setup {
channelID: "testchannel",
}

setup.startBrokerAndOrderer()
setup.startOrderer()

setup.startPeer(peers[0])
setup.startPeer(peers[1])
Expand Down Expand Up @@ -319,10 +318,6 @@ func (s *setup) terminateAllProcess() {
Eventually(s.ordererProcess.Wait(), s.network.EventuallyTimeout).Should(Receive())
s.ordererProcess = nil

s.brokerProcess.Signal(syscall.SIGTERM)
Eventually(s.brokerProcess.Wait(), s.network.EventuallyTimeout).Should(Receive())
s.brokerProcess = nil

for _, p := range s.peerProcess {
p.Signal(syscall.SIGTERM)
Eventually(p.Wait(), s.network.EventuallyTimeout).Should(Receive())
Expand Down Expand Up @@ -351,12 +346,7 @@ func (s *setup) startPeer(peer *nwo.Peer) {
s.peerProcess = append(s.peerProcess, peerProcess)
}

func (s *setup) startBrokerAndOrderer() {
brokerRunner := s.network.BrokerGroupRunner()
brokerProcess := ifrit.Invoke(brokerRunner)
Eventually(brokerProcess.Ready(), s.network.EventuallyTimeout).Should(BeClosed())
s.brokerProcess = brokerProcess

func (s *setup) startOrderer() {
ordererRunner := s.network.OrdererGroupRunner()
ordererProcess := ifrit.Invoke(ordererRunner)
Eventually(ordererProcess.Ready(), s.network.EventuallyTimeout).Should(BeClosed())
Expand Down
4 changes: 2 additions & 2 deletions integration/ledger/snapshot_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -636,8 +636,8 @@ func initAndStartFourOrgsNetwork() *setup {
}
Expect(setup.testDir).To(Equal(setup.network.RootDir))

By("starting broker and orderer")
setup.startBrokerAndOrderer()
By("starting orderer")
setup.startOrderer()

By("starting peers")
setup.startPeers()
Expand Down
119 changes: 1 addition & 118 deletions integration/nwo/network.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ import (
"path/filepath"
"runtime"
"sort"
"strconv"
"strings"
"sync"
"syscall"
Expand Down Expand Up @@ -65,13 +64,10 @@ type Consortium struct {
Organizations []string `yaml:"organizations,omitempty"`
}

// Consensus indicates the orderer types and how many broker and zookeeper
// instances.
// Consensus indicates the orderer types.
type Consensus struct {
Type string `yaml:"type,omitempty"`
BootstrapMethod string `yaml:"bootstrap_method,omitempty"`
Brokers int `yaml:"brokers,omitempty"`
ZooKeepers int `yaml:"zookeepers,omitempty"`
ChannelParticipationEnabled bool `yaml:"channel_participation_enabled,omitempty"`
}

Expand Down Expand Up @@ -157,7 +153,6 @@ type Network struct {
TLSEnabled bool
GatewayEnabled bool

PortsByBrokerID map[string]Ports
PortsByOrdererID map[string]Ports
PortsByPeerID map[string]Ports
Organizations []*Organization
Expand Down Expand Up @@ -188,7 +183,6 @@ func New(c *Config, rootDir string, dockerClient *docker.Client, startPort int,
NetworkID: runner.UniqueName(),
EventuallyTimeout: time.Minute,
MetricsProvider: "prometheus",
PortsByBrokerID: map[string]Ports{},
PortsByOrdererID: map[string]Ports{},
PortsByPeerID: map[string]Ports{},

Expand Down Expand Up @@ -230,14 +224,6 @@ func New(c *Config, rootDir string, dockerClient *docker.Client, startPort int,
network.SessionCreateInterval = time.Second
}

for i := 0; i < network.Consensus.Brokers; i++ {
ports := Ports{}
for _, portName := range BrokerPortNames() {
ports[portName] = network.ReservePort()
}
network.PortsByBrokerID[strconv.Itoa(i)] = ports
}

for _, o := range c.Orderers {
ports := Ports{}
for _, portName := range OrdererPortNames() {
Expand Down Expand Up @@ -1245,87 +1231,6 @@ func (n *Network) Osnadmin(command Command) (*gexec.Session, error) {
return n.StartSession(cmd, command.SessionName())
}

// ZooKeeperRunner returns a runner for a ZooKeeper instance.
func (n *Network) ZooKeeperRunner(idx int) *runner.ZooKeeper {
colorCode := n.nextColor()
name := fmt.Sprintf("zookeeper-%d-%s", idx, n.NetworkID)

return &runner.ZooKeeper{
ZooMyID: idx + 1, // IDs must be between 1 and 255
Client: n.DockerClient,
Name: name,
NetworkName: n.NetworkID,
OutputStream: gexec.NewPrefixedWriter(
fmt.Sprintf("\x1b[32m[o]\x1b[%s[%s]\x1b[0m ", colorCode, name),
ginkgo.GinkgoWriter,
),
ErrorStream: gexec.NewPrefixedWriter(
fmt.Sprintf("\x1b[91m[e]\x1b[%s[%s]\x1b[0m ", colorCode, name),
ginkgo.GinkgoWriter,
),
}
}

func (n *Network) minBrokersInSync() int {
if n.Consensus.Brokers < 2 {
return n.Consensus.Brokers
}
return 2
}

func (n *Network) defaultBrokerReplication() int {
if n.Consensus.Brokers < 3 {
return n.Consensus.Brokers
}
return 3
}

// BrokerRunner returns a runner for a Kafka broker instance.
func (n *Network) BrokerRunner(id int, zookeepers []string) *runner.Kafka {
colorCode := n.nextColor()
name := fmt.Sprintf("kafka-%d-%s", id, n.NetworkID)

return &runner.Kafka{
BrokerID: id + 1,
Client: n.DockerClient,
AdvertisedListeners: "127.0.0.1",
HostPort: int(n.PortsByBrokerID[strconv.Itoa(id)][HostPort]),
Name: name,
NetworkName: n.NetworkID,
MinInsyncReplicas: n.minBrokersInSync(),
DefaultReplicationFactor: n.defaultBrokerReplication(),
ZooKeeperConnect: strings.Join(zookeepers, ","),
OutputStream: gexec.NewPrefixedWriter(
fmt.Sprintf("\x1b[32m[o]\x1b[%s[%s]\x1b[0m ", colorCode, name),
ginkgo.GinkgoWriter,
),
ErrorStream: gexec.NewPrefixedWriter(
fmt.Sprintf("\x1b[91m[e]\x1b[%s[%s]\x1b[0m ", colorCode, name),
ginkgo.GinkgoWriter,
),
}
}

// BrokerGroupRunner returns a runner that manages the processes that make up
// the Kafka broker network for fabric.
func (n *Network) BrokerGroupRunner() ifrit.Runner {
members := grouper.Members{}
zookeepers := []string{}

for i := 0; i < n.Consensus.ZooKeepers; i++ {
zk := n.ZooKeeperRunner(i)
zookeepers = append(zookeepers, fmt.Sprintf("%s:2181", zk.Name))
members = append(members, grouper.Member{Name: zk.Name, Runner: zk})
}

for i := 0; i < n.Consensus.Brokers; i++ {
kafka := n.BrokerRunner(i, zookeepers)
members = append(members, grouper.Member{Name: kafka.Name, Runner: kafka})
}

return grouper.NewOrdered(syscall.SIGTERM, members)
}

// OrdererRunner returns an ifrit.Runner for the specified orderer. The runner
// can be used to start and manage an orderer process.
func (n *Network) OrdererRunner(o *Orderer, env ...string) *ginkgomon.Runner {
Expand All @@ -1342,12 +1247,6 @@ func (n *Network) OrdererRunner(o *Orderer, env ...string) *ginkgomon.Runner {
StartCheckTimeout: 15 * time.Second,
}

// After consensus-type migration, the #brokers is >0, but the type is etcdraft
if n.Consensus.Type == "kafka" && n.Consensus.Brokers != 0 {
config.StartCheck = "Start phase completed successfully"
config.StartCheckTimeout = 30 * time.Second
}

return ginkgomon.New(config)
}

Expand Down Expand Up @@ -1396,7 +1295,6 @@ func (n *Network) PeerGroupRunner() ifrit.Runner {
// entire fabric network.
func (n *Network) NetworkGroupRunner() ifrit.Runner {
members := grouper.Members{
{Name: "brokers", Runner: n.BrokerGroupRunner()},
{Name: "orderers", Runner: n.OrdererGroupRunner()},
{Name: "peers", Runner: n.PeerGroupRunner()},
}
Expand Down Expand Up @@ -1786,21 +1684,6 @@ func OrdererPortNames() []PortName {
return []PortName{ListenPort, ProfilePort, OperationsPort, ClusterPort, AdminPort}
}

// BrokerPortNames returns the list of ports that need to be reserved for a
// Kafka broker.
func BrokerPortNames() []PortName {
return []PortName{HostPort}
}

// BrokerAddresses returns the list of broker addresses for the network.
func (n *Network) BrokerAddresses(portName PortName) []string {
addresses := []string{}
for _, ports := range n.PortsByBrokerID {
addresses = append(addresses, fmt.Sprintf("127.0.0.1:%d", ports[portName]))
}
return addresses
}

// OrdererAddress returns the address (host and port) exposed by the Orderer
// for the named port. Command line tools should use the returned address when
// connecting to the orderer.
Expand Down

0 comments on commit 3119076

Please sign in to comment.