Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1,284 changes: 642 additions & 642 deletions NOTICE-fips.txt

Large diffs are not rendered by default.

1,016 changes: 508 additions & 508 deletions NOTICE.txt

Large diffs are not rendered by default.

6 changes: 3 additions & 3 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -81,11 +81,14 @@ require (
go.elastic.co/ecszap v1.0.3
go.elastic.co/go-licence-detector v0.7.0
go.opentelemetry.io/collector/component/componentstatus v0.132.0
go.opentelemetry.io/collector/component/componenttest v0.132.0
go.opentelemetry.io/collector/connector/forwardconnector v0.132.0
go.opentelemetry.io/collector/extension/extensiontest v0.132.0
go.opentelemetry.io/collector/pipeline v1.38.0
go.opentelemetry.io/collector/processor/memorylimiterprocessor v0.132.0
go.opentelemetry.io/collector/receiver/nopreceiver v0.132.0
go.uber.org/zap v1.27.0
go.yaml.in/yaml/v3 v3.0.4
golang.org/x/crypto v0.41.0
golang.org/x/exp v0.0.0-20250215185904-eff6e970281f
golang.org/x/mod v0.27.0
Expand Down Expand Up @@ -637,7 +640,6 @@ require (
go.opentelemetry.io/auto/sdk v1.1.0 // indirect
go.opentelemetry.io/collector v0.132.0 // indirect
go.opentelemetry.io/collector/client v1.38.0 // indirect
go.opentelemetry.io/collector/component/componenttest v0.132.0 // indirect
go.opentelemetry.io/collector/config/configauth v0.132.0 // indirect
go.opentelemetry.io/collector/config/configcompression v1.38.0 // indirect
go.opentelemetry.io/collector/config/configgrpc v0.132.0 // indirect
Expand All @@ -663,7 +665,6 @@ require (
go.opentelemetry.io/collector/extension/extensionauth v1.38.0 // indirect
go.opentelemetry.io/collector/extension/extensioncapabilities v0.132.0 // indirect
go.opentelemetry.io/collector/extension/extensionmiddleware v0.132.0 // indirect
go.opentelemetry.io/collector/extension/extensiontest v0.132.0 // indirect
go.opentelemetry.io/collector/extension/xextension v0.132.0 // indirect
go.opentelemetry.io/collector/filter v0.132.0 // indirect
go.opentelemetry.io/collector/internal/fanoutconsumer v0.132.0 // indirect
Expand Down Expand Up @@ -721,7 +722,6 @@ require (
go.uber.org/ratelimit v0.3.1 // indirect
go.uber.org/zap/exp v0.3.0 // indirect
go.yaml.in/yaml/v2 v2.4.2 // indirect
go.yaml.in/yaml/v3 v3.0.4 // indirect
golang.org/x/oauth2 v0.30.0 // indirect
golang.org/x/xerrors v0.0.0-20240903120638-7835f813f4da // indirect
gonum.org/v1/gonum v0.16.0 // indirect
Expand Down
1 change: 1 addition & 0 deletions internal/pkg/otel/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,7 @@ This section provides a summary of components included in the Elastic Distributi
| [apmconfigextension](https://github.com/elastic/opentelemetry-collector-components/blob/extension/apmconfigextension/v0.6.0/extension/apmconfigextension/README.md) | v0.6.0 |
| [bearertokenauthextension](https://github.com/open-telemetry/opentelemetry-collector-contrib/blob/extension/bearertokenauthextension/v0.132.0/extension/bearertokenauthextension/README.md) | v0.132.0 |
| [beatsauthextension](https://github.com/elastic/opentelemetry-collector-components/blob/extension/beatsauthextension/v0.2.0/extension/beatsauthextension/README.md) | v0.2.0 |
| [extensiontest](https://github.com/open-telemetry/opentelemetry-collector/blob/extension/extensiontest/v0.132.0/extension/extensiontest/README.md) | v0.132.0 |
| [filestorage](https://github.com/open-telemetry/opentelemetry-collector-contrib/blob/extension/storage/filestorage/v0.132.0/extension/storage/filestorage/README.md) | v0.132.0 |
| [headerssetterextension](https://github.com/open-telemetry/opentelemetry-collector-contrib/blob/extension/headerssetterextension/v0.132.0/extension/headerssetterextension/README.md) | v0.132.0 |
| [healthcheckextension](https://github.com/open-telemetry/opentelemetry-collector-contrib/blob/extension/healthcheckextension/v0.132.0/extension/healthcheckextension/README.md) | v0.132.0 |
Expand Down
64 changes: 64 additions & 0 deletions internal/pkg/otel/extension/elasticdiagnostics/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
# elasticdiagnosticsextension

`elasticdiagnosticsextension` is an internal package for peforming diagnostics and is used in conjunction with EDOT.
The extension is designed to return diagnostics in a format compatible with the [ActionDiagnosticUnitResult](https://github.com/elastic/elastic-agent-client/blob/888026ef85e1c9190fe76eb158cf21d9c9c02920/elastic-agent-client.proto#L424-L437) type defined in the control protocol.


## Configuration

The extension accepts the `endpoint` as a sole parameter. The endpoint should begin with a valid protocol and it valid values are `unix` and `npipe` for now. Here are a few examples:

- `unix:///tmp/elastic-agent/xyz.soc`
- `npipe:///elastic-agent`

## Features

- Acts as a registrar and keeps track of common diagnostic hooks.
- Collects profiles using `runtime/pprof`.
- Collects internal telemetry exposed by the OTeL Collector.
- Implements the `extensioncapabilities.ConfigWatcher` interface and stores the latest configuration of the running collector.
- Listens for diagnostic requests and provides diagnostic data.

## Design

### Diagnostic hooks:
- Individual beats register custom diagnostic hooks and these hooks are called when we run the elastic-agent diagnostics command.
- Our extension stores these hooks and executes them everytime it gets a "diagnostics" request.

### Request/Response format:
- This extension runs an HTTP server and listens to new requests on `/diagnostics` path.
- The following query parameters are optional:
- `cpu`
- If `true`, the extension will also collect cpu profile of EDOT.
- By default, the extension doesn't collect the CPU profile unless explicitly specified.
- `cpuduration`:
- Specifies the time duration over which the CPU profile should be collected.
- Valid time units are `ns`, `us`, `ms`, `s`, `m`, `h`
- Default: `30s`.
- The response format is defined in [response.go](./response.go).
- `GlobalDiagnostics`: Data related to the overall process:
1. Profiles.
2. Internal telemetry.
3. latest collector configuration.
- `ComponentDiagnostics`: Data from individual receivers, collected via registered diagnostic hooks.

### Interaction with Elastic-Agent service in hybrid mode.

- When the user triggers the diagnostic request, EDOT diagnostics are injected at two levels:
1. At top-Level:
- When `DiagnosticAgent()` is called in [server.go](https://github.com/elastic/elastic-agent/blob/710c49f45433e2f136a6e41cae980c1aa37dabdd/pkg/control/v2/server/server.go#L197).
- Diagnostics are captured at the global level and stored under the `edot/*` directory in the resulting ZIP archive.
2. At component-level:
- When `otelMgr.PerformComponentsDiagnostics()` is called in [coordinator.go](https://github.com/elastic/elastic-agent/blob/710c49f45433e2f136a6e41cae980c1aa37dabdd/internal/pkg/agent/application/coordinator/coordinator.go#L863).
- Diagnostics are added per component and stored under the `components/{comp}/*` directory in the resulting ZIP archive.

#### Diagram

```mermaid
graph LR
A[elastic-agent install ...] --> B[Runs the service in hybrid mode<br/>and we inject agentdiagnosticsextension]
B -->|listens to| D[edot-diagnostics-extension.sock]

C[elastic-agent diagnostics] --> E[Extracts diagnostics socket path via the binary]
E -->|requests OTeL diagnostics| D
```
26 changes: 26 additions & 0 deletions internal/pkg/otel/extension/elasticdiagnostics/config.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
// or more contributor license agreements. Licensed under the Elastic License 2.0;
// you may not use this file except in compliance with the Elastic License 2.0.

package elasticdiagnostics

import (
"errors"

"go.opentelemetry.io/collector/component"
)

type Config struct {
Endpoint string `mapstructure:"endpoint"`
}

func createDefaultConfig() component.Config {
return &Config{}
}

func (c *Config) Validate() error {
if c.Endpoint == "" {
return errors.New("endpoint is a required field")
}
return nil
}
245 changes: 245 additions & 0 deletions internal/pkg/otel/extension/elasticdiagnostics/extension.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,245 @@
// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
// or more contributor license agreements. Licensed under the Elastic License 2.0;
// you may not use this file except in compliance with the Elastic License 2.0.

package elasticdiagnostics

import (
"bytes"
"context"
"encoding/json"
"errors"
"fmt"
"net"
"net/http"
"runtime/pprof"
"sync"
"time"

"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/confmap"
"go.uber.org/zap"
"go.yaml.in/yaml/v3"
"google.golang.org/protobuf/types/known/timestamppb"

"github.com/elastic/elastic-agent-client/v7/pkg/proto"
"github.com/elastic/elastic-agent-libs/logp"
"github.com/elastic/elastic-agent/internal/pkg/diagnostics"
"github.com/elastic/elastic-agent/pkg/ipc"
)

var (
_ component.Component = (*diagnosticsExtension)(nil)
)

type diagHook struct {
description string
filename string
contentType string
hook func() []byte
}

type diagnosticsExtension struct {
listener net.Listener
server *http.Server
logger *zap.Logger
logp *logp.Logger

diagnosticsConfig *Config
collectorConfig *confmap.Conf
componentHooks map[string][]*diagHook
globalHooks map[string]*diagHook

mx sync.Mutex
hooksMtx sync.Mutex
configMtx sync.Mutex
}

func (d *diagnosticsExtension) Start(ctx context.Context, host component.Host) error {
d.mx.Lock()
defer d.mx.Unlock()
var err error

d.logp, err = logp.NewZapLogger(d.logger)
if err != nil {
// NewZapLogger always returns nil error, so this shouldn't happen.
return fmt.Errorf("failed to create logp.Logger from zap logger: %w", err)
}

d.registerGlobalDiagnostics()

d.listener, err = ipc.CreateListener(d.logp, d.diagnosticsConfig.Endpoint)
if err != nil {
return fmt.Errorf("error creating listener: %w", err)
}

mux := http.NewServeMux()
mux.Handle("/diagnostics", d)

d.server = &http.Server{
Handler: mux,
ReadHeaderTimeout: 30 * time.Second,
}
go func() {
if err := d.server.Serve(d.listener); err != nil && !errors.Is(err, http.ErrServerClosed) {
d.logger.Error("HTTP server error", zap.Error(err))
}
}()
d.logger.Info("Diagnostics extension started", zap.String("address", d.listener.Addr().String()))
return nil
}

func (d *diagnosticsExtension) Shutdown(ctx context.Context) error {
d.mx.Lock()
defer d.mx.Unlock()
if d.server == nil {
return nil
}
if err := d.server.Shutdown(ctx); err != nil {
return err
}
ipc.CleanupListener(d.logp, d.diagnosticsConfig.Endpoint)
return nil
}

func (d *diagnosticsExtension) registerGlobalDiagnostics() {
d.globalHooks["collector_config"] = &diagHook{
description: "full collector configuration",
filename: "edot/otel-merged-actual.yaml",
contentType: "application/yaml",
hook: func() []byte {
d.configMtx.Lock()
defer d.configMtx.Unlock()
if d.collectorConfig == nil {
return []byte("no active OTel Configuration")
}
b, err := yaml.Marshal(d.collectorConfig.ToStringMap())
if err != nil {
return fmt.Appendf(nil, "error: failed to convert to yaml: %v", err)
}
return b
},
}

// register basic profiles.
for _, profile := range []string{"goroutine", "heap", "allocs", "mutex", "threadcreate", "block"} {
d.globalHooks[profile] = &diagHook{
description: fmt.Sprintf("%s profile of the collector", profile),
filename: fmt.Sprintf("edot/%s.profile.gz", profile),
contentType: "application/octet-stream",
hook: func() []byte {
var buf bytes.Buffer
err := pprof.Lookup(profile).WriteTo(&buf, 0)
if err != nil {
return fmt.Appendf(nil, "error: failed to get %s profile: %v", profile, err)
}
return buf.Bytes()
},
}
}
}

func (d *diagnosticsExtension) NotifyConfig(ctx context.Context, conf *confmap.Conf) error {
d.configMtx.Lock()
defer d.configMtx.Unlock()
d.collectorConfig = conf
return nil
}

// RegisterDiagnosticHook API exposes the ability for beat receivers to register their hooks.
// NOTE: Changing the function signature will require changes to libbeat and beatreceivers. Proceed with caution.
func (d *diagnosticsExtension) RegisterDiagnosticHook(componentName string, description string, filename string, contentType string, hook func() []byte) {
d.hooksMtx.Lock()
defer d.hooksMtx.Unlock()
if _, ok := d.componentHooks[componentName]; ok {
d.componentHooks[componentName] = append(d.componentHooks[componentName], &diagHook{
description: description,
filename: filename,
contentType: contentType,
hook: hook,
})
} else {
d.componentHooks[componentName] = []*diagHook{
{
description: description,
filename: filename,
contentType: contentType,
hook: hook,
},
}
}
}

func (d *diagnosticsExtension) ServeHTTP(w http.ResponseWriter, req *http.Request) {
d.hooksMtx.Lock()
defer d.hooksMtx.Unlock()
componentResults := make([]*proto.ActionDiagnosticUnitResult, 0)
for name, hooks := range d.componentHooks {
for _, hook := range hooks {
componentResults = append(componentResults, &proto.ActionDiagnosticUnitResult{
Name: name,
Filename: hook.filename,
ContentType: hook.contentType,
Description: hook.description,
Content: hook.hook(),
Generated: timestamppb.Now(),
})
}
}

globalResults := make([]*proto.ActionDiagnosticUnitResult, 0)
for name, hook := range d.globalHooks {
globalResults = append(globalResults, &proto.ActionDiagnosticUnitResult{
Name: name,
Filename: hook.filename,
ContentType: hook.contentType,
Description: hook.description,
Content: hook.hook(),
Generated: timestamppb.Now(),
})
}

// only add a CPU profile if requested via query parameter.
if req.URL.Query().Get("cpu") == "true" {
diagCPUDuration := diagnostics.DiagCPUDuration

// check if cpuduration parameter is set, if so override the default duration
// if parsing fails, log the error and use the default duration
if req.URL.Query().Get("cpuduration") != "" {
var err error
diagCPUDuration, err = time.ParseDuration(req.URL.Query().Get("cpuduration"))
if err != nil {
d.logger.Error("Failed parsing cpuduration parameter, using default", zap.String("cpuduration", req.URL.Query().Get("cpuduration")), zap.Error(err))
diagCPUDuration = diagnostics.DiagCPUDuration
}
}
cpuProfile, err := diagnostics.CreateCPUProfile(req.Context(), diagCPUDuration)
if err != nil {
d.logger.Error("Failed creating CPU profile", zap.Error(err))
}
globalResults = append(globalResults, &proto.ActionDiagnosticUnitResult{
Name: "cpu",
Filename: "edot/cpu.profile.gz",
ContentType: "application/octet-stream",
Description: "CPU profile of the collector",
Content: cpuProfile,
})
}

b, err := json.Marshal(Response{
GlobalDiagnostics: globalResults,
ComponentDiagnostics: componentResults,
})
w.Header().Add("content-type", "application/json")
if err != nil {
d.logger.Error("Failed marshaling response", zap.Error(err))
w.WriteHeader(500)
if _, err := fmt.Fprintf(w, "{'error':'%v'}", err); err != nil {
d.logger.Error("Failed writing response to client.", zap.Error(err))
}
return
}
if _, err := w.Write(b); err != nil {
d.logger.Error("Failed writing response to client.", zap.Error(err))
}
}
Loading