Skip to content

Commit

Permalink
Ensure kubernetes caches don't expire if they are being read (#10946) (
Browse files Browse the repository at this point in the history
…#11057)

Some metrics in metricbeat kubernetes module are cached during a time,
if they are not updated they are removed. But it is usual to have pods or
containers that are not updated during more time that the expiration cache.
Current implementation was not renovating expiration times for cache
entries so all were eventually removed if updates for them are not received.
Replace it with the cache implementation available in libbeat, but keeping
the existing interface.

Also, use slashes instead of dashes to generate unique container uids.
Dashes can be used by kubernetes names, what could lead to ambiguous
keys for the caches.

Fix #10658

(cherry picked from commit 106df3d)
  • Loading branch information
jsoriano committed Mar 5, 2019
1 parent f70a408 commit 73e65c9
Show file tree
Hide file tree
Showing 3 changed files with 46 additions and 87 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.next.asciidoc
Expand Up @@ -62,6 +62,7 @@ https://github.com/elastic/beats/compare/v7.0.0-beta1...master[Check the HEAD di
*Metricbeat*

- Migrate docker autodiscover to ECS. {issue}10757[10757] {pull}10862[10862]
- Fix issue in kubernetes module preventing usage percentages to be properly calculated. {pull}10946[10946]

*Packetbeat*

Expand Down
95 changes: 44 additions & 51 deletions metricbeat/module/kubernetes/util/metrics_cache.go
Expand Up @@ -18,17 +18,19 @@
package util

import (
"sync"
"time"

"github.com/elastic/beats/libbeat/common"
)

// PerfMetrics stores known metrics from Kubernetes nodes and containers
var PerfMetrics = NewPerfMetricsCache()

const defaultTimeout = 120 * time.Second
func init() {
PerfMetrics.Start()
}

var now = time.Now
var sleep = time.Sleep
const defaultTimeout = 120 * time.Second

// NewPerfMetricsCache initializes and returns a new PerfMetricsCache
func NewPerfMetricsCache() *PerfMetricsCache {
Expand All @@ -43,80 +45,71 @@ func NewPerfMetricsCache() *PerfMetricsCache {

// PerfMetricsCache stores known metrics from Kubernetes nodes and containers
type PerfMetricsCache struct {
mutex sync.RWMutex
NodeMemAllocatable *valueMap
NodeCoresAllocatable *valueMap

ContainerMemLimit *valueMap
ContainerCoresLimit *valueMap
}

func newValueMap(timeout time.Duration) *valueMap {
return &valueMap{
values: map[string]value{},
timeout: timeout,
}
// Start cache workers
func (c *PerfMetricsCache) Start() {
c.NodeMemAllocatable.Start()
c.NodeCoresAllocatable.Start()
c.ContainerMemLimit.Start()
c.ContainerCoresLimit.Start()
}

type valueMap struct {
sync.RWMutex
running bool
timeout time.Duration
values map[string]value
// Stop cache workers
func (c *PerfMetricsCache) Stop() {
c.NodeMemAllocatable.Stop()
c.NodeCoresAllocatable.Stop()
c.ContainerMemLimit.Stop()
c.ContainerCoresLimit.Stop()
}

type value struct {
value float64
expires int64
type valueMap struct {
cache *common.Cache
timeout time.Duration
}

// ContainerUID creates an unique ID for from namespace, pod name and container name
func ContainerUID(namespace, pod, container string) string {
return namespace + "-" + pod + "-" + container
func newValueMap(timeout time.Duration) *valueMap {
return &valueMap{
cache: common.NewCache(timeout, 0),
timeout: timeout,
}
}

// Get value
func (m *valueMap) Get(name string) float64 {
m.RLock()
defer m.RUnlock()
return m.values[name].value
return m.GetWithDefault(name, 0.0)
}

// Get value
func (m *valueMap) GetWithDefault(name string, def float64) float64 {
m.RLock()
defer m.RUnlock()
val, ok := m.values[name]
if ok {
return val.value
v := m.cache.Get(name)
if v, ok := v.(float64); ok {
return v
}
return def
}

// Set value
func (m *valueMap) Set(name string, val float64) {
m.Lock()
defer m.Unlock()
m.ensureCleanupWorker()
m.values[name] = value{val, now().Add(m.timeout).Unix()}
m.cache.PutWithTimeout(name, val, m.timeout)
}

func (m *valueMap) ensureCleanupWorker() {
if !m.running {
// Run worker to cleanup expired entries
m.running = true
go func() {
for {
sleep(m.timeout)
m.Lock()
now := now().Unix()
for name, val := range m.values {
if now > val.expires {
delete(m.values, name)
}
}
m.Unlock()
}
}()
}
// Start cache workers
func (m *valueMap) Start() {
m.cache.StartJanitor(m.timeout)
}

// Stop cache workers
func (m *valueMap) Stop() {
m.cache.StopJanitor()
}

// ContainerUID creates an unique ID for from namespace, pod name and container name
func ContainerUID(namespace, pod, container string) string {
return namespace + "/" + pod + "/" + container
}
37 changes: 1 addition & 36 deletions metricbeat/module/kubernetes/util/metrics_cache_test.go
Expand Up @@ -19,45 +19,10 @@ package util

import (
"testing"
"time"

"github.com/stretchr/testify/assert"
)

func TestTimeout(t *testing.T) {
// Mock monotonic time:
fakeTimeCh := make(chan int64)
go func() {
fakeTime := time.Now().Unix()
for {
fakeTime++
fakeTimeCh <- fakeTime
}
}()

now = func() time.Time {
return time.Unix(<-fakeTimeCh, 0)
}

// Blocking sleep:
sleepCh := make(chan struct{})
sleep = func(time.Duration) {
<-sleepCh
}

test := newValueMap(1 * time.Second)

test.Set("foo", 3.14)

// Let cleanup do its job
sleepCh <- struct{}{}
sleepCh <- struct{}{}
sleepCh <- struct{}{}

// Check it expired
assert.Equal(t, 0.0, test.Get("foo"))
}

func TestValueMap(t *testing.T) {
test := newValueMap(defaultTimeout)

Expand All @@ -82,5 +47,5 @@ func TestGetWithDefault(t *testing.T) {
}

func TestContainerUID(t *testing.T) {
assert.Equal(t, "a-b-c", ContainerUID("a", "b", "c"))
assert.Equal(t, "a/b/c", ContainerUID("a", "b", "c"))
}

0 comments on commit 73e65c9

Please sign in to comment.