Skip to content

Commit

Permalink
lib/promscrape/discovery/kubernetes: stop all the url watchers, which…
Browse files Browse the repository at this point in the history
… belong to a particular groupWatcher, at once

Previously url watchers for pod, service and node objects could be mistakenly closed
when service discovery was set up only for endpoints and endpointslice roles,
since watchers for these roles may start start pod, service and node url watchers
with nil apiWatcher passed to groupWatcher.startWatchersForRole().

Now all the url watchers, which belong to a particular groupWatcher, are stopped at once
when this groupWatcher has no apiWatcher subscribers.

Updates #5216

The issue has been introduced in v1.93.5 when addressing #4850
  • Loading branch information
valyala committed Oct 27, 2023
1 parent 163bcc9 commit 682d9da
Show file tree
Hide file tree
Showing 2 changed files with 56 additions and 39 deletions.
1 change: 1 addition & 0 deletions docs/CHANGELOG.md
Expand Up @@ -11,6 +11,7 @@ The following `tip` changes can be tested by building VictoriaMetrics components

## v1.93.x long-time support release (LTS)

* BUGFIX: [vmagent](https://docs.victoriametrics.com/vmagent.html): properly discover Kubernetes targets via [kubernetes_sd_configs](https://docs.victoriametrics.com/sd_configs.html#kubernetes_sd_configs). Previously some targets and some labels could be skipped during service discovery because of the bug introduced in [v1.93.5](https://github.com/VictoriaMetrics/VictoriaMetrics/releases/tag/v1.93.5) when implementing [this feature](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/4850). See [tis issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/5216) for more details.
* BUGFIX: [vmagent](https://docs.victoriametrics.com/vmagent.html): properly parse `ca`, `cert` and `key` options at `tls_config` section inside [http client settings](https://docs.victoriametrics.com/sd_configs.html#http-api-client-options). Previously string values couldn't be parsed for these options, since the parser was mistakenly expecting a list of `uint8` values instead.
* BUGFIX: [vmagent](https://docs.victoriametrics.com/vmagent.html): properly drop samples if `-streamAggr.dropInput` command-line flag is set and `-remoteWrite.streamAggr.config` contains an empty file. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/5207).
* BUGFIX: [vmagent](https://docs.victoriametrics.com/vmagent.html): do not print redundant error logs when failed to scrape consul or nomad target. See [this pull request](https://github.com/VictoriaMetrics/VictoriaMetrics/pull/5239).
Expand Down
94 changes: 55 additions & 39 deletions lib/promscrape/discovery/kubernetes/api_watcher.go
Expand Up @@ -216,6 +216,16 @@ type groupWatcher struct {

mu sync.Mutex
m map[string]*urlWatcher

// cancel is used for stopping all the urlWatcher instances inside m,
// which must check for ctx.Done() when performing their background watch work.
ctx context.Context
cancel context.CancelFunc

// noAPIWatchers is set to true when there are no API watchers for the given groupWatcher.
// This field is used for determining when it is safe to stop all the urlWatcher instances
// for the given groupWatcher.
noAPIWatchers bool
}

func newGroupWatcher(apiServer string, ac *promauth.Config, namespaces []string, selectors []Selector, attachNodeMetadata bool, proxyURL *url.URL) *groupWatcher {
Expand All @@ -233,15 +243,21 @@ func newGroupWatcher(apiServer string, ac *promauth.Config, namespaces []string,
},
Timeout: *apiServerTimeout,
}
ctx, cancel := context.WithCancel(context.Background())
return &groupWatcher{
apiServer: apiServer,
namespaces: namespaces,
selectors: selectors,
attachNodeMetadata: attachNodeMetadata,

setHeaders: func(req *http.Request) { ac.SetHeaders(req, true) },
client: client,
m: make(map[string]*urlWatcher),
setHeaders: func(req *http.Request) {
ac.SetHeaders(req, true)
},
client: client,
m: make(map[string]*urlWatcher),

ctx: ctx,
cancel: cancel,
}
}

Expand Down Expand Up @@ -292,8 +308,25 @@ func groupWatchersCleaner(gws map[string]*groupWatcher) {
groupWatchersLock.Lock()
for key, gw := range gws {
gw.mu.Lock()
if len(gw.m) == 0 {
delete(gws, key)
// Calculate the number of apiWatcher instances subscribed to gw.
awsTotal := 0
for _, uw := range gw.m {
awsTotal += len(uw.aws) + len(uw.awsPending)
}

if awsTotal == 0 {
// There are no API watchers subscribed to gw.
// Stop all the urlWatcher instances at gw and drop gw from gws in this case,
// but do it only on the second iteration in order to reduce urlWatcher churn
// during scrape config reloads.
if gw.noAPIWatchers {
gw.cancel()
delete(gws, key)
} else {
gw.noAPIWatchers = true
}
} else {
gw.noAPIWatchers = false
}
gw.mu.Unlock()
}
Expand Down Expand Up @@ -440,21 +473,18 @@ func (gw *groupWatcher) doRequest(ctx context.Context, requestURL string) (*http

func (gw *groupWatcher) registerPendingAPIWatchers() {
gw.mu.Lock()
defer gw.mu.Unlock()
for _, uw := range gw.m {
uw.registerPendingAPIWatchersLocked()
}
gw.mu.Unlock()
}

func (gw *groupWatcher) unsubscribeAPIWatcher(aw *apiWatcher) {
gw.mu.Lock()
defer gw.mu.Unlock()
for _, uw := range gw.m {
uw.unsubscribeAPIWatcherLocked(aw)
if len(uw.aws)+len(uw.awsPending) == 0 {
time.AfterFunc(10*time.Second, uw.stopIfNoUsers)
}
}
gw.mu.Unlock()
}

// urlWatcher watches for an apiURL and updates object states in objectsByKey.
Expand All @@ -466,9 +496,6 @@ type urlWatcher struct {
apiURL string
gw *groupWatcher

ctx context.Context
cancel context.CancelFunc

parseObject parseObjectFunc
parseObjectList parseObjectListFunc

Expand Down Expand Up @@ -500,15 +527,11 @@ func newURLWatcher(role, apiURL string, gw *groupWatcher) *urlWatcher {
parseObject, parseObjectList := getObjectParsersForRole(role)
metrics.GetOrCreateCounter(fmt.Sprintf(`vm_promscrape_discovery_kubernetes_url_watchers{role=%q}`, role)).Inc()

ctx, cancel := context.WithCancel(context.Background())
uw := &urlWatcher{
role: role,
apiURL: apiURL,
gw: gw,

ctx: ctx,
cancel: cancel,

parseObject: parseObject,
parseObjectList: parseObjectList,

Expand All @@ -526,21 +549,11 @@ func newURLWatcher(role, apiURL string, gw *groupWatcher) *urlWatcher {
return uw
}

func (uw *urlWatcher) stopIfNoUsers() {
gw := uw.gw
gw.mu.Lock()
if len(uw.aws)+len(uw.awsPending) == 0 {
uw.cancel()
delete(gw.m, uw.apiURL)
}
gw.mu.Unlock()
}

func (uw *urlWatcher) recreateScrapeWorks() {
const minSleepTime = 5 * time.Second
sleepTime := minSleepTime
gw := uw.gw
stopCh := uw.ctx.Done()
stopCh := gw.ctx.Done()
for {
t := timerpool.Get(sleepTime)
select {
Expand Down Expand Up @@ -633,6 +646,7 @@ func (uw *urlWatcher) reloadObjects() string {
return uw.resourceVersion
}

gw := uw.gw
startTime := time.Now()
apiURL := uw.apiURL

Expand All @@ -641,7 +655,7 @@ func (uw *urlWatcher) reloadObjects() string {
// and https://github.com/VictoriaMetrics/VictoriaMetrics/issues/4855 .
delimiter := getQueryArgsDelimiter(apiURL)
requestURL := apiURL + delimiter + "resourceVersion=0&resourceVersionMatch=NotOlderThan"
resp, err := uw.gw.doRequest(uw.ctx, requestURL)
resp, err := gw.doRequest(gw.ctx, requestURL)
if err != nil {
if !errors.Is(err, context.Canceled) {
logger.Errorf("cannot perform request to %q: %s", requestURL, err)
Expand All @@ -661,7 +675,7 @@ func (uw *urlWatcher) reloadObjects() string {
return ""
}

uw.gw.mu.Lock()
gw.mu.Lock()
objectsAdded := make(map[string]object)
objectsUpdated := make(map[string]object)
var objectsRemoved []string
Expand Down Expand Up @@ -692,7 +706,7 @@ func (uw *urlWatcher) reloadObjects() string {
if len(objectsRemoved) > 0 || len(objectsUpdated) > 0 || len(objectsAdded) > 0 {
uw.maybeUpdateDependedScrapeWorksLocked()
}
uw.gw.mu.Unlock()
gw.mu.Unlock()

uw.objectsUpdated.Add(len(objectsUpdated))
uw.objectsRemoved.Add(len(objectsRemoved))
Expand All @@ -709,7 +723,8 @@ func (uw *urlWatcher) reloadObjects() string {
//
// See https://kubernetes.io/docs/reference/using-api/api-concepts/#efficient-detection-of-changes
func (uw *urlWatcher) watchForUpdates() {
stopCh := uw.ctx.Done()
gw := uw.gw
stopCh := gw.ctx.Done()
backoffDelay := time.Second
maxBackoffDelay := 30 * time.Second
backoffSleep := func() {
Expand All @@ -728,7 +743,7 @@ func (uw *urlWatcher) watchForUpdates() {
}
apiURL := uw.apiURL
delimiter := getQueryArgsDelimiter(apiURL)
timeoutSeconds := time.Duration(0.9 * float64(uw.gw.client.Timeout)).Seconds()
timeoutSeconds := time.Duration(0.9 * float64(gw.client.Timeout)).Seconds()
apiURL += delimiter + "watch=1&allowWatchBookmarks=true&timeoutSeconds=" + strconv.Itoa(int(timeoutSeconds))
for {
select {
Expand All @@ -745,7 +760,7 @@ func (uw *urlWatcher) watchForUpdates() {
continue
}
requestURL := apiURL + "&resourceVersion=" + url.QueryEscape(resourceVersion)
resp, err := uw.gw.doRequest(uw.ctx, requestURL)
resp, err := gw.doRequest(gw.ctx, requestURL)
if err != nil {
if !errors.Is(err, context.Canceled) {
logger.Errorf("cannot perform request to %q: %s", requestURL, err)
Expand All @@ -771,7 +786,7 @@ func (uw *urlWatcher) watchForUpdates() {
err = uw.readObjectUpdateStream(resp.Body)
_ = resp.Body.Close()
if err != nil {
if !(errors.Is(err, io.EOF) || errors.Is(err, context.Canceled)) {
if !errors.Is(err, io.EOF) && !errors.Is(err, context.Canceled) {
logger.Errorf("error when reading WatchEvent stream from %q: %s", requestURL, err)
uw.resourceVersion = ""
}
Expand All @@ -783,6 +798,7 @@ func (uw *urlWatcher) watchForUpdates() {

// readObjectUpdateStream reads Kubernetes watch events from r and updates locally cached objects according to the received events.
func (uw *urlWatcher) readObjectUpdateStream(r io.Reader) error {
gw := uw.gw
d := json.NewDecoder(r)
var we WatchEvent
for {
Expand All @@ -796,18 +812,18 @@ func (uw *urlWatcher) readObjectUpdateStream(r io.Reader) error {
return fmt.Errorf("cannot parse %s object: %w", we.Type, err)
}
key := o.key()
uw.gw.mu.Lock()
gw.mu.Lock()
uw.updateObjectLocked(key, o)
uw.gw.mu.Unlock()
gw.mu.Unlock()
case "DELETED":
o, err := uw.parseObject(we.Object)
if err != nil {
return fmt.Errorf("cannot parse %s object: %w", we.Type, err)
}
key := o.key()
uw.gw.mu.Lock()
gw.mu.Lock()
uw.removeObjectLocked(key)
uw.gw.mu.Unlock()
gw.mu.Unlock()
case "BOOKMARK":
// See https://kubernetes.io/docs/reference/using-api/api-concepts/#watch-bookmarks
bm, err := parseBookmark(we.Object)
Expand Down

0 comments on commit 682d9da

Please sign in to comment.