Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add initial execution progress streaming impl #6544

Merged
merged 3 commits into from
May 13, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 25 additions & 8 deletions cli/execute/execute.go
Original file line number Diff line number Diff line change
Expand Up @@ -162,18 +162,35 @@ func execute(cmdArgs []string) error {
return err
}
log.Debugf("Waiting for execution to complete")
response, err := rexec.Wait(stream)
if err != nil {
return err
}
if response.Err != nil {
// We failed to execute.
return response.Err
var rsp *rexec.Response
for {
msg, err := stream.Recv()
if err != nil {
return err
}
if msg.Err != nil {
// We failed to execute.
return msg.Err
}
// Log execution state
progress := &repb.ExecutionProgress{}
ok, _ := rexec.AuxiliaryMetadata(msg.ExecuteOperationMetadata.GetPartialExecutionMetadata(), progress)
if ok && progress.GetExecutionState() != 0 {
log.Debugf(
"Remote: %s @ %s",
repb.ExecutionProgress_ExecutionState_name[int32(progress.GetExecutionState())],
progress.GetTimestamp().AsTime(),
)
}
if msg.Done {
rsp = msg
break
}
}
log.Debugf("Execution completed in %s", time.Since(stageStart))
stageStart = time.Now()
log.Debugf("Downloading result")
res, err := rexec.GetResult(ctx, env, *instanceName, df, response.ExecuteResponse.GetResult())
res, err := rexec.GetResult(ctx, env, *instanceName, df, rsp.ExecuteResponse.GetResult())
if err != nil {
return status.WrapError(err, "execution failed")
}
Expand Down
13 changes: 7 additions & 6 deletions enterprise/server/remote_execution/executor/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@ func shouldRetry(task *repb.ExecutionTask, taskError error) bool {
return !isClientBazel(task)
}

func (s *Executor) ExecuteTaskAndStreamResults(ctx context.Context, st *repb.ScheduledTask, stream operation.StreamLike) (retry bool, err error) {
func (s *Executor) ExecuteTaskAndStreamResults(ctx context.Context, st *repb.ScheduledTask, stream *operation.Publisher) (retry bool, err error) {
// From here on in we use these liberally, so check that they are setup properly
// in the environment.
if s.env.GetActionCacheClient() == nil || s.env.GetByteStreamClient() == nil || s.env.GetContentAddressableStorageClient() == nil {
Expand Down Expand Up @@ -218,6 +218,9 @@ func (s *Executor) ExecuteTaskAndStreamResults(ctx context.Context, st *repb.Sch

log.CtxDebugf(ctx, "Preparing runner for task.")
stage.Set("pull_image")
// TODO: don't publish this progress update if we're not actually pulling
// an image.
_ = stream.SetState(repb.ExecutionProgress_PULLING_CONTAINER_IMAGE)
if err := r.PrepareForTask(ctx); err != nil {
return finishWithErrFn(err)
}
Expand All @@ -226,16 +229,12 @@ func (s *Executor) ExecuteTaskAndStreamResults(ctx context.Context, st *repb.Sch

log.CtxDebugf(ctx, "Downloading inputs.")
stage.Set("input_fetch")
_ = stream.SetState(repb.ExecutionProgress_DOWNLOADING_INPUTS)
if err := r.DownloadInputs(ctx, md.IoStats); err != nil {
return finishWithErrFn(err)
}

md.InputFetchCompletedTimestamp = timestamppb.Now()

log.CtxDebugf(ctx, "Transitioning to EXECUTING stage")
if err := stateChangeFn(repb.ExecutionStage_EXECUTING, operation.InProgressExecuteResponse()); err != nil {
return true, err
}
md.ExecutionStartTimestamp = timestamppb.Now()
execTimeout, err := parseTimeout(task.GetAction().Timeout)
if err != nil {
Expand All @@ -247,6 +246,7 @@ func (s *Executor) ExecuteTaskAndStreamResults(ctx context.Context, st *repb.Sch

log.CtxDebugf(ctx, "Executing task.")
stage.Set("execution")
_ = stream.SetState(repb.ExecutionProgress_EXECUTING_COMMAND)
cmdResultChan := make(chan *interfaces.CommandResult, 1)
go func() {
cmdResultChan <- r.Run(ctx)
Expand Down Expand Up @@ -306,6 +306,7 @@ func (s *Executor) ExecuteTaskAndStreamResults(ctx context.Context, st *repb.Sch

log.CtxDebugf(ctx, "Uploading outputs.")
stage.Set("output_upload")
_ = stream.SetState(repb.ExecutionProgress_UPLOADING_OUTPUTS)
if err := r.UploadOutputs(ctx, md.IoStats, executeResponse, cmdResult); err != nil {
return finishWithErrFn(status.UnavailableErrorf("Error uploading outputs: %s", err.Error()))
}
Expand Down
1 change: 1 addition & 0 deletions enterprise/server/remote_execution/operation/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -17,5 +17,6 @@ go_library(
"@org_golang_google_genproto//googleapis/longrunning",
"@org_golang_google_grpc//status",
"@org_golang_google_protobuf//types/known/anypb",
"@org_golang_google_protobuf//types/known/timestamppb",
],
)
112 changes: 94 additions & 18 deletions enterprise/server/remote_execution/operation/operation.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"encoding/base64"
"io"
"net/url"
"sync"
"time"

"github.com/buildbuddy-io/buildbuddy/server/remote_cache/digest"
Expand All @@ -18,13 +19,76 @@ import (

repb "github.com/buildbuddy-io/buildbuddy/proto/remote_execution"
gstatus "google.golang.org/grpc/status"
tspb "google.golang.org/protobuf/types/known/timestamppb"
)

const (
// Timeout for retrying a PublishOperation stream.
reconnectTimeout = 5 * time.Second
)

// Publisher is used to publish state changes for a task. Publisher is intended
// to be used instead of a raw PublishOperation stream because it is safe for
// concurrent use and includes auto-reconnect functionality in case the stream
// is broken.
type Publisher struct {
taskID string
taskResourceName *digest.ResourceName

mu sync.Mutex
stream *retryingClient
}

func newPublisher(stream *retryingClient, taskID string, taskResourceName *digest.ResourceName) *Publisher {
return &Publisher{
stream: stream,
taskID: taskID,
taskResourceName: taskResourceName,
}
}

func (p *Publisher) Context() context.Context {
return p.stream.Context()
}

// Send publishes a message on the stream. It is safe for concurrent use.
func (p *Publisher) Send(op *longrunning.Operation) error {
p.mu.Lock()
defer p.mu.Unlock()
return p.stream.Send(op)
}

// SetStatus sets the current task status and eagerly publishes a progress
// update with the new state.
func (p *Publisher) SetState(state repb.ExecutionProgress_ExecutionState) error {
progress := &repb.ExecutionProgress{
Timestamp: tspb.Now(),
ExecutionState: state,
}
progressAny, err := anypb.New(progress)
if err != nil {
return err
}
md := &repb.ExecuteOperationMetadata{
Stage: repb.ExecutionStage_EXECUTING,
ActionDigest: p.taskResourceName.GetDigest(),
PartialExecutionMetadata: &repb.ExecutedActionMetadata{
AuxiliaryMetadata: []*anypb.Any{progressAny},
},
}
op, err := assemble(p.taskID, md, nil /*=response*/)
if err != nil {
return status.WrapError(err, "assemble operation")
}
return p.Send(op)
}

// CloseAndRecv closes the send direction of the stream and waits for the
// server to ack.
func (p *Publisher) CloseAndRecv() (*repb.PublishOperationResponse, error) {
return p.stream.CloseAndRecv()
}

// retryingClient works like a PublishOperationClient but transparently
// re-connects the stream when disconnected. The retryingClient does not re-dial
// the backend; instead it depends on the client connection being terminated by
Expand All @@ -40,16 +104,21 @@ type retryingClient struct {
// stream if disconnected. After a disconnect (either in Send or CloseAndRecv),
// it will re-publish the last sent message if applicable to ensure that the
// server has acknowledged it.
func Publish(ctx context.Context, client repb.ExecutionClient) (*retryingClient, error) {
func Publish(ctx context.Context, client repb.ExecutionClient, taskID string) (*Publisher, error) {
r, err := digest.ParseUploadResourceName(taskID)
if err != nil {
return nil, status.WrapError(err, "parse task ID")
}
clientStream, err := client.PublishOperation(ctx)
if err != nil {
return nil, err
}
return &retryingClient{
retryingStream := &retryingClient{
ctx: ctx,
client: client,
clientStream: clientStream,
}, nil
}
return newPublisher(retryingStream, taskID, r), nil
}

func (c *retryingClient) Context() context.Context {
Expand Down Expand Up @@ -167,31 +236,38 @@ func (c *retryingClient) CloseAndRecv() (*repb.PublishOperationResponse, error)
return nil, status.UnknownError("CloseAndRecv: unknown error")
}

// TODO: make assemble() the public one and remove this.
func Assemble(stage repb.ExecutionStage_Value, name string, r *digest.ResourceName, er *repb.ExecuteResponse) (*longrunning.Operation, error) {
if r == nil || er == nil {
return nil, status.FailedPreconditionError("digest or execute response are both required to assemble operation")
}
metadata, err := anypb.New(&repb.ExecuteOperationMetadata{
md := &repb.ExecuteOperationMetadata{
Stage: stage,
ActionDigest: r.GetDigest(),
})
if err != nil {
return nil, err
}
operation := &longrunning.Operation{
Name: name,
Metadata: metadata,
return assemble(name, md, er)
}

func assemble(name string, md *repb.ExecuteOperationMetadata, rsp *repb.ExecuteResponse) (*longrunning.Operation, error) {
op := &longrunning.Operation{
Name: name,
Done: md.GetStage() == repb.ExecutionStage_COMPLETED,
}
result, err := anypb.New(er)
if err != nil {
return nil, err
if md != nil {
mdAny, err := anypb.New(md)
if err != nil {
return nil, err
}
op.Metadata = mdAny
}
operation.Result = &longrunning.Operation_Response{Response: result}

if stage == repb.ExecutionStage_COMPLETED {
operation.Done = true
if rsp != nil {
resultAny, err := anypb.New(rsp)
if err != nil {
return nil, err
}
op.Result = &longrunning.Operation_Response{Response: resultAny}
}
return operation, nil
return op, nil
}

func AssembleFailed(stage repb.ExecutionStage_Value, name string, d *digest.ResourceName, status error) (*longrunning.Operation, error) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -341,7 +341,7 @@ func (q *PriorityTaskScheduler) runTask(ctx context.Context, st *repb.ScheduledT

execTask := st.ExecutionTask
ctx = q.propagateExecutionTaskValuesToContext(ctx, execTask)
clientStream, err := operation.Publish(ctx, q.env.GetRemoteExecutionClient())
clientStream, err := operation.Publish(ctx, q.env.GetRemoteExecutionClient(), execTask.GetExecutionId())
if err != nil {
log.CtxWarningf(ctx, "Error opening publish operation stream: %s", err)
return true, status.WrapError(err, "failed to open execution status update stream")
Expand Down
1 change: 1 addition & 0 deletions enterprise/server/test/integration/remote_execution/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ go_test(
"//server/util/grpc_client",
"//server/util/log",
"//server/util/proto",
"//server/util/rexec",
"//server/util/status",
"//server/util/testing/flags",
"@com_github_go_redis_redis_v8//:redis",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ go_library(
"//server/util/log",
"//server/util/retry",
"//server/util/status",
"@org_golang_google_genproto//googleapis/longrunning",
"@org_golang_google_genproto_googleapis_bytestream//:bytestream",
"@org_golang_google_grpc//status",
"@org_golang_google_protobuf//types/known/durationpb",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
"github.com/buildbuddy-io/buildbuddy/server/util/log"
"github.com/buildbuddy-io/buildbuddy/server/util/retry"
"github.com/buildbuddy-io/buildbuddy/server/util/status"
"google.golang.org/genproto/googleapis/longrunning"
"google.golang.org/protobuf/types/known/durationpb"

repb "github.com/buildbuddy-io/buildbuddy/proto/remote_execution"
Expand Down Expand Up @@ -94,6 +95,9 @@ type Command struct {
beforeExecuteTime time.Time
afterExecuteTime time.Time

// Complete list of operations received on the stream.
operations []*longrunning.Operation

mu sync.Mutex
opName string
}
Expand All @@ -110,6 +114,10 @@ func (c *Command) GetActionResourceName() *digest.ResourceName {
return c.actionResourceName
}

func (c *Command) GetOperations() []*longrunning.Operation {
return c.operations
}

type StartOpts struct {
SkipCacheLookup bool
}
Expand Down Expand Up @@ -220,6 +228,8 @@ func (c *Command) processUpdatesAsync(ctx context.Context, stream repb.Execution
return
}

c.operations = append(c.operations, op)

if taskID == "" {
taskID = op.GetName()
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ import (
"github.com/buildbuddy-io/buildbuddy/server/util/grpc_client"
"github.com/buildbuddy-io/buildbuddy/server/util/log"
"github.com/buildbuddy-io/buildbuddy/server/util/proto"
"github.com/buildbuddy-io/buildbuddy/server/util/rexec"
"github.com/buildbuddy-io/buildbuddy/server/util/status"
"github.com/buildbuddy-io/buildbuddy/server/util/testing/flags"
"github.com/go-redis/redis/v8"
Expand Down Expand Up @@ -64,6 +65,12 @@ func TestSimpleCommandWithNonZeroExitCode(t *testing.T) {
assert.Equal(t, 5, res.ExitCode, "exit code should be propagated")
assert.Equal(t, "hello\n", res.Stdout, "stdout should be propagated")
assert.Equal(t, "bye\n", res.Stderr, "stderr should be propagated")
assert.Equal(t, []repb.ExecutionProgress_ExecutionState{
repb.ExecutionProgress_PULLING_CONTAINER_IMAGE,
repb.ExecutionProgress_DOWNLOADING_INPUTS,
repb.ExecutionProgress_EXECUTING_COMMAND,
repb.ExecutionProgress_UPLOADING_OUTPUTS,
}, getProgressStates(t, cmd), "command progress states")
}

func TestActionResultCacheWithSuccessfulAction(t *testing.T) {
Expand Down Expand Up @@ -1999,3 +2006,18 @@ func randSleepMillis(min, max int) {
r := rand.Int63n(int64(max-min)) + int64(min)
time.Sleep(time.Duration(r))
}

func getProgressStates(t *testing.T, c *rbetest.Command) []repb.ExecutionProgress_ExecutionState {
var states []repb.ExecutionProgress_ExecutionState
for _, op := range c.GetOperations() {
rsp, err := rexec.UnpackOperation(op)
require.NoError(t, err)
var progress repb.ExecutionProgress
ok, err := rexec.AuxiliaryMetadata(rsp.ExecuteOperationMetadata.GetPartialExecutionMetadata(), &progress)
require.NoError(t, err)
if ok {
states = append(states, progress.ExecutionState)
}
}
return states
}
8 changes: 4 additions & 4 deletions proto/remote_execution.proto
Original file line number Diff line number Diff line change
Expand Up @@ -2256,16 +2256,16 @@ message ExecutionProgress {
DOWNLOADING_INPUTS = 2;

// Booting a new VM.
BOOTING_VM = 4;
BOOTING_VM = 3;

// Resuming a VM from snapshot.
RESUMING_VM = 5;
RESUMING_VM = 4;

// Executing the command for the action.
EXECUTING_COMMAND = 6;
EXECUTING_COMMAND = 5;

// Uploading action outputs.
UPLOADING_OUTPUTS = 7;
UPLOADING_OUTPUTS = 6;
}
}

Expand Down
1 change: 1 addition & 0 deletions server/util/rexec/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ go_library(
"//server/interfaces",
"//server/remote_cache/cachetools",
"//server/remote_cache/digest",
"//server/util/proto",
"//server/util/retry",
"//server/util/status",
"@org_golang_google_genproto//googleapis/longrunning",
Expand Down
Loading
Loading