Skip to content

Commit

Permalink
Backport of [HCP Telemetry] Periodic Refresh for Dynamic Telemetry Co…
Browse files Browse the repository at this point in the history
…nfiguration into release/1.16.x (#18359)

[HCP Telemetry] Periodic Refresh for Dynamic Telemetry Configuration (#18168)

* OTElExporter now uses an EndpointProvider to discover the endpoint

* OTELSink uses a ConfigProvider to obtain filters and labels configuration

* improve tests for otel_sink

* Regex logic is moved into client for a method on the TelemetryConfig object

* Create a telemetry_config_provider and update deps to use it

* Fix conversion

* fix import newline

* Add logger to hcp client and move telemetry_config out of the client.go file

* Add a telemetry_config.go to refactor client.go

* Update deps

* update hcp deps test

* Modify telemetry_config_providers

* Check for nil filters

* PR review updates

* Fix comments and move around pieces

* Fix comments

* Remove context from client struct

* Moved ctx out of sink struct and fixed filters, added a test

* Remove named imports, use errors.New if not fformatting

* Remove HCP dependencies in telemetry package

* Add success metric and move lock only to grab the t.cfgHahs

* Update hash

* fix nits

* Create an equals method and add tests

* Improve telemetry_config_provider.go tests

* Add race test

* Add missing godoc

* Remove mock for MetricsClient

* Avoid goroutine test panics

* trying to kick CI lint issues by upgrading mod

* imprve test code and add hasher for testing

* Use structure logging for filters, fix error constants, and default to allow all regex

* removed hashin and modify logic to simplify

* Improve race test and fix PR feedback by removing hash equals and avoid testing the timer.Ticker logic, and instead unit test

* Ran make go-mod-tidy

* Use errtypes in the test

* Add changelog

* add safety check for exporter endpoint

* remove require.Contains by using error types, fix structure logging, and fix success metric typo in exporter

* Fixed race test to have changing config values

* Send success metric before modifying config

* Avoid the defer and move the success metric under
  • Loading branch information
Achooo committed Aug 2, 2023
1 parent 61cf766 commit c56781a
Show file tree
Hide file tree
Showing 21 changed files with 1,575 additions and 517 deletions.
3 changes: 3 additions & 0 deletions .changelog/18168.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
```release-note:improvement
hcp: Add dynamic configuration support for the export of server metrics to HCP.
```
80 changes: 6 additions & 74 deletions agent/hcp/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,21 +35,6 @@ type Client interface {
DiscoverServers(ctx context.Context) ([]string, error)
}

// MetricsConfig holds metrics specific configuration for the TelemetryConfig.
// The endpoint field overrides the TelemetryConfig endpoint.
type MetricsConfig struct {
Filters []string
Endpoint string
}

// TelemetryConfig contains configuration for telemetry data forwarded by Consul servers
// to the HCP Telemetry gateway.
type TelemetryConfig struct {
Endpoint string
Labels map[string]string
MetricsConfig *MetricsConfig
}

type BootstrapConfig struct {
Name string
BootstrapExpect int
Expand Down Expand Up @@ -112,10 +97,14 @@ func (c *hcpClient) FetchTelemetryConfig(ctx context.Context) (*TelemetryConfig,

resp, err := c.tgw.AgentTelemetryConfig(params, nil)
if err != nil {
return nil, err
return nil, fmt.Errorf("failed to fetch from HCP: %w", err)
}

if err := validateAgentTelemetryConfigPayload(resp); err != nil {
return nil, fmt.Errorf("invalid response payload: %w", err)
}

return convertTelemetryConfig(resp)
return convertAgentTelemetryResponse(ctx, resp, c.cfg)
}

func (c *hcpClient) FetchBootstrap(ctx context.Context) (*BootstrapConfig, error) {
Expand Down Expand Up @@ -272,60 +261,3 @@ func (c *hcpClient) DiscoverServers(ctx context.Context) ([]string, error) {

return servers, nil
}

// convertTelemetryConfig validates the AgentTelemetryConfig payload and converts it into a TelemetryConfig object.
func convertTelemetryConfig(resp *hcptelemetry.AgentTelemetryConfigOK) (*TelemetryConfig, error) {
if resp.Payload == nil {
return nil, fmt.Errorf("missing payload")
}

if resp.Payload.TelemetryConfig == nil {
return nil, fmt.Errorf("missing telemetry config")
}

payloadConfig := resp.Payload.TelemetryConfig
var metricsConfig MetricsConfig
if payloadConfig.Metrics != nil {
metricsConfig.Endpoint = payloadConfig.Metrics.Endpoint
metricsConfig.Filters = payloadConfig.Metrics.IncludeList
}
return &TelemetryConfig{
Endpoint: payloadConfig.Endpoint,
Labels: payloadConfig.Labels,
MetricsConfig: &metricsConfig,
}, nil
}

// Enabled verifies if telemetry is enabled by ensuring a valid endpoint has been retrieved.
// It returns full metrics endpoint and true if a valid endpoint was obtained.
func (t *TelemetryConfig) Enabled() (string, bool) {
endpoint := t.Endpoint
if override := t.MetricsConfig.Endpoint; override != "" {
endpoint = override
}

if endpoint == "" {
return "", false
}

// The endpoint from Telemetry Gateway is a domain without scheme, and without the metrics path, so they must be added.
return endpoint + metricsGatewayPath, true
}

// DefaultLabels returns a set of <key, value> string pairs that must be added as attributes to all exported telemetry data.
func (t *TelemetryConfig) DefaultLabels(cfg config.CloudConfig) map[string]string {
labels := make(map[string]string)
nodeID := string(cfg.NodeID)
if nodeID != "" {
labels["node_id"] = nodeID
}
if cfg.NodeName != "" {
labels["node_name"] = cfg.NodeName
}

for k, v := range t.Labels {
labels[k] = v
}

return labels
}
240 changes: 81 additions & 159 deletions agent/hcp/client/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,200 +2,122 @@ package client

import (
"context"
"fmt"
"net/url"
"regexp"
"testing"
"time"

"github.com/hashicorp/consul/agent/hcp/config"
"github.com/hashicorp/consul/types"
"github.com/hashicorp/hcp-sdk-go/clients/cloud-consul-telemetry-gateway/preview/2023-04-14/client/consul_telemetry_service"
"github.com/go-openapi/runtime"
hcptelemetry "github.com/hashicorp/hcp-sdk-go/clients/cloud-consul-telemetry-gateway/preview/2023-04-14/client/consul_telemetry_service"
"github.com/hashicorp/hcp-sdk-go/clients/cloud-consul-telemetry-gateway/preview/2023-04-14/models"
"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/require"
)

func TestFetchTelemetryConfig(t *testing.T) {
t.Parallel()
for name, test := range map[string]struct {
metricsEndpoint string
expect func(*MockClient)
disabled bool
}{
"success": {
expect: func(mockClient *MockClient) {
mockClient.EXPECT().FetchTelemetryConfig(mock.Anything).Return(&TelemetryConfig{
Endpoint: "https://test.com",
MetricsConfig: &MetricsConfig{
Endpoint: "",
},
}, nil)
},
metricsEndpoint: "https://test.com/v1/metrics",
},
"overrideMetricsEndpoint": {
expect: func(mockClient *MockClient) {
mockClient.EXPECT().FetchTelemetryConfig(mock.Anything).Return(&TelemetryConfig{
Endpoint: "https://test.com",
MetricsConfig: &MetricsConfig{
Endpoint: "https://test.com",
},
}, nil)
},
metricsEndpoint: "https://test.com/v1/metrics",
},
"disabledWithEmptyEndpoint": {
expect: func(mockClient *MockClient) {
mockClient.EXPECT().FetchTelemetryConfig(mock.Anything).Return(&TelemetryConfig{
Endpoint: "",
MetricsConfig: &MetricsConfig{
Endpoint: "",
},
}, nil)
},
disabled: true,
},
} {
test := test
t.Run(name, func(t *testing.T) {
t.Parallel()

mock := NewMockClient(t)
test.expect(mock)

telemetryCfg, err := mock.FetchTelemetryConfig(context.Background())
require.NoError(t, err)

if test.disabled {
endpoint, ok := telemetryCfg.Enabled()
require.False(t, ok)
require.Empty(t, endpoint)
return
}
type mockTGW struct {
mockResponse *hcptelemetry.AgentTelemetryConfigOK
mockError error
}

endpoint, ok := telemetryCfg.Enabled()
func (m *mockTGW) AgentTelemetryConfig(params *hcptelemetry.AgentTelemetryConfigParams, authInfo runtime.ClientAuthInfoWriter, opts ...hcptelemetry.ClientOption) (*hcptelemetry.AgentTelemetryConfigOK, error) {
return m.mockResponse, m.mockError
}
func (m *mockTGW) GetLabelValues(params *hcptelemetry.GetLabelValuesParams, authInfo runtime.ClientAuthInfoWriter, opts ...hcptelemetry.ClientOption) (*hcptelemetry.GetLabelValuesOK, error) {
return hcptelemetry.NewGetLabelValuesOK(), nil
}
func (m *mockTGW) QueryRangeBatch(params *hcptelemetry.QueryRangeBatchParams, authInfo runtime.ClientAuthInfoWriter, opts ...hcptelemetry.ClientOption) (*hcptelemetry.QueryRangeBatchOK, error) {
return hcptelemetry.NewQueryRangeBatchOK(), nil
}
func (m *mockTGW) SetTransport(transport runtime.ClientTransport) {}

require.True(t, ok)
require.Equal(t, test.metricsEndpoint, endpoint)
})
}
type expectedTelemetryCfg struct {
endpoint string
labels map[string]string
filters string
refreshInterval time.Duration
}

func TestConvertTelemetryConfig(t *testing.T) {
func TestFetchTelemetryConfig(t *testing.T) {
t.Parallel()
for name, test := range map[string]struct {
resp *consul_telemetry_service.AgentTelemetryConfigOK
expectedTelemetryCfg *TelemetryConfig
wantErr string
for name, tc := range map[string]struct {
mockResponse *hcptelemetry.AgentTelemetryConfigOK
mockError error
wantErr string
expected *expectedTelemetryCfg
}{
"success": {
resp: &consul_telemetry_service.AgentTelemetryConfigOK{
Payload: &models.HashicorpCloudConsulTelemetry20230414AgentTelemetryConfigResponse{
TelemetryConfig: &models.HashicorpCloudConsulTelemetry20230414TelemetryConfig{
Endpoint: "https://test.com",
Labels: map[string]string{"test": "test"},
},
},
},
expectedTelemetryCfg: &TelemetryConfig{
Endpoint: "https://test.com",
Labels: map[string]string{"test": "test"},
MetricsConfig: &MetricsConfig{},
"errorsWithFetchFailure": {
mockError: fmt.Errorf("failed to fetch from HCP"),
mockResponse: nil,
wantErr: "failed to fetch from HCP",
},
"errorsWithInvalidPayload": {
mockResponse: &hcptelemetry.AgentTelemetryConfigOK{
Payload: &models.HashicorpCloudConsulTelemetry20230414AgentTelemetryConfigResponse{},
},
mockError: nil,
wantErr: "invalid response payload",
},
"successWithMetricsConfig": {
resp: &consul_telemetry_service.AgentTelemetryConfigOK{
"success:": {
mockResponse: &hcptelemetry.AgentTelemetryConfigOK{
Payload: &models.HashicorpCloudConsulTelemetry20230414AgentTelemetryConfigResponse{
RefreshConfig: &models.HashicorpCloudConsulTelemetry20230414RefreshConfig{
RefreshInterval: "1s",
},
TelemetryConfig: &models.HashicorpCloudConsulTelemetry20230414TelemetryConfig{
Endpoint: "https://test.com",
Labels: map[string]string{"test": "test"},
Labels: map[string]string{"test": "123"},
Metrics: &models.HashicorpCloudConsulTelemetry20230414TelemetryMetricsConfig{
Endpoint: "https://metrics-test.com",
IncludeList: []string{"consul.raft.apply"},
IncludeList: []string{"consul", "test"},
},
},
},
},
expectedTelemetryCfg: &TelemetryConfig{
Endpoint: "https://test.com",
Labels: map[string]string{"test": "test"},
MetricsConfig: &MetricsConfig{
Endpoint: "https://metrics-test.com",
Filters: []string{"consul.raft.apply"},
},
},
},
"errorsWithNilPayload": {
resp: &consul_telemetry_service.AgentTelemetryConfigOK{},
wantErr: "missing payload",
},
"errorsWithNilTelemetryConfig": {
resp: &consul_telemetry_service.AgentTelemetryConfigOK{
Payload: &models.HashicorpCloudConsulTelemetry20230414AgentTelemetryConfigResponse{},
expected: &expectedTelemetryCfg{
endpoint: "https://test.com/v1/metrics",
labels: map[string]string{"test": "123"},
filters: "consul|test",
refreshInterval: 1 * time.Second,
},
wantErr: "missing telemetry config",
},
} {
test := test
tc := tc
t.Run(name, func(t *testing.T) {
t.Parallel()
telemetryCfg, err := convertTelemetryConfig(test.resp)
if test.wantErr != "" {
c := &hcpClient{
tgw: &mockTGW{
mockError: tc.mockError,
mockResponse: tc.mockResponse,
},
}

telemetryCfg, err := c.FetchTelemetryConfig(context.Background())

if tc.wantErr != "" {
require.Error(t, err)
require.Contains(t, err.Error(), tc.wantErr)
require.Nil(t, telemetryCfg)
require.Contains(t, err.Error(), test.wantErr)
return
}

urlEndpoint, err := url.Parse(tc.expected.endpoint)
require.NoError(t, err)
require.Equal(t, test.expectedTelemetryCfg, telemetryCfg)
})
}
}

func Test_DefaultLabels(t *testing.T) {
for name, tc := range map[string]struct {
cfg config.CloudConfig
expectedLabels map[string]string
}{
"Success": {
cfg: config.CloudConfig{
NodeID: types.NodeID("nodeyid"),
NodeName: "nodey",
},
expectedLabels: map[string]string{
"node_id": "nodeyid",
"node_name": "nodey",
},
},
regexFilters, err := regexp.Compile(tc.expected.filters)
require.NoError(t, err)

"NoNodeID": {
cfg: config.CloudConfig{
NodeID: types.NodeID(""),
NodeName: "nodey",
},
expectedLabels: map[string]string{
"node_name": "nodey",
},
},
"NoNodeName": {
cfg: config.CloudConfig{
NodeID: types.NodeID("nodeyid"),
NodeName: "",
},
expectedLabels: map[string]string{
"node_id": "nodeyid",
},
},
"Empty": {
cfg: config.CloudConfig{
NodeID: "",
NodeName: "",
},
expectedLabels: map[string]string{},
},
} {
t.Run(name, func(t *testing.T) {
tCfg := &TelemetryConfig{}
labels := tCfg.DefaultLabels(tc.cfg)
require.Equal(t, labels, tc.expectedLabels)
expectedCfg := &TelemetryConfig{
MetricsConfig: &MetricsConfig{
Endpoint: urlEndpoint,
Filters: regexFilters,
Labels: tc.expected.labels,
},
RefreshConfig: &RefreshConfig{
RefreshInterval: tc.expected.refreshInterval,
},
}

require.NoError(t, err)
require.Equal(t, expectedCfg, telemetryCfg)
})
}
}
Loading

0 comments on commit c56781a

Please sign in to comment.