Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

client: add OTEL_RESOURCE_ATTRIBUTES env var. #14556

Draft
wants to merge 2 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .changelog/14556.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
```release-note:improvement
client: expose Nomad attributes to allocations using the `OTEL_RESOURCE_ATTRIBUTES` environment variable
```
144 changes: 144 additions & 0 deletions client/allocrunner/taskrunner/otel_hook.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,144 @@
package taskrunner

import (
"context"
"fmt"
"net/url"

log "github.com/hashicorp/go-hclog"
multierror "github.com/hashicorp/go-multierror"
"github.com/hashicorp/nomad/client/allocrunner/interfaces"
"github.com/hashicorp/nomad/nomad/structs"
"go.opentelemetry.io/otel/baggage"
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

They went with the name "baggage" for this concept? 😦

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah yeah, that's another spec 😅
https://www.w3.org/TR/baggage/

)

const envKeyOtelResourceAttrs = "OTEL_RESOURCE_ATTRIBUTES"

type otelHookConfig struct {
logger log.Logger
alloc *structs.Allocation
node *structs.Node
}

type otelHook struct {
alloc *structs.Allocation
node *structs.Node
logger log.Logger
}

func newOtelHook(config *otelHookConfig) *otelHook {
hook := &otelHook{
alloc: config.alloc,
node: config.node,
}
hook.logger = config.logger.Named(hook.Name()).
With("alloc_id", config.alloc.ID)

return hook
}

func (h *otelHook) Name() string {
return "otel"
}

func (h *otelHook) Prestart(_ context.Context, req *interfaces.TaskPrestartRequest, resp *interfaces.TaskPrestartResponse) error {
logger := h.logger.With("task", req.Task.Name)

resourceAttrsEnv, ok := req.TaskEnv.EnvMap[envKeyOtelResourceAttrs]
if ok && resourceAttrsEnv == "" {
logger.Debug("skipping OTEL_RESOURCE_ATTRIBUTES environment variable")
return nil
}

resourceAttrs, err := generateBaggage(h.alloc, req.Task, h.node)
if err != nil {
logger.Warn("failed to generate OTEL_RESOURCE_ATTRIBUTES environment variable", "error", err)
return nil
}

if resourceAttrsEnv != "" {
logger.Debug("merging existing OTEL_RESOURCE_ATTRIBUTES environment variable values", "attrs", resourceAttrsEnv)

taskBaggage, err := baggage.Parse(resourceAttrsEnv)
if err != nil {
logger.Warn("failed to parse task environment variable OTEL_RESOURCE_ATTRIBUTES as baggage",
"otel_resource_attributes", resourceAttrsEnv, "error", err)
} else {
for _, m := range taskBaggage.Members() {
k, v := m.Key(), m.Value()
logger.Trace("found member", "key", k, "value", v)

// TODO(luiz): don't create new member once baggage.Members()
// returns values with `hasData` set to `true`.
// https://github.com/open-telemetry/opentelemetry-go/issues/3164
member, err := baggage.NewMember(k, v)
if err != nil {
logger.Warn("failed to create new baggage member", "key", k, "value", v, "error", err)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The ridiculous name "baggage" aside, which isn't your fault, I'm not sure we should expose that inside-baseball terminology to end-users?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah right, good point 👍

continue
}

resourceAttrs, err = resourceAttrs.SetMember(member)
if err != nil {
logger.Warn("failed to set new baggage member", "key", k, "value", v, "error", err)
continue
}
}
}
}

// TODO(luiz): remove decode step once the Otel SDK handles it internally.
// https://github.com/open-telemetry/opentelemetry-go/pull/2963
attrs, err := url.QueryUnescape(resourceAttrs.String())
if err != nil {
attrs = resourceAttrs.String()
}
Comment on lines +89 to +94
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I may be misunderstanding this PR, but this looks like it impacts the read side and not the write side. If the read side of the SDK gets fixed, don't we still need to encode on the write side so that older versions of the SDK aren't broken? (And are there lots of read-side SDKs for different languages? If there's only go, then it doesn't seem sensible for us to bake-in support for a single language in Nomad.)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reading a bit more, this looks like we could end up double-encoding in the case where the user has something we're merging onto.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The problem was a mismatch between how the baggage spec and the collector handled encoding values. Only the baggage required encoding/decoding so the more general "fix" was an update to the spec. Other languages will need to be updated to handle this as well, but good point on supporting older versions. I will make sure to test it.

Attempting to double-encode would result in an error that is handled by the using the original string.

resp.Env = map[string]string{
envKeyOtelResourceAttrs: attrs,
}
return nil
}

