Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Orderer v3: Kafka integration tests cleanup #3521

Merged
merged 1 commit into from
Jul 20, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
3 changes: 1 addition & 2 deletions integration/README.rst
Original file line number Diff line number Diff line change
Expand Up @@ -44,8 +44,7 @@ your machine.
Caveats, Gotchas, and Good-To-Knows
-----------------------------------
* The tests in this repository only exercise components that originate in the Fabric repository.
* Currently, docker is only used in the ginkgo tests when using chaincode, kafka, zookeeper,
or couchdb images.
* Currently, docker is only used in the ginkgo tests when using chaincode or couchdb images.


Getting Started
Expand Down
18 changes: 4 additions & 14 deletions integration/ledger/reset_rollback_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,7 @@ var _ = Describe("rollback, reset, pause, resume, and unjoin peer node commands"
helper.assertPausedChannel(org3peer0)

By("Checking preResetHeightFile exists for a paused channel that is also rolled back or reset")
setup.startBrokerAndOrderer()
setup.startOrderer()
preResetHeightFile := filepath.Join(setup.network.PeerLedgerDir(org3peer0), "chains/chains", helper.channelID, "__preResetHeight")
Expect(preResetHeightFile).To(BeARegularFile())

Expand All @@ -153,7 +153,7 @@ var _ = Describe("rollback, reset, pause, resume, and unjoin peer node commands"
}

By("Bringing the peers to recent height by starting the orderer")
setup.startBrokerAndOrderer()
setup.startOrderer()
for _, peer := range setup.peers {
By("Verifying endorsement is enabled and preResetHeightFile is removed on peer " + peer.ID())
helper.waitUntilEndorserEnabled(peer)
Expand Down Expand Up @@ -263,7 +263,6 @@ type setup struct {
peerProcess []ifrit.Process
orderer *nwo.Orderer
ordererProcess ifrit.Process
brokerProcess ifrit.Process
}

func initThreeOrgsSetup() *setup {
Expand Down Expand Up @@ -291,7 +290,7 @@ func initThreeOrgsSetup() *setup {
channelID: "testchannel",
}

setup.startBrokerAndOrderer()
setup.startOrderer()

setup.startPeer(peers[0])
setup.startPeer(peers[1])
Expand Down Expand Up @@ -319,10 +318,6 @@ func (s *setup) terminateAllProcess() {
Eventually(s.ordererProcess.Wait(), s.network.EventuallyTimeout).Should(Receive())
s.ordererProcess = nil

s.brokerProcess.Signal(syscall.SIGTERM)
Eventually(s.brokerProcess.Wait(), s.network.EventuallyTimeout).Should(Receive())
s.brokerProcess = nil

for _, p := range s.peerProcess {
p.Signal(syscall.SIGTERM)
Eventually(p.Wait(), s.network.EventuallyTimeout).Should(Receive())
Expand Down Expand Up @@ -351,12 +346,7 @@ func (s *setup) startPeer(peer *nwo.Peer) {
s.peerProcess = append(s.peerProcess, peerProcess)
}

func (s *setup) startBrokerAndOrderer() {
brokerRunner := s.network.BrokerGroupRunner()
brokerProcess := ifrit.Invoke(brokerRunner)
Eventually(brokerProcess.Ready(), s.network.EventuallyTimeout).Should(BeClosed())
s.brokerProcess = brokerProcess

func (s *setup) startOrderer() {
ordererRunner := s.network.OrdererGroupRunner()
ordererProcess := ifrit.Invoke(ordererRunner)
Eventually(ordererProcess.Ready(), s.network.EventuallyTimeout).Should(BeClosed())
Expand Down
4 changes: 2 additions & 2 deletions integration/ledger/snapshot_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -636,8 +636,8 @@ func initAndStartFourOrgsNetwork() *setup {
}
Expect(setup.testDir).To(Equal(setup.network.RootDir))

By("starting broker and orderer")
setup.startBrokerAndOrderer()
By("starting orderer")
setup.startOrderer()

By("starting peers")
setup.startPeers()
Expand Down
119 changes: 1 addition & 118 deletions integration/nwo/network.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ import (
"path/filepath"
"runtime"
"sort"
"strconv"
"strings"
"sync"
"syscall"
Expand Down Expand Up @@ -65,13 +64,10 @@ type Consortium struct {
Organizations []string `yaml:"organizations,omitempty"`
}

// Consensus indicates the orderer types and how many broker and zookeeper
// instances.
// Consensus indicates the orderer types.
type Consensus struct {
Type string `yaml:"type,omitempty"`
BootstrapMethod string `yaml:"bootstrap_method,omitempty"`
Brokers int `yaml:"brokers,omitempty"`
ZooKeepers int `yaml:"zookeepers,omitempty"`
ChannelParticipationEnabled bool `yaml:"channel_participation_enabled,omitempty"`
}

Expand Down Expand Up @@ -157,7 +153,6 @@ type Network struct {
TLSEnabled bool
GatewayEnabled bool

PortsByBrokerID map[string]Ports
PortsByOrdererID map[string]Ports
PortsByPeerID map[string]Ports
Organizations []*Organization
Expand Down Expand Up @@ -188,7 +183,6 @@ func New(c *Config, rootDir string, dockerClient *docker.Client, startPort int,
NetworkID: runner.UniqueName(),
EventuallyTimeout: time.Minute,
MetricsProvider: "prometheus",
PortsByBrokerID: map[string]Ports{},
PortsByOrdererID: map[string]Ports{},
PortsByPeerID: map[string]Ports{},

Expand Down Expand Up @@ -230,14 +224,6 @@ func New(c *Config, rootDir string, dockerClient *docker.Client, startPort int,
network.SessionCreateInterval = time.Second
}

for i := 0; i < network.Consensus.Brokers; i++ {
ports := Ports{}
for _, portName := range BrokerPortNames() {
ports[portName] = network.ReservePort()
}
network.PortsByBrokerID[strconv.Itoa(i)] = ports
}

for _, o := range c.Orderers {
ports := Ports{}
for _, portName := range OrdererPortNames() {
Expand Down Expand Up @@ -1245,87 +1231,6 @@ func (n *Network) Osnadmin(command Command) (*gexec.Session, error) {
return n.StartSession(cmd, command.SessionName())
}

// ZooKeeperRunner returns a runner for a ZooKeeper instance.
func (n *Network) ZooKeeperRunner(idx int) *runner.ZooKeeper {
colorCode := n.nextColor()
name := fmt.Sprintf("zookeeper-%d-%s", idx, n.NetworkID)

return &runner.ZooKeeper{
ZooMyID: idx + 1, // IDs must be between 1 and 255
Client: n.DockerClient,
Name: name,
NetworkName: n.NetworkID,
OutputStream: gexec.NewPrefixedWriter(
fmt.Sprintf("\x1b[32m[o]\x1b[%s[%s]\x1b[0m ", colorCode, name),
ginkgo.GinkgoWriter,
),
ErrorStream: gexec.NewPrefixedWriter(
fmt.Sprintf("\x1b[91m[e]\x1b[%s[%s]\x1b[0m ", colorCode, name),
ginkgo.GinkgoWriter,
),
}
}

func (n *Network) minBrokersInSync() int {
if n.Consensus.Brokers < 2 {
return n.Consensus.Brokers
}
return 2
}

func (n *Network) defaultBrokerReplication() int {
if n.Consensus.Brokers < 3 {
return n.Consensus.Brokers
}
return 3
}

// BrokerRunner returns a runner for a Kafka broker instance.
func (n *Network) BrokerRunner(id int, zookeepers []string) *runner.Kafka {
colorCode := n.nextColor()
name := fmt.Sprintf("kafka-%d-%s", id, n.NetworkID)

return &runner.Kafka{
BrokerID: id + 1,
Client: n.DockerClient,
AdvertisedListeners: "127.0.0.1",
HostPort: int(n.PortsByBrokerID[strconv.Itoa(id)][HostPort]),
Name: name,
NetworkName: n.NetworkID,
MinInsyncReplicas: n.minBrokersInSync(),
DefaultReplicationFactor: n.defaultBrokerReplication(),
ZooKeeperConnect: strings.Join(zookeepers, ","),
OutputStream: gexec.NewPrefixedWriter(
fmt.Sprintf("\x1b[32m[o]\x1b[%s[%s]\x1b[0m ", colorCode, name),
ginkgo.GinkgoWriter,
),
ErrorStream: gexec.NewPrefixedWriter(
fmt.Sprintf("\x1b[91m[e]\x1b[%s[%s]\x1b[0m ", colorCode, name),
ginkgo.GinkgoWriter,
),
}
}

// BrokerGroupRunner returns a runner that manages the processes that make up
// the Kafka broker network for fabric.
func (n *Network) BrokerGroupRunner() ifrit.Runner {
members := grouper.Members{}
zookeepers := []string{}

for i := 0; i < n.Consensus.ZooKeepers; i++ {
zk := n.ZooKeeperRunner(i)
zookeepers = append(zookeepers, fmt.Sprintf("%s:2181", zk.Name))
members = append(members, grouper.Member{Name: zk.Name, Runner: zk})
}

for i := 0; i < n.Consensus.Brokers; i++ {
kafka := n.BrokerRunner(i, zookeepers)
members = append(members, grouper.Member{Name: kafka.Name, Runner: kafka})
}

return grouper.NewOrdered(syscall.SIGTERM, members)
}

// OrdererRunner returns an ifrit.Runner for the specified orderer. The runner
// can be used to start and manage an orderer process.
func (n *Network) OrdererRunner(o *Orderer, env ...string) *ginkgomon.Runner {
Expand All @@ -1342,12 +1247,6 @@ func (n *Network) OrdererRunner(o *Orderer, env ...string) *ginkgomon.Runner {
StartCheckTimeout: 15 * time.Second,
}

// After consensus-type migration, the #brokers is >0, but the type is etcdraft
if n.Consensus.Type == "kafka" && n.Consensus.Brokers != 0 {
config.StartCheck = "Start phase completed successfully"
config.StartCheckTimeout = 30 * time.Second
}

return ginkgomon.New(config)
}

Expand Down Expand Up @@ -1396,7 +1295,6 @@ func (n *Network) PeerGroupRunner() ifrit.Runner {
// entire fabric network.
func (n *Network) NetworkGroupRunner() ifrit.Runner {
members := grouper.Members{
{Name: "brokers", Runner: n.BrokerGroupRunner()},
{Name: "orderers", Runner: n.OrdererGroupRunner()},
{Name: "peers", Runner: n.PeerGroupRunner()},
}
Expand Down Expand Up @@ -1786,21 +1684,6 @@ func OrdererPortNames() []PortName {
return []PortName{ListenPort, ProfilePort, OperationsPort, ClusterPort, AdminPort}
}

// BrokerPortNames returns the list of ports that need to be reserved for a
// Kafka broker.
func BrokerPortNames() []PortName {
return []PortName{HostPort}
}

// BrokerAddresses returns the list of broker addresses for the network.
func (n *Network) BrokerAddresses(portName PortName) []string {
addresses := []string{}
for _, ports := range n.PortsByBrokerID {
addresses = append(addresses, fmt.Sprintf("127.0.0.1:%d", ports[portName]))
}
return addresses
}

// OrdererAddress returns the address (host and port) exposed by the Orderer
// for the named port. Command line tools should use the returned address when
// connecting to the orderer.
Expand Down