Skip to content

Commit

Permalink
FAB-12372 Collect info on go routines
Browse files Browse the repository at this point in the history
Collect information about all of the currently executing go routines
within a process by handling SIGUSR1. SIGUSR1 will not terminate the
process.

Change-Id: Ide5a26a8f1ee9bae93d3c6ba4b0b178661cbfd6c
Signed-off-by: Saad Karim <skarim@us.ibm.com>
Signed-off-by: Matthew Sykes <sykesmat@us.ibm.com>
  • Loading branch information
Saad Karim authored and sykesm committed Nov 16, 2018
1 parent ac63a6f commit db46110
Show file tree
Hide file tree
Showing 5 changed files with 207 additions and 8 deletions.
36 changes: 36 additions & 0 deletions common/diag/goroutine.go
@@ -0,0 +1,36 @@
/*
Copyright IBM Corp All Rights Reserved.
SPDX-License-Identifier: Apache-2.0
*/

package diag

import (
"bytes"
"runtime/pprof"
)

type Logger interface {
Infof(template string, args ...interface{})
Errorf(template string, args ...interface{})
}

func CaptureGoRoutines() (string, error) {
var buf bytes.Buffer
err := pprof.Lookup("goroutine").WriteTo(&buf, 2)
if err != nil {
return "", err
}
return buf.String(), nil
}

func LogGoRoutines(logger Logger) {
output, err := CaptureGoRoutines()
if err != nil {
logger.Errorf("failed to capture go routines: %s", err)
return
}

logger.Infof("Go routines report:\n%s", output)
}
33 changes: 33 additions & 0 deletions common/diag/goroutine_test.go
@@ -0,0 +1,33 @@
/*
Copyright IBM Corp All Rights Reserved.
SPDX-License-Identifier: Apache-2.0
*/

package diag_test

import (
"testing"

"github.com/hyperledger/fabric/common/diag"
"github.com/hyperledger/fabric/common/flogging/floggingtest"
. "github.com/onsi/gomega"
"github.com/onsi/gomega/gbytes"
)

func TestCaptureGoRoutines(t *testing.T) {
gt := NewGomegaWithT(t)
output, err := diag.CaptureGoRoutines()
gt.Expect(err).NotTo(HaveOccurred())

gt.Expect(output).To(MatchRegexp(`goroutine \d+ \[running\]:`))
gt.Expect(output).To(ContainSubstring("github.com/hyperledger/fabric/common/diag.CaptureGoRoutines"))
}

func TestLogGoRoutines(t *testing.T) {
gt := NewGomegaWithT(t)
logger, recorder := floggingtest.NewTestLogger(t, floggingtest.Named("goroutine"))
diag.LogGoRoutines(logger)

gt.Expect(recorder).To(gbytes.Say(`goroutine \d+ \[running\]:`))
}
94 changes: 94 additions & 0 deletions integration/e2e/e2e_signal_test.go
@@ -0,0 +1,94 @@
/*
Copyright IBM Corp All Rights Reserved.
SPDX-License-Identifier: Apache-2.0
*/

package e2e

import (
"io/ioutil"
"os"
"syscall"

docker "github.com/fsouza/go-dockerclient"
"github.com/hyperledger/fabric/integration/nwo"
. "github.com/onsi/ginkgo"
. "github.com/onsi/gomega"
"github.com/onsi/gomega/gbytes"
"github.com/tedsuo/ifrit"
"github.com/tedsuo/ifrit/ginkgomon"
)

