Skip to content

Commit

Permalink
[FAB-13239] terminate container streaming output loop
Browse files Browse the repository at this point in the history
When vm.docker.attachStdout is enabled, output from chaincode containers
is written to the peer log. A recent change went in that broke the error
handling behavior and resulted in a tight loop of reading from a closed
reader and issuing an error message. This change fixes the error
handling and back-fills test.

Change-Id: Icda853dba90b873f2fbddde990c5f61f7834f1c9
Signed-off-by: Matthew Sykes <sykesmat@us.ibm.com>
  • Loading branch information
sykesm committed Dec 12, 2018
1 parent e485f77 commit 0b7aefd
Show file tree
Hide file tree
Showing 2 changed files with 49 additions and 8 deletions.
18 changes: 10 additions & 8 deletions core/container/dockercontroller/dockercontroller.go
Original file line number Diff line number Diff line change
Expand Up @@ -264,7 +264,8 @@ func (vm *DockerVM) Start(ccid ccintf.CCID, args, env []string, filesToUpload ma

// stream stdout and stderr to chaincode logger
if attachStdout {
go vm.streamOutput(client, containerName)
containerLogger := flogging.MustGetLogger("peer.chaincode." + containerName)
streamOutput(dockerLogger, client, containerName, containerLogger)
}

// upload specified files to the container before starting it
Expand Down Expand Up @@ -309,7 +310,7 @@ func (vm *DockerVM) Start(ccid ccintf.CCID, args, env []string, filesToUpload ma
}

// streamOutput mirrors output from the named container to a fabric logger.
func (vm *DockerVM) streamOutput(client dockerClient, containerName string) {
func streamOutput(logger *flogging.FabricLogger, client dockerClient, containerName string, containerLogger *flogging.FabricLogger) {
// Launch a few go routines to manage output streams from the container.
// They will be automatically destroyed when the container exits
attached := make(chan struct{})
Expand Down Expand Up @@ -337,19 +338,18 @@ func (vm *DockerVM) streamOutput(client dockerClient, containerName string) {
}()

go func() {
defer r.Close() // ensure the pipe reader gets closed

// Block here until the attachment completes or we timeout
select {
case <-attached: // successful attach
close(attached) // close indicates the streams can now be copied

case <-time.After(10 * time.Second):
dockerLogger.Errorf("Timeout while attaching to IO channel in container %s", containerName)
logger.Errorf("Timeout while attaching to IO channel in container %s", containerName)
return
}

// create a logger for the chaincode
containerLogger := flogging.MustGetLogger("peer.chaincode." + containerName)

is := bufio.NewReader(r)
for {
// Loop forever dumping lines of text into the containerLogger
Expand All @@ -359,9 +359,11 @@ func (vm *DockerVM) streamOutput(client dockerClient, containerName string) {
case nil:
containerLogger.Info(line)
case io.EOF:
dockerLogger.Infof("Container %s has closed its IO channel", containerName)
logger.Infof("Container %s has closed its IO channel", containerName)
return
default:
dockerLogger.Errorf("Error reading container output: %s", err)
logger.Errorf("Error reading container output: %s", err)
return
}
}
}()
Expand Down
39 changes: 39 additions & 0 deletions core/container/dockercontroller/dockercontroller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"time"

docker "github.com/fsouza/go-dockerclient"
"github.com/hyperledger/fabric/common/flogging/floggingtest"
"github.com/hyperledger/fabric/common/metrics/disabled"
"github.com/hyperledger/fabric/common/metrics/metricsfakes"
"github.com/hyperledger/fabric/common/util"
Expand All @@ -28,6 +29,7 @@ import (
coreutil "github.com/hyperledger/fabric/core/testutil"
pb "github.com/hyperledger/fabric/protos/peer"
. "github.com/onsi/gomega"
"github.com/onsi/gomega/gbytes"
"github.com/spf13/viper"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
Expand Down Expand Up @@ -179,6 +181,38 @@ func Test_Start(t *testing.T) {
gt.Expect(err).NotTo(HaveOccurred())
}

func Test_streamOutput(t *testing.T) {
gt := NewGomegaWithT(t)

logger, recorder := floggingtest.NewTestLogger(t)
containerLogger, containerRecorder := floggingtest.NewTestLogger(t)

client := &mockClient{}
errCh := make(chan error, 1)
optsCh := make(chan docker.AttachToContainerOptions, 1)
client.attachToContainerStub = func(opts docker.AttachToContainerOptions) error {
optsCh <- opts
return <-errCh
}

streamOutput(logger, client, "container-name", containerLogger)

var opts docker.AttachToContainerOptions
gt.Eventually(optsCh).Should(Receive(&opts))
gt.Eventually(opts.Success).Should(BeSent(struct{}{}))
gt.Eventually(opts.Success).Should(BeClosed())

fmt.Fprintf(opts.OutputStream, "message-one\n")
fmt.Fprintf(opts.OutputStream, "message-two") // does not get written
gt.Eventually(containerRecorder).Should(gbytes.Say("message-one"))
gt.Consistently(containerRecorder.Entries).Should(HaveLen(1))

close(errCh)
gt.Eventually(recorder).Should(gbytes.Say("Container container-name has closed its IO channel"))
gt.Consistently(recorder.Entries).Should(HaveLen(1))
gt.Consistently(containerRecorder.Entries).Should(HaveLen(1))
}

func Test_BuildMetric(t *testing.T) {
ccid := ccintf.CCID{Name: "simple", Version: "1.0"}
client := &mockClient{}
Expand Down Expand Up @@ -373,6 +407,8 @@ func (m *mockBuilder) Build() (io.Reader, error) {
type mockClient struct {
noSuchImgErrReturned bool
pingErr bool

attachToContainerStub func(docker.AttachToContainerOptions) error
}

var getClientErr, createErr, uploadErr, noSuchImgErr, buildErr, removeImgErr,
Expand Down Expand Up @@ -404,6 +440,9 @@ func (c *mockClient) UploadToContainer(id string, opts docker.UploadToContainerO
}

func (c *mockClient) AttachToContainer(opts docker.AttachToContainerOptions) error {
if c.attachToContainerStub != nil {
return c.attachToContainerStub(opts)
}
if opts.Success != nil {
opts.Success <- struct{}{}
}
Expand Down

0 comments on commit 0b7aefd

Please sign in to comment.