From 7db53d2929fa1c7c8542f461dcbb3bb3a18bc8aa Mon Sep 17 00:00:00 2001 From: Katie Hockman Date: Wed, 21 Sep 2022 11:50:40 -0400 Subject: [PATCH 1/2] internal/telemetry: support DD_TELEMETRY_HEARTBEAT_INTERVAL This change decouples the heartbeat signal and flushing data to the backend, and adds support for a configurable DD_TELEMETRY_HEARTBEAT_INTERVAL environment variable. See https://github.com/DataDog/instrumentation-telemetry-api-docs/blob/main/GeneratedDocumentation/ApiDocs/v1/producing-telemetry.md#configurable-interval for more details. --- internal/telemetry/client.go | 41 +++++++++++++++++++++++++------ internal/telemetry/client_test.go | 12 ++++++++- 2 files changed, 45 insertions(+), 8 deletions(-) diff --git a/internal/telemetry/client.go b/internal/telemetry/client.go index 0bc49a5831..82c975d67a 100644 --- a/internal/telemetry/client.go +++ b/internal/telemetry/client.go @@ -48,6 +48,8 @@ var ( } // TODO: Default telemetry URL? hostname string + + defaultHeartbeatInterval = 60 // seconds ) func init() { @@ -73,6 +75,10 @@ type Client struct { // How often to send batched requests. Defaults to 60s SubmissionInterval time.Duration + // The interval for sending a heartbeat signal to the backend. + // Configurable with DD_TELEMETRY_HEARTBEAT_INTERVAL. Default 60s. + heartbeatInterval time.Duration + // e.g. "tracers", "profilers", "appsec" Namespace Namespace @@ -114,8 +120,10 @@ type Client struct { // seqID is a sequence number used to order telemetry messages by // the back end. seqID int64 - // t is used to schedule flushing outstanding messages - t *time.Timer + // flushT is used to schedule flushing outstanding messages + flushT *time.Timer + // heartbeatT is used to schedule heartbeat messages + heartbeatT *time.Timer // requests hold all messages which don't need to be immediately sent requests []*Request // metrics holds un-sent metrics that will be aggregated the next time @@ -195,7 +203,14 @@ func (c *Client) Start(integrations []Integration, configuration []Configuration if c.SubmissionInterval == 0 { c.SubmissionInterval = 60 * time.Second } - c.t = time.AfterFunc(c.SubmissionInterval, c.backgroundFlush) + c.flushT = time.AfterFunc(c.SubmissionInterval, c.backgroundFlush) + + heartbeat := internal.IntEnv("DD_TELEMETRY_HEARTBEAT_INTERVAL", defaultHeartbeatInterval) + if heartbeat < 1 || heartbeat > 3600 { + heartbeat = defaultHeartbeatInterval + } + c.heartbeatInterval = time.Duration(heartbeat) * time.Second + c.heartbeatT = time.AfterFunc(c.heartbeatInterval, c.backgroundHeartbeat) } // Stop notifies the telemetry endpoint that the app is closing. All outstanding @@ -208,7 +223,8 @@ func (c *Client) Stop() { return } c.started = false - c.t.Stop() + c.flushT.Stop() + c.heartbeatT.Stop() // close request types have no body r := c.newRequest(RequestTypeAppClosing) c.scheduleSubmit(r) @@ -425,14 +441,25 @@ func (c *Client) scheduleSubmit(r *Request) { c.requests = append(c.requests, r) } +func (c *Client) backgroundHeartbeat() { + c.mu.Lock() + defer c.mu.Unlock() + if !c.started { + return + } + err := c.submit(c.newRequest(RequestTypeAppHeartbeat)) + if err != nil { + c.log("heartbeat failed: %s", err) + } + c.heartbeatT.Reset(c.heartbeatInterval) +} + func (c *Client) backgroundFlush() { c.mu.Lock() defer c.mu.Unlock() if !c.started { return } - r := c.newRequest(RequestTypeAppHeartbeat) - c.scheduleSubmit(r) c.flush() - c.t.Reset(c.SubmissionInterval) + c.flushT.Reset(c.SubmissionInterval) } diff --git a/internal/telemetry/client_test.go b/internal/telemetry/client_test.go index 1e796dc3fb..e3864d7597 100644 --- a/internal/telemetry/client_test.go +++ b/internal/telemetry/client_test.go @@ -9,6 +9,7 @@ import ( "encoding/json" "net/http" "net/http/httptest" + "os" "reflect" "sort" "sync" @@ -19,6 +20,9 @@ import ( ) func TestClient(t *testing.T) { + os.Setenv("DD_TELEMETRY_HEARTBEAT_INTERVAL", "1") + defer os.Unsetenv("DD_TELEMETRY_HEARTBEAT_INTERVAL") + heartbeat := make(chan struct{}) server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { @@ -43,7 +47,13 @@ func TestClient(t *testing.T) { client.Start(nil, nil) // test idempotence defer client.Stop() - <-heartbeat + timeout := time.After(30 * time.Second) + select { + case <-timeout: + t.Fatal("Heartbeat took more than 30 seconds. Should have been ~1 second") + case <-heartbeat: + } + } func TestMetrics(t *testing.T) { From 45a3b8ac528ef920e3897ae0cb0152cb4439acc2 Mon Sep 17 00:00:00 2001 From: Katie Hockman Date: Thu, 22 Sep 2022 13:51:28 -0400 Subject: [PATCH 2/2] warn if outside range --- internal/telemetry/client.go | 2 ++ 1 file changed, 2 insertions(+) diff --git a/internal/telemetry/client.go b/internal/telemetry/client.go index 82c975d67a..ccd699041b 100644 --- a/internal/telemetry/client.go +++ b/internal/telemetry/client.go @@ -22,6 +22,7 @@ import ( "gopkg.in/DataDog/dd-trace-go.v1/internal" "gopkg.in/DataDog/dd-trace-go.v1/internal/globalconfig" + "gopkg.in/DataDog/dd-trace-go.v1/internal/log" "gopkg.in/DataDog/dd-trace-go.v1/internal/osinfo" "gopkg.in/DataDog/dd-trace-go.v1/internal/version" ) @@ -207,6 +208,7 @@ func (c *Client) Start(integrations []Integration, configuration []Configuration heartbeat := internal.IntEnv("DD_TELEMETRY_HEARTBEAT_INTERVAL", defaultHeartbeatInterval) if heartbeat < 1 || heartbeat > 3600 { + log.Warn("DD_TELEMETRY_HEARTBEAT_INTERVAL=%d not in [1,3600] range, setting to default of %d", heartbeat, defaultHeartbeatInterval) heartbeat = defaultHeartbeatInterval } c.heartbeatInterval = time.Duration(heartbeat) * time.Second