Skip to content

Commit

Permalink
Adding parallel runners
Browse files Browse the repository at this point in the history
Closes issue #48

The snag.yml now accepts a 'Runners' tag. This tag represents a group
of commands that are meant to be run in parallel after the build stage.

Each process will be given a chance to start and will then get backgrounded
to allow other processes to run. The output of each command is streamed to
stdout prefix with the pid of the logging command
  • Loading branch information
zabawaba99 committed Jan 26, 2016
1 parent b3671a7 commit d466e97
Show file tree
Hide file tree
Showing 5 changed files with 63 additions and 12 deletions.
38 changes: 28 additions & 10 deletions builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,8 @@ type Bob struct {
watchDir string

depWarning string
cmds [][]string
buildCmds [][]string
runCmds [][]string
ignoredItems []string

verbose bool
Expand All @@ -40,21 +41,32 @@ func NewBuilder(c config) (*Bob, error) {
return nil, err
}

cmds := make([][]string, len(c.Build))
for i, s := range c.Build {
cmds[i] = strings.Split(s, " ")
parseCmd := func(cmd string) []string {
c := strings.Split(cmd, " ")

// check for environment variables inside script
if strings.Contains(s, "$$") {
replaceEnv(cmds[i])
if strings.Contains(cmd, "$$") {
replaceEnv(c)
}
return c
}

buildCmds := make([][]string, len(c.Build))
for i, s := range c.Build {
buildCmds[i] = parseCmd(s)
}

runCmds := make([][]string, len(c.Run))
for i, s := range c.Run {
runCmds[i] = parseCmd(s)
}

return &Bob{
w: w,
done: make(chan struct{}),
watching: map[string]struct{}{},
cmds: cmds,
buildCmds: buildCmds,
runCmds: runCmds,
depWarning: c.DepWarnning,
ignoredItems: c.IgnoredItems,
verbose: c.Verbose,
Expand Down Expand Up @@ -153,14 +165,20 @@ func (b *Bob) execute() {
}

// setup the first command
firstCmd := b.cmds[0]
firstCmd := b.buildCmds[0]
b.curVow = vow.To(firstCmd[0], firstCmd[1:]...)

// setup the remaining commands
for i := 1; i < len(b.cmds); i++ {
cmd := b.cmds[i]
for i := 1; i < len(b.buildCmds); i++ {
cmd := b.buildCmds[i]
b.curVow = b.curVow.Then(cmd[0], cmd[1:]...)
}

// setup all parallel commands
for i := 0; i < len(b.runCmds); i++ {
cmd := b.runCmds[i]
b.curVow = b.curVow.ThenAsync(cmd[0], cmd[1:]...)
}
b.curVow.Verbose = b.verbose
go b.curVow.Exec(ansicolor.NewAnsiColorWriter(os.Stdout))

Expand Down
4 changes: 2 additions & 2 deletions builder_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,8 @@ func TestNewBuilder_EnvScript(t *testing.T) {
assert.NoError(t, err)
assert.NotNil(t, b)

require.Len(t, b.cmds, 1)
assert.Equal(t, testEnv, b.cmds[0][1])
require.Len(t, b.buildCmds, 1)
assert.Equal(t, testEnv, b.buildCmds[0][1])
}

func TestClose(t *testing.T) {
Expand Down
1 change: 1 addition & 0 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ type config struct {
DepWarnning string
Script []string `yaml:"script"`
Build []string `yaml:"build"`
Run []string `yaml:"run"`
IgnoredItems []string `yaml:"ignore"`
Verbose bool `yaml:"verbose"`
}
Expand Down
27 changes: 27 additions & 0 deletions vow/promise.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"sync"
"sync/atomic"
"syscall"
"time"
)

var errKilled = errors.New("promise has already been killed")
Expand All @@ -23,6 +24,7 @@ var (
type promise struct {
cmdMtx sync.Mutex
cmd *exec.Cmd
async bool
killed *int32
}

Expand All @@ -33,6 +35,12 @@ func newPromise(name string, args ...string) *promise {
}
}

func newAsyncPromise(name string, args ...string) *promise {
p := newPromise(name, args...)
p.async = true
return p
}

func (p *promise) Run(w io.Writer, verbose bool) (err error) {
if p.isKilled() {
return errKilled
Expand All @@ -58,6 +66,13 @@ func (p *promise) Run(w io.Writer, verbose bool) (err error) {
}
p.cmdMtx.Unlock()

// if the process is async we don't need to do anything else
if p.async {
fmt.Println(" process id: ", p.cmd.Process.Pid)
go p.fowardOutput(p.cmd.Process.Pid, w, &buf)
return nil
}

err = p.cmd.Wait()

status := statusPassed
Expand All @@ -72,6 +87,18 @@ func (p *promise) Run(w io.Writer, verbose bool) (err error) {
return err
}

func (p *promise) fowardOutput(pid int, w io.Writer, buf *bytes.Buffer) {
prefix := []byte(fmt.Sprintf("pid %d : ", pid))
for t := time.Tick(time.Second); !p.isKilled(); <-t {
b := buf.Next(1024)
if len(b) == 0 {
continue
}

p.writeIfAlive(w, append(prefix, b...))
}
}

func (p *promise) writeIfAlive(w io.Writer, b []byte) {
if p.isKilled() {
return
Expand Down
5 changes: 5 additions & 0 deletions vow/to.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,11 @@ func (vow *Vow) Then(name string, args ...string) *Vow {
return vow
}

func (vow *Vow) ThenAsync(name string, args ...string) *Vow {
vow.cmds = append(vow.cmds, newAsyncPromise(name, args...))
return vow
}

// Stop terminates the active command and stops the execution of any future commands
func (vow *Vow) Stop() {
atomic.StoreInt32(vow.canceled, 1)
Expand Down

0 comments on commit d466e97

Please sign in to comment.