From 354c774ee036e973c5ff5eed8acbed8f92495e79 Mon Sep 17 00:00:00 2001 From: Daniel Nichter Date: Sun, 26 Jan 2020 13:30:01 -0500 Subject: [PATCH 1/2] Close streaming output channels (issue #26). Refactor NewCmd. Update blocking-streaming example. --- .travis.yml | 1 + cmd.go | 155 ++++++++++--------- cmd_test.go | 2 +- examples/blocking-streaming/main.go | 29 ++-- examples/blocking-streaming/print-some-lines | 14 ++ go.mod | 2 +- go.sum | 2 + 7 files changed, 121 insertions(+), 84 deletions(-) create mode 100755 examples/blocking-streaming/print-some-lines diff --git a/.travis.yml b/.travis.yml index a222d4d..6d4cc6b 100644 --- a/.travis.yml +++ b/.travis.yml @@ -6,6 +6,7 @@ os: - windows go: + - "1.12" - "1.13" env: diff --git a/cmd.go b/cmd.go index abb5cfb..29bd9e2 100644 --- a/cmd.go +++ b/cmd.go @@ -65,17 +65,18 @@ type Cmd struct { Stdout chan string // streaming STDOUT if enabled, else nil (see Options) Stderr chan string // streaming STDERR if enabled, else nil (see Options) *sync.Mutex - started bool // cmd.Start called, no error - stopped bool // Stop called - done bool // run() done - final bool // status finalized in Status - startTime time.Time // if started true - stdout *OutputBuffer // low-level stdout buffering and streaming - stderr *OutputBuffer // low-level stderr buffering and streaming - status Status - statusChan chan Status // nil until Start() called - doneChan chan struct{} // closed when done running - buffered bool // buffer STDOUT and STDERR to Status.Stdout and Std + started bool // cmd.Start called, no error + stopped bool // Stop called + done bool // run() done + final bool // status finalized in Status + startTime time.Time // if started true + stdoutBuf *OutputBuffer + stderrBuf *OutputBuffer + stdoutStream *OutputStream + stderrStream *OutputStream + status Status + statusChan chan Status // nil until Start() called + doneChan chan struct{} // closed when done running } // Status represents the running status and consolidated return of a Cmd. It can @@ -110,21 +111,7 @@ type Status struct { // is not started until Start is called. Output buffering is on, streaming output // is off. To control output, use NewCmdOptions instead. func NewCmd(name string, args ...string) *Cmd { - return &Cmd{ - Name: name, - Args: args, - buffered: true, - Mutex: &sync.Mutex{}, - status: Status{ - Cmd: name, - PID: 0, - Complete: false, - Exit: -1, - Error: nil, - Runtime: 0, - }, - doneChan: make(chan struct{}), - } + return NewCmdOptions(Options{Buffered: true}, name, args...) } // Options represents customizations for NewCmdOptions. @@ -144,13 +131,35 @@ type Options struct { // NewCmdOptions creates a new Cmd with options. The command is not started // until Start is called. func NewCmdOptions(options Options, name string, args ...string) *Cmd { - out := NewCmd(name, args...) - out.buffered = options.Buffered + c := &Cmd{ + Name: name, + Args: args, + Mutex: &sync.Mutex{}, + status: Status{ + Cmd: name, + PID: 0, + Complete: false, + Exit: -1, + Error: nil, + Runtime: 0, + }, + doneChan: make(chan struct{}), + } + + if options.Buffered { + c.stdoutBuf = NewOutputBuffer() + c.stderrBuf = NewOutputBuffer() + } + if options.Streaming { - out.Stdout = make(chan string, DEFAULT_STREAM_CHAN_SIZE) - out.Stderr = make(chan string, DEFAULT_STREAM_CHAN_SIZE) + c.Stdout = make(chan string, DEFAULT_STREAM_CHAN_SIZE) + c.stdoutStream = NewOutputStream(c.Stdout) + + c.Stderr = make(chan string, DEFAULT_STREAM_CHAN_SIZE) + c.stderrStream = NewOutputStream(c.Stderr) } - return out + + return c } // Clone clones a Cmd. All the options are transferred, @@ -160,8 +169,8 @@ func NewCmdOptions(options Options, name string, args ...string) *Cmd { func (c *Cmd) Clone() *Cmd { clone := NewCmdOptions( Options{ - Buffered: c.buffered, - Streaming: c.Stdout != nil, + Buffered: c.stdoutBuf != nil, + Streaming: c.stdoutStream != nil, }, c.Name, c.Args..., @@ -252,20 +261,20 @@ func (c *Cmd) Status() Status { if c.done { // No longer running if !c.final { - if c.buffered { - c.status.Stdout = c.stdout.Lines() - c.status.Stderr = c.stderr.Lines() - c.stdout = nil // release buffers - c.stderr = nil + if c.stdoutBuf != nil { + c.status.Stdout = c.stdoutBuf.Lines() + c.status.Stderr = c.stderrBuf.Lines() + c.stdoutBuf = nil // release buffers + c.stderrBuf = nil } c.final = true } } else { // Still running c.status.Runtime = time.Now().Sub(c.startTime).Seconds() - if c.buffered { - c.status.Stdout = c.stdout.Lines() - c.status.Stderr = c.stderr.Lines() + if c.stdoutBuf != nil { + c.status.Stdout = c.stdoutBuf.Lines() + c.status.Stderr = c.stderrBuf.Lines() } } @@ -295,30 +304,39 @@ func (c *Cmd) run() { // Platform-specific SysProcAttr management setProcessGroupID(cmd) - // Write stdout and stderr to buffers that are safe to read while writing - // and don't cause a race condition. - if c.buffered && c.Stdout != nil { - // Buffered and streaming, create both and combine with io.MultiWriter - c.stdout = NewOutputBuffer() - c.stderr = NewOutputBuffer() - cmd.Stdout = io.MultiWriter(NewOutputStream(c.Stdout), c.stdout) - cmd.Stderr = io.MultiWriter(NewOutputStream(c.Stderr), c.stderr) - } else if c.buffered { - // Buffered only - c.stdout = NewOutputBuffer() - c.stderr = NewOutputBuffer() - cmd.Stdout = c.stdout - cmd.Stderr = c.stderr - } else if c.Stdout != nil { - // Streaming only - cmd.Stdout = NewOutputStream(c.Stdout) - cmd.Stderr = NewOutputStream(c.Stderr) - } else { - // No output (effectively >/dev/null 2>&1) + // Set exec.Cmd.Stdout and .Stderr to our concurrent-safe stdout/stderr + // buffer, stream both, or neither + switch { + case c.stdoutBuf != nil && c.stdoutStream != nil: // buffer and stream + cmd.Stdout = io.MultiWriter(c.stdoutStream, c.stdoutBuf) + cmd.Stderr = io.MultiWriter(c.stderrStream, c.stderrBuf) + case c.stdoutBuf != nil: // buffer only + cmd.Stdout = c.stdoutBuf + cmd.Stderr = c.stderrBuf + case c.stdoutStream != nil: // stream only + cmd.Stdout = c.stdoutStream + cmd.Stderr = c.stderrStream + default: // no output (cmd >/dev/null 2>&1) cmd.Stdout = nil cmd.Stderr = nil } + // Always close output streams. Do not do this after Wait because if Start + // fails and we return without closing these, it could deadlock the caller + // who's waiting for us to close them. + if c.stdoutStream != nil { + defer func() { + // exec.Cmd.Wait has already waited for all output: + // Otherwise, during the execution of the command a separate goroutine + // reads from the process over a pipe and delivers that data to the + // corresponding Writer. In this case, Wait does not complete until the + // goroutine reaches EOF or encounters an error. + // from https://golang.org/pkg/os/exec/#Cmd + close(c.Stdout) + close(c.Stderr) + }() + } + // Set the runtime environment for the command as per os/exec.Cmd. If Env // is nil, use the current process' environment. cmd.Env = c.Env @@ -497,8 +515,8 @@ func (e ErrLineBufferOverflow) Error() string { // sent to a caller-provided channel. // // The caller must begin receiving before starting the Cmd. Write blocks on the -// channel; the caller must always read the channel. The channel is not closed -// by the OutputStream. +// channel; the caller must always read the channel. The channel is closed when +// the Cmd exits and all output has been sent. // // A Cmd in this package uses an OutputStream for both STDOUT and STDERR when // created by calling NewCmdOptions and Options.Streaming is true. To use @@ -520,16 +538,7 @@ func (e ErrLineBufferOverflow) Error() string { // // // While runnableCmd is running, lines are sent to the channel as soon as they -// are written and newline-terminated by the command. After the command finishes, -// the caller should wait for the last lines to be sent: -// -// for len(stdoutChan) > 0 { -// time.Sleep(10 * time.Millisecond) -// } -// -// Since the channel is not closed by the OutputStream, the two indications that -// all lines have been sent and received are the command finishing and the -// channel size being zero. +// are written and newline-terminated by the command. type OutputStream struct { streamChan chan string bufSize int diff --git a/cmd_test.go b/cmd_test.go index 9db1341..dab05ff 100644 --- a/cmd_test.go +++ b/cmd_test.go @@ -265,7 +265,7 @@ func TestCmdNotFound(t *testing.T) { PID: 0, Complete: false, Exit: -1, - Error: errors.New(`exec: "cmd-does-not-exist": executable file not found in $PATH`), + Error: &exec.Error{Name: "cmd-does-not-exist", Err: errors.New(`executable file not found in $PATH`)}, Runtime: 0, Stdout: nil, Stderr: nil, diff --git a/examples/blocking-streaming/main.go b/examples/blocking-streaming/main.go index d87de04..f065aa3 100644 --- a/examples/blocking-streaming/main.go +++ b/examples/blocking-streaming/main.go @@ -1,9 +1,10 @@ package main +// This example requires go-cmd v1.2 or newer + import ( "fmt" "os" - "time" "github.com/go-cmd/cmd" ) @@ -16,15 +17,27 @@ func main() { } // Create Cmd with options - envCmd := cmd.NewCmdOptions(cmdOptions, "env") + envCmd := cmd.NewCmdOptions(cmdOptions, "./print-some-lines") // Print STDOUT and STDERR lines streaming from Cmd + doneChan := make(chan struct{}) go func() { - for { + defer close(doneChan) + // Done when both channels have been closed + // https://dave.cheney.net/2013/04/30/curious-channels + for envCmd.Stdout != nil || envCmd.Stderr != nil { select { - case line := <-envCmd.Stdout: + case line, open := <-envCmd.Stdout: + if !open { + envCmd.Stdout = nil + continue + } fmt.Println(line) - case line := <-envCmd.Stderr: + case line, open := <-envCmd.Stderr: + if !open { + envCmd.Stderr = nil + continue + } fmt.Fprintln(os.Stderr, line) } } @@ -33,8 +46,6 @@ func main() { // Run and wait for Cmd to return, discard Status <-envCmd.Start() - // Cmd has finished but wait for goroutine to print all lines - for len(envCmd.Stdout) > 0 || len(envCmd.Stderr) > 0 { - time.Sleep(10 * time.Millisecond) - } + // Wait for goroutine to print everything + <-doneChan } diff --git a/examples/blocking-streaming/print-some-lines b/examples/blocking-streaming/print-some-lines new file mode 100755 index 0000000..fd3d190 --- /dev/null +++ b/examples/blocking-streaming/print-some-lines @@ -0,0 +1,14 @@ +#!/bin/bash + +# This script just prints some lines to simulate random command out +# that is streamed and printed by main.go. + +for i in 1 2 3 4 5; do + echo "Line $i" + sleep 0.2 +done + +echo "One more..." +sleep 1 + +echo "Last line" diff --git a/go.mod b/go.mod index bdb4135..74c3e31 100644 --- a/go.mod +++ b/go.mod @@ -2,4 +2,4 @@ module github.com/go-cmd/cmd go 1.13 -require github.com/go-test/deep v1.0.1 +require github.com/go-test/deep v1.0.5 diff --git a/go.sum b/go.sum index b9fa84d..e582490 100644 --- a/go.sum +++ b/go.sum @@ -1,2 +1,4 @@ github.com/go-test/deep v1.0.1 h1:UQhStjbkDClarlmv0am7OXXO4/GaPdCGiUiMTvi28sg= github.com/go-test/deep v1.0.1/go.mod h1:wGDj63lr65AM2AQyKZd/NYHGb0R+1RLqB8NKt3aSFNA= +github.com/go-test/deep v1.0.5 h1:AKODKU3pDH1RzZzm6YZu77YWtEAq6uh1rLIAQlay2qc= +github.com/go-test/deep v1.0.5/go.mod h1:QV8Hv/iy04NyLBxAdO9njL0iVPN1S4d/A3NVv1V36o8= From fac941337f65f29623964ebacd820dea8807eddd Mon Sep 17 00:00:00 2001 From: Daniel Nichter Date: Sun, 26 Jan 2020 14:14:54 -0500 Subject: [PATCH 2/2] Add CHANGELOG.md --- CHANGELOG.md | 49 +++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 49 insertions(+) create mode 100644 CHANGELOG.md diff --git a/CHANGELOG.md b/CHANGELOG.md new file mode 100644 index 0000000..87d26f4 --- /dev/null +++ b/CHANGELOG.md @@ -0,0 +1,49 @@ +# go-cmd/cmd Changelog + +## v1.2 + +### v1.2.0 (2020-01-26) + +* Changed streaming output: channels are closed after command stops (issue #26) +* Updated examples/blocking-streaming + +## v1.1 + +### v1.1.0 (2019-11-24) + +* Added support for Windows (PR #24) (@hfrappier, et al.) + +## v1.0 + +### v1.0.6 (2019-09-30) + +* Use go mod (PR #37) (@akkumar) + +### v1.0.5 (2019-07-20) + +* Fixed typo in README (PR #28) (@alokrajiv) +* Added `Cmd.Clone()` (PR #35) (@croqaz) +* Code cleanup (PR #34) (@croqaz) + +### v1.0.4 (2018-11-22) + +* Fixed no output: Buffered=false, Streaming=false +* Added `Cmd.Dir` to set `os/exec.Cmd.Dir` (PR #25) (@tkschmidt) + +### v1.0.3 (2018-05-13) + +* Added `Cmd.Env` to set `os/exec.Cmd.Env` (PR #14) (@robothor) + +### v1.0.2 (2018-04-28) + +* Changed `Running()` to `Done() <-chan struct{}` to match `Context.Done()` and support multiple waiters (PR #13) + +### v1.0.1 (2018-04-22) + +* Fixed errors in example code (PR #9) (@anshap1719) +* Added NewCmdOptions, Options, OutputBuffer, and OutputStream +* Added example code + +### v1.0.0 (2017-03-22) + +* First release.