New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
vm-agent 1.94.0 fails to discover targets #5216
Comments
Hey @mindw.
If i replace |
@Amper yes, there is an unfortunate typo there. |
@mindw, sorry, I can't fully understand the problem. Could you elaborate a bit more?
Could you provide more info about that? |
@mindw , could you also share logs generated by It would be great also to compare the output of |
Update: it looks like Probably, you mean diff --git a/lib/promscrape/discovery/kubernetes/api_watcher.go b/lib/promscrape/discovery/kubernetes/api_watcher.go
index 78bb7ddd7..b677bd241 100644
--- a/lib/promscrape/discovery/kubernetes/api_watcher.go
+++ b/lib/promscrape/discovery/kubernetes/api_watcher.go
@@ -1,6 +1,7 @@
package kubernetes
import (
+ "context"
"encoding/json"
"errors"
"flag"
@@ -16,11 +17,13 @@ import (
"sync/atomic"
"time"
+ "github.com/VictoriaMetrics/metrics"
+
"github.com/VictoriaMetrics/VictoriaMetrics/lib/cgroup"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/promauth"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/promutils"
- "github.com/VictoriaMetrics/metrics"
+ "github.com/VictoriaMetrics/VictoriaMetrics/lib/timerpool"
)
var apiServerTimeout = flag.Duration("promscrape.kubernetes.apiServerTimeout", 30*time.Minute, "How frequently to reload the full state from Kubernetes API server")
@@ -269,7 +272,11 @@ func selectorsKey(selectors []Selector) string {
var (
groupWatchersLock sync.Mutex
- groupWatchers = make(map[string]*groupWatcher)
+ groupWatchers = func() map[string]*groupWatcher {
+ gws := make(map[string]*groupWatcher)
+ go groupWatchersCleaner(gws)
+ return gws
+ }()
_ = metrics.NewGauge(`vm_promscrape_discovery_kubernetes_group_watchers`, func() float64 {
groupWatchersLock.Lock()
@@ -279,6 +286,21 @@ var (
})
)
+func groupWatchersCleaner(gws map[string]*groupWatcher) {
+ for {
+ time.Sleep(7 * time.Second)
+ groupWatchersLock.Lock()
+ for key, gw := range gws {
+ gw.mu.Lock()
+ if len(gw.m) == 0 {
+ delete(gws, key)
+ }
+ gw.mu.Unlock()
+ }
+ groupWatchersLock.Unlock()
+ }
+}
+
type swosByKeyWithLock struct {
mu sync.Mutex
swosByKey map[string][]interface{}
@@ -378,31 +400,14 @@ func (gw *groupWatcher) startWatchersForRole(role string, aw *apiWatcher) {
// This should guarantee that the ScrapeWork objects for these objects are properly updated
// as soon as the objects they depend on are updated.
// This should fix https://github.com/VictoriaMetrics/VictoriaMetrics/issues/1240 .
- go func() {
- const minSleepTime = 5 * time.Second
- sleepTime := minSleepTime
- for {
- time.Sleep(sleepTime)
- startTime := time.Now()
- gw.mu.Lock()
- if uw.needRecreateScrapeWorks {
- uw.needRecreateScrapeWorks = false
- uw.recreateScrapeWorksLocked(uw.objectsByKey, uw.aws)
- sleepTime = time.Since(startTime)
- if sleepTime < minSleepTime {
- sleepTime = minSleepTime
- }
- }
- gw.mu.Unlock()
- }
- }()
+ go uw.recreateScrapeWorks()
}
}
}
}
// doRequest performs http request to the given requestURL.
-func (gw *groupWatcher) doRequest(requestURL string) (*http.Response, error) {
+func (gw *groupWatcher) doRequest(ctx context.Context, requestURL string) (*http.Response, error) {
if strings.Contains(requestURL, "/apis/networking.k8s.io/v1/") && atomic.LoadUint32(&gw.useNetworkingV1Beta1) == 1 {
// Update networking URL for old Kubernetes API, which supports only v1beta1 path.
requestURL = strings.Replace(requestURL, "/apis/networking.k8s.io/v1/", "/apis/networking.k8s.io/v1beta1/", 1)
@@ -411,7 +416,7 @@ func (gw *groupWatcher) doRequest(requestURL string) (*http.Response, error) {
// Update discovery URL for old Kubernetes API, which supports only v1beta1 path.
requestURL = strings.Replace(requestURL, "/apis/discovery.k8s.io/v1/", "/apis/discovery.k8s.io/v1beta1/", 1)
}
- req, err := http.NewRequest(http.MethodGet, requestURL, nil)
+ req, err := http.NewRequestWithContext(ctx, http.MethodGet, requestURL, nil)
if err != nil {
logger.Fatalf("cannot create a request for %q: %s", requestURL, err)
}
@@ -423,11 +428,11 @@ func (gw *groupWatcher) doRequest(requestURL string) (*http.Response, error) {
if resp.StatusCode == http.StatusNotFound {
if strings.Contains(requestURL, "/apis/networking.k8s.io/v1/") && atomic.LoadUint32(&gw.useNetworkingV1Beta1) == 0 {
atomic.StoreUint32(&gw.useNetworkingV1Beta1, 1)
- return gw.doRequest(requestURL)
+ return gw.doRequest(ctx, requestURL)
}
if strings.Contains(requestURL, "/apis/discovery.k8s.io/v1/") && atomic.LoadUint32(&gw.useDiscoveryV1Beta1) == 0 {
atomic.StoreUint32(&gw.useDiscoveryV1Beta1, 1)
- return gw.doRequest(requestURL)
+ return gw.doRequest(ctx, requestURL)
}
}
return resp, nil
@@ -446,6 +451,9 @@ func (gw *groupWatcher) unsubscribeAPIWatcher(aw *apiWatcher) {
defer gw.mu.Unlock()
for _, uw := range gw.m {
uw.unsubscribeAPIWatcherLocked(aw)
+ if len(uw.aws)+len(uw.awsPending) == 0 {
+ time.AfterFunc(10*time.Second, uw.stopIfNoUsers)
+ }
}
}
@@ -458,6 +466,9 @@ type urlWatcher struct {
apiURL string
gw *groupWatcher
+ ctx context.Context
+ cancel context.CancelFunc
+
parseObject parseObjectFunc
parseObjectList parseObjectListFunc
@@ -488,11 +499,16 @@ type urlWatcher struct {
func newURLWatcher(role, apiURL string, gw *groupWatcher) *urlWatcher {
parseObject, parseObjectList := getObjectParsersForRole(role)
metrics.GetOrCreateCounter(fmt.Sprintf(`vm_promscrape_discovery_kubernetes_url_watchers{role=%q}`, role)).Inc()
+
+ ctx, cancel := context.WithCancel(context.Background())
uw := &urlWatcher{
role: role,
apiURL: apiURL,
gw: gw,
+ ctx: ctx,
+ cancel: cancel,
+
parseObject: parseObject,
parseObjectList: parseObjectList,
@@ -510,6 +526,44 @@ func newURLWatcher(role, apiURL string, gw *groupWatcher) *urlWatcher {
return uw
}
+func (uw *urlWatcher) stopIfNoUsers() {
+ gw := uw.gw
+ gw.mu.Lock()
+ if len(uw.aws)+len(uw.awsPending) == 0 {
+ uw.cancel()
+ delete(gw.m, uw.apiURL)
+ }
+ gw.mu.Unlock()
+}
+
+func (uw *urlWatcher) recreateScrapeWorks() {
+ const minSleepTime = 5 * time.Second
+ sleepTime := minSleepTime
+ gw := uw.gw
+ stopCh := uw.ctx.Done()
+ for {
+ t := timerpool.Get(sleepTime)
+ select {
+ case <-stopCh:
+ timerpool.Put(t)
+ return
+ case <-t.C:
+ timerpool.Put(t)
+ }
+ startTime := time.Now()
+ gw.mu.Lock()
+ if uw.needRecreateScrapeWorks {
+ uw.needRecreateScrapeWorks = false
+ uw.recreateScrapeWorksLocked(uw.objectsByKey, uw.aws)
+ sleepTime = time.Since(startTime)
+ if sleepTime < minSleepTime {
+ sleepTime = minSleepTime
+ }
+ }
+ gw.mu.Unlock()
+ }
+}
+
func (uw *urlWatcher) subscribeAPIWatcherLocked(aw *apiWatcher) {
if _, ok := uw.aws[aw]; !ok {
if _, ok := uw.awsPending[aw]; !ok {
@@ -587,9 +641,11 @@ func (uw *urlWatcher) reloadObjects() string {
// and https://github.com/VictoriaMetrics/VictoriaMetrics/issues/4855 .
delimiter := getQueryArgsDelimiter(apiURL)
requestURL := apiURL + delimiter + "resourceVersion=0&resourceVersionMatch=NotOlderThan"
- resp, err := uw.gw.doRequest(requestURL)
+ resp, err := uw.gw.doRequest(uw.ctx, requestURL)
if err != nil {
- logger.Errorf("cannot perform request to %q: %s", requestURL, err)
+ if !errors.Is(err, context.Canceled) {
+ logger.Errorf("cannot perform request to %q: %s", requestURL, err)
+ }
return ""
}
if resp.StatusCode != http.StatusOK {
@@ -653,10 +709,18 @@ func (uw *urlWatcher) reloadObjects() string {
//
// See https://kubernetes.io/docs/reference/using-api/api-concepts/#efficient-detection-of-changes
func (uw *urlWatcher) watchForUpdates() {
+ stopCh := uw.ctx.Done()
backoffDelay := time.Second
maxBackoffDelay := 30 * time.Second
backoffSleep := func() {
- time.Sleep(backoffDelay)
+ t := timerpool.Get(backoffDelay)
+ select {
+ case <-stopCh:
+ timerpool.Put(t)
+ return
+ case <-t.C:
+ timerpool.Put(t)
+ }
backoffDelay *= 2
if backoffDelay > maxBackoffDelay {
backoffDelay = maxBackoffDelay
@@ -667,16 +731,26 @@ func (uw *urlWatcher) watchForUpdates() {
timeoutSeconds := time.Duration(0.9 * float64(uw.gw.client.Timeout)).Seconds()
apiURL += delimiter + "watch=1&allowWatchBookmarks=true&timeoutSeconds=" + strconv.Itoa(int(timeoutSeconds))
for {
+ select {
+ case <-stopCh:
+ metrics.GetOrCreateCounter(fmt.Sprintf(`vm_promscrape_discovery_kubernetes_url_watchers{role=%q}`, uw.role)).Dec()
+ logger.Infof("stopped %s watcher for %q", uw.role, uw.apiURL)
+ return
+ default:
+ }
+
resourceVersion := uw.reloadObjects()
if resourceVersion == "" {
backoffSleep()
continue
}
requestURL := apiURL + "&resourceVersion=" + url.QueryEscape(resourceVersion)
- resp, err := uw.gw.doRequest(requestURL)
+ resp, err := uw.gw.doRequest(uw.ctx, requestURL)
if err != nil {
- logger.Errorf("cannot perform request to %q: %s", requestURL, err)
- backoffSleep()
+ if !errors.Is(err, context.Canceled) {
+ logger.Errorf("cannot perform request to %q: %s", requestURL, err)
+ backoffSleep()
+ }
continue
}
if resp.StatusCode != http.StatusOK {
@@ -697,7 +771,7 @@ func (uw *urlWatcher) watchForUpdates() {
err = uw.readObjectUpdateStream(resp.Body)
_ = resp.Body.Close()
if err != nil {
- if !errors.Is(err, io.EOF) {
+ if !(errors.Is(err, io.EOF) || errors.Is(err, context.Canceled)) {
logger.Errorf("error when reading WatchEvent stream from %q: %s", requestURL, err)
uw.resourceVersion = ""
} Note that changes related to #4855 are already included in |
… belong to a particular groupWatcher, at once Previously url watchers for pod, service and node objects could be mistakenly closed when service discovery was set up only for endpoints and endpointslice roles, since watchers for these roles may start start pod, service and node url watchers with nil apiWatcher passed to groupWatcher.startWatchersForRole(). Now all the url watchers, which belong to a particular groupWatcher, are stopped at once when this groupWatcher has no apiWatcher subscribers. Updates #5216 The issue has been introduced in v1.93.5 when addressing #4850
@mindw , the commit 632d788 should fix the issue. Could you build You can also use Docker tag |
… belong to a particular groupWatcher, at once Previously url watchers for pod, service and node objects could be mistakenly closed when service discovery was set up only for endpoints and endpointslice roles, since watchers for these roles may start start pod, service and node url watchers with nil apiWatcher passed to groupWatcher.startWatchersForRole(). Now all the url watchers, which belong to a particular groupWatcher, are stopped at once when this groupWatcher has no apiWatcher subscribers. Updates #5216 The issue has been introduced in v1.93.5 when addressing #4850
… belong to a particular groupWatcher, at once Previously url watchers for pod, service and node objects could be mistakenly closed when service discovery was set up only for endpoints and endpointslice roles, since watchers for these roles may start start pod, service and node url watchers with nil apiWatcher passed to groupWatcher.startWatchersForRole(). Now all the url watchers, which belong to a particular groupWatcher, are stopped at once when this groupWatcher has no apiWatcher subscribers. Updates #5216 The issue has been introduced in v1.93.5 when addressing #4850
… belong to a particular groupWatcher, at once Previously url watchers for pod, service and node objects could be mistakenly closed when service discovery was set up only for endpoints and endpointslice roles, since watchers for these roles may start start pod, service and node url watchers with nil apiWatcher passed to groupWatcher.startWatchersForRole(). Now all the url watchers, which belong to a particular groupWatcher, are stopped at once when this groupWatcher has no apiWatcher subscribers. Updates #5216 The issue has been introduced in v1.93.5 when addressing #4850
FYI, Closing the issue as fixed. @mindw , feel free re-opening the issue if you still see inconsistent behavior in |
… belong to a particular groupWatcher, at once Previously url watchers for pod, service and node objects could be mistakenly closed when service discovery was set up only for endpoints and endpointslice roles, since watchers for these roles may start start pod, service and node url watchers with nil apiWatcher passed to groupWatcher.startWatchersForRole(). Now all the url watchers, which belong to a particular groupWatcher, are stopped at once when this groupWatcher has no apiWatcher subscribers. Updates VictoriaMetrics#5216 The issue has been introduced in v1.93.5 when addressing VictoriaMetrics#4850
@valyala my apologies for not getting back sooner! Was kept away by other matters. I've deployed 1.95.1 and so far so good. All targets are detected as expected. |
Describe the bug
Valid targets previous discovered by 1.93.4 aren't discovered by 1.94.0.
Also,
kubernetes_sd_configs
role = endpoint fails to add service labels (very likely to be related).To Reproduce
The target should have a history of not being discovered. For example, it was missing annotations or some labels.
Deploy kube-state-metrics chart into metrics namespace
add a static discovery rule
The kube-state-metrics target should list 0 discovered targets.
add a custom label:
Discovered targets should still be 0
Downgrade to 1.93.6.
Discovered targets should still be 1
Version
docker.io/victoriametrics/victoria-metrics:v1.94.0
Logs
didn't see anything relevant.
Screenshots
1.94.0
1.93.6
Used command-line flags
Additional information
Possible cause of trouble - 00685b6
and
#4855
The text was updated successfully, but these errors were encountered: