diff --git a/CHANGELOG.next.asciidoc b/CHANGELOG.next.asciidoc index 11f916ddfce..e0e5e6912ab 100644 --- a/CHANGELOG.next.asciidoc +++ b/CHANGELOG.next.asciidoc @@ -62,6 +62,7 @@ https://github.com/elastic/beats/compare/v7.0.0-beta1...master[Check the HEAD di *Metricbeat* - Migrate docker autodiscover to ECS. {issue}10757[10757] {pull}10862[10862] +- Fix issue in kubernetes module preventing usage percentages to be properly calculated. {pull}10946[10946] *Packetbeat* diff --git a/metricbeat/module/kubernetes/util/metrics_cache.go b/metricbeat/module/kubernetes/util/metrics_cache.go index ffa8f235ed5..7ffff06edfe 100644 --- a/metricbeat/module/kubernetes/util/metrics_cache.go +++ b/metricbeat/module/kubernetes/util/metrics_cache.go @@ -18,17 +18,19 @@ package util import ( - "sync" "time" + + "github.com/elastic/beats/libbeat/common" ) // PerfMetrics stores known metrics from Kubernetes nodes and containers var PerfMetrics = NewPerfMetricsCache() -const defaultTimeout = 120 * time.Second +func init() { + PerfMetrics.Start() +} -var now = time.Now -var sleep = time.Sleep +const defaultTimeout = 120 * time.Second // NewPerfMetricsCache initializes and returns a new PerfMetricsCache func NewPerfMetricsCache() *PerfMetricsCache { @@ -43,7 +45,6 @@ func NewPerfMetricsCache() *PerfMetricsCache { // PerfMetricsCache stores known metrics from Kubernetes nodes and containers type PerfMetricsCache struct { - mutex sync.RWMutex NodeMemAllocatable *valueMap NodeCoresAllocatable *valueMap @@ -51,72 +52,64 @@ type PerfMetricsCache struct { ContainerCoresLimit *valueMap } -func newValueMap(timeout time.Duration) *valueMap { - return &valueMap{ - values: map[string]value{}, - timeout: timeout, - } +// Start cache workers +func (c *PerfMetricsCache) Start() { + c.NodeMemAllocatable.Start() + c.NodeCoresAllocatable.Start() + c.ContainerMemLimit.Start() + c.ContainerCoresLimit.Start() } -type valueMap struct { - sync.RWMutex - running bool - timeout time.Duration - values map[string]value +// Stop cache workers +func (c *PerfMetricsCache) Stop() { + c.NodeMemAllocatable.Stop() + c.NodeCoresAllocatable.Stop() + c.ContainerMemLimit.Stop() + c.ContainerCoresLimit.Stop() } -type value struct { - value float64 - expires int64 +type valueMap struct { + cache *common.Cache + timeout time.Duration } -// ContainerUID creates an unique ID for from namespace, pod name and container name -func ContainerUID(namespace, pod, container string) string { - return namespace + "-" + pod + "-" + container +func newValueMap(timeout time.Duration) *valueMap { + return &valueMap{ + cache: common.NewCache(timeout, 0), + timeout: timeout, + } } // Get value func (m *valueMap) Get(name string) float64 { - m.RLock() - defer m.RUnlock() - return m.values[name].value + return m.GetWithDefault(name, 0.0) } // Get value func (m *valueMap) GetWithDefault(name string, def float64) float64 { - m.RLock() - defer m.RUnlock() - val, ok := m.values[name] - if ok { - return val.value + v := m.cache.Get(name) + if v, ok := v.(float64); ok { + return v } return def } // Set value func (m *valueMap) Set(name string, val float64) { - m.Lock() - defer m.Unlock() - m.ensureCleanupWorker() - m.values[name] = value{val, now().Add(m.timeout).Unix()} + m.cache.PutWithTimeout(name, val, m.timeout) } -func (m *valueMap) ensureCleanupWorker() { - if !m.running { - // Run worker to cleanup expired entries - m.running = true - go func() { - for { - sleep(m.timeout) - m.Lock() - now := now().Unix() - for name, val := range m.values { - if now > val.expires { - delete(m.values, name) - } - } - m.Unlock() - } - }() - } +// Start cache workers +func (m *valueMap) Start() { + m.cache.StartJanitor(m.timeout) +} + +// Stop cache workers +func (m *valueMap) Stop() { + m.cache.StopJanitor() +} + +// ContainerUID creates an unique ID for from namespace, pod name and container name +func ContainerUID(namespace, pod, container string) string { + return namespace + "/" + pod + "/" + container } diff --git a/metricbeat/module/kubernetes/util/metrics_cache_test.go b/metricbeat/module/kubernetes/util/metrics_cache_test.go index d5ce7bd2bb8..649c1f5fb86 100644 --- a/metricbeat/module/kubernetes/util/metrics_cache_test.go +++ b/metricbeat/module/kubernetes/util/metrics_cache_test.go @@ -19,45 +19,10 @@ package util import ( "testing" - "time" "github.com/stretchr/testify/assert" ) -func TestTimeout(t *testing.T) { - // Mock monotonic time: - fakeTimeCh := make(chan int64) - go func() { - fakeTime := time.Now().Unix() - for { - fakeTime++ - fakeTimeCh <- fakeTime - } - }() - - now = func() time.Time { - return time.Unix(<-fakeTimeCh, 0) - } - - // Blocking sleep: - sleepCh := make(chan struct{}) - sleep = func(time.Duration) { - <-sleepCh - } - - test := newValueMap(1 * time.Second) - - test.Set("foo", 3.14) - - // Let cleanup do its job - sleepCh <- struct{}{} - sleepCh <- struct{}{} - sleepCh <- struct{}{} - - // Check it expired - assert.Equal(t, 0.0, test.Get("foo")) -} - func TestValueMap(t *testing.T) { test := newValueMap(defaultTimeout) @@ -82,5 +47,5 @@ func TestGetWithDefault(t *testing.T) { } func TestContainerUID(t *testing.T) { - assert.Equal(t, "a-b-c", ContainerUID("a", "b", "c")) + assert.Equal(t, "a/b/c", ContainerUID("a", "b", "c")) }