/
runcommand.go
163 lines (147 loc) · 5.44 KB
/
runcommand.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
package commands
import (
"fmt"
"github.com/UpCloudLtd/upcloud-cli/v2/internal/config"
"github.com/UpCloudLtd/upcloud-cli/v2/internal/output"
"github.com/UpCloudLtd/upcloud-cli/v2/internal/resolver"
internal "github.com/UpCloudLtd/upcloud-cli/v2/internal/service"
"github.com/gemalto/flume"
)
var logger = flume.New("runcommand")
func commandRunE(command Command, service internal.AllServices, config *config.Config, args []string) error {
// Cobra validations were successful
command.Cobra().SilenceUsage = true
cmdLogger := logger.With("command", command.Cobra().CommandPath())
executor := NewExecutor(config, service, cmdLogger)
w := command.Cobra().OutOrStdout()
switch typedCommand := command.(type) {
case NoArgumentCommand:
cmdLogger.Debug("executing without arguments", "arguments", args)
// need to pass in fake arguments here, to actually trigger execution
results, err := execute(typedCommand, executor, []string{""}, 1,
// FIXME: this bit panics go-critic unlambda check, figure out why and report upstream
func(exec Executor, fake string) (output.Output, error) {
return typedCommand.ExecuteWithoutArguments(exec)
})
if err != nil {
return err
}
return output.Render(w, config.Output(), results...)
case SingleArgumentCommand:
cmdLogger.Debug("executing single argument", "arguments", args)
// make sure we have an argument
if len(args) != 1 || args[0] == "" {
return fmt.Errorf("exactly one positional argument is required")
}
results, err := execute(typedCommand, executor, args, 1, typedCommand.ExecuteSingleArgument)
if err != nil {
return err
}
return output.Render(w, config.Output(), results...)
case MultipleArgumentCommand:
cmdLogger.Debug("executing multi argument", "arguments", args)
// make sure we have arguments
if len(args) < 1 {
return fmt.Errorf("at least one positional argument is required")
}
results, err := execute(typedCommand, executor, args, typedCommand.MaximumExecutions(), typedCommand.Execute)
if err != nil {
return err
}
return output.Render(w, config.Output(), results...)
default:
// no execution found on this command, eg. most likely an 'organizational' command
// so just show usage
cmdLogger.Debug("no execution found", "arguments", args)
return command.Cobra().Usage()
}
}
type resolvedArgument struct {
Resolved string
Error error
Original string
}
func resolveArguments(nc Command, exec Executor, args []string) (out []resolvedArgument, err error) {
if resolve, ok := nc.(resolver.ResolutionProvider); ok {
argumentResolver, err := resolve.Get(exec.Context(), exec.All())
if err != nil {
return nil, fmt.Errorf("cannot get resolver: %w", err)
}
for _, arg := range args {
resolved, err := argumentResolver(arg)
out = append(out, resolvedArgument{Resolved: resolved, Error: err, Original: arg})
}
} else {
for _, arg := range args {
out = append(out, resolvedArgument{Resolved: arg, Original: arg})
}
}
return
}
func execute(command Command, executor Executor, args []string, parallelRuns int, executeCommand func(exec Executor, arg string) (output.Output, error)) ([]output.Output, error) {
resolvedArgs, err := resolveArguments(command, executor, args)
if err != nil {
return nil, fmt.Errorf("cannot resolve command line arguments: %w", err)
}
returnChan := make(chan executeResult)
workerCount := parallelRuns
workerQueue := make(chan int, workerCount)
// push initial workers into the worker queue
for n := 0; n < workerCount; n++ {
workerQueue <- n
}
// make a copy of the original args to pass into the workers
argQueue := resolvedArgs
outputs := make([]output.Output, 0, len(args))
executor.Debug("starting work", "workers", workerCount)
for {
select {
case workerID := <-workerQueue:
// got an idle worker
if len(argQueue) == 0 {
// we are out of arguments to process, just let the worker exit
break
}
arg := argQueue[0]
argQueue = argQueue[1:]
// trigger execution in a goroutine
go func(index int, argument resolvedArgument) {
defer func() {
// return worker to queue when exiting
executor.Debug("worker exiting", "worker", index)
workerQueue <- workerID
}()
if argument.Error != nil {
// argument wasn't parsed correctly, pass the error on
executor.Debug("worker got invalid argument", "worker", index, "error", argument.Error)
err := fmt.Errorf("cannot resolve argument: %w", argument.Error)
outputError(argument.Original, err, executor)
returnChan <- executeResult{Job: index, Error: err}
} else {
// otherwise, execute and return results
executor.Debug("worker starting", "worker", index, "argument", argument.Resolved)
res, err := executeCommand(executor.WithLogger("worker", index, "argument", argument.Resolved), argument.Resolved)
outputError(argument.Original, err, executor)
returnChan <- executeResult{Job: index, Result: res, Error: err, ResolvedArgument: argument}
}
}(workerID, arg)
case res := <-returnChan:
// got a result from a worker
if res.Error != nil {
outputs = append(outputs, output.Error{
Value: res.Error,
Resolved: res.ResolvedArgument.Resolved,
Original: res.ResolvedArgument.Original,
})
} else {
outputs = append(outputs, res.Result)
}
if len(outputs) >= len(args) {
executor.Debug("execute done")
// We're done, update ui for the last time and render the results
executor.StopProgressLog()
return outputs, nil
}
}
}
}