-
Notifications
You must be signed in to change notification settings - Fork 63
/
tracing_build_executor.go
68 lines (59 loc) · 2.66 KB
/
tracing_build_executor.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
package builder
import (
"context"
remoteexecution "github.com/bazelbuild/remote-apis/build/bazel/remote/execution/v2"
re_filesystem "github.com/buildbarn/bb-remote-execution/pkg/filesystem"
"github.com/buildbarn/bb-remote-execution/pkg/filesystem/access"
"github.com/buildbarn/bb-remote-execution/pkg/proto/remoteworker"
"github.com/buildbarn/bb-storage/pkg/digest"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/trace"
)
type tracingBuildExecutor struct {
BuildExecutor
tracer trace.Tracer
}
// NewTracingBuildExecutor is a decorator for BuildExecutor that creates
// an OpenTelemetry trace span for every action that is executed. At the
// start of every execution state, an event is added to the span that
// indicates which state is entered.
func NewTracingBuildExecutor(buildExecutor BuildExecutor, tracerProvider trace.TracerProvider) BuildExecutor {
return &tracingBuildExecutor{
BuildExecutor: buildExecutor,
tracer: tracerProvider.Tracer("github.com/buildbarn/bb-remote-execution/pkg/builder"),
}
}
func (be *tracingBuildExecutor) Execute(ctx context.Context, filePool re_filesystem.FilePool, monitor access.UnreadDirectoryMonitor, digestFunction digest.Function, request *remoteworker.DesiredState_Executing, executionStateUpdates chan<- *remoteworker.CurrentState_Executing) *remoteexecution.ExecuteResponse {
actionDigest := request.ActionDigest
action := request.Action
ctxWithTracing, span := be.tracer.Start(ctx, "BuildExecutor.Execute", trace.WithAttributes(
attribute.String("action_digest.hash", actionDigest.GetHash()),
attribute.Int64("action_digest.size_bytes", actionDigest.GetSizeBytes()),
attribute.String("digest_function", digestFunction.GetEnumValue().String()),
attribute.Bool("do_not_cache", action.GetDoNotCache()),
attribute.String("instance_name", digestFunction.GetInstanceName().String()),
attribute.Float64("timeout", action.GetTimeout().AsDuration().Seconds()),
))
defer span.End()
baseUpdates := make(chan *remoteworker.CurrentState_Executing)
baseCompletion := make(chan *remoteexecution.ExecuteResponse)
go func() {
baseCompletion <- be.BuildExecutor.Execute(ctxWithTracing, filePool, monitor, digestFunction, request, baseUpdates)
}()
for {
select {
case update := <-baseUpdates:
switch update.ExecutionState.(type) {
case *remoteworker.CurrentState_Executing_FetchingInputs:
span.AddEvent("FetchingInputs")
case *remoteworker.CurrentState_Executing_Running:
span.AddEvent("Running")
case *remoteworker.CurrentState_Executing_UploadingOutputs:
span.AddEvent("UploadingOutputs")
}
executionStateUpdates <- update
case response := <-baseCompletion:
return response
}
}
}