Skip to content

Commit

Permalink
trace-agent: get rid of the statsd singleton (#22729)
Browse files Browse the repository at this point in the history
  • Loading branch information
ahmed-mez committed Feb 13, 2024
1 parent 395a26b commit 8b6bddc
Show file tree
Hide file tree
Showing 69 changed files with 527 additions and 635 deletions.
8 changes: 4 additions & 4 deletions cmd/trace-agent/config/remote/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,10 @@ import (
pbgo "github.com/DataDog/datadog-agent/pkg/proto/pbgo/core"
"github.com/DataDog/datadog-agent/pkg/trace/api"
"github.com/DataDog/datadog-agent/pkg/trace/config"
"github.com/DataDog/datadog-agent/pkg/trace/metrics"
"github.com/DataDog/datadog-agent/pkg/trace/metrics/timing"
"github.com/DataDog/datadog-agent/pkg/trace/timing"
"github.com/DataDog/datadog-agent/pkg/trace/traceutil"
"github.com/DataDog/datadog-agent/pkg/util/log"
"github.com/DataDog/datadog-go/v5/statsd"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
)
Expand All @@ -44,15 +44,15 @@ func putBuffer(buffer *bytes.Buffer) {
}

// ConfigHandler is the HTTP handler for configs
func ConfigHandler(r *api.HTTPReceiver, client rcclient.ConfigUpdater, cfg *config.AgentConfig) http.Handler {
func ConfigHandler(r *api.HTTPReceiver, client rcclient.ConfigUpdater, cfg *config.AgentConfig, statsd statsd.ClientInterface) http.Handler {
cidProvider := api.NewIDProvider(cfg.ContainerProcRoot)
return http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) {
defer timing.Since("datadog.trace_agent.receiver.config_process_ms", time.Now())
tags := r.TagStats(api.V07, req.Header).AsTags()
statusCode := http.StatusOK
defer func() {
tags = append(tags, fmt.Sprintf("status_code:%d", statusCode))
metrics.Count("datadog.trace_agent.receiver.config_request", 1, tags, 1)
_ = statsd.Count("datadog.trace_agent.receiver.config_request", 1, tags, 1)
}()

buf := getBuffer()
Expand Down
21 changes: 11 additions & 10 deletions cmd/trace-agent/config/remote/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"github.com/DataDog/datadog-agent/pkg/trace/config"
"github.com/DataDog/datadog-agent/pkg/trace/sampler"
"github.com/DataDog/datadog-agent/pkg/trace/telemetry"
"github.com/DataDog/datadog-go/v5/statsd"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/mock"
Expand Down Expand Up @@ -56,10 +57,10 @@ func TestConfigEndpoint(t *testing.T) {
t.Run(tc.name, func(t *testing.T) {
assert := assert.New(t)
grpc := agentGRPCConfigFetcher{}
rcv := api.NewHTTPReceiver(config.New(), sampler.NewDynamicConfig(), make(chan *api.Payload, 5000), nil, telemetry.NewNoopCollector())
rcv := api.NewHTTPReceiver(config.New(), sampler.NewDynamicConfig(), make(chan *api.Payload, 5000), nil, telemetry.NewNoopCollector(), &statsd.NoOpClient{})
mux := http.NewServeMux()
cfg := &config.AgentConfig{}
mux.Handle("/v0.7/config", ConfigHandler(rcv, &grpc, cfg))
mux.Handle("/v0.7/config", ConfigHandler(rcv, &grpc, cfg, &statsd.NoOpClient{}))
server := httptest.NewServer(mux)
if tc.valid {
var request pbgo.ClientGetConfigsRequest
Expand Down Expand Up @@ -131,15 +132,15 @@ func TestUpstreamRequest(t *testing.T) {
t.Run(tc.name, func(t *testing.T) {
assert := assert.New(t)
grpc := agentGRPCConfigFetcher{}
rcv := api.NewHTTPReceiver(config.New(), sampler.NewDynamicConfig(), make(chan *api.Payload, 5000), nil, telemetry.NewNoopCollector())
rcv := api.NewHTTPReceiver(config.New(), sampler.NewDynamicConfig(), make(chan *api.Payload, 5000), nil, telemetry.NewNoopCollector(), &statsd.NoOpClient{})

var request pbgo.ClientGetConfigsRequest
err := json.Unmarshal([]byte(tc.expectedUpstreamRequest), &request)
assert.NoError(err)
grpc.On("ClientGetConfigs", mock.Anything, &request, mock.Anything).Return(&pbgo.ClientGetConfigsResponse{Targets: []byte("test")}, nil)

mux := http.NewServeMux()
mux.Handle("/v0.7/config", ConfigHandler(rcv, &grpc, tc.cfg))
mux.Handle("/v0.7/config", ConfigHandler(rcv, &grpc, tc.cfg, &statsd.NoOpClient{}))
server := httptest.NewServer(mux)

req, _ := http.NewRequest("POST", server.URL+"/v0.7/config", strings.NewReader(tc.tracerReq))
Expand All @@ -159,13 +160,13 @@ func TestUpstreamRequest(t *testing.T) {
func TestForwardErrors(t *testing.T) {
assert := assert.New(t)
grpc := agentGRPCConfigFetcher{}
rcv := api.NewHTTPReceiver(config.New(), sampler.NewDynamicConfig(), make(chan *api.Payload, 5000), nil, telemetry.NewNoopCollector())
rcv := api.NewHTTPReceiver(config.New(), sampler.NewDynamicConfig(), make(chan *api.Payload, 5000), nil, telemetry.NewNoopCollector(), &statsd.NoOpClient{})

grpc.On("ClientGetConfigs", mock.Anything, mock.Anything, mock.Anything).
Return(nil, status.Error(codes.Unimplemented, "not implemented"))

mux := http.NewServeMux()
mux.Handle("/v0.7/config", ConfigHandler(rcv, &grpc, &config.AgentConfig{}))
mux.Handle("/v0.7/config", ConfigHandler(rcv, &grpc, &config.AgentConfig{}, &statsd.NoOpClient{}))
server := httptest.NewServer(mux)

req, _ := http.NewRequest("POST", server.URL+"/v0.7/config", strings.NewReader(`{"client":{"id":"test_client","is_tracer":true,"client_tracer":{"service":"test","tags":["foo:bar"]}}}`))
Expand All @@ -178,13 +179,13 @@ func TestForwardErrors(t *testing.T) {
func TestUnexpectedError(t *testing.T) {
assert := assert.New(t)
grpc := agentGRPCConfigFetcher{}
rcv := api.NewHTTPReceiver(config.New(), sampler.NewDynamicConfig(), make(chan *api.Payload, 5000), nil, telemetry.NewNoopCollector())
rcv := api.NewHTTPReceiver(config.New(), sampler.NewDynamicConfig(), make(chan *api.Payload, 5000), nil, telemetry.NewNoopCollector(), &statsd.NoOpClient{})

grpc.On("ClientGetConfigs", mock.Anything, mock.Anything, mock.Anything).
Return(nil, status.Error(codes.Unavailable, "unavailable"))

mux := http.NewServeMux()
mux.Handle("/v0.7/config", ConfigHandler(rcv, &grpc, &config.AgentConfig{}))
mux.Handle("/v0.7/config", ConfigHandler(rcv, &grpc, &config.AgentConfig{}, &statsd.NoOpClient{}))
server := httptest.NewServer(mux)

req, _ := http.NewRequest("POST", server.URL+"/v0.7/config", strings.NewReader(`{"client":{"id":"test_client","is_tracer":true,"client_tracer":{"service":"test","tags":["foo:bar"]}}}`))
Expand All @@ -197,13 +198,13 @@ func TestUnexpectedError(t *testing.T) {
func TestEmptyResponse(t *testing.T) {
assert := assert.New(t)
grpc := agentGRPCConfigFetcher{}
rcv := api.NewHTTPReceiver(config.New(), sampler.NewDynamicConfig(), make(chan *api.Payload, 5000), nil, telemetry.NewNoopCollector())
rcv := api.NewHTTPReceiver(config.New(), sampler.NewDynamicConfig(), make(chan *api.Payload, 5000), nil, telemetry.NewNoopCollector(), &statsd.NoOpClient{})

grpc.On("ClientGetConfigs", mock.Anything, mock.Anything, mock.Anything).
Return(nil, nil)

mux := http.NewServeMux()
mux.Handle("/v0.7/config", ConfigHandler(rcv, &grpc, &config.AgentConfig{}))
mux.Handle("/v0.7/config", ConfigHandler(rcv, &grpc, &config.AgentConfig{}, &statsd.NoOpClient{}))
server := httptest.NewServer(mux)

req, _ := http.NewRequest("POST", server.URL+"/v0.7/config", strings.NewReader(`{"client":{"id":"test_client","is_tracer":true,"client_tracer":{"service":"test","tags":["foo:bar"]}}}`))
Expand Down
77 changes: 47 additions & 30 deletions comp/trace/agent/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,12 @@ import (
"context"
"errors"
"fmt"
"net"
"os"
"os/signal"
"runtime"
"runtime/pprof"
"strconv"
"sync"
"syscall"

Expand All @@ -23,14 +25,15 @@ import (
"github.com/DataDog/datadog-agent/comp/core/workloadmeta"
"github.com/DataDog/datadog-agent/comp/dogstatsd/statsd"
"github.com/DataDog/datadog-agent/comp/trace/config"
apiutil "github.com/DataDog/datadog-agent/pkg/api/util"
"github.com/DataDog/datadog-agent/pkg/pidfile"
pkgagent "github.com/DataDog/datadog-agent/pkg/trace/agent"
"github.com/DataDog/datadog-agent/pkg/trace/metrics"
tracecfg "github.com/DataDog/datadog-agent/pkg/trace/config"
"github.com/DataDog/datadog-agent/pkg/trace/telemetry"
"github.com/DataDog/datadog-agent/pkg/trace/watchdog"
"github.com/DataDog/datadog-agent/pkg/util/log"
"github.com/DataDog/datadog-agent/pkg/version"

ddgostatsd "github.com/DataDog/datadog-go/v5/statsd"
)

const messageAgentDisabled = `trace-agent not enabled. Set the environment variable
Expand Down Expand Up @@ -84,11 +87,6 @@ func newAgent(deps dependencies) Component {
}
ctx, cancel := context.WithCancel(deps.Context) // Several related non-components require a shared context to gracefully stop.
ag := &agent{
Agent: pkgagent.NewAgent(
ctx,
deps.Config.Object(),
deps.TelemetryCollector,
),
cancel: cancel,
config: deps.Config,
statsd: deps.Statsd,
Expand All @@ -101,16 +99,6 @@ func newAgent(deps dependencies) Component {
wg: sync.WaitGroup{},
}

// We're adding the /config endpoint from the comp side of the trace agent to avoid linking with pkg/config from
// the trace agent.
// pkg/config is not a go-module yet and pulls a large chunk of Agent code base with it. Using it within the
// trace-agent would largely increase the number of module pulled by OTEL when using the pkg/trace go-module.
if err := apiutil.CreateAndSetAuthToken(); err != nil {
log.Errorf("could not set auth token: %s", err)
} else {
ag.Agent.DebugServer.AddRoute("/config", deps.Config.GetConfigHandler())
}

deps.Lc.Append(fx.Hook{
// Provided contexts have a timeout, so it can't be used for gracefully stopping long-running components.
// These contexts are cancelled on a deadline, so they would have side effects on the agent.
Expand All @@ -120,7 +108,6 @@ func newAgent(deps dependencies) Component {
}

func start(ag *agent) error {
setupShutdown(ag.ctx, ag.shutdowner)
if ag.params.CPUProfile != "" {
f, err := os.Create(ag.params.CPUProfile)
if err != nil {
Expand All @@ -140,11 +127,18 @@ func start(ag *agent) error {
log.Infof("PID '%d' written to PID file '%s'", os.Getpid(), ag.params.PIDFilePath)
}

if err := setupMetrics(ag.statsd, ag.config, ag.telemetryCollector); err != nil {
statsdCl, err := setupMetrics(ag.statsd, ag.config, ag.telemetryCollector)
if err != nil {
return err
}

if err := runAgentSidekicks(ag.ctx, ag.config, ag.telemetryCollector); err != nil {
setupShutdown(ag.ctx, ag.shutdowner, statsdCl)
ag.Agent = pkgagent.NewAgent(
ag.ctx,
ag.config.Object(),
ag.telemetryCollector,
statsdCl,
)
if err := runAgentSidekicks(ag); err != nil {
return err
}
ag.wg.Add(1)
Expand All @@ -155,24 +149,30 @@ func start(ag *agent) error {
return nil
}

func setupMetrics(statsd statsd.Component, cfg config.Component, telemetryCollector telemetry.TelemetryCollector) error {
tracecfg := cfg.Object()
func setupMetrics(statsd statsd.Component, cfg config.Component, telemetryCollector telemetry.TelemetryCollector) (ddgostatsd.ClientInterface, error) {
addr, err := findAddr(cfg.Object())
if err != nil {
return nil, err
}

// TODO: Try to use statsd.Get() everywhere instead in the long run.
err := metrics.Configure(tracecfg, []string{"version:" + version.AgentVersion}, statsd.CreateForAddr)
client, err := statsd.CreateForAddr(addr, ddgostatsd.WithTags([]string{"version:" + version.AgentVersion}))
if err != nil {
telemetryCollector.SendStartupError(telemetry.CantConfigureDogstatsd, err)
return fmt.Errorf("cannot configure dogstatsd: %v", err)
return nil, fmt.Errorf("cannot configure dogstatsd: %v", err)
}

metrics.Count("datadog.trace_agent.started", 1, nil, 1)
return nil
_ = client.Count("datadog.trace_agent.started", 1, nil, 1)
return client, nil
}

func stop(ag *agent) error {
ag.cancel()
ag.wg.Wait()
stopAgentSidekicks(ag.config)
if err := ag.Statsd.Flush(); err != nil {
log.Error("Could not flush statsd: ", err)
}
stopAgentSidekicks(ag.config, ag.Statsd)
if ag.params.CPUProfile != "" {
pprof.StopCPUProfile()
}
Expand Down Expand Up @@ -200,8 +200,8 @@ func stop(ag *agent) error {
}

// handleSignal closes a channel to exit cleanly from routines
func handleSignal(shutdowner fx.Shutdowner) {
defer watchdog.LogOnPanic()
func handleSignal(shutdowner fx.Shutdowner, statsd ddgostatsd.ClientInterface) {
defer watchdog.LogOnPanic(statsd)

sigChan := make(chan os.Signal, 1)
signal.Notify(sigChan, syscall.SIGINT, syscall.SIGTERM, syscall.SIGPIPE)
Expand All @@ -220,3 +220,20 @@ func handleSignal(shutdowner fx.Shutdowner) {
}
}
}

// findAddr finds the correct address to connect to the Dogstatsd server.
func findAddr(conf *tracecfg.AgentConfig) (string, error) {
if conf.StatsdPort > 0 {
// UDP enabled
return net.JoinHostPort(conf.StatsdHost, strconv.Itoa(conf.StatsdPort)), nil
}
if conf.StatsdPipeName != "" {
// Windows Pipes can be used
return `\\.\pipe\` + conf.StatsdPipeName, nil
}
if conf.StatsdSocket != "" {
// Unix sockets can be used
return `unix://` + conf.StatsdSocket, nil
}
return "", errors.New("dogstatsd_port is set to 0 and no alternative is available")
}
2 changes: 2 additions & 0 deletions comp/trace/agent/agent_mock.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (
"github.com/DataDog/datadog-agent/pkg/trace/stats"
"github.com/DataDog/datadog-agent/pkg/trace/telemetry"
"github.com/DataDog/datadog-agent/pkg/trace/writer"
"github.com/DataDog/datadog-go/v5/statsd"
)

func newMock(deps dependencies, t testing.TB) Component { //nolint:revive // TODO fix revive unused-parameter
Expand All @@ -28,6 +29,7 @@ func newMock(deps dependencies, t testing.TB) Component { //nolint:revive // TOD
ctx,
deps.Config.Object(),
telemetryCollector,
&statsd.NoOpClient{},
),
cancel: cancel,
config: deps.Config,
Expand Down
5 changes: 3 additions & 2 deletions comp/trace/agent/agent_nix.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,11 @@ package agent
import (
"context"

"github.com/DataDog/datadog-go/v5/statsd"
"go.uber.org/fx"
)

func setupShutdown(_ context.Context, shutdowner fx.Shutdowner) {
func setupShutdown(_ context.Context, shutdowner fx.Shutdowner, statsd statsd.ClientInterface) {
// Handle stops properly
go handleSignal(shutdowner)
go handleSignal(shutdowner, statsd)
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,8 @@
// This product includes software developed at Datadog (https://www.datadoghq.com/).
// Copyright 2016-present Datadog, Inc.

package metrics
// Package agent defines the tracer agent.
package agent

import (
"testing"
Expand Down
8 changes: 5 additions & 3 deletions comp/trace/agent/agent_windows.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,15 +11,17 @@ import (
"go.uber.org/fx"

"github.com/DataDog/datadog-agent/pkg/trace/watchdog"

"github.com/DataDog/datadog-go/v5/statsd"
)

func setupShutdown(ctx context.Context, shutdowner fx.Shutdowner) {
func setupShutdown(ctx context.Context, shutdowner fx.Shutdowner, statsd statsd.ClientInterface) {
// Handle stops properly
go handleSignal(shutdowner)
go handleSignal(shutdowner, statsd)

// Support context cancellation approach (required for Windows service, as it doesn't use signals)
go func() {
defer watchdog.LogOnPanic()
defer watchdog.LogOnPanic(statsd)
<-ctx.Done()
_ = shutdowner.Shutdown()
}()
Expand Down

0 comments on commit 8b6bddc

Please sign in to comment.