var _ = Describe("SignalHandling", func() {
var (
testDir string
client *docker.Client
network *nwo.Network

peerRunner, ordererRunner *ginkgomon.Runner
peerProcess, ordererProcess ifrit.Process
)

BeforeEach(func() {
var err error
testDir, err = ioutil.TempDir("", "e2e-sigs")
Expect(err).NotTo(HaveOccurred())

client, err = docker.NewClientFromEnv()
Expect(err).NotTo(HaveOccurred())

network = nwo.New(nwo.BasicSolo(), testDir, client, BasePort(), components)
network.GenerateConfigTree()
network.Bootstrap()

ordererRunner = network.OrdererRunner(network.Orderers[0])
ordererProcess = ifrit.Invoke(ordererRunner)
Eventually(ordererProcess.Ready(), network.EventuallyTimeout).Should(BeClosed())

peerRunner = network.PeerRunner(network.Peers[0])
peerProcess = ifrit.Invoke(peerRunner)
Eventually(peerProcess.Ready(), network.EventuallyTimeout).Should(BeClosed())
})

AfterEach(func() {
if peerProcess != nil {
peerProcess.Signal(syscall.SIGKILL)
}
if ordererProcess != nil {
ordererProcess.Signal(syscall.SIGKILL)
}
if network != nil {
network.Cleanup()
}
os.RemoveAll(testDir)
})

It("handles signals", func() {
By("verifying SIGUSR1 to the peer dumps go routines")
peerProcess.Signal(syscall.SIGUSR1)
Eventually(peerRunner.Err()).Should(gbytes.Say("Received signal: "))
Eventually(peerRunner.Err()).Should(gbytes.Say(`Go routines report`))

By("verifying SIGUSR1 to the orderer dumps go routines")
ordererProcess.Signal(syscall.SIGUSR1)
Eventually(ordererRunner.Err()).Should(gbytes.Say("Received signal: "))
Eventually(ordererRunner.Err()).Should(gbytes.Say(`Go routines report`))

By("verifying SIGUSR1 does not terminate processes")
Consistently(peerProcess.Wait()).ShouldNot(Receive())
Consistently(ordererProcess.Wait()).ShouldNot(Receive())

By("verifying SIGTERM to the peer stops the process")
peerProcess.Signal(syscall.SIGTERM)
Eventually(peerRunner.Err()).Should(gbytes.Say("Received signal: "))
Eventually(peerProcess.Wait()).Should(Receive())
peerProcess = nil

By("verifying SIGTERM to the orderer stops the process")
ordererProcess.Signal(syscall.SIGTERM)
Eventually(ordererRunner.Err()).Should(gbytes.Say("Received signal: "))
Eventually(ordererProcess.Wait()).Should(Receive())
ordererProcess = nil
})
})
22 changes: 22 additions & 0 deletions orderer/common/server/main.go
Expand Up @@ -14,11 +14,14 @@ import (
"net/http"
_ "net/http/pprof" // This is essentially the main package for the orderer
"os"
"os/signal"
"syscall"
"time"

"github.com/golang/protobuf/proto"
"github.com/hyperledger/fabric/common/channelconfig"
"github.com/hyperledger/fabric/common/crypto"
"github.com/hyperledger/fabric/common/diag"
"github.com/hyperledger/fabric/common/flogging"
"github.com/hyperledger/fabric/common/grpclogging"
"github.com/hyperledger/fabric/common/grpcmetrics"
Expand Down Expand Up @@ -129,6 +132,10 @@ func Start(cmd string, conf *localconfig.TopLevel) {
switch cmd {
case start.FullCommand(): // "start" command
logger.Infof("Starting %s", metadata.GetVersionInfo())
go handleSignals(map[os.Signal]func(){
syscall.SIGTERM: func() { grpcServer.Stop() },
syscall.SIGUSR1: func() { diag.LogGoRoutines(logger.Named("diag")) },
})
initializeProfilingService(conf)
ab.RegisterAtomicBroadcastServer(grpcServer.Server(), server)
logger.Info("Beginning to serve requests")
Expand Down Expand Up @@ -162,6 +169,21 @@ func initializeProfilingService(conf *localconfig.TopLevel) {
}
}

func handleSignals(handlers map[os.Signal]func()) {
var signals []os.Signal
for sig := range handlers {
signals = append(signals, sig)
}

signalChan := make(chan os.Signal, 1)
signal.Notify(signalChan, signals...)

for sig := range signalChan {
logger.Infof("Received signal: %d (%s)", sig, sig)
handlers[sig]()
}
}

func initializeClusterConfig(conf *localconfig.TopLevel) comm.ClientConfig {
cc := comm.ClientConfig{
AsyncConnect: true,
Expand Down
30 changes: 22 additions & 8 deletions peer/node/start.go
Expand Up @@ -20,6 +20,7 @@ import (
ccdef "github.com/hyperledger/fabric/common/chaincode"
"github.com/hyperledger/fabric/common/crypto/tlsgen"
"github.com/hyperledger/fabric/common/deliver"
"github.com/hyperledger/fabric/common/diag"
"github.com/hyperledger/fabric/common/flogging"
"github.com/hyperledger/fabric/common/grpclogging"
"github.com/hyperledger/fabric/common/grpcmetrics"
Expand Down Expand Up @@ -357,14 +358,6 @@ func serve(args []string) error {
// genesis block if needed.
serve := make(chan error)

sigs := make(chan os.Signal, 1)
signal.Notify(sigs, syscall.SIGINT, syscall.SIGTERM)
go func() {
sig := <-sigs
logger.Debugf("sig: %s", sig)
serve <- nil
}()

go func() {
var grpcErr error
if grpcErr = peerServer.Start(); grpcErr != nil {
Expand All @@ -385,12 +378,33 @@ func serve(args []string) error {
}()
}

go handleSignals(map[os.Signal]func(){
syscall.SIGUSR1: func() { diag.LogGoRoutines(logger.Named("diag")) },
syscall.SIGINT: func() { serve <- nil },
syscall.SIGTERM: func() { serve <- nil },
})

logger.Infof("Started peer with ID=[%s], network ID=[%s], address=[%s]", peerEndpoint.Id, networkID, peerEndpoint.Address)

// Block until grpc server exits
return <-serve
}

func handleSignals(handlers map[os.Signal]func()) {
var signals []os.Signal
for sig := range handlers {
signals = append(signals, sig)
}

signalChan := make(chan os.Signal, 1)
signal.Notify(signalChan, signals...)

for sig := range signalChan {
logger.Infof("Received signal: %d (%s)", sig, sig)
handlers[sig]()
}
}

func localPolicy(policyObject proto.Message) policies.Policy {
localMSP := mgmt.GetLocalMSP()
pp := cauthdsl.NewPolicyProvider(localMSP)
Expand Down

0 comments on commit db46110

Please sign in to comment.