func generateBaggage(alloc *structs.Allocation, task *structs.Task, node *structs.Node) (baggage.Baggage, error) {
tgross marked this conversation as resolved.
Show resolved Hide resolved
var mErr *multierror.Error
job := alloc.Job
members := []baggage.Member{
newMember("nomad.alloc.createTime", fmt.Sprintf("%v", alloc.CreateTime), mErr),
newMember("nomad.alloc.id", alloc.ID, mErr),
newMember("nomad.alloc.name", alloc.Name, mErr),
newMember("nomad.eval.id", alloc.EvalID, mErr),
newMember("nomad.group.name", alloc.TaskGroup, mErr),
newMember("nomad.job.id", job.ID, mErr),
newMember("nomad.job.name", job.Name, mErr),
newMember("nomad.job.region", job.Region, mErr),
newMember("nomad.job.type", job.Type, mErr),
newMember("nomad.namespace", alloc.Namespace, mErr),
newMember("nomad.node.id", node.ID, mErr),
newMember("nomad.node.name", node.Name, mErr),
newMember("nomad.node.datacenter", node.Datacenter, mErr),
newMember("nomad.task.name", task.Name, mErr),
newMember("nomad.task.driver", task.Driver, mErr),
}
Comment on lines +104 to +120
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure I understand the value to operators in providing these specific attributes to task processes. Ex. why do we want to expose the eval ID for every running process on the cluster?

From a design standpoint I'm not sure it makes sense to have Nomad itself define a hard-coded set of attributes, rather than making this something the operators define (as either client configuration or in the jobspec). Could this whole thing be done via clever templating of an env block, and if it can be but only with a lot of work, could we implement something to make it easier instead of tying ourselves to the extremely fast-moving OTEL project?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah I see. I thought of some kind of configuration (either jobspec or client side), but that would be even more commitment to keep it updated, or meta values, but they are not really well defined and should probably remain opaque to Nomad.

Maybe an external project may be better for now then. I will think more about it, thanks!

if job.ParentID != "" {
members = append(members, newMember("nomad.job.parentId", job.ParentID, mErr))
}
if node.NodeClass != "" {
members = append(members, newMember("nomad.node.class", node.NodeClass, mErr))
}
if err := mErr.ErrorOrNil(); err != nil {
return baggage.Baggage{}, err
}

b, err := baggage.New(members...)
if err != nil {
_ = multierror.Append(mErr, err)
}
return b, mErr.ErrorOrNil()
}

func newMember(key, value string, mErr *multierror.Error) baggage.Member {
m, err := baggage.NewMember(key, value)
if err != nil {
_ = multierror.Append(mErr, err)
}
return m
}
140 changes: 140 additions & 0 deletions client/allocrunner/taskrunner/otel_hook_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,140 @@
package taskrunner

import (
"context"
"fmt"
"os"
"testing"

"github.com/hashicorp/go-hclog"
"github.com/hashicorp/nomad/ci"
"github.com/hashicorp/nomad/client/allocdir"
"github.com/hashicorp/nomad/client/allocrunner/interfaces"
"github.com/hashicorp/nomad/client/taskenv"
"github.com/hashicorp/nomad/nomad/mock"
"go.opentelemetry.io/otel/baggage"

"github.com/shoenig/test/must"
)

// Statically assert the otel hook implements the expected interfaces
var _ interfaces.TaskPrestartHook = &otelHook{}

