diff --git a/.github/CODEOWNERS b/.github/CODEOWNERS index 1ce79f07b8778..db6bd1cc04140 100644 --- a/.github/CODEOWNERS +++ b/.github/CODEOWNERS @@ -417,6 +417,7 @@ /pkg/util/pdhutil/ @DataDog/windows-agent /pkg/util/winutil/ @DataDog/windows-agent /pkg/util/testutil/flake @DataDog/agent-developer-tools +/pkg/util/trie @DataDog/container-integrations /pkg/languagedetection @DataDog/processes @DataDog/universal-service-monitoring /pkg/logs/ @DataDog/agent-metrics-logs /pkg/logs/launchers/windowsevent @DataDog/agent-metrics-logs @DataDog/windows-agent diff --git a/comp/core/workloadmeta/collectors/internal/containerd/container_builder.go b/comp/core/workloadmeta/collectors/internal/containerd/container_builder.go index 3b09caaaae1c7..5f0152be7f939 100644 --- a/comp/core/workloadmeta/collectors/internal/containerd/container_builder.go +++ b/comp/core/workloadmeta/collectors/internal/containerd/container_builder.go @@ -135,6 +135,9 @@ func buildWorkloadMetaContainer(namespace string, container containerd.Container workloadContainer.EnvVars = envs workloadContainer.Hostname = spec.Hostname + if spec.Linux != nil { + workloadContainer.CgroupPath = extractCgroupPath(spec.Linux.CgroupsPath) + } } else if errors.Is(err, cutil.ErrSpecTooLarge) { log.Warnf("Skipping parsing of container spec for container id: %s, spec is bigger than: %d", info.ID, cutil.DefaultAllowedSpecMaxSize) } else { @@ -144,6 +147,17 @@ func buildWorkloadMetaContainer(namespace string, container containerd.Container return workloadContainer, nil } +// Containerd applies some transformations to the cgroup path, we need to revert them +// https://github.com/containerd/containerd/blob/b168147ca8fccf05003117324f493d40f97b6077/internal/cri/server/podsandbox/helpers_linux.go#L64-L65 +// See https://github.com/opencontainers/runc/blob/main/docs/systemd.md +func extractCgroupPath(path string) string { + res := path + if l := strings.Split(path, ":"); len(l) == 3 { + res = l[0] + "/" + l[1] + "-" + l[2] + ".scope" + } + return res +} + func extractStatus(status containerd.ProcessStatus) workloadmeta.ContainerStatus { switch status { case containerd.Paused, containerd.Pausing: diff --git a/comp/core/workloadmeta/collectors/internal/containerd/container_builder_test.go b/comp/core/workloadmeta/collectors/internal/containerd/container_builder_test.go index a2d590f929dd7..8a106993c0ae8 100644 --- a/comp/core/workloadmeta/collectors/internal/containerd/container_builder_test.go +++ b/comp/core/workloadmeta/collectors/internal/containerd/container_builder_test.go @@ -109,7 +109,9 @@ func TestBuildWorkloadMetaContainer(t *testing.T) { }, nil }, MockSpec: func(namespace string, ctn containers.Container) (*oci.Spec, error) { - return &oci.Spec{Hostname: hostName, Process: &specs.Process{Env: envVarStrs}}, nil + return &oci.Spec{Hostname: hostName, Process: &specs.Process{Env: envVarStrs}, Linux: &specs.Linux{ + CgroupsPath: "kubelet-kubepods-burstable-pod99dcb84d2a34f7e338778606703258c4.slice:cri-containerd:ec9ea0ad54dd0d96142d5dbe11eb3f1509e12ba9af739620c7b5ad377ce94602", + }}, nil }, MockStatus: func(namespace string, ctn containerd.Container) (containerd.ProcessStatus, error) { return containerd.Running, nil @@ -175,10 +177,49 @@ func TestBuildWorkloadMetaContainer(t *testing.T) { NetworkIPs: make(map[string]string), // Not available Hostname: hostName, PID: 0, // Not available + CgroupPath: "kubelet-kubepods-burstable-pod99dcb84d2a34f7e338778606703258c4.slice/cri-containerd-ec9ea0ad54dd0d96142d5dbe11eb3f1509e12ba9af739620c7b5ad377ce94602.scope", } assert.Equal(t, expected, result) } +func TestExtractCgroupPath(t *testing.T) { + tests := []struct { + name string + path string + expected string + }{ + // cgroupfs retrieved using minikube + qemu2 driver + { + name: "cgroupfs + kubernetes pod", + path: "/kubepods/burstable/pod84a7cac1-5690-4935-bffc-4b808e0240e4/0af39253daf5d1054519efdd054023e929785c2813c29f6a0ce887f652e1a997", + expected: "/kubepods/burstable/pod84a7cac1-5690-4935-bffc-4b808e0240e4/0af39253daf5d1054519efdd054023e929785c2813c29f6a0ce887f652e1a997", + }, + // systemd retrieved using kind + { + name: "systemd + kubernetes pod", + path: "kubelet-kubepods-burstable-pod99dcb84d2a34f7e338778606703258c4.slice:cri-containerd:ec9ea0ad54dd0d96142d5dbe11eb3f1509e12ba9af739620c7b5ad377ce94602", + expected: "kubelet-kubepods-burstable-pod99dcb84d2a34f7e338778606703258c4.slice/cri-containerd-ec9ea0ad54dd0d96142d5dbe11eb3f1509e12ba9af739620c7b5ad377ce94602.scope", + }, + // custom retrieved using ctr run + { + name: "systemd/cgroupfs + container", + path: "/default/redis", + expected: "/default/redis", + }, + // garden + { + name: "garden", + path: "/garden/1a65c217-84b2-8d13-3d78-46b14b6c7ea1", + expected: "/garden/1a65c217-84b2-8d13-3d78-46b14b6c7ea1", + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + assert.Equal(t, tt.expected, extractCgroupPath(tt.path)) + }) + } +} + func TestExtractRuntimeFlavor(t *testing.T) { tests := []struct { name string diff --git a/comp/core/workloadmeta/dump_test.go b/comp/core/workloadmeta/dump_test.go index 3e5fb0116c93d..94db86417ccb8 100644 --- a/comp/core/workloadmeta/dump_test.go +++ b/comp/core/workloadmeta/dump_test.go @@ -12,6 +12,7 @@ import ( "github.com/DataDog/datadog-agent/comp/core/config" "github.com/DataDog/datadog-agent/comp/core/log/logimpl" "github.com/DataDog/datadog-agent/pkg/util/fxutil" + "github.com/stretchr/testify/assert" "go.uber.org/fx" ) @@ -59,7 +60,8 @@ func TestDump(t *testing.T) { Image: ContainerImage{ Tag: "latest", }, - PID: 1, + PID: 1, + CgroupPath: "/default/ctr-id", } s.handleEvents([]CollectorEvent{ @@ -134,6 +136,7 @@ Allowed env variables: DD_SERVICE:my-svc DD_ENV:prod DD_VERSION:v1 Hostname: Network IPs: PID: 0 +Cgroup path: `, "source:source2 id: ctr-id": `----------- Entity ID ----------- Kind: container ID: ctr-id @@ -163,6 +166,7 @@ Allowed env variables: Hostname: Network IPs: PID: 1 +Cgroup path: /default/ctr-id `, "sources(merged):[source1 source2] id: ctr-id": `----------- Entity ID ----------- Kind: container ID: ctr-id @@ -192,6 +196,7 @@ Allowed env variables: DD_SERVICE:my-svc DD_ENV:prod DD_VERSION:v1 Hostname: Network IPs: PID: 1 +Cgroup path: /default/ctr-id `, }, }, diff --git a/comp/core/workloadmeta/proto/proto.go b/comp/core/workloadmeta/proto/proto.go index 1646860a85587..a5888bd051c75 100644 --- a/comp/core/workloadmeta/proto/proto.go +++ b/comp/core/workloadmeta/proto/proto.go @@ -146,6 +146,7 @@ func protoContainerFromWorkloadmetaContainer(container *workloadmeta.Container) Runtime: protoRuntime, State: protoContainerState, CollectorTags: container.CollectorTags, + CgroupPath: container.CgroupPath, }, nil } @@ -611,6 +612,7 @@ func toWorkloadmetaContainer(protoContainer *pb.Container) (*workloadmeta.Contai Runtime: runtime, State: state, CollectorTags: protoContainer.CollectorTags, + CgroupPath: protoContainer.CgroupPath, }, nil } diff --git a/comp/core/workloadmeta/types.go b/comp/core/workloadmeta/types.go index b25df2a1198d5..b0a5b6beb5e28 100644 --- a/comp/core/workloadmeta/types.go +++ b/comp/core/workloadmeta/types.go @@ -522,6 +522,10 @@ type Container struct { Owner *EntityID SecurityContext *ContainerSecurityContext Resources ContainerResources + // CgroupPath is a path to the cgroup of the container. + // It can be relative to the cgroup parent. + // Linux only. + CgroupPath string } // GetID implements Entity#GetID. @@ -575,6 +579,7 @@ func (c Container) String(verbose bool) string { _, _ = fmt.Fprintln(&sb, "Hostname:", c.Hostname) _, _ = fmt.Fprintln(&sb, "Network IPs:", mapToString(c.NetworkIPs)) _, _ = fmt.Fprintln(&sb, "PID:", c.PID) + _, _ = fmt.Fprintln(&sb, "Cgroup path:", c.CgroupPath) } if len(c.Ports) > 0 && verbose { diff --git a/comp/dogstatsd/server/convert_bench_test.go b/comp/dogstatsd/server/convert_bench_test.go index 22003455704dd..4725d13cf671e 100644 --- a/comp/dogstatsd/server/convert_bench_test.go +++ b/comp/dogstatsd/server/convert_bench_test.go @@ -78,5 +78,5 @@ type ServerDeps struct { } func newServerDeps(t testing.TB, options ...fx.Option) ServerDeps { - return fxutil.Test[ServerDeps](t, core.MockBundle(), workloadmeta.MockModule(), fx.Supply(workloadmeta.NewParams()), fx.Options(options...)) + return fxutil.Test[ServerDeps](t, core.MockBundle(), workloadmeta.MockModuleV2(), fx.Supply(workloadmeta.NewParams()), fx.Options(options...)) } diff --git a/pkg/proto/datadog/workloadmeta/workloadmeta.proto b/pkg/proto/datadog/workloadmeta/workloadmeta.proto index c691a3187276d..fd3c522ebe8b8 100644 --- a/pkg/proto/datadog/workloadmeta/workloadmeta.proto +++ b/pkg/proto/datadog/workloadmeta/workloadmeta.proto @@ -51,7 +51,7 @@ message ContainerImage { string name = 3; string shortName = 4; string tag = 5; - string repo_digest = 6; + string repo_digest = 6; } message ContainerPort { @@ -106,6 +106,7 @@ message Container { Runtime runtime = 9; ContainerState state = 10; repeated string collectorTags = 11; + string cgroupPath = 12; } message KubernetesPodOwner { diff --git a/pkg/proto/pbgo/core/api.pb.go b/pkg/proto/pbgo/core/api.pb.go index 9efb3a8f6e967..46229348979b6 100644 --- a/pkg/proto/pbgo/core/api.pb.go +++ b/pkg/proto/pbgo/core/api.pb.go @@ -1,7 +1,7 @@ // Code generated by protoc-gen-go. DO NOT EDIT. // versions: -// protoc-gen-go v1.30.0 -// protoc v4.25.2 +// protoc-gen-go v1.34.0 +// protoc v5.26.1 // source: datadog/api/v1/api.proto package core diff --git a/pkg/proto/pbgo/core/api.pb.gw.go b/pkg/proto/pbgo/core/api.pb.gw.go index 023bf1b866b58..a1db6c3c8fe39 100644 --- a/pkg/proto/pbgo/core/api.pb.gw.go +++ b/pkg/proto/pbgo/core/api.pb.gw.go @@ -21,7 +21,6 @@ import ( "google.golang.org/grpc" "google.golang.org/grpc/codes" "google.golang.org/grpc/grpclog" - "google.golang.org/grpc/metadata" "google.golang.org/grpc/status" ) @@ -32,7 +31,6 @@ var _ status.Status var _ = runtime.String var _ = utilities.NewDoubleArray var _ = descriptor.ForMessage -var _ = metadata.Join func request_Agent_GetHostname_0(ctx context.Context, marshaler runtime.Marshaler, client AgentClient, req *http.Request, pathParams map[string]string) (proto.Message, runtime.ServerMetadata, error) { var protoReq HostnameRequest @@ -343,14 +341,11 @@ func request_AgentSecure_WorkloadmetaStreamEntities_0(ctx context.Context, marsh // RegisterAgentHandlerServer registers the http handlers for service Agent to "mux". // UnaryRPC :call AgentServer directly. // StreamingRPC :currently unsupported pending https://github.com/grpc/grpc-go/issues/906. -// Note that using this registration option will cause many gRPC library features to stop working. Consider using RegisterAgentHandlerFromEndpoint instead. func RegisterAgentHandlerServer(ctx context.Context, mux *runtime.ServeMux, server AgentServer) error { mux.Handle("GET", pattern_Agent_GetHostname_0, func(w http.ResponseWriter, req *http.Request, pathParams map[string]string) { ctx, cancel := context.WithCancel(req.Context()) defer cancel() - var stream runtime.ServerTransportStream - ctx = grpc.NewContextWithServerTransportStream(ctx, &stream) inboundMarshaler, outboundMarshaler := runtime.MarshalerForRequest(mux, req) rctx, err := runtime.AnnotateIncomingContext(ctx, mux, req) if err != nil { @@ -358,7 +353,6 @@ func RegisterAgentHandlerServer(ctx context.Context, mux *runtime.ServeMux, serv return } resp, md, err := local_request_Agent_GetHostname_0(rctx, inboundMarshaler, server, req, pathParams) - md.HeaderMD, md.TrailerMD = metadata.Join(md.HeaderMD, stream.Header()), metadata.Join(md.TrailerMD, stream.Trailer()) ctx = runtime.NewServerMetadataContext(ctx, md) if err != nil { runtime.HTTPError(ctx, mux, outboundMarshaler, w, req, err) @@ -375,7 +369,6 @@ func RegisterAgentHandlerServer(ctx context.Context, mux *runtime.ServeMux, serv // RegisterAgentSecureHandlerServer registers the http handlers for service AgentSecure to "mux". // UnaryRPC :call AgentSecureServer directly. // StreamingRPC :currently unsupported pending https://github.com/grpc/grpc-go/issues/906. -// Note that using this registration option will cause many gRPC library features to stop working. Consider using RegisterAgentSecureHandlerFromEndpoint instead. func RegisterAgentSecureHandlerServer(ctx context.Context, mux *runtime.ServeMux, server AgentSecureServer) error { mux.Handle("POST", pattern_AgentSecure_TaggerStreamEntities_0, func(w http.ResponseWriter, req *http.Request, pathParams map[string]string) { @@ -388,8 +381,6 @@ func RegisterAgentSecureHandlerServer(ctx context.Context, mux *runtime.ServeMux mux.Handle("POST", pattern_AgentSecure_TaggerFetchEntity_0, func(w http.ResponseWriter, req *http.Request, pathParams map[string]string) { ctx, cancel := context.WithCancel(req.Context()) defer cancel() - var stream runtime.ServerTransportStream - ctx = grpc.NewContextWithServerTransportStream(ctx, &stream) inboundMarshaler, outboundMarshaler := runtime.MarshalerForRequest(mux, req) rctx, err := runtime.AnnotateIncomingContext(ctx, mux, req) if err != nil { @@ -397,7 +388,6 @@ func RegisterAgentSecureHandlerServer(ctx context.Context, mux *runtime.ServeMux return } resp, md, err := local_request_AgentSecure_TaggerFetchEntity_0(rctx, inboundMarshaler, server, req, pathParams) - md.HeaderMD, md.TrailerMD = metadata.Join(md.HeaderMD, stream.Header()), metadata.Join(md.TrailerMD, stream.Trailer()) ctx = runtime.NewServerMetadataContext(ctx, md) if err != nil { runtime.HTTPError(ctx, mux, outboundMarshaler, w, req, err) @@ -411,8 +401,6 @@ func RegisterAgentSecureHandlerServer(ctx context.Context, mux *runtime.ServeMux mux.Handle("POST", pattern_AgentSecure_DogstatsdCaptureTrigger_0, func(w http.ResponseWriter, req *http.Request, pathParams map[string]string) { ctx, cancel := context.WithCancel(req.Context()) defer cancel() - var stream runtime.ServerTransportStream - ctx = grpc.NewContextWithServerTransportStream(ctx, &stream) inboundMarshaler, outboundMarshaler := runtime.MarshalerForRequest(mux, req) rctx, err := runtime.AnnotateIncomingContext(ctx, mux, req) if err != nil { @@ -420,7 +408,6 @@ func RegisterAgentSecureHandlerServer(ctx context.Context, mux *runtime.ServeMux return } resp, md, err := local_request_AgentSecure_DogstatsdCaptureTrigger_0(rctx, inboundMarshaler, server, req, pathParams) - md.HeaderMD, md.TrailerMD = metadata.Join(md.HeaderMD, stream.Header()), metadata.Join(md.TrailerMD, stream.Trailer()) ctx = runtime.NewServerMetadataContext(ctx, md) if err != nil { runtime.HTTPError(ctx, mux, outboundMarshaler, w, req, err) @@ -434,8 +421,6 @@ func RegisterAgentSecureHandlerServer(ctx context.Context, mux *runtime.ServeMux mux.Handle("POST", pattern_AgentSecure_DogstatsdSetTaggerState_0, func(w http.ResponseWriter, req *http.Request, pathParams map[string]string) { ctx, cancel := context.WithCancel(req.Context()) defer cancel() - var stream runtime.ServerTransportStream - ctx = grpc.NewContextWithServerTransportStream(ctx, &stream) inboundMarshaler, outboundMarshaler := runtime.MarshalerForRequest(mux, req) rctx, err := runtime.AnnotateIncomingContext(ctx, mux, req) if err != nil { @@ -443,7 +428,6 @@ func RegisterAgentSecureHandlerServer(ctx context.Context, mux *runtime.ServeMux return } resp, md, err := local_request_AgentSecure_DogstatsdSetTaggerState_0(rctx, inboundMarshaler, server, req, pathParams) - md.HeaderMD, md.TrailerMD = metadata.Join(md.HeaderMD, stream.Header()), metadata.Join(md.TrailerMD, stream.Trailer()) ctx = runtime.NewServerMetadataContext(ctx, md) if err != nil { runtime.HTTPError(ctx, mux, outboundMarshaler, w, req, err) @@ -457,8 +441,6 @@ func RegisterAgentSecureHandlerServer(ctx context.Context, mux *runtime.ServeMux mux.Handle("POST", pattern_AgentSecure_ClientGetConfigs_0, func(w http.ResponseWriter, req *http.Request, pathParams map[string]string) { ctx, cancel := context.WithCancel(req.Context()) defer cancel() - var stream runtime.ServerTransportStream - ctx = grpc.NewContextWithServerTransportStream(ctx, &stream) inboundMarshaler, outboundMarshaler := runtime.MarshalerForRequest(mux, req) rctx, err := runtime.AnnotateIncomingContext(ctx, mux, req) if err != nil { @@ -466,7 +448,6 @@ func RegisterAgentSecureHandlerServer(ctx context.Context, mux *runtime.ServeMux return } resp, md, err := local_request_AgentSecure_ClientGetConfigs_0(rctx, inboundMarshaler, server, req, pathParams) - md.HeaderMD, md.TrailerMD = metadata.Join(md.HeaderMD, stream.Header()), metadata.Join(md.TrailerMD, stream.Trailer()) ctx = runtime.NewServerMetadataContext(ctx, md) if err != nil { runtime.HTTPError(ctx, mux, outboundMarshaler, w, req, err) @@ -480,8 +461,6 @@ func RegisterAgentSecureHandlerServer(ctx context.Context, mux *runtime.ServeMux mux.Handle("POST", pattern_AgentSecure_GetConfigState_0, func(w http.ResponseWriter, req *http.Request, pathParams map[string]string) { ctx, cancel := context.WithCancel(req.Context()) defer cancel() - var stream runtime.ServerTransportStream - ctx = grpc.NewContextWithServerTransportStream(ctx, &stream) inboundMarshaler, outboundMarshaler := runtime.MarshalerForRequest(mux, req) rctx, err := runtime.AnnotateIncomingContext(ctx, mux, req) if err != nil { @@ -489,7 +468,6 @@ func RegisterAgentSecureHandlerServer(ctx context.Context, mux *runtime.ServeMux return } resp, md, err := local_request_AgentSecure_GetConfigState_0(rctx, inboundMarshaler, server, req, pathParams) - md.HeaderMD, md.TrailerMD = metadata.Join(md.HeaderMD, stream.Header()), metadata.Join(md.TrailerMD, stream.Trailer()) ctx = runtime.NewServerMetadataContext(ctx, md) if err != nil { runtime.HTTPError(ctx, mux, outboundMarshaler, w, req, err) @@ -503,8 +481,6 @@ func RegisterAgentSecureHandlerServer(ctx context.Context, mux *runtime.ServeMux mux.Handle("POST", pattern_AgentSecure_ClientGetConfigsHA_0, func(w http.ResponseWriter, req *http.Request, pathParams map[string]string) { ctx, cancel := context.WithCancel(req.Context()) defer cancel() - var stream runtime.ServerTransportStream - ctx = grpc.NewContextWithServerTransportStream(ctx, &stream) inboundMarshaler, outboundMarshaler := runtime.MarshalerForRequest(mux, req) rctx, err := runtime.AnnotateIncomingContext(ctx, mux, req) if err != nil { @@ -512,7 +488,6 @@ func RegisterAgentSecureHandlerServer(ctx context.Context, mux *runtime.ServeMux return } resp, md, err := local_request_AgentSecure_ClientGetConfigsHA_0(rctx, inboundMarshaler, server, req, pathParams) - md.HeaderMD, md.TrailerMD = metadata.Join(md.HeaderMD, stream.Header()), metadata.Join(md.TrailerMD, stream.Trailer()) ctx = runtime.NewServerMetadataContext(ctx, md) if err != nil { runtime.HTTPError(ctx, mux, outboundMarshaler, w, req, err) @@ -526,8 +501,6 @@ func RegisterAgentSecureHandlerServer(ctx context.Context, mux *runtime.ServeMux mux.Handle("POST", pattern_AgentSecure_GetConfigStateHA_0, func(w http.ResponseWriter, req *http.Request, pathParams map[string]string) { ctx, cancel := context.WithCancel(req.Context()) defer cancel() - var stream runtime.ServerTransportStream - ctx = grpc.NewContextWithServerTransportStream(ctx, &stream) inboundMarshaler, outboundMarshaler := runtime.MarshalerForRequest(mux, req) rctx, err := runtime.AnnotateIncomingContext(ctx, mux, req) if err != nil { @@ -535,7 +508,6 @@ func RegisterAgentSecureHandlerServer(ctx context.Context, mux *runtime.ServeMux return } resp, md, err := local_request_AgentSecure_GetConfigStateHA_0(rctx, inboundMarshaler, server, req, pathParams) - md.HeaderMD, md.TrailerMD = metadata.Join(md.HeaderMD, stream.Header()), metadata.Join(md.TrailerMD, stream.Trailer()) ctx = runtime.NewServerMetadataContext(ctx, md) if err != nil { runtime.HTTPError(ctx, mux, outboundMarshaler, w, req, err) diff --git a/pkg/proto/pbgo/core/model.pb.go b/pkg/proto/pbgo/core/model.pb.go index 2035c6a0b71c7..7e5838a862438 100644 --- a/pkg/proto/pbgo/core/model.pb.go +++ b/pkg/proto/pbgo/core/model.pb.go @@ -1,7 +1,7 @@ // Code generated by protoc-gen-go. DO NOT EDIT. // versions: -// protoc-gen-go v1.30.0 -// protoc v4.25.2 +// protoc-gen-go v1.34.0 +// protoc v5.26.1 // source: datadog/model/v1/model.proto package core diff --git a/pkg/proto/pbgo/core/remoteconfig.pb.go b/pkg/proto/pbgo/core/remoteconfig.pb.go index 1dba8b1a63e91..79fdd888ebaf9 100644 --- a/pkg/proto/pbgo/core/remoteconfig.pb.go +++ b/pkg/proto/pbgo/core/remoteconfig.pb.go @@ -1,7 +1,7 @@ // Code generated by protoc-gen-go. DO NOT EDIT. // versions: -// protoc-gen-go v1.30.0 -// protoc v4.25.3 +// protoc-gen-go v1.34.0 +// protoc v5.26.1 // source: datadog/remoteconfig/remoteconfig.proto package core diff --git a/pkg/proto/pbgo/core/workloadmeta.pb.go b/pkg/proto/pbgo/core/workloadmeta.pb.go index 17196683390b1..d59bb72eaed8d 100644 --- a/pkg/proto/pbgo/core/workloadmeta.pb.go +++ b/pkg/proto/pbgo/core/workloadmeta.pb.go @@ -1,7 +1,7 @@ // Code generated by protoc-gen-go. DO NOT EDIT. // versions: -// protoc-gen-go v1.30.0 -// protoc v4.25.2 +// protoc-gen-go v1.34.0 +// protoc v5.26.1 // source: datadog/workloadmeta/workloadmeta.proto package core @@ -878,6 +878,7 @@ type Container struct { Runtime Runtime `protobuf:"varint,9,opt,name=runtime,proto3,enum=datadog.workloadmeta.Runtime" json:"runtime,omitempty"` State *ContainerState `protobuf:"bytes,10,opt,name=state,proto3" json:"state,omitempty"` CollectorTags []string `protobuf:"bytes,11,rep,name=collectorTags,proto3" json:"collectorTags,omitempty"` + CgroupPath string `protobuf:"bytes,12,opt,name=cgroupPath,proto3" json:"cgroupPath,omitempty"` } func (x *Container) Reset() { @@ -989,6 +990,13 @@ func (x *Container) GetCollectorTags() []string { return nil } +func (x *Container) GetCgroupPath() string { + if x != nil { + return x.CgroupPath + } + return "" +} + type KubernetesPodOwner struct { state protoimpl.MessageState sizeCache protoimpl.SizeCache @@ -1591,7 +1599,7 @@ var file_datadog_workloadmeta_workloadmeta_proto_rawDesc = []byte{ 0x69, 0x73, 0x68, 0x65, 0x64, 0x41, 0x74, 0x18, 0x06, 0x20, 0x01, 0x28, 0x03, 0x52, 0x0a, 0x66, 0x69, 0x6e, 0x69, 0x73, 0x68, 0x65, 0x64, 0x41, 0x74, 0x12, 0x1a, 0x0a, 0x08, 0x65, 0x78, 0x69, 0x74, 0x43, 0x6f, 0x64, 0x65, 0x18, 0x07, 0x20, 0x01, 0x28, 0x0d, 0x52, 0x08, 0x65, 0x78, 0x69, - 0x74, 0x43, 0x6f, 0x64, 0x65, 0x22, 0xe9, 0x05, 0x0a, 0x09, 0x43, 0x6f, 0x6e, 0x74, 0x61, 0x69, + 0x74, 0x43, 0x6f, 0x64, 0x65, 0x22, 0x89, 0x06, 0x0a, 0x09, 0x43, 0x6f, 0x6e, 0x74, 0x61, 0x69, 0x6e, 0x65, 0x72, 0x12, 0x46, 0x0a, 0x08, 0x65, 0x6e, 0x74, 0x69, 0x74, 0x79, 0x49, 0x64, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x2a, 0x2e, 0x64, 0x61, 0x74, 0x61, 0x64, 0x6f, 0x67, 0x2e, 0x77, 0x6f, 0x72, 0x6b, 0x6c, 0x6f, 0x61, 0x64, 0x6d, 0x65, 0x74, 0x61, 0x2e, 0x57, 0x6f, 0x72, @@ -1630,7 +1638,9 @@ var file_datadog_workloadmeta_workloadmeta_proto_rawDesc = []byte{ 0x69, 0x6e, 0x65, 0x72, 0x53, 0x74, 0x61, 0x74, 0x65, 0x52, 0x05, 0x73, 0x74, 0x61, 0x74, 0x65, 0x12, 0x24, 0x0a, 0x0d, 0x63, 0x6f, 0x6c, 0x6c, 0x65, 0x63, 0x74, 0x6f, 0x72, 0x54, 0x61, 0x67, 0x73, 0x18, 0x0b, 0x20, 0x03, 0x28, 0x09, 0x52, 0x0d, 0x63, 0x6f, 0x6c, 0x6c, 0x65, 0x63, 0x74, - 0x6f, 0x72, 0x54, 0x61, 0x67, 0x73, 0x1a, 0x3a, 0x0a, 0x0c, 0x45, 0x6e, 0x76, 0x56, 0x61, 0x72, + 0x6f, 0x72, 0x54, 0x61, 0x67, 0x73, 0x12, 0x1e, 0x0a, 0x0a, 0x63, 0x67, 0x72, 0x6f, 0x75, 0x70, + 0x50, 0x61, 0x74, 0x68, 0x18, 0x0c, 0x20, 0x01, 0x28, 0x09, 0x52, 0x0a, 0x63, 0x67, 0x72, 0x6f, + 0x75, 0x70, 0x50, 0x61, 0x74, 0x68, 0x1a, 0x3a, 0x0a, 0x0c, 0x45, 0x6e, 0x76, 0x56, 0x61, 0x72, 0x73, 0x45, 0x6e, 0x74, 0x72, 0x79, 0x12, 0x10, 0x0a, 0x03, 0x6b, 0x65, 0x79, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x03, 0x6b, 0x65, 0x79, 0x12, 0x14, 0x0a, 0x05, 0x76, 0x61, 0x6c, 0x75, 0x65, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x05, 0x76, 0x61, 0x6c, 0x75, 0x65, 0x3a, 0x02, diff --git a/pkg/proto/pbgo/languagedetection/api.pb.go b/pkg/proto/pbgo/languagedetection/api.pb.go index 716fd3c1ab878..f2c09323e3822 100644 --- a/pkg/proto/pbgo/languagedetection/api.pb.go +++ b/pkg/proto/pbgo/languagedetection/api.pb.go @@ -1,7 +1,7 @@ // Code generated by protoc-gen-go. DO NOT EDIT. // versions: -// protoc-gen-go v1.28.1 -// protoc v3.6.1 +// protoc-gen-go v1.34.0 +// protoc v5.26.1 // source: datadog/languagedetection/api.proto package languagedetection diff --git a/pkg/proto/pbgo/process/process.pb.go b/pkg/proto/pbgo/process/process.pb.go index c0d477424bfb2..f1c158d390062 100644 --- a/pkg/proto/pbgo/process/process.pb.go +++ b/pkg/proto/pbgo/process/process.pb.go @@ -1,7 +1,7 @@ // Code generated by protoc-gen-go. DO NOT EDIT. // versions: -// protoc-gen-go v1.28.1 -// protoc v3.6.1 +// protoc-gen-go v1.34.0 +// protoc v5.26.1 // source: datadog/process/process.proto package process diff --git a/pkg/proto/pbgo/process/workloadmeta_process.pb.go b/pkg/proto/pbgo/process/workloadmeta_process.pb.go index 591fb9b56e29c..7fc2e0b03f3a6 100644 --- a/pkg/proto/pbgo/process/workloadmeta_process.pb.go +++ b/pkg/proto/pbgo/process/workloadmeta_process.pb.go @@ -1,7 +1,7 @@ // Code generated by protoc-gen-go. DO NOT EDIT. // versions: -// protoc-gen-go v1.28.1 -// protoc v3.6.1 +// protoc-gen-go v1.34.0 +// protoc v5.26.1 // source: datadog/process/workloadmeta_process.proto package process diff --git a/pkg/proto/pbgo/trace/agent_payload.pb.go b/pkg/proto/pbgo/trace/agent_payload.pb.go index 3fd40f66f5723..5d803ea9b4019 100644 --- a/pkg/proto/pbgo/trace/agent_payload.pb.go +++ b/pkg/proto/pbgo/trace/agent_payload.pb.go @@ -2,8 +2,8 @@ // Code generated by protoc-gen-go. DO NOT EDIT. // versions: -// protoc-gen-go v1.28.1 -// protoc v3.6.1 +// protoc-gen-go v1.34.0 +// protoc v5.26.1 // source: datadog/trace/agent_payload.proto package trace diff --git a/pkg/proto/pbgo/trace/span.pb.go b/pkg/proto/pbgo/trace/span.pb.go index 02b9558055887..be7ee2bdadb77 100644 --- a/pkg/proto/pbgo/trace/span.pb.go +++ b/pkg/proto/pbgo/trace/span.pb.go @@ -1,7 +1,7 @@ // Code generated by protoc-gen-go. DO NOT EDIT. // versions: -// protoc-gen-go v1.28.1 -// protoc v4.25.1 +// protoc-gen-go v1.34.0 +// protoc v5.26.1 // source: datadog/trace/span.proto package trace @@ -31,11 +31,11 @@ type SpanLink struct { TraceIDHigh uint64 `protobuf:"varint,2,opt,name=traceID_high,json=traceIDHigh,proto3" json:"trace_id_high" msg:"trace_id_high,omitempty"` // Optional. The high 64 bits of a referenced trace id. // @gotags: json:"span_id" msg:"span_id" SpanID uint64 `protobuf:"varint,3,opt,name=spanID,proto3" json:"span_id" msg:"span_id"` // Required. - // @gotags: json:"attributes,omitempty" msg:"attributes,omitempty" + // @gotags: msg:"attributes,omitempty" Attributes map[string]string `protobuf:"bytes,4,rep,name=attributes,proto3" json:"attributes,omitempty" protobuf_key:"bytes,1,opt,name=key,proto3" protobuf_val:"bytes,2,opt,name=value,proto3" msg:"attributes,omitempty"` // Optional. Simple mapping of keys to string values. - // @gotags: json:"tracestate,omitempty" msg:"tracestate,omitempty" + // @gotags: msg:"tracestate,omitempty" Tracestate string `protobuf:"bytes,5,opt,name=tracestate,proto3" json:"tracestate,omitempty" msg:"tracestate,omitempty"` // Optional. W3C tracestate. - // @gotags: json:"flags,omitempty" msg:"flags,omitempty" + // @gotags: msg:"flags,omitempty" Flags uint32 `protobuf:"varint,6,opt,name=flags,proto3" json:"flags,omitempty" msg:"flags,omitempty"` // Optional. W3C trace flags. If set, the high bit (bit 31) must be set. } diff --git a/pkg/proto/pbgo/trace/span_gen.go b/pkg/proto/pbgo/trace/span_gen.go index a1591c955dd00..193f623958605 100644 --- a/pkg/proto/pbgo/trace/span_gen.go +++ b/pkg/proto/pbgo/trace/span_gen.go @@ -278,7 +278,7 @@ func (z *Span) UnmarshalMsg(bts []byte) (o []byte, err error) { err = msgp.WrapError(err, "Metrics") return } - if z.Metrics == nil && zb0003 > 0 { + if z.Metrics == nil && zb0003 > 0{ z.Metrics = make(map[string]float64, zb0003) } else if len(z.Metrics) > 0 { for key := range z.Metrics { diff --git a/pkg/proto/pbgo/trace/stats.pb.go b/pkg/proto/pbgo/trace/stats.pb.go index 13dac44441721..52d51fff5f75b 100644 --- a/pkg/proto/pbgo/trace/stats.pb.go +++ b/pkg/proto/pbgo/trace/stats.pb.go @@ -1,6 +1,6 @@ // Code generated by protoc-gen-go. DO NOT EDIT. // versions: -// protoc-gen-go v1.28.1 +// protoc-gen-go v1.34.0 // protoc v5.26.1 // source: datadog/trace/stats.proto diff --git a/pkg/proto/pbgo/trace/tracer_payload.pb.go b/pkg/proto/pbgo/trace/tracer_payload.pb.go index d32bb683c35f2..7e7e2b0cc278e 100644 --- a/pkg/proto/pbgo/trace/tracer_payload.pb.go +++ b/pkg/proto/pbgo/trace/tracer_payload.pb.go @@ -1,7 +1,7 @@ // Code generated by protoc-gen-go. DO NOT EDIT. // versions: -// protoc-gen-go v1.28.1 -// protoc v3.6.1 +// protoc-gen-go v1.34.0 +// protoc v5.26.1 // source: datadog/trace/tracer_payload.proto package trace diff --git a/pkg/util/cgroups/reader.go b/pkg/util/cgroups/reader.go index 1778a4a560c39..948dd94c7155f 100644 --- a/pkg/util/cgroups/reader.go +++ b/pkg/util/cgroups/reader.go @@ -49,9 +49,7 @@ type readerImpl interface { type ReaderFilter func(path, name string) (string, error) // DefaultFilter matches all cgroup folders and use folder name as identifier -// -//nolint:revive // TODO(CINT) Fix revive linter -func DefaultFilter(path, name string) (string, error) { +func DefaultFilter(path, _ string) (string, error) { return path, nil } @@ -61,9 +59,7 @@ func DefaultFilter(path, name string) (string, error) { var ContainerRegexp = regexp.MustCompile(ContainerRegexpStr) // ContainerFilter returns a filter that will match cgroup folders containing a container id -// -//nolint:revive // TODO(CINT) Fix revive linter -func ContainerFilter(path, name string) (string, error) { +func ContainerFilter(_, name string) (string, error) { match := ContainerRegexp.FindString(name) // With systemd cgroup driver, there may be a `.mount` cgroup on top of the normal one diff --git a/pkg/util/containers/metrics/system/collector_linux.go b/pkg/util/containers/metrics/system/collector_linux.go index ca530e9478880..96b40291a308b 100644 --- a/pkg/util/containers/metrics/system/collector_linux.go +++ b/pkg/util/containers/metrics/system/collector_linux.go @@ -36,8 +36,8 @@ const ( func init() { provider.RegisterCollector(provider.CollectorFactory{ ID: systemCollectorID, - Constructor: func(cache *provider.Cache, _ optional.Option[workloadmeta.Component]) (provider.CollectorMetadata, error) { - return newSystemCollector(cache) + Constructor: func(cache *provider.Cache, wlm optional.Option[workloadmeta.Component]) (provider.CollectorMetadata, error) { + return newSystemCollector(cache, wlm) }, }) } @@ -51,7 +51,7 @@ type systemCollector struct { hostCgroupNamespace bool } -func newSystemCollector(cache *provider.Cache) (provider.CollectorMetadata, error) { +func newSystemCollector(cache *provider.Cache, wlm optional.Option[workloadmeta.Component]) (provider.CollectorMetadata, error) { var err error var hostPrefix string var collectorMetadata provider.CollectorMetadata @@ -61,11 +61,18 @@ func newSystemCollector(cache *provider.Cache) (provider.CollectorMetadata, erro hostPrefix = "/host" } + var w workloadmeta.Component + unwrapped, ok := wlm.Get() + if ok { + w = unwrapped + } + cf := newContainerFilter(w) + go cf.start() reader, err := cgroups.NewReader( cgroups.WithCgroupV1BaseController(cgroupV1BaseController), cgroups.WithProcPath(procPath), cgroups.WithHostPrefix(hostPrefix), - cgroups.WithReaderFilter(cgroups.ContainerFilter), + cgroups.WithReaderFilter(cf.ContainerFilter), ) if err != nil { // Cgroup provider is pretty static. Except not having required mounts, it should always work. @@ -174,7 +181,7 @@ func newSystemCollector(cache *provider.Cache) (provider.CollectorMetadata, erro return metadata, nil } -func (c *systemCollector) GetContainerStats(containerNS, containerID string, cacheValidity time.Duration) (*provider.ContainerStats, error) { //nolint:revive // TODO fix revive unused-parameter +func (c *systemCollector) GetContainerStats(_, containerID string, cacheValidity time.Duration) (*provider.ContainerStats, error) { cg, err := c.getCgroup(containerID, cacheValidity) if err != nil { return nil, err @@ -183,8 +190,7 @@ func (c *systemCollector) GetContainerStats(containerNS, containerID string, cac return c.buildContainerMetrics(cg, cacheValidity) } -//nolint:revive // TODO(CINT) Fix revive linter -func (c *systemCollector) GetContainerOpenFilesCount(containerNS, containerID string, cacheValidity time.Duration) (*uint64, error) { +func (c *systemCollector) GetContainerOpenFilesCount(_, containerID string, cacheValidity time.Duration) (*uint64, error) { pids, err := c.getPIDs(containerID, cacheValidity) if err != nil { return nil, fmt.Errorf("unable to get PIDs for cgroup id: %s. Unable to get count of open files", containerID) @@ -198,8 +204,7 @@ func (c *systemCollector) GetContainerOpenFilesCount(containerNS, containerID st return &ofCount, nil } -//nolint:revive // TODO(CINT) Fix revive linter -func (c *systemCollector) GetContainerNetworkStats(containerNS, containerID string, cacheValidity time.Duration) (*provider.ContainerNetworkStats, error) { +func (c *systemCollector) GetContainerNetworkStats(_, containerID string, cacheValidity time.Duration) (*provider.ContainerNetworkStats, error) { pids, err := c.getPIDs(containerID, cacheValidity) if err != nil { return nil, err @@ -212,8 +217,7 @@ func (c *systemCollector) GetPIDs(_, containerID string, cacheValidity time.Dura return c.getPIDs(containerID, cacheValidity) } -//nolint:revive // TODO(CINT) Fix revive linter -func (c *systemCollector) GetContainerIDForPID(pid int, cacheValidity time.Duration) (string, error) { +func (c *systemCollector) GetContainerIDForPID(pid int, _ time.Duration) (string, error) { containerID, err := cgroups.IdentiferFromCgroupReferences(c.procPath, strconv.Itoa(pid), c.baseController, cgroups.ContainerFilter) return containerID, err } @@ -289,8 +293,7 @@ func (c *systemCollector) getPIDs(containerID string, cacheValidity time.Duratio return c.pidMapper.GetPIDs(containerID, cacheValidity), nil } -//nolint:revive // TODO(CINT) Fix revive linter -func (c *systemCollector) buildContainerMetrics(cg cgroups.Cgroup, cacheValidity time.Duration) (*provider.ContainerStats, error) { +func (c *systemCollector) buildContainerMetrics(cg cgroups.Cgroup, _ time.Duration) (*provider.ContainerStats, error) { stats := &cgroups.Stats{} allFailed, errs := cgroups.GetStats(cg, stats) if allFailed { diff --git a/pkg/util/containers/metrics/system/filter_container.go b/pkg/util/containers/metrics/system/filter_container.go new file mode 100644 index 0000000000000..6e05f86b6cc95 --- /dev/null +++ b/pkg/util/containers/metrics/system/filter_container.go @@ -0,0 +1,93 @@ +// Unless explicitly stated otherwise all files in this repository are licensed +// under the Apache License Version 2.0. +// This product includes software developed at Datadog (https://www.datadoghq.com/). +// Copyright 2016-present Datadog, Inc. + +//go:build linux + +package system + +import ( + "sync" + + "github.com/DataDog/datadog-agent/comp/core/workloadmeta" + "github.com/DataDog/datadog-agent/pkg/util/cgroups" + "github.com/DataDog/datadog-agent/pkg/util/log" + "github.com/DataDog/datadog-agent/pkg/util/trie" +) + +// containerFilter is a filter that will first try to retrieve a container-id from the cgroup path +// using regex matching. If it fails, it will perform suffix-matching on the cgroup path to find a container-id. +// Containerd currently exposes either the full cgroup path or only a suffix, this is why we use a `trie` to store +// the metadata retrieved from workloadmeta. +type containerFilter struct { + wlm workloadmeta.Component + + mutex sync.RWMutex + trie *trie.SuffixTrie[string] +} + +// newContainerFilter returns a new container filter +func newContainerFilter(wlm workloadmeta.Component) *containerFilter { + cf := &containerFilter{ + trie: trie.NewSuffixTrie[string](), + wlm: wlm, + } + return cf +} + +func (cf *containerFilter) start() { + if cf.wlm == nil { + return + } + evBundle := cf.wlm.Subscribe("cid-mapper", workloadmeta.NormalPriority, workloadmeta.NewFilter( + &workloadmeta.FilterParams{ + Kinds: []workloadmeta.Kind{workloadmeta.KindContainer}, + Source: workloadmeta.SourceAll, + EventType: workloadmeta.EventTypeAll, + }, + )) + for evs := range evBundle { + evs.Acknowledge() + cf.mutex.Lock() + for _, ev := range evs.Events { + cf.handleEvent(ev) + } + cf.mutex.Unlock() + } +} + +func (cf *containerFilter) handleEvent(ev workloadmeta.Event) { + cont, ok := ev.Entity.(*workloadmeta.Container) + if !ok { + log.Errorf("unexpected event type: %T", ev) + return + } + switch ev.Type { + case workloadmeta.EventTypeSet: + // As a memory optimization, we only store the container id in the trie + // if the cgroup path is not already matched by the cgroup filter. + if res, _ := cgroups.ContainerFilter("", cont.CgroupPath); res == "" { + cid := cont.ID + cf.trie.Insert(cont.CgroupPath, &cid) + } + case workloadmeta.EventTypeUnset: + cf.trie.Delete(cont.CgroupPath) + default: + log.Errorf("unexpected event type: %v", ev.Type) + } +} + +// ContainerFilter returns a filter that will match cgroup folders containing a container id +func (cf *containerFilter) ContainerFilter(fullPath, name string) (string, error) { + if res, _ := cgroups.ContainerFilter(fullPath, name); res != "" { + return res, nil + } + cf.mutex.RLock() + res, ok := cf.trie.Get(fullPath) + cf.mutex.RUnlock() + if !ok || res == nil { + return "", nil + } + return *res, nil +} diff --git a/pkg/util/containers/metrics/system/filter_container_test.go b/pkg/util/containers/metrics/system/filter_container_test.go new file mode 100644 index 0000000000000..8b1444bcee7bb --- /dev/null +++ b/pkg/util/containers/metrics/system/filter_container_test.go @@ -0,0 +1,114 @@ +// Unless explicitly stated otherwise all files in this repository are licensed +// under the Apache License Version 2.0. +// This product includes software developed at Datadog (https://www.datadoghq.com/). +// Copyright 2016-present Datadog, Inc. + +//go:build linux && test + +package system + +import ( + "context" + "testing" + "time" + + "github.com/DataDog/datadog-agent/comp/core/config" + "github.com/DataDog/datadog-agent/comp/core/log/logimpl" + "github.com/DataDog/datadog-agent/comp/core/workloadmeta" + "github.com/DataDog/datadog-agent/pkg/util/fxutil" + + "github.com/stretchr/testify/assert" + "go.uber.org/fx" +) + +func TestHandleSetEvent(t *testing.T) { + for _, tt := range []struct { + name string + cid string + cgroupPath string + prefixedCgroupPath string + cgroupName string + }{ + { + name: "redis", + cid: "redis", + cgroupPath: "/default/redis", + prefixedCgroupPath: "/host/sys/fs/cgroup/default/redis", + cgroupName: "redis", + }, + { + name: "kubelet container", + cid: "022c4ffba65e5031285fd427553e56c3fd6cc85a3a49f3fa2825d0a258d8a5d6", + cgroupPath: "kubelet-kubepods-pod1715d361_61cf_4060_8673_38ab3ca88e66.slice/cri-containerd/022c4ffba65e5031285fd427553e56c3fd6cc85a3a49f3fa2825d0a258d8a5d6", + prefixedCgroupPath: "/host/sys/fs/cgroup/kubelet.slice/kubelet-kubepods.slice/kubelet-kubepods-burstable.slice/kubelet-kubepods-burstable-pod99dcb84d2a34f7e338778606703258c4.slice/cri-containerd-022c4ffba65e5031285fd427553e56c3fd6cc85a3a49f3fa2825d0a258d8a5d6.scope", + cgroupName: "cri-containerd-022c4ffba65e5031285fd427553e56c3fd6cc85a3a49f3fa2825d0a258d8a5d6.scope", + }, + } { + t.Run(tt.name, func(t *testing.T) { + cf := newContainerFilter(nil) + cont := &workloadmeta.Container{ + EntityID: workloadmeta.EntityID{ + Kind: workloadmeta.KindContainer, + ID: tt.cid, + }, + CgroupPath: tt.cgroupPath, + } + event := workloadmeta.Event{ + Type: workloadmeta.EventTypeSet, + Entity: cont, + } + cf.handleEvent(event) + id, err := cf.ContainerFilter(tt.prefixedCgroupPath, tt.cgroupName) + assert.NoError(t, err) + assert.Equal(t, tt.cid, id) + }) + } +} + +func TestHandleUnsetEvent(t *testing.T) { + cf := newContainerFilter(nil) + cont := &workloadmeta.Container{ + EntityID: workloadmeta.EntityID{ + Kind: workloadmeta.KindContainer, + ID: "redis", + }, + CgroupPath: "/default/redis", + } + event := workloadmeta.Event{ + Type: workloadmeta.EventTypeSet, + Entity: cont, + } + cf.handleEvent(event) + event.Type = workloadmeta.EventTypeUnset + cf.handleEvent(event) + + id, err := cf.ContainerFilter("/host/sys/fs/cgroup/default/redis", "redis") + assert.NoError(t, err) + assert.Equal(t, "", id) +} + +func TestListenWorkloadmeta(t *testing.T) { + wlm := fxutil.Test[workloadmeta.Mock](t, fx.Options( + logimpl.MockModule(), + config.MockModule(), + fx.Supply(context.Background()), + fx.Supply(workloadmeta.NewParams()), + workloadmeta.MockModuleV2(), + )) + cf := newContainerFilter(wlm) + go cf.start() + cont := &workloadmeta.Container{ + EntityID: workloadmeta.EntityID{ + Kind: workloadmeta.KindContainer, + ID: "redis", + }, + CgroupPath: "/default/redis", + } + + wlm.Set(cont) + + assert.Eventuallyf(t, func() bool { + cid, _ := cf.ContainerFilter("/host/sys/fs/cgroup/default/redis", "redis") + return cid == "redis" + }, 5*time.Second, 200*time.Millisecond, "expected cid to be added to the container filter") +} diff --git a/pkg/util/trie/trie.go b/pkg/util/trie/trie.go new file mode 100644 index 0000000000000..2c77d429982cb --- /dev/null +++ b/pkg/util/trie/trie.go @@ -0,0 +1,109 @@ +// Unless explicitly stated otherwise all files in this repository are licensed +// under the Apache License Version 2.0. +// This product includes software developed at Datadog (https://www.datadoghq.com/). +// Copyright 2024-present Datadog, Inc. + +// Package trie provides a SuffixTrie data structure +// that can be used to index data by suffixes of strings. +package trie + +// node represents a node in the trie +type node[T any] struct { + children map[rune]*node[T] + value *T +} + +// newTrieNode creates a new node +func newTrieNode[T any]() *node[T] { + return &node[T]{ + children: make(map[rune]*node[T]), + } +} + +// SuffixTrie represents a trie data structure for suffixes +type SuffixTrie[T any] struct { + root *node[T] +} + +// NewSuffixTrie creates a new SuffixTrie +func NewSuffixTrie[T any]() *SuffixTrie[T] { + return &SuffixTrie[T]{ + root: newTrieNode[T](), + } +} + +// reverse reverses a string +func reverse(s string) string { + runes := []rune(s) + for i, j := 0, len(runes)-1; i < j; i, j = i+1, j-1 { + runes[i], runes[j] = runes[j], runes[i] + } + return string(runes) +} + +// Delete deletes a suffix from the SuffixTrie +func (t *SuffixTrie[T]) Delete(suffix string) { + reversedSuffix := reverse(suffix) + t.delete(t.root, reversedSuffix, 0) +} + +// delete deletes a suffix from the SuffixTrie +func (t *SuffixTrie[T]) delete(node *node[T], suffix string, depth int) bool { + if node == nil { + return false + } + + if len(suffix) == depth { + node.value = nil + return len(node.children) == 0 + } + + char := rune(suffix[depth]) + if nextNode, ok := node.children[char]; ok { + if t.delete(nextNode, suffix, depth+1) { + delete(node.children, char) + return len(node.children) == 0 && node.value == nil + } + } + return false +} + +// Insert stores the value for a given suffix +func (t *SuffixTrie[T]) Insert(suffix string, value *T) { + reversedSuffix := reverse(suffix) + n := t.root + + for _, char := range reversedSuffix { + if _, ok := n.children[char]; !ok { + n.children[char] = newTrieNode[T]() + } + n = n.children[char] + } + + n.value = value +} + +// Get returns the value for the first suffix that matches the given key +// Example: if the trie contains the suffixes "foo" and "foobar" and the key is "foobarbaz", +// the value for "foo" will be returned +func (t *SuffixTrie[T]) Get(key string) (*T, bool) { + reversedText := reverse(key) + n := t.root + + for _, char := range reversedText { + if n.value != nil { + return n.value, true + } + if next, ok := n.children[char]; ok { + n = next + } else { + return nil, false + } + } + + if n.value == nil { + return nil, false + } + + return n.value, true +} diff --git a/pkg/util/trie/trie_test.go b/pkg/util/trie/trie_test.go new file mode 100644 index 0000000000000..7775e92d64be5 --- /dev/null +++ b/pkg/util/trie/trie_test.go @@ -0,0 +1,71 @@ +// Unless explicitly stated otherwise all files in this repository are licensed +// under the Apache License Version 2.0. +// This product includes software developed at Datadog (https://www.datadoghq.com/). +// Copyright 2016-present Datadog, Inc. + +//go:build test + +package trie + +import ( + "testing" + + "github.com/stretchr/testify/assert" +) + +func TestSuffixTrieInsertAndGet(t *testing.T) { + trie := NewSuffixTrie[string]() + cid := "kubelet-kubepods-burstable-pod99dcb84d2a34f7e338778606703258c4.slice/cri-containerd-ec9ea0ad54dd0d96142d5dbe11eb3f1509e12ba9af739620c7b5ad377ce94602" + cgroupPath := "/host/sys/fs/cgroup/kubelet.slice/kubelet-kubepods.slice/kubelet-kubepods-burstable.slice/kubelet-kubepods-burstable-pod99dcb84d2a34f7e338778606703258c4.slice/cri-containerd-ec9ea0ad54dd0d96142d5dbe11eb3f1509e12ba9af739620c7b5ad377ce94602.scope" + trie.Insert(cgroupPath, &cid) + storedCid, ok := trie.Get(cgroupPath) + assert.Equal(t, &cid, storedCid, "should return correct container id") + assert.True(t, ok, "should return true if path is known") + + _, ok = trie.Get("unknown/path") + assert.False(t, ok, "should return false if path is unknown") +} + +func TestSuffixTrieDelete_EmptyKey(t *testing.T) { + trie := NewSuffixTrie[string]() + trie.Delete("") + val := "value" + trie.Insert("abc", &val) + storedVal, ok := trie.Get("abc") + assert.Equal(t, storedVal, &val, "Deleting empty key should allow to insert keys") + assert.True(t, ok, "Deleting empty key should allow to insert keys") +} + +func TestSuffixTrieDelete_NonExistentKey(t *testing.T) { + trie := NewSuffixTrie[string]() + val := "container123" + trie.Insert("path/to/container", &val) + trie.Delete("nonexistent/key") + storedVal, ok := trie.Get("path/to/container") + assert.True(t, ok, "Deleting nonexistent key should not affect existing keys") + assert.Equal(t, &val, storedVal, "Deleting nonexistent key should not affect existing keys") +} + +func TestSuffixTrieDelete_RootNode(t *testing.T) { + trie := NewSuffixTrie[string]() + val := "rootValue" + trie.Insert("", &val) + trie.Delete("") + _, ok := trie.Get("") + assert.False(t, ok, "Deleting root node should remove the value") +} + +func TestSuffixTrieDelete_PartiallyMatchingKey(t *testing.T) { + trie := NewSuffixTrie[string]() + vals := []string{"value1", "value2", "value3"} + trie.Insert("path/to/one", &vals[0]) + trie.Insert("path/to/one/two", &vals[1]) + trie.Insert("path/to/one/two/three", &vals[2]) + trie.Delete("path/to/one/two") + storedVal1, ok1 := trie.Get("path/to/one") + storedVal3, ok3 := trie.Get("path/to/one/two/three") + assert.True(t, ok1, "Existing shorter keys should be unaffected") + assert.True(t, ok3, "Existing longer keys should be unaffected") + assert.Equal(t, &vals[0], storedVal1, "Existing shorter keys should be unaffected") + assert.Equal(t, &vals[2], storedVal3, "Existing longer keys should be unaffected") +} diff --git a/releasenotes/notes/support-arbitrary-container-id-cc0efdf7c156b7ad.yaml b/releasenotes/notes/support-arbitrary-container-id-cc0efdf7c156b7ad.yaml new file mode 100644 index 0000000000000..c47b999b7b4fe --- /dev/null +++ b/releasenotes/notes/support-arbitrary-container-id-cc0efdf7c156b7ad.yaml @@ -0,0 +1,4 @@ +fixes: + - | + Fix a bug where containerd container metrics and container tags were not being + collected for containers with arbitrary container IDs.