Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Structured events in place of comment strings #3771

Merged
merged 12 commits into from
Apr 14, 2024
2 changes: 0 additions & 2 deletions cmd/cli/docker/docker_run_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -791,8 +791,6 @@ func (s *DockerRunSuite) TestRun_InvalidImage() {
s.Require().NoError(err)
s.T().Log(info)

s.Len(info.State.Executions, 1)
s.Equal(model.ExecutionStateAskForBidRejected, info.State.Executions[0].State)
s.Contains(info.State.Executions[0].Status, `Could not inspect image "@" - could be due to repo/image not existing`)
}

Expand Down
61 changes: 60 additions & 1 deletion cmd/cli/job/describe.go
Original file line number Diff line number Diff line change
@@ -1,12 +1,17 @@
package job

import (
"cmp"
"fmt"
"slices"
"time"

"github.com/bacalhau-project/bacalhau/pkg/lib/collections"
"github.com/bacalhau-project/bacalhau/pkg/models"
"github.com/bacalhau-project/bacalhau/pkg/util/idgen"
"github.com/jedib0t/go-pretty/v6/table"
"github.com/jedib0t/go-pretty/v6/text"
"github.com/samber/lo"
"github.com/spf13/cobra"
"k8s.io/kubectl/pkg/util/i18n"

Expand Down Expand Up @@ -65,7 +70,7 @@ func (o *DescribeOptions) run(cmd *cobra.Command, args []string) error {
jobID := args[0]
response, err := util.GetAPIClientV2(cmd).Jobs().Get(ctx, &apimodels.GetJobRequest{
JobID: jobID,
Include: "executions",
Include: "executions,history",
})

if err != nil {
Expand All @@ -85,12 +90,39 @@ func (o *DescribeOptions) run(cmd *cobra.Command, args []string) error {
// TODO: #520 rename Executions.Executions to Executions.Items
executions = response.Executions.Executions
}
// Show most relevant execution first: sort by time DESC
slices.SortFunc(executions, func(a, b *models.Execution) int {
return cmp.Compare(b.CreateTime, a.CreateTime)
})

var history []*models.JobHistory
if response.History != nil {
history = response.History.History
}

o.printHeaderData(cmd, job)
o.printExecutionsSummary(cmd, executions)

jobHistory := lo.Filter(history, func(entry *models.JobHistory, _ int) bool {
return entry.Type == models.JobHistoryTypeJobLevel
})
if err = o.printHistory(cmd, "Job", jobHistory); err != nil {
util.Fatal(cmd, fmt.Errorf("failed to write job history: %w", err), 1)
}

if err = o.printExecutions(cmd, executions); err != nil {
return fmt.Errorf("failed to write job executions %s: %w", jobID, err)
}

for _, execution := range executions {
executionHistory := lo.Filter(history, func(item *models.JobHistory, _ int) bool {
return item.ExecutionID == execution.ID
})
if err = o.printHistory(cmd, "Execution "+idgen.ShortUUID(execution.ID), executionHistory); err != nil {
util.Fatal(cmd, fmt.Errorf("failed to write execution history for %s: %w", execution.ID, err), 1)
}
}

o.printOutputs(cmd, executions)

return nil
Expand Down Expand Up @@ -156,6 +188,33 @@ func (o *DescribeOptions) printExecutions(cmd *cobra.Command, executions []*mode
return output.Output(cmd, executionCols, tableOptions, executions)
}

func (o *DescribeOptions) printHistory(cmd *cobra.Command, label string, history []*models.JobHistory) error {
if len(history) < 1 {
return nil
}

firstTime := history[0].Occurred()
timeSinceCol := output.TableColumn[*models.JobHistory]{
ColumnConfig: table.ColumnConfig{Name: historyTimeCol.ColumnConfig.Name, WidthMax: 10, WidthMaxEnforcer: text.WrapText},
Value: func(h *models.JobHistory) string { return h.Occurred().Sub(firstTime).String() },
wdbaruni marked this conversation as resolved.
Show resolved Hide resolved
}

tableOptions := output.OutputOptions{
Format: output.TableFormat,
NoStyle: true,
}
jobHistoryCols := []output.TableColumn[*models.JobHistory]{
timeSinceCol,
historyRevisionCol,
historyStateCol,
historyTopicCol,
historyEventCol,
historyDetailsCol,
}
wdbaruni marked this conversation as resolved.
Show resolved Hide resolved
output.Bold(cmd, fmt.Sprintf("\n%s History\n", label))
return output.Output(cmd, jobHistoryCols, tableOptions, history)
}

func (o *DescribeOptions) printOutputs(cmd *cobra.Command, executions []*models.Execution) {
outputs := make(map[string]string)
for _, e := range executions {
Expand Down
2 changes: 1 addition & 1 deletion cmd/cli/job/executions.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ var (
Value: func(e *models.Execution) string { return strconv.FormatUint(e.Revision, 10) },
}
executionColumnState = output.TableColumn[*models.Execution]{
ColumnConfig: table.ColumnConfig{Name: "State", WidthMax: 10, WidthMaxEnforcer: text.WrapText},
ColumnConfig: table.ColumnConfig{Name: "State", WidthMax: 17, WidthMaxEnforcer: text.WrapText},
Value: func(e *models.Execution) string { return e.ComputeState.StateType.String() },
}
executionColumnDesired = output.TableColumn[*models.Execution]{
Expand Down
81 changes: 50 additions & 31 deletions cmd/cli/job/history.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,14 @@ package job

import (
"fmt"
"slices"
"strconv"
"strings"
"time"

"github.com/jedib0t/go-pretty/v6/table"
"github.com/jedib0t/go-pretty/v6/text"
"github.com/samber/lo"
"github.com/spf13/cobra"
"k8s.io/kubectl/pkg/util/i18n"

Expand Down Expand Up @@ -77,50 +80,66 @@ func NewHistoryCmd() *cobra.Command {
return nodeCmd
}

var historyColumns = []output.TableColumn[*models.JobHistory]{
{
ColumnConfig: table.ColumnConfig{Name: "Time", WidthMax: 8, WidthMaxEnforcer: output.ShortenTime},
Value: func(j *models.JobHistory) string { return j.Time.Format(time.DateTime) },
},
{
var (
historyTimeCol = output.TableColumn[*models.JobHistory]{
ColumnConfig: table.ColumnConfig{Name: "Time", WidthMax: len(time.StampMilli), WidthMaxEnforcer: output.ShortenTime},
Value: func(j *models.JobHistory) string { return j.Occurred().Format(time.StampMilli) },
}
historyLevelCol = output.TableColumn[*models.JobHistory]{
ColumnConfig: table.ColumnConfig{Name: "Level", WidthMax: 15, WidthMaxEnforcer: text.WrapText},
Value: func(jwi *models.JobHistory) string { return jwi.Type.String() },
},
{
}
historyRevisionCol = output.TableColumn[*models.JobHistory]{
ColumnConfig: table.ColumnConfig{Name: "Rev.", WidthMax: 4, WidthMaxEnforcer: text.WrapText},
Value: func(j *models.JobHistory) string { return strconv.FormatUint(j.NewRevision, 10) },
}
historyExecIDCol = output.TableColumn[*models.JobHistory]{
ColumnConfig: table.ColumnConfig{Name: "Exec. ID", WidthMax: 10, WidthMaxEnforcer: text.WrapText},
Value: func(j *models.JobHistory) string { return idgen.ShortUUID(j.ExecutionID) },
},
{
}
historyNodeIDCol = output.TableColumn[*models.JobHistory]{
ColumnConfig: table.ColumnConfig{Name: "Node ID", WidthMax: 10, WidthMaxEnforcer: text.WrapText},
Value: func(j *models.JobHistory) string { return idgen.ShortNodeID(j.NodeID) },
},
{
ColumnConfig: table.ColumnConfig{Name: "Rev.", WidthMax: 4, WidthMaxEnforcer: text.WrapText},
Value: func(j *models.JobHistory) string { return strconv.FormatUint(j.NewRevision, 10) },
},
{
ColumnConfig: table.ColumnConfig{Name: "Previous State", WidthMax: 20, WidthMaxEnforcer: text.WrapText},
Value: func(j *models.JobHistory) string {
if j.Type == models.JobHistoryTypeJobLevel {
return j.JobState.Previous.String()
}
return j.ExecutionState.Previous.String()
},
},
{
ColumnConfig: table.ColumnConfig{Name: "New State", WidthMax: 20, WidthMaxEnforcer: text.WrapText},
}
historyStateCol = output.TableColumn[*models.JobHistory]{
ColumnConfig: table.ColumnConfig{Name: "State", WidthMax: 20, WidthMaxEnforcer: text.WrapText},
Value: func(j *models.JobHistory) string {
if j.Type == models.JobHistoryTypeJobLevel {
return j.JobState.New.String()
}
return j.ExecutionState.New.String()
},
},
}
historyTopicCol = output.TableColumn[*models.JobHistory]{
ColumnConfig: table.ColumnConfig{Name: "Topic", WidthMax: 15, WidthMaxEnforcer: text.WrapSoft},
Value: func(jh *models.JobHistory) string { return string(jh.Event.Topic) },
}
historyEventCol = output.TableColumn[*models.JobHistory]{
ColumnConfig: table.ColumnConfig{Name: "Event", WidthMax: 60, WidthMaxEnforcer: text.WrapSoft},
Value: func(j *models.JobHistory) string { return j.Event.Message },
}
historyDetailsCol = output.TableColumn[*models.JobHistory]{
ColumnConfig: table.ColumnConfig{Name: "Details", WidthMax: 25, WidthMaxEnforcer: text.WrapText},
Value: func(jh *models.JobHistory) string {
details := lo.MapToSlice(jh.Event.Details, func(key, value string) string {
return fmt.Sprintf("%s: %s", key, value)
})
slices.Sort(details)
return strings.Join(details, "\n")
},
}
)

{
ColumnConfig: table.ColumnConfig{Name: "Comment", WidthMax: 40, WidthMaxEnforcer: text.WrapText},
Value: func(j *models.JobHistory) string { return j.Comment },
},
var historyColumns = []output.TableColumn[*models.JobHistory]{
historyTimeCol,
historyLevelCol,
historyRevisionCol,
historyExecIDCol,
historyNodeIDCol,
historyStateCol,
historyTopicCol,
historyEventCol,
historyDetailsCol,
}

func (o *HistoryOptions) run(cmd *cobra.Command, args []string) error {
Expand Down
112 changes: 102 additions & 10 deletions cmd/util/printer/print.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,19 +4,26 @@ import (
"context"
"fmt"
"io"
"math"
"os"
"os/signal"
"slices"
"strings"
"time"

"github.com/fatih/color"
"github.com/mitchellh/go-wordwrap"
"github.com/pkg/errors"
"github.com/rs/zerolog/log"
"github.com/samber/lo"
"github.com/spf13/cobra"
"golang.org/x/exp/maps"
"golang.org/x/term"

"github.com/bacalhau-project/bacalhau/cmd/util"
"github.com/bacalhau-project/bacalhau/cmd/util/flags/cliflags"
"github.com/bacalhau-project/bacalhau/pkg/bacerrors"
"github.com/bacalhau-project/bacalhau/pkg/lib/math"
libmath "github.com/bacalhau-project/bacalhau/pkg/lib/math"
"github.com/bacalhau-project/bacalhau/pkg/models"
"github.com/bacalhau-project/bacalhau/pkg/publicapi/apimodels"
clientv2 "github.com/bacalhau-project/bacalhau/pkg/publicapi/client/v2"
Expand Down Expand Up @@ -72,12 +79,27 @@ func PrintJobExecution(
if jobErr != nil {
if jobErr.Error() == PrintoutCanceledButRunningNormally {
return nil
}

history, err := client.Jobs().History(ctx, &apimodels.ListJobHistoryRequest{
JobID: jobID,
EventType: "execution",
})
if err != nil {
return fmt.Errorf("failed getting job history: %w", err)
}

historySummary := summariseHistoryEvents(history.History)
if len(historySummary) > 0 {
for _, event := range historySummary {
printEvent(cmd, event)
}
} else {
cmd.PrintErrf("\nError submitting job: %s", jobErr)
printError(cmd, jobErr)
}
}

if runtimeSettings.PrintNodeDetails || jobErr != nil {
if runtimeSettings.PrintNodeDetails {
executions, err := client.Jobs().Executions(ctx, &apimodels.ListJobExecutionsRequest{
JobID: jobID,
})
Expand All @@ -87,13 +109,13 @@ func PrintJobExecution(
summary := summariseExecutions(executions.Executions)
if len(summary) > 0 {
cmd.Println("\nJob Results By Node:")
for message, nodes := range summary {
cmd.Printf("• Node %s: ", strings.Join(nodes, ", "))
if strings.ContainsRune(message, '\n') {
cmd.Printf("\n\t%s\n", strings.Join(strings.Split(message, "\n"), "\n\t"))
} else {
cmd.Println(message)
for message, runs := range summary {
nodes := len(lo.Uniq(runs))
prefix := fmt.Sprintf("• Node %s: ", runs[0])
if len(runs) > 1 {
prefix = fmt.Sprintf("• %d runs on %d nodes: ", len(runs), nodes)
}
printIndentedString(cmd, prefix, strings.Trim(message, "\n"), none, 0)
}
} else {
cmd.Println()
Expand Down Expand Up @@ -179,7 +201,7 @@ To cancel the job, run:

widestString := len(startMessage)
for _, v := range eventsWorthPrinting {
widestString = math.Max(widestString, len(v.Message))
widestString = libmath.Max(widestString, len(v.Message))
}

spinner, err := NewSpinner(ctx, writer, widestString, false)
Expand Down Expand Up @@ -297,6 +319,58 @@ To cancel the job, run:
return returnError
}

var (
none = color.New(color.Reset)
red = color.New(color.FgRed)
green = color.New(color.FgGreen)
)

const (
errorPrefix = "Error: "
hintPrefix = "Hint: "
)

var terminalWidth int

func getTerminalWidth(cmd *cobra.Command) uint {
if terminalWidth == 0 {
var err error
terminalWidth, _, err = term.GetSize(int(os.Stderr.Fd()))
if err != nil || terminalWidth <= 0 {
log.Ctx(cmd.Context()).Debug().Err(err).Msg("Failed to get terminal size")
terminalWidth = math.MaxInt8
}
}
return uint(terminalWidth)
}

func printEvent(cmd *cobra.Command, event models.Event) {
printIndentedString(cmd, errorPrefix, event.Message, red, 0)
if event.Details != nil && event.Details[models.DetailsKeyHint] != "" {
printIndentedString(cmd, hintPrefix, event.Details[models.DetailsKeyHint], green, uint(len(errorPrefix)))
}
}

func printError(cmd *cobra.Command, err error) {
printIndentedString(cmd, errorPrefix, err.Error(), red, 0)
}

func printIndentedString(cmd *cobra.Command, prefix, msg string, prefixColor *color.Color, startIndent uint) {
maxWidth := getTerminalWidth(cmd)
blockIndent := int(startIndent) + len(prefix)
blockTextWidth := maxWidth - startIndent - uint(len(prefix))

cmd.PrintErrln()
cmd.PrintErr(strings.Repeat(" ", int(startIndent)))
prefixColor.Fprintf(cmd.ErrOrStderr(), prefix)
for i, line := range strings.Split(wordwrap.WrapString(msg, blockTextWidth), "\n") {
if i > 0 {
cmd.PrintErr(strings.Repeat(" ", blockIndent))
}
cmd.PrintErrln(line)
}
}

// Groups the executions in the job state, returning a map of printable messages
// to node(s) that generated that message.
func summariseExecutions(executions []*models.Execution) map[string][]string {
Expand All @@ -321,3 +395,21 @@ func summariseExecutions(executions []*models.Execution) map[string][]string {
}
return results
}

func summariseHistoryEvents(history []*models.JobHistory) []models.Event {
slices.SortFunc(history, func(a, b *models.JobHistory) int {
return a.Occurred().Compare(b.Occurred())
})

events := make(map[string]models.Event, len(history))
for _, entry := range history {
hasDetails := entry.Event.Details != nil
failsExecution := hasDetails && entry.Event.Details[models.DetailsKeyFailsExecution] == "true"
terminalState := entry.ExecutionState.New.IsTermainl()
if (failsExecution || terminalState) && entry.Event.Message != "" {
events[entry.Event.Message] = entry.Event
}
}

return maps.Values(events)
}
Loading
Loading