func TestTaskRunner_OtelHook(t *testing.T) {
ci.Parallel(t)

testCases := []struct {
name string
taskEnv map[string]string
expectNomadAttrs bool
expectAdditionalAttrs map[string]string
}{
{
name: "tasks have otel resource attributes env var",
expectNomadAttrs: true,
},
{
name: "disable otel resource attributes env var",
taskEnv: map[string]string{
envKeyOtelResourceAttrs: "",
},
expectNomadAttrs: false,
},
{
name: "merge otel resource attributes env var",
taskEnv: map[string]string{
envKeyOtelResourceAttrs: "test=true",
},
expectNomadAttrs: true,
expectAdditionalAttrs: map[string]string{
"test": "true",
},
},
{
name: "invalid values are ignored",
taskEnv: map[string]string{
envKeyOtelResourceAttrs: "not-valid",
},
expectNomadAttrs: true,
},
}

for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
alloc := mock.Alloc()
node := mock.Node()
task := mock.Job().TaskGroups[0].Tasks[0]

otelHook := newOtelHook(&otelHookConfig{
logger: hclog.NewNullLogger(),
alloc: alloc,
node: node,
})

// Setup task environment with addition test values.
builder := taskenv.NewBuilder(node, alloc, task, "global")
taskEnv := builder.Build()
for k, v := range tc.taskEnv {
taskEnv.EnvMap[k] = v
}

// Run hook.
req := &interfaces.TaskPrestartRequest{
TaskEnv: taskEnv,
TaskDir: &allocdir.TaskDir{Dir: os.TempDir()},
Task: task,
}
resp := interfaces.TaskPrestartResponse{}
err := otelHook.Prestart(context.Background(), req, &resp)
must.NoError(t, err)

// Read and parse resulting OTEL_RESOURCE_ATTRIBUTES env var.
got := resp.Env[envKeyOtelResourceAttrs]
b, err := baggage.Parse(got)
must.NoError(t, err)

if tc.expectNomadAttrs {
must.Eq(t, b.Member("nomad.alloc.id").Value(), alloc.ID)
must.Eq(t, b.Member("nomad.alloc.name").Value(), alloc.Name)
must.Eq(t, b.Member("nomad.alloc.createTime").Value(), fmt.Sprintf("%v", alloc.CreateTime))
must.Eq(t, b.Member("nomad.eval.id").Value(), alloc.EvalID)
must.Eq(t, b.Member("nomad.job.id").Value(), alloc.Job.ID)
must.Eq(t, b.Member("nomad.job.name").Value(), alloc.Job.Name)
must.Eq(t, b.Member("nomad.job.region").Value(), alloc.Job.Region)
must.Eq(t, b.Member("nomad.job.type").Value(), alloc.Job.Type)
must.Eq(t, b.Member("nomad.namespace").Value(), alloc.Namespace)
must.Eq(t, b.Member("nomad.node.id").Value(), node.ID)
must.Eq(t, b.Member("nomad.node.name").Value(), node.Name)
must.Eq(t, b.Member("nomad.node.datacenter").Value(), node.Datacenter)
must.Eq(t, b.Member("nomad.task.name").Value(), task.Name)
must.Eq(t, b.Member("nomad.task.driver").Value(), task.Driver)

if alloc.Job.ParentID != "" {
must.Eq(t, b.Member("nomad.job.parentId").Value(), alloc.Job.ParentID)
} else {
must.Eq(t, b.Member("nomad.job.parentId"), baggage.Member{})
}

if node.NodeClass != "" {
must.Eq(t, b.Member("nomad.node.class").Value(), node.NodeClass)
} else {
must.Eq(t, b.Member("nomad.node.class"), baggage.Member{})
}
} else {
must.Eq(t, got, "")
}

if len(tc.expectAdditionalAttrs) > 0 {
for k, v := range tc.expectAdditionalAttrs {
must.Eq(t, b.Member(k).Value(), v)
}
} else {
for _, m := range b.Members() {
// If not additional values are expected, all attributes
// must be related to Nomad.
must.StrContains(t, m.Key(), "nomad")
}
}
})
}
}
5 changes: 5 additions & 0 deletions client/allocrunner/taskrunner/task_runner_hooks.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,11 @@ func (tr *TaskRunner) initHooks() {
newArtifactHook(tr, tr.getter, hookLogger),
newStatsHook(tr, tr.clientConfig.StatsCollectionInterval, hookLogger),
newDeviceHook(tr.devicemanager, hookLogger),
newOtelHook(&otelHookConfig{
logger: hookLogger,
alloc: tr.Alloc(),
node: tr.clientConfig.Node,
}),
}

// If the task has a CSI stanza, add the hook.
Expand Down
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,7 @@ require (
github.com/zclconf/go-cty v1.8.0
github.com/zclconf/go-cty-yaml v1.0.2
go.etcd.io/bbolt v1.3.6
go.opentelemetry.io/otel v1.9.0
go.uber.org/goleak v1.1.12
golang.org/x/crypto v0.0.0-20220622213112-05595931fe9d
golang.org/x/exp v0.0.0-20220609121020-a51bd0440498
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -1294,6 +1294,8 @@ go.opencensus.io v0.22.4/go.mod h1:yxeiOL68Rb0Xd1ddK5vPZ/oVn4vY4Ynel7k9FzqtOIw=
go.opencensus.io v0.22.5/go.mod h1:5pWMHQbX5EPX2/62yrJeAkowc+lfs/XD7Uxpq3pI6kk=
go.opencensus.io v0.23.0 h1:gqCw0LfLxScz8irSi8exQc7fyQ0fKQU/qnC/X8+V/1M=
go.opencensus.io v0.23.0/go.mod h1:XItmlyltB5F7CS4xOC1DcqMoFqwtC6OG2xF7mCv7P7E=
go.opentelemetry.io/otel v1.9.0 h1:8WZNQFIB2a71LnANS9JeyidJKKGOOremcUtb/OtHISw=
go.opentelemetry.io/otel v1.9.0/go.mod h1:np4EoPGzoPs3O67xUVNoPPcmSvsfOxNlNA4F4AC+0Eo=
go.opentelemetry.io/proto/otlp v0.7.0/go.mod h1:PqfVotwruBrMGOCsRd/89rSnXhoiJIqeYNgFYFoEGnI=
go.uber.org/atomic v1.3.2/go.mod h1:gD2HeocX3+yG+ygLZcrzQJaqmWj9AIm7n08wl/qW/PE=
go.uber.org/atomic v1.4.0/go.mod h1:gD2HeocX3+yG+ygLZcrzQJaqmWj9AIm7n08wl/qW/PE=
Expand Down
Loading