diff --git a/.travis.yml b/.travis.yml index 81019f2..c3edce6 100644 --- a/.travis.yml +++ b/.travis.yml @@ -1,11 +1,10 @@ language: go +sudo: false go: - 1.7 - 1.8 - + - 1.9 before_install: - go get github.com/mattn/goveralls - - go get golang.org/x/tools/cover - script: - - $HOME/gopath/bin/goveralls -service=travis-ci + - $GOPATH/bin/goveralls -service=travis-ci -race -package github.com/go-cmd/cmd -show diff --git a/README.md b/README.md index cc59efa..538c2b3 100644 --- a/README.md +++ b/README.md @@ -2,14 +2,14 @@ [![Go Report Card](https://goreportcard.com/badge/github.com/go-cmd/cmd)](https://goreportcard.com/report/github.com/go-cmd/cmd) [![Build Status](https://travis-ci.org/go-cmd/cmd.svg?branch=master)](https://travis-ci.org/go-cmd/cmd) [![Coverage Status](https://coveralls.io/repos/github/go-cmd/cmd/badge.svg?branch=master)](https://coveralls.io/github/go-cmd/cmd?branch=master) [![GoDoc](https://godoc.org/github.com/go-cmd/cmd?status.svg)](https://godoc.org/github.com/go-cmd/cmd) -This package is a small but very useful wrapper around [os/exec.Cmd](https://golang.org/pkg/os/exec/#Cmd) that makes it dead simple and safe to run commands in asynchronous, highly concurrent, real-time applications. Here's the look and feel: +This package is a small but very useful wrapper around [os/exec.Cmd](https://golang.org/pkg/os/exec/#Cmd) that makes it safe and simple to run external commands in highly concurrent, asynchronous, real-time applications. Here's the look and feel: ```go import "github.com/go-cmd/cmd" // Start a long-running process, capture stdout and stderr -c := cmd.NewCmd("find", "/", "--name" "needle") -statusChan := c.Start() +findCmd := cmd.NewCmd("find", "/", "--name" "needle") +statusChan := findCmd.Start() // non-blocking ticker := time.NewTicker(2 * time.Second) @@ -25,13 +25,13 @@ go func() { // Stop command after 1 hour go func() { <-time.After(1 * time.Hour) - c.Stop() + findCmd.Stop() }() // Check if command is done select { case finalStatus := <-statusChan: - // yes! + // done default: // no, still running } @@ -40,7 +40,7 @@ default: finalStatus := <-statusChan ``` -That's it, only three methods: `Start`, `Stop`, and `Status`. Although free, here are the selling points of `go-cmd/Cmd`: +That's it, only three methods: `Start`, `Stop`, and `Status`. When possible, it's better to use `go-cmd/Cmd` than `os/exec.Cmd` because `go-cmd/Cmd` provides: 1. Channel-based fire and forget 1. Real-time stdout and stderr @@ -58,13 +58,13 @@ As the example above shows, starting a command immediately returns a channel to c := cmd.NewCmd("foo") s := <-c.Start() ``` -To achieve similar with Go built-in `Cmd` requires everything this package already does. +To achieve similar with `os/exec.Cmd` requires everything this package already does. ### Real-time stdout and stderr -It's common to want to read stdout or stderr _while_ the command is running. The common approach is to call [StdoutPipe](https://golang.org/pkg/os/exec/#Cmd.StdoutPipe) and read from the provided `io.ReadCloser`. This works but it causes a race condition (that `go test -race` detects) and the docs say not to do it: "it is incorrect to call Wait before all reads from the pipe have completed". +It's common to want to read stdout or stderr _while_ the command is running. The common approach is to call [StdoutPipe](https://golang.org/pkg/os/exec/#Cmd.StdoutPipe) and read from the provided `io.ReadCloser`. This works but it's wrong because it causes a race condition (that `go test -race` detects) and the docs say it's wrong: "it is incorrect to call Wait before all reads from the pipe have completed. [...] it is incorrect to call Run when using StdoutPipe". -The proper solution is to set the `io.Writer` of `Stdout`. To be thread-safe and non-racey, this requires further work to write while possibly N-many goroutines read. `go-cmd/Cmd` has already done this work. +The proper solution is to set the `io.Writer` of `Stdout`. To be thread-safe and non-racey, this requires further work to write while possibly N-many goroutines read. `go-cmd/Cmd` has done this work. ### Real-time status @@ -88,11 +88,11 @@ Speaking of that struct above, Go built-in `Cmd` does not put all the return inf ### Proper process termination -It's been said that everyone love's a good mystery. Then here's one: _process group ID_. If you know, then wow, congratulations! If not, don't feel bad. It took me hours one Saturday evening to solve this mystery. Let's just say that Go built-in [Wait](https://golang.org/pkg/os/exec/#Cmd.Wait) can still block even after the command is killed. But not `go-cmd/Cmd`. You can rely on `Stop` in this package. +[os/exec/Cmd.Wait](https://golang.org/pkg/os/exec/#Cmd.Wait) can block even after the command is killed. That can be surprising and cause problems. But `go-cmd/Cmd.Stop` reliably terminates the command, no surprises. The issue has to do with process group IDs. It's common to kill the command PID, but usually one needs to kill its process group ID instead. `go-cmd/Cmd.Stop` implements the necessary low-level magic to make this happen. ### 100% test coverage, no race conditions -Enough said. +In addition to 100% test coverage and no race conditions, this package is actively used in production environments. --- diff --git a/cmd.go b/cmd.go index 6b898b5..03cdf95 100644 --- a/cmd.go +++ b/cmd.go @@ -1,11 +1,50 @@ -// Package cmd provides higher-level wrappers around os/exec.Cmd. All operations -// are thread-safe and designed to be used asynchronously by multiple goroutines. +// Package cmd runs external commands with concurrent access to output and status. +// It wraps the Go standard library os/exec.Command to correctly handle reading +// output (STDOUT and STDERR) while a command is running and killing a command. +// All operations are safe to call from multiple goroutines. +// +// A simple example that runs env and prints its output: +// +// import ( +// "fmt" +// "github.com/go-cmd/cmd" +// ) +// +// func main() { +// // Create Cmd, buffered output +// envCmd := cmd.NewCmd("env") +// +// // Run and wait for Cmd to return Status +// status := <-envCmd.Start() +// +// // Print each line of STDOUT from Cmd +// for _, line := range status.Stdout { +// fmt.Println(line) +// } +// } +// +// Commands can be ran synchronously (blocking) or asynchronously (non-blocking): +// +// envCmd := cmd.NewCmd("env") // create +// +// status := <-envCmd.Start() // run blocking +// +// statusChan := envCmd.Start() // run non-blocking +// // Do other work while Cmd is running... +// status <- statusChan // blocking +// +// Start returns a channel, so the first example is blocking because it receives +// on the channel, but the second example is non-blocking because it saves the +// channel and receives on it later. The Status function can be called while the +// Cmd is running. When the Cmd finishes, a final status is sent to the channel +// returned by Start. package cmd import ( "bufio" "bytes" "errors" + "io" "os/exec" "sync" "syscall" @@ -13,21 +52,24 @@ import ( ) // Cmd represents an external command, similar to the Go built-in os/exec.Cmd. -// A Cmd cannot be reused after calling Start. +// A Cmd cannot be reused after calling Start. Do not modify exported fields; +// they are read-only. To create a new Cmd, call NewCmd or NewCmdOptions. type Cmd struct { - Name string - Args []string - // -- + Name string + Args []string + Stdout chan string // streaming STDOUT if enabled, else nil (see Options) + Stderr chan string // streaming STDERR if enabled, else nil (see Options) *sync.Mutex - started bool // cmd.Start called, no error - stopped bool // Stop called - done bool // run() done - final bool // status finalized in Status - startTime time.Time // if started true - stdout *output - stderr *output + started bool // cmd.Start called, no error + stopped bool // Stop called + done bool // run() done + final bool // status finalized in Status + startTime time.Time // if started true + stdout *OutputBuffer // low-level stdout buffering and streaming + stderr *OutputBuffer // low-level stderr buffering and streaming status Status - doneChan chan Status + doneChan chan Status // nil until Start() called + buffered bool // buffer STDOUT and STDERR to Status.Stdout and Std } // Status represents the status of a Cmd. It is valid during the entire lifecycle @@ -42,24 +84,25 @@ type Cmd struct { type Status struct { Cmd string PID int - Complete bool // false if stopped or signaled - Exit int // exit code of process - Error error // Go error - StartTs int64 // Unix ts (nanoseconds) - StopTs int64 // Unix ts (nanoseconds) - Runtime float64 // seconds - Stdout []string - Stderr []string + Complete bool // false if stopped or signaled + Exit int // exit code of process + Error error // Go error + StartTs int64 // Unix ts (nanoseconds) + StopTs int64 // Unix ts (nanoseconds) + Runtime float64 // seconds + Stdout []string // buffered STDOUT + Stderr []string // buffered STDERR } // NewCmd creates a new Cmd for the given command name and arguments. The command -// is not started until Start is called. +// is not started until Start is called. Output buffering is on, streaming output +// is off. To control output, use NewCmdOptions instead. func NewCmd(name string, args ...string) *Cmd { return &Cmd{ - Name: name, - Args: args, - // -- - Mutex: &sync.Mutex{}, + Name: name, + Args: args, + buffered: true, + Mutex: &sync.Mutex{}, status: Status{ Cmd: name, PID: 0, @@ -71,21 +114,46 @@ func NewCmd(name string, args ...string) *Cmd { } } +// Options represents customizations for NewCmdOptions. +type Options struct { + // If Buffered is true, STDOUT and STDERR are written to Status.Stdout and + // Status.Stderr. The caller can call Cmd.Status to read output at intervals. + Buffered bool + + // If Streaming is true, Cmd.Stdout and Cmd.Stderr channels are created and + // STDOUT and STDERR output lines are written them in real time. This is + // faster and more efficient than polling Cmd.Status. The caller must read both + // streaming channels, else lines are dropped silently. + Streaming bool +} + +// NewCmdOptions creates a new Cmd with options. The command is not started +// until Start is called. +func NewCmdOptions(options Options, name string, args ...string) *Cmd { + out := NewCmd(name, args...) + out.buffered = options.Buffered + if options.Streaming { + out.Stdout = make(chan string, STREAM_CHAN_SIZE) + out.Stderr = make(chan string, STREAM_CHAN_SIZE) + } + return out +} + // Start starts the command and immediately returns a channel that the caller // can use to receive the final Status of the command when it ends. The caller // can start the command and wait like, // -// status := <-c.Start() // blocking +// status := <-myCmd.Start() // blocking // // or start the command asynchronously and be notified later when it ends, // -// statusChan := c.Start() // non-blocking -// // do other stuff... +// statusChan := myCmd.Start() // non-blocking +// // Do other work while Cmd is running... // status := <-statusChan // blocking // -// Either way, exactly one Status is sent on the channel when the command ends. -// The channel is not closed. Any error is set to Status.Error. Start is idempotent; -// it always returns the same channel. +// Exactly one Status is sent on the channel when the command ends. The channel +// is not closed. Any error is set to Status.Error. Start is idempotent; it always +// returns the same channel. func (c *Cmd) Start() <-chan Status { c.Lock() defer c.Unlock() @@ -137,19 +205,21 @@ func (c *Cmd) Status() Status { if c.done { // No longer running if !c.final { - c.status.Stdout = c.stdout.Lines() - c.status.Stderr = c.stderr.Lines() - - c.stdout = nil // release buffers - c.stderr = nil - + if c.buffered { + c.status.Stdout = c.stdout.Lines() + c.status.Stderr = c.stderr.Lines() + c.stdout = nil // release buffers + c.stderr = nil + } c.final = true } } else { // Still running c.status.Runtime = time.Now().Sub(c.startTime).Seconds() - c.status.Stdout = c.stdout.Lines() - c.status.Stderr = c.stderr.Lines() + if c.buffered { + c.status.Stdout = c.stdout.Lines() + c.status.Stderr = c.stderr.Lines() + } } return c.status @@ -174,10 +244,23 @@ func (c *Cmd) run() { // Write stdout and stderr to buffers that are safe to read while writing // and don't cause a race condition. - c.stdout = newOutput() - c.stderr = newOutput() - cmd.Stdout = c.stdout - cmd.Stderr = c.stderr + if c.buffered && c.Stdout != nil { + // Buffered and streaming, create both and combine with io.MultiWriter + c.stdout = NewOutputBuffer() + c.stderr = NewOutputBuffer() + cmd.Stdout = io.MultiWriter(NewOutputStream(c.Stdout), c.stdout) + cmd.Stderr = io.MultiWriter(NewOutputStream(c.Stderr), c.stderr) + } else if c.buffered { + // Buffered only + c.stdout = NewOutputBuffer() + c.stderr = NewOutputBuffer() + cmd.Stdout = c.stdout + cmd.Stderr = c.stderr + } else { + // Streaming only + cmd.Stdout = NewOutputStream(c.Stdout) + cmd.Stderr = NewOutputStream(c.Stderr) + } // ////////////////////////////////////////////////////////////////////// // Start command @@ -239,7 +322,9 @@ func (c *Cmd) run() { c.Unlock() } -// -------------------------------------------------------------------------- +// ////////////////////////////////////////////////////////////////////////// +// Output +// ////////////////////////////////////////////////////////////////////////// // os/exec.Cmd.StdoutPipe is usually used incorrectly. The docs are clear: // "it is incorrect to call Wait before all reads from the pipe have completed." @@ -249,35 +334,209 @@ func (c *Cmd) run() { // The proper solution is using an io.Writer for cmd.Stdout. I couldn't find // an io.Writer that's also safe for concurrent reads (as lines in a []string // no less), so I created one: -type output struct { + +// OutputBuffer represents command output that is saved, line by line, in an +// unbounded buffer. It is safe for multiple goroutines to read while the command +// is running and after it has finished. If output is small (a few megabytes) +// and not read frequently, an output buffer is a good solution. +// +// A Cmd in this package uses an OutputBuffer for both STDOUT and STDERR by +// default when created by calling NewCmd. To use OutputBuffer directly with +// a Go standard library os/exec.Command: +// +// import "os/exec" +// import "github.com/go-cmd/cmd" +// runnableCmd := exec.Command(...) +// stdout := cmd.NewOutputBuffer() +// runnableCmd.Stdout = stdout +// +// While runnableCmd is running, call stdout.Lines() to read all output +// currently written. +type OutputBuffer struct { buf *bytes.Buffer lines []string *sync.Mutex } -func newOutput() *output { - return &output{ +// NewOutputBuffer creates a new output buffer. The buffer is unbounded and safe +// for multiple goroutines to read while the command is running by calling Lines. +func NewOutputBuffer() *OutputBuffer { + out := &OutputBuffer{ buf: &bytes.Buffer{}, lines: []string{}, Mutex: &sync.Mutex{}, } + return out } -// io.Writer interface is only this method -func (rw *output) Write(p []byte) (int, error) { +// Write makes OutputBuffer implement the io.Writer interface. Do not call +// this function directly. +func (rw *OutputBuffer) Write(p []byte) (n int, err error) { rw.Lock() - defer rw.Unlock() - return rw.buf.Write(p) // and bytes.Buffer implements it, too + n, err = rw.buf.Write(p) // and bytes.Buffer implements io.Writer + rw.Unlock() + return // implicit } -func (rw *output) Lines() []string { +// Lines returns lines of output written by the Cmd. It is safe to call while +// the Cmd is running and after it has finished. Subsequent calls returns more +// lines, if more lines were written. "\r\n" are stripped from the lines. +func (rw *OutputBuffer) Lines() []string { rw.Lock() - defer rw.Unlock() // Scanners are io.Readers which effectively destroy the buffer by reading // to EOF. So once we scan the buf to lines, the buf is empty again. s := bufio.NewScanner(rw.buf) for s.Scan() { rw.lines = append(rw.lines, s.Text()) } + rw.Unlock() return rw.lines } + +// -------------------------------------------------------------------------- + +// OutputStream represents real time, line by line output from a running Cmd. +// Lines are terminated by a single newline preceded by an optional carriage +// return. Both newline and carriage return are stripped from the line when +// sent to a caller-provided channel. The caller should begin receiving before +// starting the Cmd. If the channel blocks, lines are dropped. Lines are +// discarded after being sent. The channel is not closed by the OutputStream. +// +// If output is large or written very fast, this is a better solution than +// OutputBuffer because the internal stream buffer is bounded at STREAM_BUFFER_SIZE +// bytes. For multiple goroutines to read from the same channel, the caller must +// implement its own multiplexing solution. +// +// A Cmd in this package uses an OutputStream for both STDOUT and STDERR when +// created by calling NewCmdOptions and Options.Streaming is true. To use +// OutputStream directly with a Go standard library os/exec.Command: +// +// import "os/exec" +// import "github.com/go-cmd/cmd" +// +// stdoutChan := make(chan string, 100) +// go func() { +// for line := range stdoutChan { +// // Do something with the line +// } +// }() +// +// runnableCmd := exec.Command(...) +// stdout := cmd.NewOutputStream(stdoutChan) +// runnableCmd.Stdout = stdout +// +// While runnableCmd is running, lines are sent to the channels as soon as they +// are written by the command. The channel is not closed by the OutputStream. +type OutputStream struct { + streamChan chan string + streamBuf []byte + n int + nextLine int +} + +const ( + // STREAM_BUFFER_SIZE is the size of the OutputStream internal buffer. + // Lines are truncated at this length. For example, if a line is three + // times this length, only the first third is sent. + STREAM_BUFFER_SIZE = 8192 + + // STREAM_CHAN_SIZE is the default string channel size for a Cmd when + // Options.Streaming is true. + STREAM_CHAN_SIZE = 100 +) + +// NewOutputStream creates a new streaming output on the given channel. The +// caller should begin receiving on the channel before the command is started. +// The OutputStream never closes the channel. +func NewOutputStream(streamChan chan string) *OutputStream { + out := &OutputStream{ + streamChan: streamChan, + streamBuf: make([]byte, STREAM_BUFFER_SIZE), + n: 0, + nextLine: 0, + } + return out +} + +// Write makes OutputStream implement the io.Writer interface. Do not call +// this function directly. +func (rw *OutputStream) Write(p []byte) (n int, err error) { + // Determine how much of p we can save in our buffer + total := len(p) + free := len(rw.streamBuf[rw.n:]) + if rw.n+total <= free { + // all of p + copy(rw.streamBuf[rw.n:], p) + rw.n += total + n = total // on implicit return + } else { + // part of p + copy(rw.streamBuf[rw.n:], p[0:free]) + rw.n += free + n = free // on implicit return + } + + // Send each line in streamBuf +LINES: + for { + // Find next newline in stream buffer. nextLine starts at 0, but buff + // can containe multiple lines, like "foo\nbar". So in that case nextLine + // will be 0 ("foo\nbar\n") then 4 ("bar\n") on next iteration. And i + // will be 3 and 7, respectively. So lines are [0:3] are [4:7]. + i := bytes.IndexByte(rw.streamBuf[rw.nextLine:], '\n') + if i < 0 { + break LINES // no newline in stream, next line incomplete + } + + // End of line offset is start (nextLine) + newline offset. Like bufio.Scanner, + // we allow \r\n but strip the \r too by decrementing the offset for that byte. + eol := rw.nextLine + i // "line\n" + if i > 0 && rw.streamBuf[i-1] == '\r' { + eol -= 1 // "line\r\n" + } + + // Send the string + select { + case rw.streamChan <- string(rw.streamBuf[rw.nextLine:eol]): + default: + // Channel blocked, caller isn't listening? Silently drop the string. + } + + // Next line offset is the first byte (+1) after the newline (i) + rw.nextLine += i + 1 + + // If next line offset is the end of the buffer (rw.n), then we've processed + // all lines. Reset everything back to offset 0. + if rw.nextLine == rw.n { + rw.n = 0 + rw.nextLine = 0 + break LINES + } + + // Next line offset is < end of buffer (rw.n), so keep processing lines + } + + // nextLine is reset to zero when a string is complete (has a newline), + // so if > 0 _and_ n is at max capacity, then buffer is full and there's + // an incomplete string. We don't support double-buffering, so send the + // partial string and reset. Note: if nextLine > 0 but n < buff size, + // then it's an incomplete string but we have buffer space to wait for + // the rest of it. + if rw.n == STREAM_BUFFER_SIZE { + select { + case rw.streamChan <- string(rw.streamBuf[rw.nextLine:rw.n]): + default: + } + rw.n = 0 + rw.nextLine = 0 + err = io.ErrShortWrite // on implicit return + } + + return // implicit +} + +// Lines returns the channel to which lines are sent. This is the same channel +// passed to NewOutputStream. +func (rw *OutputStream) Lines() <-chan string { + return rw.streamChan +} diff --git a/cmd_test.go b/cmd_test.go index 6e49e37..4fd0c72 100644 --- a/cmd_test.go +++ b/cmd_test.go @@ -2,6 +2,7 @@ package cmd_test import ( "errors" + "io" "io/ioutil" "os" "os/exec" @@ -300,3 +301,537 @@ func TestCmdLost(t *testing.T) { t.Error(diffs) } } + +func TestCmdBothOutput(t *testing.T) { + tmpfile, err := ioutil.TempFile("", "cmd.TestStreamingOutput") + if err != nil { + t.Fatal(err) + } + defer os.Remove(tmpfile.Name()) + if err := tmpfile.Close(); err != nil { + t.Fatal(err) + } + if err := os.Remove(tmpfile.Name()); err != nil { + t.Fatal(err) + } + + touchFile := func(file string) { + if err := exec.Command("touch", file).Run(); err != nil { + t.Fatal(err) + } + time.Sleep(500 * time.Millisecond) + } + + // Streams a count to stdout and stderr until given file exists + // Output like: + // stdout 1 + // stderr 1 + // stdout 2 + // stderr 2 + // Where each is printed on stdout and stderr as indicated. + p := cmd.NewCmdOptions(cmd.Options{Buffered: true, Streaming: true}, "./test/stream", tmpfile.Name()) + p.Start() + time.Sleep(250 * time.Millisecond) // give test/stream a moment to print something + + timeout := time.After(10 * time.Second) // test timeout + + // test/stream is spewing output, so we should be able to read it while + // the cmd is running. Try and fetch 3 lines from stdout and stderr. + i := 0 + stdoutPrevLine := "" + stderrPrevLine := "" + readLines := 3 + lines := 0 + for i < readLines { + i++ + + // STDOUT + select { + case curLine := <-p.Stdout: + t.Logf("got line: '%s'", curLine) + if curLine == "" { + // Shouldn't happen because test/stream doesn't print empty lines. + // This indicates a bug in the stream buffer handling. + t.Fatal("got empty line") + } + if stdoutPrevLine != "" && curLine == stdoutPrevLine { + t.Fatal("current line == previous line, expected new output:\ncprev: %s\ncur: %s\n", stdoutPrevLine, curLine) + } + stdoutPrevLine = curLine + lines++ + case <-timeout: + t.Fatal("timeout reading streaming output") + default: + } + + // STDERR + select { + case curLine := <-p.Stderr: + t.Logf("got line: '%s'", curLine) + if curLine == "" { + // Shouldn't happen because test/stream doesn't print empty lines. + // This indicates a bug in the stream buffer handling. + t.Fatal("got empty line") + } + if stderrPrevLine != "" && curLine == stderrPrevLine { + t.Fatal("current line == previous line, expected new output:\ncprev: %s\ncur: %s\n", stderrPrevLine, curLine) + } + stderrPrevLine = curLine + lines++ + case <-timeout: + t.Fatal("timeout reading streaming output") + default: + } + + time.Sleep(200 * time.Millisecond) + } + + // readLines * 2 (stdout and stderr) + if lines != readLines*2 { + t.Fatalf("read %d lines from streaming output, expected 6", lines) + } + + s := p.Status() + if len(s.Stdout) < readLines { + t.Fatalf("read %d lines from buffered STDOUT, expected %d", len(s.Stdout), readLines) + } + if len(s.Stderr) < readLines { + t.Fatalf("read %d lines from buffered STDERR, expected %d", len(s.Stderr), readLines) + } + + // Stop test/stream + touchFile(tmpfile.Name()) + + s = p.Status() + if s.Exit != 0 { + t.Errorf("got exit %d, expected 0", s.Exit) + } + + // Kill the process + if err := p.Stop(); err != nil { + t.Error(err) + } +} + +func TestCmdOnlyStreamingOutput(t *testing.T) { + tmpfile, err := ioutil.TempFile("", "cmd.TestStreamingOutput") + if err != nil { + t.Fatal(err) + } + defer os.Remove(tmpfile.Name()) + if err := tmpfile.Close(); err != nil { + t.Fatal(err) + } + if err := os.Remove(tmpfile.Name()); err != nil { + t.Fatal(err) + } + + touchFile := func(file string) { + if err := exec.Command("touch", file).Run(); err != nil { + t.Fatal(err) + } + time.Sleep(500 * time.Millisecond) + } + + // Streams a count to stdout and stderr until given file exists + // Output like: + // stdout 1 + // stderr 1 + // stdout 2 + // stderr 2 + // Where each is printed on stdout and stderr as indicated. + p := cmd.NewCmdOptions(cmd.Options{Buffered: false, Streaming: true}, "./test/stream", tmpfile.Name()) + p.Start() + time.Sleep(250 * time.Millisecond) // give test/stream a moment to print something + + timeout := time.After(10 * time.Second) // test timeout + + // test/stream is spewing output, so we should be able to read it while + // the cmd is running. Try and fetch 3 lines from stdout and stderr. + i := 0 + stdoutPrevLine := "" + stderrPrevLine := "" + readLines := 3 + lines := 0 + for i < readLines { + i++ + + // STDOUT + select { + case curLine := <-p.Stdout: + t.Logf("got line: '%s'", curLine) + if curLine == "" { + // Shouldn't happen because test/stream doesn't print empty lines. + // This indicates a bug in the stream buffer handling. + t.Fatal("got empty line") + } + if stdoutPrevLine != "" && curLine == stdoutPrevLine { + t.Fatal("current line == previous line, expected new output:\ncprev: %s\ncur: %s\n", stdoutPrevLine, curLine) + } + stdoutPrevLine = curLine + lines++ + case <-timeout: + t.Fatal("timeout reading streaming output") + default: + } + + // STDERR + select { + case curLine := <-p.Stderr: + t.Logf("got line: '%s'", curLine) + if curLine == "" { + // Shouldn't happen because test/stream doesn't print empty lines. + // This indicates a bug in the stream buffer handling. + t.Fatal("got empty line") + } + if stderrPrevLine != "" && curLine == stderrPrevLine { + t.Fatal("current line == previous line, expected new output:\ncprev: %s\ncur: %s\n", stderrPrevLine, curLine) + } + stderrPrevLine = curLine + lines++ + case <-timeout: + t.Fatal("timeout reading streaming output") + default: + } + + time.Sleep(200 * time.Millisecond) + } + + // readLines * 2 (stdout and stderr) + if lines != readLines*2 { + t.Fatalf("read %d lines from streaming output, expected 6", lines) + } + + s := p.Status() + if len(s.Stdout) != 0 { + t.Fatalf("read %d lines from buffered STDOUT, expected 0", len(s.Stdout)) + } + if len(s.Stderr) != 0 { + t.Fatalf("read %d lines from buffered STDERR, expected 0", len(s.Stderr)) + } + + // Stop test/stream + touchFile(tmpfile.Name()) + + s = p.Status() + if s.Exit != 0 { + t.Errorf("got exit %d, expected 0", s.Exit) + } + + // Kill the process + if err := p.Stop(); err != nil { + t.Error(err) + } +} + +func TestStreamingOverflow(t *testing.T) { + // Make a line that will fill up and overflow the steaming buffer by 2 chars: + // "bc", plus newline. The line will be truncated at "bc\n" so we only get back + // the "aaa.." long string. + longLine := make([]byte, cmd.STREAM_BUFFER_SIZE+3) // "a...bc\n" + for i := 0; i < cmd.STREAM_BUFFER_SIZE; i++ { + longLine[i] = 'a' + } + longLine[cmd.STREAM_BUFFER_SIZE] = 'b' + longLine[cmd.STREAM_BUFFER_SIZE+1] = 'c' + longLine[cmd.STREAM_BUFFER_SIZE+2] = '\n' + + // Make new streaming output on our lines chan + lines := make(chan string, 5) + out := cmd.NewOutputStream(lines) + + // Write the long line, it should only write (n) up to cmd.STREAM_BUFFER_SIZE + n, err := out.Write(longLine) + if n != cmd.STREAM_BUFFER_SIZE { + t.Errorf("Write n = %d, expected %d", n, cmd.STREAM_BUFFER_SIZE) + } + if err != io.ErrShortWrite { + t.Errorf("got err '%v', expected io.ErrShortWrite", err) + } + + // Get first, truncated line + var gotLine string + select { + case gotLine = <-lines: + default: + t.Fatal("blocked on <-lines") + } + + // Up to but not include "bc\n" because it should have been truncated + if gotLine != string(longLine[0:cmd.STREAM_BUFFER_SIZE]) { + t.Logf("got line: '%s'", gotLine) + t.Error("did not get expected first line (see log above), expected only \"aaa...\" part") + } + + // Streaming should still work as normal after an overflow; send it a line + n, err = out.Write([]byte("foo\n")) + if n != 4 { + t.Errorf("got n %d, expected 4", n) + } + if err != nil { + t.Errorf("got err '%v', expected nil", err) + } + + select { + case gotLine = <-lines: + default: + t.Fatal("blocked on <-lines") + } + + if gotLine != "foo" { + t.Errorf("got line: '%s', expected 'foo'", gotLine) + } +} + +func TestStreamingMultipleLines(t *testing.T) { + lines := make(chan string, 5) + out := cmd.NewOutputStream(lines) + + // Quick side test: Lines() chan string should be the same chan string + // we created the object with + if out.Lines() != lines { + t.Errorf("Lines() does not return the given string chan") + } + + // Write two short lines + input := "foo\nbar\n" + n, err := out.Write([]byte(input)) + if n != len(input) { + t.Errorf("Write n = %d, expected %d", n, len(input)) + } + if err != nil { + t.Errorf("got err '%v', expected nil", err) + } + + // Get one line + var gotLine string + select { + case gotLine = <-lines: + default: + t.Fatal("blocked on <-lines") + } + + // "foo" should be sent before "bar" because that was the input + if gotLine != "foo" { + t.Errorf("got line: '%s', expected 'foo'", gotLine) + } + + // Get next line + select { + case gotLine = <-lines: + default: + t.Fatal("blocked on <-lines") + } + + if gotLine != "bar" { + t.Errorf("got line: '%s', expected 'bar'", gotLine) + } +} + +func TestStreamingBlankLines(t *testing.T) { + lines := make(chan string, 5) + out := cmd.NewOutputStream(lines) + + // Blank line in the middle + input := "foo\n\nbar\n" + expectLines := []string{"foo", "", "bar"} + gotLines := []string{} + n, err := out.Write([]byte(input)) + if n != len(input) { + t.Errorf("Write n = %d, expected %d", n, len(input)) + } + if err != nil { + t.Errorf("got err '%v', expected nil", err) + } +LINES1: + for { + select { + case line := <-lines: + gotLines = append(gotLines, line) + default: + break LINES1 + } + } + if diffs := deep.Equal(gotLines, expectLines); diffs != nil { + t.Error(diffs) + } + + // All blank lines + input = "\n\n\n" + expectLines = []string{"", "", ""} + gotLines = []string{} + n, err = out.Write([]byte(input)) + if n != len(input) { + t.Errorf("Write n = %d, expected %d", n, len(input)) + } + if err != nil { + t.Errorf("got err '%v', expected nil", err) + } +LINES2: + for { + select { + case line := <-lines: + gotLines = append(gotLines, line) + default: + break LINES2 + } + } + if diffs := deep.Equal(gotLines, expectLines); diffs != nil { + t.Error(diffs) + } + + // Blank lines at end + input = "foo\n\n\n" + expectLines = []string{"foo", "", ""} + gotLines = []string{} + n, err = out.Write([]byte(input)) + if n != len(input) { + t.Errorf("Write n = %d, expected %d", n, len(input)) + } + if err != nil { + t.Errorf("got err '%v', expected nil", err) + } +LINES3: + for { + select { + case line := <-lines: + gotLines = append(gotLines, line) + default: + break LINES3 + } + } + if diffs := deep.Equal(gotLines, expectLines); diffs != nil { + t.Error(diffs) + } +} + +func TestStreamingCarriageReturn(t *testing.T) { + lines := make(chan string, 5) + out := cmd.NewOutputStream(lines) + + input := "foo\r\nbar\r\n" + expectLines := []string{"foo", "bar"} + gotLines := []string{} + n, err := out.Write([]byte(input)) + if n != len(input) { + t.Errorf("Write n = %d, expected %d", n, len(input)) + } + if err != nil { + t.Errorf("got err '%v', expected nil", err) + } +LINES1: + for { + select { + case line := <-lines: + gotLines = append(gotLines, line) + default: + break LINES1 + } + } + if diffs := deep.Equal(gotLines, expectLines); diffs != nil { + t.Error(diffs) + } +} + +func TestStreamingDropsLines(t *testing.T) { + lines := make(chan string, 3) + out := cmd.NewOutputStream(lines) + + // Fill up the chan so Write blocks. We'll receive these instead of... + lines <- "1" + lines <- "2" + lines <- "3" + + // ...new lines that we shouldn't receive because "123" is already in the chan. + input := "A\nB\nC\n" + expectLines := []string{"1", "2", "3"} + gotLines := []string{} + n, err := out.Write([]byte(input)) + if n != len(input) { + t.Errorf("Write n = %d, expected %d", n, len(input)) + } + if err != nil { + t.Errorf("got err '%v', expected nil", err) + } +LINES1: + for { + select { + case line := <-lines: + gotLines = append(gotLines, line) + default: + break LINES1 + } + } + if diffs := deep.Equal(gotLines, expectLines); diffs != nil { + t.Error(diffs) + } + + // Now that chan is clear, we should receive only new lines. + input = "D\nE\nF\n" + expectLines = []string{"D", "E", "F"} + gotLines = []string{} + n, err = out.Write([]byte(input)) + if n != len(input) { + t.Errorf("Write n = %d, expected %d", n, len(input)) + } + if err != nil { + t.Errorf("got err '%v', expected nil", err) + } +LINES2: + for { + select { + case line := <-lines: + gotLines = append(gotLines, line) + default: + break LINES2 + } + } + if diffs := deep.Equal(gotLines, expectLines); diffs != nil { + t.Error(diffs) + } + + if diffs := deep.Equal(gotLines, expectLines); diffs != nil { + t.Error(diffs) + } +} + +func TestStreamingOverflowBlocked(t *testing.T) { + // Make a line that will overflow the steaming buffer; see TestStreamingOverflow + longLine := make([]byte, cmd.STREAM_BUFFER_SIZE+3) + for i := 0; i < cmd.STREAM_BUFFER_SIZE; i++ { + longLine[i] = 'a' + } + longLine[cmd.STREAM_BUFFER_SIZE] = 'b' + longLine[cmd.STREAM_BUFFER_SIZE+1] = 'c' + longLine[cmd.STREAM_BUFFER_SIZE+2] = '\n' + + // Fill up the chan so Write blocks + lines := make(chan string, 3) + out := cmd.NewOutputStream(lines) + lines <- "1" + lines <- "2" + lines <- "3" + + expectLines := []string{"1", "2", "3"} + gotLines := []string{} + n, err := out.Write(longLine) + if n != cmd.STREAM_BUFFER_SIZE { + t.Errorf("Write n = %d, expected %d", n, cmd.STREAM_BUFFER_SIZE) + } + if err != io.ErrShortWrite { + t.Errorf("got err '%v', expected io.ErrShortWrite", err) + } + +LINES1: + for { + select { + case line := <-lines: + gotLines = append(gotLines, line) + default: + break LINES1 + } + } + if diffs := deep.Equal(gotLines, expectLines); diffs != nil { + t.Error(diffs) + } +} diff --git a/examples/blocking-buffered/main.go b/examples/blocking-buffered/main.go new file mode 100644 index 0000000..4b3d5e3 --- /dev/null +++ b/examples/blocking-buffered/main.go @@ -0,0 +1,20 @@ +package main + +import ( + "fmt" + + "github.com/go-cmd/cmd" +) + +func main() { + // Create Cmd, buffered output + envCmd := cmd.NewCmd("env") + + // Run and wait for Cmd to return Status + status := <-envCmd.Start() + + // Print each line of STDOUT from Cmd + for _, line := range status.Stdout { + fmt.Println(line) + } +} diff --git a/examples/blocking-streaming/main.go b/examples/blocking-streaming/main.go new file mode 100644 index 0000000..0a616f5 --- /dev/null +++ b/examples/blocking-streaming/main.go @@ -0,0 +1,34 @@ +package main + +import ( + "fmt" + "time" + + "github.com/go-cmd/cmd" +) + +func main() { + // Disable output buffering, enable streaming + cmdOptions := cmd.Options{ + Buffered: false, + Streaming: true, + } + + // Create Cmd with options + envCmd := cmd.NewCmdOptions(cmdOptions, "env") + + // Print STDOUT lines streaming from Cmd + go func() { + for line := range envCmd.Stdout { + fmt.Println(line) + } + }() + + // Run and wait for Cmd to return, discard Status + <-envCmd.Start() + + // Cmd has finished but wait for goroutine to print all lines + for len(envCmd.Stdout) > 0 { + time.Sleep(10 * time.Millisecond) + } +} diff --git a/test/stream b/test/stream new file mode 100755 index 0000000..6bf0888 --- /dev/null +++ b/test/stream @@ -0,0 +1,10 @@ +#!/bin/bash +x=1 +y=1 +while [[ ! -f "$1" ]]; do + echo "stdout $x" + echo "stderr $y" >&2 + x=$((x + 1)) + y=$((y + 1)) + sleep 0.25 +done