diff --git a/agentcfg/elasticsearch.go b/agentcfg/elasticsearch.go new file mode 100644 index 0000000..f8f8a90 --- /dev/null +++ b/agentcfg/elasticsearch.go @@ -0,0 +1,250 @@ +// Licensed to Elasticsearch B.V. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Elasticsearch B.V. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package agentcfg // import "github.com/elastic/opentelemetry-collector-components/internal/agentcfg" + +import ( + "context" + "encoding/json" + "errors" + "fmt" + "io" + "net/http" + "sync" + "sync/atomic" + "time" + + "go.uber.org/zap" + + "github.com/elastic/go-elasticsearch/v8" + "github.com/elastic/go-elasticsearch/v8/esapi" +) + +const ElasticsearchIndexName = ".apm-agent-configuration" + +const ( + // ErrInfrastructureNotReady is returned when a fetch request comes in while + // the infrastructure is not ready to serve the request. + // This may happen when the local cache is not initialized and no fallback fetcher is configured. + ErrInfrastructureNotReady = "agentcfg infrastructure is not ready" + + // ErrNoValidElasticsearchConfig is an error where the server is + // not properly configured to fetch agent configuration. + ErrNoValidElasticsearchConfig = "no valid elasticsearch config to fetch agent config" +) + +const ( + refreshCacheTimeout = 5 * time.Second + loggerRateLimit = time.Minute +) + +// TODO: +// - Add Otel tracer +// - Collection metrics +type ElasticsearchFetcher struct { + last time.Time + client *elasticsearch.Client + logger *zap.Logger + cache []AgentConfig + cacheDuration time.Duration + searchSize int + mu sync.RWMutex + invalidESCfg atomic.Bool + cacheInitialized atomic.Bool +} + +func NewElasticsearchFetcher( + client *elasticsearch.Client, + cacheDuration time.Duration, + logger *zap.Logger, +) *ElasticsearchFetcher { + return &ElasticsearchFetcher{ + client: client, + cacheDuration: cacheDuration, + searchSize: 100, + logger: logger, + } +} + +// Fetch finds a matching agent config based on the received query. +func (f *ElasticsearchFetcher) Fetch(ctx context.Context, query Query) (Result, error) { + if f.cacheInitialized.Load() { + // Happy path: serve fetch requests using an initialized cache. + f.mu.RLock() + defer f.mu.RUnlock() + return matchAgentConfig(query, f.cache), nil + } + + if f.invalidESCfg.Load() { + return Result{}, errors.New(ErrNoValidElasticsearchConfig) + } + + return Result{}, errors.New(ErrInfrastructureNotReady) +} + +// Run refreshes the fetcher cache by querying Elasticsearch periodically. +func (f *ElasticsearchFetcher) Run(ctx context.Context) error { + refresh := func() bool { + // refresh returns a bool that indicates whether Run should return + // immediately without error, e.g. due to invalid Elasticsearch config. + if err := f.refreshCache(ctx); err != nil { + + f.logger.Error(fmt.Sprintf("refresh cache error: %s", err)) + if f.invalidESCfg.Load() { + f.logger.Warn("stopping refresh cache background job: elasticsearch config is invalid") + return true + } + } else { + f.logger.Debug("refresh cache success") + } + return false + } + + // Trigger initial run. + select { + case <-ctx.Done(): + return ctx.Err() + default: + if stop := refresh(); stop { + return nil + } + } + + // Then schedule subsequent runs. + t := time.NewTicker(f.cacheDuration) + defer t.Stop() + for { + select { + case <-ctx.Done(): + return ctx.Err() + case <-t.C: + if stop := refresh(); stop { + return nil + } + } + } +} + +type cacheResult struct { + ScrollID string `json:"_scroll_id"` + Hits struct { + Hits []struct { + Source struct { + Settings map[string]string `json:"settings"` + Service struct { + Name string `json:"name"` + Environment string `json:"environment"` + } `json:"service"` + AgentName string `json:"agent_name"` + ETag string `json:"etag"` + } `json:"_source"` + } `json:"hits"` + } `json:"hits"` +} + +func (f *ElasticsearchFetcher) refreshCache(ctx context.Context) (err error) { + scrollID := "" + buffer := make([]AgentConfig, 0, len(f.cache)) + + // The refresh cache operation should complete within refreshCacheTimeout. + ctx, cancel := context.WithTimeout(ctx, refreshCacheTimeout) + defer cancel() + + for { + result, err := f.singlePageRefresh(ctx, scrollID) + if err != nil { + f.clearScroll(ctx, scrollID) + return err + } + + for _, hit := range result.Hits.Hits { + buffer = append(buffer, AgentConfig{ + ServiceName: hit.Source.Service.Name, + ServiceEnvironment: hit.Source.Service.Environment, + AgentName: hit.Source.AgentName, + Etag: hit.Source.ETag, + Config: hit.Source.Settings, + }) + } + scrollID = result.ScrollID + if len(result.Hits.Hits) == 0 { + break + } + } + + f.clearScroll(ctx, scrollID) + + f.mu.Lock() + f.cache = buffer + f.mu.Unlock() + f.cacheInitialized.Store(true) + f.last = time.Now() + return nil +} + +func (f *ElasticsearchFetcher) clearScroll(ctx context.Context, scrollID string) { + resp, err := esapi.ClearScrollRequest{ + ScrollID: []string{scrollID}, + }.Do(ctx, f.client) + if err != nil { + f.logger.Warn(fmt.Sprintf("failed to clear scroll: %v", err)) + return + } + + if resp.IsError() { + f.logger.Warn(fmt.Sprintf("clearscroll request returned error: %s", resp.Status())) + } + + resp.Body.Close() +} + +func (f *ElasticsearchFetcher) singlePageRefresh(ctx context.Context, scrollID string) (cacheResult, error) { + var result cacheResult + var err error + var resp *esapi.Response + + switch scrollID { + case "": + resp, err = esapi.SearchRequest{ + Index: []string{ElasticsearchIndexName}, + Size: &f.searchSize, + Scroll: f.cacheDuration, + }.Do(ctx, f.client) + default: + resp, err = esapi.ScrollRequest{ + ScrollID: scrollID, + Scroll: f.cacheDuration, + }.Do(ctx, f.client) + } + if err != nil { + return result, err + } + defer resp.Body.Close() + + if resp.StatusCode >= http.StatusBadRequest { + // Elasticsearch returns 401 on unauthorized requests and 403 on insufficient permission + if resp.StatusCode == http.StatusUnauthorized || resp.StatusCode == http.StatusForbidden { + f.invalidESCfg.Store(true) + } + bodyBytes, err := io.ReadAll(resp.Body) + if err == nil { + f.logger.Debug(fmt.Sprintf("refresh cache elasticsearch returned status %d: %s", resp.StatusCode, string(bodyBytes))) + } + return result, fmt.Errorf("refresh cache elasticsearch returned status %d", resp.StatusCode) + } + return result, json.NewDecoder(resp.Body).Decode(&result) +} diff --git a/agentcfg/elasticsearch_test.go b/agentcfg/elasticsearch_test.go new file mode 100644 index 0000000..ee3a335 --- /dev/null +++ b/agentcfg/elasticsearch_test.go @@ -0,0 +1,160 @@ +// Licensed to Elasticsearch B.V. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Elasticsearch B.V. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package agentcfg + +import ( + "context" + "encoding/json" + "net/http" + "net/http/httptest" + "strings" + "testing" + "time" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "go.uber.org/zap" + + "github.com/elastic/go-elasticsearch/v8" +) + +var sampleHits = []map[string]interface{}{ + {"_id": "h_KmzYQBfJ4l0GgqXgKA", "_index": ".apm-agent-configuration", "_score": 1, "_source": map[string]interface{}{"@timestamp": 1.669897543296e+12, "applied_by_agent": false, "etag": "ef12bf5e879c38e931d2894a9c90b2cb1b5fa190", "service": map[string]interface{}{"name": "first"}, "settings": map[string]interface{}{"sanitize_field_names": "foo,bar,baz", "transaction_sample_rate": "0.1"}}}, + {"_id": "hvKmzYQBfJ4l0GgqXgJt", "_index": ".apm-agent-configuration", "_score": 1, "_source": map[string]interface{}{"@timestamp": 1.669897543277e+12, "applied_by_agent": false, "etag": "2da2f86251165ccced5c5e41100a216b0c880db4", "service": map[string]interface{}{"name": "second"}, "settings": map[string]interface{}{"sanitize_field_names": "foo,bar,baz", "transaction_sample_rate": "0.1"}}}, +} + +func newMockElasticsearchClient(t testing.TB, handler func(http.ResponseWriter, *http.Request)) *elasticsearch.Client { + srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + w.Header().Set("X-Elastic-Product", "Elasticsearch") + handler(w, r) + })) + t.Cleanup(srv.Close) + config := elasticsearch.Config{} + config.Addresses = []string{srv.URL} + client, err := elasticsearch.NewClient(config) + require.NoError(t, err) + return client +} + +func newElasticsearchFetcher( + t testing.TB, + hits []map[string]interface{}, + searchSize int, +) *ElasticsearchFetcher { + maxScore := func(hits []map[string]interface{}) interface{} { + if len(hits) == 0 { + return nil + } + return 1 + } + respTmpl := map[string]interface{}{ + "_scroll_id": "FGluY2x1ZGVfY29udGV4dF91dWlkDXF1ZXJ5QW5kRmV0Y2gBFkJUT0Z5bFUtUXRXM3NTYno0dkM2MlEAAAAAAABnRBY5OUxYalAwUFFoS1NfLV9lWjlSYTRn", + "_shards": map[string]interface{}{"failed": 0, "skipped": 0, "successful": 1, "total": 1}, + "hits": map[string]interface{}{ + "hits": []map[string]interface{}{}, + "max_score": maxScore(hits), + "total": map[string]interface{}{"relation": "eq", "value": len(hits)}, + }, + "timed_out": false, + "took": 1, + } + + i := 0 + + fetcher := NewElasticsearchFetcher(newMockElasticsearchClient(t, func(w http.ResponseWriter, r *http.Request) { + if r.Method == http.MethodDelete && strings.HasPrefix(r.URL.Path, "/_search/scroll") { + scrollID := strings.TrimPrefix(r.URL.Path, "/_search/scroll/") + assert.Equal(t, respTmpl["_scroll_id"], scrollID) + return + } + switch r.URL.Path { + case "/_search/scroll": + scrollID := r.URL.Query().Get("scroll_id") + assert.Equal(t, respTmpl["_scroll_id"], scrollID) + case "/.apm-agent-configuration/_search": + default: + assert.Failf(t, "unexpected path", "path: %s", r.URL.Path) + } + if i < len(hits) { + respTmpl["hits"].(map[string]interface{})["hits"] = hits[i : i+searchSize] + } else { + respTmpl["hits"].(map[string]interface{})["hits"] = []map[string]interface{}{} + } + + b, err := json.Marshal(respTmpl) + require.NoError(t, err) + w.WriteHeader(200) + _, err = w.Write(b) + require.NoError(t, err) + i += searchSize + }), time.Second, zap.NewNop()) + fetcher.searchSize = searchSize + return fetcher +} + +func TestFetch(t *testing.T) { + fetcher := newElasticsearchFetcher(t, sampleHits, 2) + err := fetcher.refreshCache(context.Background()) + require.NoError(t, err) + require.Len(t, fetcher.cache, 2) + + result, err := fetcher.Fetch(context.Background(), Query{Service: Service{Name: "first"}, Etag: ""}) + require.NoError(t, err) + require.Equal(t, Result{Source: Source{ + Settings: map[string]string{"sanitize_field_names": "foo,bar,baz", "transaction_sample_rate": "0.1"}, + Etag: "ef12bf5e879c38e931d2894a9c90b2cb1b5fa190", + Agent: "", + }}, result) +} + +func TestRefreshCacheScroll(t *testing.T) { + fetcher := newElasticsearchFetcher(t, sampleHits, 1) + err := fetcher.refreshCache(context.Background()) + require.NoError(t, err) + require.Len(t, fetcher.cache, 2) + require.Equal(t, "first", fetcher.cache[0].ServiceName) + require.Equal(t, "second", fetcher.cache[1].ServiceName) +} + +func TestFetchOnCacheNotReady(t *testing.T) { + fetcher := newElasticsearchFetcher(t, []map[string]interface{}{}, 1) + + _, err := fetcher.Fetch(context.Background(), Query{Service: Service{Name: ""}, Etag: ""}) + require.EqualError(t, err, ErrInfrastructureNotReady) + + err = fetcher.refreshCache(context.Background()) + require.NoError(t, err) + + _, err = fetcher.Fetch(context.Background(), Query{Service: Service{Name: ""}, Etag: ""}) + require.NoError(t, err) +} + +func TestFetchNoFallback(t *testing.T) { + fetcher := NewElasticsearchFetcher( + newMockElasticsearchClient(t, func(w http.ResponseWriter, r *http.Request) { + w.WriteHeader(500) + }), + time.Second, + zap.NewNop(), + ) + + err := fetcher.refreshCache(context.Background()) + require.EqualError(t, err, "refresh cache elasticsearch returned status 500") + _, err = fetcher.Fetch(context.Background(), Query{Service: Service{Name: ""}, Etag: ""}) + require.EqualError(t, err, ErrInfrastructureNotReady) +} diff --git a/agentcfg/fetch.go b/agentcfg/fetch.go new file mode 100644 index 0000000..a88f194 --- /dev/null +++ b/agentcfg/fetch.go @@ -0,0 +1,100 @@ +// Licensed to Elasticsearch B.V. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Elasticsearch B.V. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package agentcfg // import "github.com/elastic/opentelemetry-collector-components/internal/agentcfg" + +import ( + "context" +) + +// TransactionSamplingRateKey is the agent configuration key for the +// sampling rate. This is used by the Jaeger handler to adapt our agent +// configuration to the Jaeger remote sampler protocol. +const TransactionSamplingRateKey = "transaction_sample_rate" + +// Fetcher defines a common interface to retrieving agent config. +type Fetcher interface { + Fetch(context.Context, Query) (Result, error) +} + +// AgentConfig holds an agent configuration definition. +type AgentConfig struct { + // Config holds configuration settings that should be sent to + // agents matching the above constraints. + Config map[string]string + // ServiceName holds the service name to which this agent configuration + // applies. This is optional. + ServiceName string + // ServiceEnvironment holds the service environment to which this agent + // configuration applies. This is optional. + ServiceEnvironment string + // AgentName holds the agent name to which this agent configuration + // applies. This is optional, and is used for filtering configuration + // settings for unauthenticated agents. + AgentName string + // Etag holds a unique ID for the configuration, which agents + // will send along with their queries. The server uses this to + // determine whether agent configuration has been applied. + Etag string +} + +// matchAgentConfig finds a matching AgentConfig based on the received Query. +// Order of precedence: +// - service.name and service.environment match an AgentConfig +// - service.name matches an AgentConfig, service.environment == "" +// - service.environment matches an AgentConfig, service.name == "" +// - an AgentConfig without a name or environment set +// Return an empty result if no matching result is found. +func matchAgentConfig(query Query, cfgs []AgentConfig) Result { + name, env := query.Service.Name, query.Service.Environment + result := zeroResult() + var nameConf, envConf, defaultConf *AgentConfig + + for i, cfg := range cfgs { + if cfg.ServiceName == name && cfg.ServiceEnvironment == env { + nameConf = &cfgs[i] + break + } else if cfg.ServiceName == name && cfg.ServiceEnvironment == "" { + nameConf = &cfgs[i] + } else if cfg.ServiceName == "" && cfg.ServiceEnvironment == env { + envConf = &cfgs[i] + } else if cfg.ServiceName == "" && cfg.ServiceEnvironment == "" { + defaultConf = &cfgs[i] + } + } + + if nameConf != nil { + result = Result{Source{ + Settings: nameConf.Config, + Etag: nameConf.Etag, + Agent: nameConf.AgentName, + }} + } else if envConf != nil { + result = Result{Source{ + Settings: envConf.Config, + Etag: envConf.Etag, + Agent: envConf.AgentName, + }} + } else if defaultConf != nil { + result = Result{Source{ + Settings: defaultConf.Config, + Etag: defaultConf.Etag, + Agent: defaultConf.AgentName, + }} + } + return result +} diff --git a/agentcfg/fetch_test.go b/agentcfg/fetch_test.go new file mode 100644 index 0000000..f49cbbb --- /dev/null +++ b/agentcfg/fetch_test.go @@ -0,0 +1,259 @@ +// Licensed to Elasticsearch B.V. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Elasticsearch B.V. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package agentcfg + +import ( + "context" + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func TestCustomJSON(t *testing.T) { + expected := Result{Source: Source{ + Etag: "123", + Settings: map[string]string{"transaction_sampling_rate": "0.3"}, + }} + input := `{"_id": "1", "_source":{"etag":"123", "settings":{"transaction_sampling_rate": 0.3}}}` + actual, _ := newResult([]byte(input), nil) + assert.Equal(t, expected, actual) +} + +type fetcherMock struct { + fetchFn func(context.Context, Query) (Result, error) +} + +func (f *fetcherMock) Fetch(ctx context.Context, query Query) (Result, error) { + return f.fetchFn(ctx, query) +} + +func TestDirectConfigurationPrecedence(t *testing.T) { + for _, tc := range []struct { + query Query + agentConfigs []AgentConfig + expectedSettings map[string]string + }{ + { + query: Query{ + Service: Service{ + Name: "service1", + Environment: "production", + }, + }, + agentConfigs: []AgentConfig{ + { + ServiceEnvironment: "production", + Config: map[string]string{"key1": "val2", "key2": "val2"}, + Etag: "def456", + }, + { + ServiceName: "service1", + Config: map[string]string{"key3": "val3"}, + Etag: "abc123", + }, + { + ServiceName: "service1", + ServiceEnvironment: "production", + Config: map[string]string{"key1": "val1"}, + Etag: "abc123", + }, + }, + expectedSettings: map[string]string{ + "key1": "val1", + }, + }, + { + query: Query{ + Service: Service{ + Name: "service1", + Environment: "production", + }, + }, + agentConfigs: []AgentConfig{ + { + ServiceEnvironment: "production", + Config: map[string]string{"key3": "val3"}, + Etag: "def456", + }, + { + ServiceName: "service1", + Config: map[string]string{"key1": "val1", "key2": "val2"}, + Etag: "abc123", + }, + }, + expectedSettings: map[string]string{ + "key1": "val1", + "key2": "val2", + }, + }, + { + query: Query{ + InsecureAgents: []string{"Jaeger"}, + Service: Service{ + Name: "service1", + Environment: "production", + }, + }, + agentConfigs: []AgentConfig{ + { + ServiceEnvironment: "production", + Config: map[string]string{"key3": "val3"}, + Etag: "def456", + }, + { + ServiceName: "service1", + Config: map[string]string{"key1": "val1", "key2": "val2"}, + Etag: "abc123", + }, + }, + expectedSettings: map[string]string{ + "key1": "val1", + "key2": "val2", + }, + }, + { + query: Query{ + InsecureAgents: []string{"Jaeger"}, + Service: Service{ + Name: "service1", + Environment: "production", + }, + }, + agentConfigs: []AgentConfig{ + { + ServiceEnvironment: "production", + Config: map[string]string{"key3": "val3"}, + Etag: "def456", + }, + { + ServiceName: "service1", + AgentName: "Jaeger/Python", + Config: map[string]string{"key1": "val1", "key2": "val2", "transaction_sample_rate": "0.1"}, + Etag: "abc123", + }, + }, + expectedSettings: map[string]string{ + "key1": "val1", + "key2": "val2", + "transaction_sample_rate": "0.1", + }, + }, + { + query: Query{ + Service: Service{ + Name: "service1", + Environment: "production", + }, + }, + agentConfigs: []AgentConfig{ + { + ServiceName: "service2", + Config: map[string]string{"key1": "val1", "key2": "val2"}, + Etag: "abc123", + }, + { + ServiceEnvironment: "production", + Config: map[string]string{"key3": "val3"}, + Etag: "def456", + }, + }, + expectedSettings: map[string]string{ + "key3": "val3", + }, + }, + { + query: Query{ + Service: Service{ + Name: "service1", + Environment: "production", + }, + }, + agentConfigs: []AgentConfig{ + { + ServiceName: "not-found", + Config: map[string]string{"key1": "val1"}, + Etag: "abc123", + }, + }, + expectedSettings: map[string]string{}, + }, + { + query: Query{ + Service: Service{ + Name: "service2", + Environment: "production", + }, + }, + agentConfigs: []AgentConfig{ + { + ServiceName: "service1", + Config: map[string]string{"key1": "val1", "key2": "val2"}, + Etag: "abc123", + }, + { + ServiceName: "service2", + Config: map[string]string{"key1": "val4", "key2": "val5"}, + Etag: "abc123", + }, + }, + expectedSettings: map[string]string{ + "key1": "val4", + "key2": "val5", + }, + }, + { + query: Query{ + Service: Service{ + Name: "service2", + Environment: "staging", + }, + }, + agentConfigs: []AgentConfig{ + { + ServiceName: "service1", + Config: map[string]string{"key1": "val1", "key2": "val2"}, + Etag: "abc123", + }, + { + ServiceEnvironment: "production", + Config: map[string]string{"key1": "val4", "key2": "val5"}, + Etag: "abc123", + }, + { + Config: map[string]string{"key3": "val5", "key4": "val6"}, + Etag: "abc123", + }, + }, + expectedSettings: map[string]string{ + "key3": "val5", + "key4": "val6", + }, + }, + } { + f := fetcherMock{ + fetchFn: func(_ctx context.Context, query Query) (Result, error) { + return matchAgentConfig(query, tc.agentConfigs), nil + }, + } + result, err := f.Fetch(context.Background(), tc.query) + require.NoError(t, err) + + assert.Equal(t, Settings(tc.expectedSettings), result.Source.Settings) + } +} diff --git a/agentcfg/model.go b/agentcfg/model.go new file mode 100644 index 0000000..e59915c --- /dev/null +++ b/agentcfg/model.go @@ -0,0 +1,105 @@ +// Licensed to Elasticsearch B.V. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Elasticsearch B.V. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package agentcfg // import "github.com/elastic/opentelemetry-collector-components/internal/agentcfg" + +import ( + "encoding/json" + "fmt" +) + +const ( + // ServiceName keyword + ServiceName = "service.name" + // ServiceEnv keyword + ServiceEnv = "service.environment" + // Etag / If-None-Match keyword + Etag = "ifnonematch" + // EtagSentinel is a value to return back to agents when Kibana doesn't have any configuration + EtagSentinel = "-" +) + +// UnrestrictedSettings are settings considered safe to be returned to all requesters, +// including unauthenticated ones such as RUM. +var UnrestrictedSettings = map[string]bool{"transaction_sample_rate": true} + +// Result models a Kibana response +type Result struct { + Source Source `json:"_source"` +} + +// Source is the Elasticsearch _source +type Source struct { + Settings Settings `json:"settings"` + Etag string `json:"etag"` + Agent string `json:"agent_name"` +} + +// Query represents an URL body or query params for agent configuration +type Query struct { + Service Service `json:"service"` + // Etag should be set to the Etag of a previous agent config query result. + // When the query is processed by the receiver a new Etag is calculated + // for the query result. If Etags from the query and the query result match, + // it indicates that the exact same query response has already been delivered. + Etag string `json:"etag"` + // InsecureAgents holds a set of prefixes for restricting results to those whose + // agent name matches any of the specified prefixes. + // + // If InsecureAgents is non-empty, and any of the prefixes matches the result, + // then the resulting settings will be filtered down to the subset of settings + // identified by UnrestrictedSettings. Otherwise, if InsecureAgents is empty, + // the agent name is ignored and no restrictions are applied. + InsecureAgents []string `json:"-"` + // MarkAsAppliedByAgent can be used to signal to the receiver that the response to this + // query can be considered to have been applied immediately. When building queries for Elastic APM + // agent requests the Etag should be set, instead of the AppliedByAgent setting. + // Use this flag when building queries for third party integrations, + // such as Jaeger, that do not send an Etag in their request. + MarkAsAppliedByAgent bool `json:"mark_as_applied_by_agent,omitempty"` +} + +// Service holds supported attributes for querying configuration +type Service struct { + Name string `json:"name"` + Environment string `json:"environment,omitempty"` +} + +// Settings hold agent configuration +type Settings map[string]string + +// UnmarshalJSON overrides default method to convert any JSON type to string +func (s Settings) UnmarshalJSON(b []byte) error { + in := make(map[string]interface{}) + err := json.Unmarshal(b, &in) + for k, v := range in { + s[k] = fmt.Sprintf("%v", v) + } + return err +} + +func zeroResult() Result { + return Result{Source: Source{Settings: Settings{}, Etag: EtagSentinel}} +} + +func newResult(b []byte, err error) (Result, error) { + r := zeroResult() + if err == nil && len(b) > 0 { + err = json.Unmarshal(b, &r) + } + return r, err +} diff --git a/agentcfg/model_test.go b/agentcfg/model_test.go new file mode 100644 index 0000000..b46f72d --- /dev/null +++ b/agentcfg/model_test.go @@ -0,0 +1,70 @@ +// Licensed to Elasticsearch B.V. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Elasticsearch B.V. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package agentcfg + +import ( + "encoding/json" + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func TestNewDoc(t *testing.T) { + t.Run("InvalidInput", func(t *testing.T) { + _, err := newResult([]byte("some string"), nil) + assert.Error(t, err) + }) + + t.Run("EmptyInput", func(t *testing.T) { + d, err := newResult([]byte{}, nil) + require.NoError(t, err) + assert.Equal(t, zeroResult(), d) + }) + + t.Run("ValidInput", func(t *testing.T) { + inp := []byte(`{"_id": "1234", "_source": {"etag":"123", "settings":{"sample_rate":0.5}}}`) + + d, err := newResult(inp, nil) + require.NoError(t, err) + assert.Equal(t, Result{Source{Etag: "123", Settings: Settings{"sample_rate": "0.5"}}}, d) + }) +} + +func TestQueryMarshaling(t *testing.T) { + for _, tc := range []struct { + name string + input string + out string + }{ + {name: "third_party", + input: `{"service":{"name":"auth-service","environment":"production"},"mark_as_applied_by_agent":true}`, + out: `{"service":{"name":"auth-service","environment":"production"},"mark_as_applied_by_agent":true,"etag":""}`}, + {name: "elastic_apm", + input: `{"service":{"name":"auth-service","environment":"production"},"etag":"1234"}`, + out: `{"service":{"name":"auth-service","environment":"production"},"etag":"1234"}`}, + } { + t.Run(tc.name, func(t *testing.T) { + var query Query + require.NoError(t, json.Unmarshal([]byte(tc.input), &query)) + out, err := json.Marshal(query) + require.NoError(t, err) + assert.JSONEq(t, tc.out, string(out)) + }) + } +} diff --git a/go.mod b/go.mod index 024f9a9..cd7e117 100644 --- a/go.mod +++ b/go.mod @@ -3,6 +3,7 @@ module github.com/elastic/opentelemetry-lib go 1.22.7 require ( + github.com/elastic/go-elasticsearch/v8 v8.17.0 github.com/google/go-cmp v0.6.0 github.com/open-telemetry/opentelemetry-collector-contrib/pkg/golden v0.117.0 github.com/open-telemetry/opentelemetry-collector-contrib/pkg/pdatatest v0.117.0 @@ -17,12 +18,18 @@ require ( require ( github.com/cespare/xxhash/v2 v2.3.0 // indirect github.com/davecgh/go-spew v1.1.1 // indirect + github.com/elastic/elastic-transport-go/v8 v8.6.0 // indirect + github.com/go-logr/logr v1.4.2 // indirect + github.com/go-logr/stdr v1.2.2 // indirect github.com/gogo/protobuf v1.3.2 // indirect github.com/json-iterator/go v1.1.12 // indirect github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect github.com/modern-go/reflect2 v1.0.2 // indirect github.com/open-telemetry/opentelemetry-collector-contrib/pkg/pdatautil v0.117.0 // indirect github.com/pmezard/go-difflib v1.0.0 // indirect + go.opentelemetry.io/otel v1.31.0 // indirect + go.opentelemetry.io/otel/metric v1.31.0 // indirect + go.opentelemetry.io/otel/trace v1.31.0 // indirect go.uber.org/multierr v1.11.0 // indirect golang.org/x/net v0.33.0 // indirect golang.org/x/sys v0.29.0 // indirect diff --git a/go.sum b/go.sum index 18ec34a..42281c5 100644 --- a/go.sum +++ b/go.sum @@ -3,6 +3,11 @@ github.com/cespare/xxhash/v2 v2.3.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XL github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/elastic/elastic-transport-go/v8 v8.6.0 h1:Y2S/FBjx1LlCv5m6pWAF2kDJAHoSjSRSJCApolgfthA= +github.com/elastic/elastic-transport-go/v8 v8.6.0/go.mod h1:YLHer5cj0csTzNFXoNQ8qhtGY1GTvSqPnKWKaqQE3Hk= +github.com/elastic/go-elasticsearch/v8 v8.17.0 h1:e9cWksE/Fr7urDRmGPGp47Nsp4/mvNOrU8As1l2HQQ0= +github.com/elastic/go-elasticsearch/v8 v8.17.0/go.mod h1:lGMlgKIbYoRvay3xWBeKahAiJOgmFDsjZC39nmO3H64= +github.com/go-logr/logr v1.2.2/go.mod h1:jdQByPbusPIv2/zmleS9BjJVeZ6kBagPoEUsqbVz/1A= github.com/go-logr/logr v1.4.2 h1:6pFjapn8bFcIbiKo3XT4j/BhANplGihG6tvd+8rYgrY= github.com/go-logr/logr v1.4.2/go.mod h1:9T104GzyrTigFIr8wt5mBrctHMim0Nb2HLGrmQ40KvY= github.com/go-logr/stdr v1.2.2 h1:hSWxHoqTgW2S2qGc0LTAI563KZ5YKYRhT3MFKZMbjag=