Skip to content

Commit

Permalink
fix label acquired with cadvisor (#501)
Browse files Browse the repository at this point in the history
  • Loading branch information
kongfei605 committed May 11, 2023
1 parent ef03677 commit 600a81c
Show file tree
Hide file tree
Showing 2 changed files with 43 additions and 20 deletions.
6 changes: 6 additions & 0 deletions inputs/cadvisor/cadvisor.go
Expand Up @@ -35,3 +35,9 @@ func (c *Cadvisor) GetInstances() []inputs.Instance {
}
return ret
}

func (c *Cadvisor) Drop() {
for i := 0; i < len(c.Instances); i++ {
c.Instances[i].Drop()
}
}
57 changes: 37 additions & 20 deletions inputs/cadvisor/instances.go
Expand Up @@ -2,6 +2,7 @@ package cadvisor

import (
"encoding/json"
"fmt"
"io"
"io/ioutil"
"log"
Expand Down Expand Up @@ -61,7 +62,7 @@ type (
tls.ClientConfig
client *http.Client

cache.BasicCache
*cache.BasicCache
stop chan struct{}
}
)
Expand All @@ -78,28 +79,27 @@ func (ins *Instance) Init() error {
if ins.Empty() {
return types.ErrInstancesEmpty
}
ins.Type = ContainerType

ins.URL = strings.Replace(ins.URL, "$hostname", config.Config.GetHostname(), -1)
ins.URL = strings.Replace(ins.URL, "$ip", config.Config.Global.IP, -1)
ins.URL = os.Expand(ins.URL, config.GetEnv)

u, err := url.Parse(ins.URL)
if err != nil {
log.Println("E! failed to scrape url:", ins.URL, "error:", err)
return fmt.Errorf("failed to parse scrape url: %s, error: %s", ins.URL, err)
}
ins.u = u
if ins.u.Path == "" {
ins.u.Path = cadvisorPath
ins.Type = NodeType
}

ins.stop = make(chan struct{})
ins.BasicCache = cache.NewBasicCache()
go ins.cache()

if ins.Timeout <= 0 {
ins.Timeout = config.Duration(time.Second * 3)
}

client, err := ins.createHTTPClient()
if err != nil {
return err
Expand Down Expand Up @@ -158,31 +158,35 @@ func (ins *Instance) cache() {
log.Println("E! failed to request for url:", podUrl.String(), "error:", err)
continue
}
resp.Body.Close()
body, err := ioutil.ReadAll(resp.Body)
if err != nil {
log.Println("E! failed to read body for url:", podUrl.String(), "error:", err)
continue
}
resp.Body.Close()
pods := kubernetes.PodList{}
err = json.Unmarshal(body, &pods)
if err != nil {
log.Println("E! unmarshal pods info", err)
continue
}
for _, pod := range pods.Items {
ins.BasicCache.Add(pod.Metadata.Namespace+"||"+pod.Metadata.Name, &pod)
ins.BasicCache.Add(cacheKey(pod.Metadata.Namespace, pod.Metadata.Name), pod)
}
timer.Reset(10 * time.Minute)
case <-ins.Done():
timer.Reset(1 * time.Minute)
case <-ins.stop:
return
}
}
}

func (ins *Instance) Done() chan struct{} {
func cacheKey(ns, pod string) string {
return ns + "||" + pod
}

func (ins *Instance) Drop() {
log.Println("I! cadvisor instance stop")
return ins.stop
close(ins.stop)
}

func (ins *Instance) createHTTPClient() (*http.Client, error) {
Expand Down Expand Up @@ -295,7 +299,10 @@ func (ins *Instance) ignoreLabel(label string) bool {
}

func (ins *Instance) makeLabels(m *dto.Metric, defaultLabels map[string]string) map[string]string {
result := map[string]string{}
var (
podName, namespace string
result = map[string]string{}
)

for _, label := range m.Label {
if ins.ignoreLabel(label.GetName()) {
Expand All @@ -304,10 +311,21 @@ func (ins *Instance) makeLabels(m *dto.Metric, defaultLabels map[string]string)
result[label.GetName()] = label.GetValue()

if ins.Type == NodeType {
podName := label.GetName()
if val, ok := ins.BasicCache.Get(podName); ok {
if pod, ok := val.(*kubernetes.Pod); ok {
if label.GetName() != "pod" && label.GetName() != "namespace" {
continue
}
if label.GetName() == "pod" {
podName = label.GetValue()
}
if label.GetName() == "namespace" {
namespace = label.GetValue()
}
if strings.TrimSpace(podName) == "" || strings.TrimSpace(namespace) == "" {
continue
}

if val, ok := ins.BasicCache.Get(cacheKey(namespace, podName)); ok {
if pod, ok := val.(*kubernetes.Pod); ok {
for k, v := range pod.Metadata.Labels {
if ins.ignoreLabel(k) {
continue
Expand All @@ -322,8 +340,11 @@ func (ins *Instance) makeLabels(m *dto.Metric, defaultLabels map[string]string)
result[k] = v
}
}
} else {
if config.Config.DebugMode {
log.Println(cacheKey(namespace, podName), "not in cache")
}
}

}
}

Expand Down Expand Up @@ -359,7 +380,3 @@ func (ins *Instance) setHeaders(req *http.Request) {
req.Header.Set(ins.Headers[i], ins.Headers[i+1])
}
}

func (ins *Instance) Drop() {
close(ins.stop)
}

0 comments on commit 600a81c

Please sign in to comment.