Skip to content

Commit

Permalink
Add --max-concurrent-tasks to exec
Browse files Browse the repository at this point in the history
  • Loading branch information
csweichel committed Nov 22, 2023
1 parent 130b6bd commit 7cc022f
Showing 1 changed file with 29 additions and 14 deletions.
43 changes: 29 additions & 14 deletions cmd/exec.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,14 +8,14 @@ import (
"os/exec"
"path/filepath"
"strings"
"sync"
"time"

"github.com/creack/pty"
"github.com/gookit/color"
"github.com/segmentio/textio"
log "github.com/sirupsen/logrus"
"github.com/spf13/cobra"
"golang.org/x/sync/errgroup"

"github.com/gitpod-io/leeway/pkg/leeway"
)
Expand Down Expand Up @@ -60,6 +60,7 @@ Example use:
parallel, _ = cmd.Flags().GetBool("parallel")
rawOutput, _ = cmd.Flags().GetBool("raw-output")
cacheKey, _ = cmd.Flags().GetString("cache-key")
maxConcurrentTasks, _ = cmd.Flags().GetUint("max-concurrent-tasks")
)

ws, err := getWorkspace()
Expand Down Expand Up @@ -162,8 +163,18 @@ Example use:
}
}

var parallelism int
if parallel {
parallelism = int(maxConcurrentTasks)
} else {
parallelism = -1
if maxConcurrentTasks > 0 {
log.Warn("max-concurrent-tasks is ignored when not running in parallel")
}
}

if watch {
err := executeCommandInLocations(args, locs, noExecCache{}, parallel, rawOutput)
err := executeCommandInLocations(args, locs, noExecCache{}, parallelism, rawOutput)
if err != nil {
log.Error(err)
}
Expand All @@ -172,7 +183,7 @@ Example use:
for {
select {
case <-evt:
err := executeCommandInLocations(args, locs, noExecCache{}, parallel, rawOutput)
err := executeCommandInLocations(args, locs, noExecCache{}, parallelism, rawOutput)
if err != nil {
log.Error(err)
}
Expand All @@ -194,7 +205,7 @@ Example use:
cache = filesystemExecCache(loc)
}

err = executeCommandInLocations(args, locs, cache, parallel, rawOutput)
err = executeCommandInLocations(args, locs, cache, parallelism, rawOutput)
if err != nil {
log.WithError(err).Fatal("cannot execut command")
}
Expand All @@ -208,8 +219,12 @@ type commandExecLocation struct {
Name string
}

func executeCommandInLocations(rawExecCmd []string, locs []commandExecLocation, cache execCache, parallel, rawOutput bool) error {
var wg sync.WaitGroup
func executeCommandInLocations(rawExecCmd []string, locs []commandExecLocation, cache execCache, parallelism int, rawOutput bool) error {
var eg errgroup.Group
if parallelism > 0 {
eg.SetLimit(parallelism)
}

for _, loc := range locs {
if ok, _ := cache.NeedsExecution(context.Background(), loc.Package); !ok {
continue
Expand Down Expand Up @@ -247,11 +262,9 @@ func executeCommandInLocations(rawExecCmd []string, locs []commandExecLocation,
go io.Copy(textio.NewPrefixWriter(os.Stdout, prefix), ptmx)
//nolint:errcheck
go io.Copy(ptmx, os.Stdin)
if parallel {
wg.Add(1)
go func(loc commandExecLocation) {
defer wg.Done()

if parallelism > -1 {
loc := loc
eg.Go(func() error {
err := cmd.Wait()
if err == nil {
err = cache.MarkExecuted(context.Background(), loc.Package)
Expand All @@ -261,7 +274,8 @@ func executeCommandInLocations(rawExecCmd []string, locs []commandExecLocation,
} else {
log.Errorf("execution failed in %s (%s): %v", loc.Name, loc.Dir, err)
}
}(loc)
return nil
})
} else {
err = cmd.Wait()
if err == nil {
Expand All @@ -274,8 +288,8 @@ func executeCommandInLocations(rawExecCmd []string, locs []commandExecLocation,
}
}
}
if parallel {
wg.Wait()
if parallelism > -1 {
_ = eg.Wait()
}

return nil
Expand All @@ -295,6 +309,7 @@ func init() {
execCmd.Flags().Bool("parallel", false, "Start all executions in parallel independent of their order")
execCmd.Flags().Bool("raw-output", false, "Produce output without package prefix")
execCmd.Flags().String("cache-key", "", "Specify a cache key to provide package-cache like execution behaviour")
execCmd.Flags().UintP("max-concurrent-tasks", "j", 0, "Maximum number of concurrent tasks - 0 means unlimited")
execCmd.Flags().SetInterspersed(true)
}

Expand Down

0 comments on commit 7cc022f

Please sign in to comment.