Skip to content

Commit

Permalink
Merge pull request #34 from grafana/thampiotr/expose-disabling-stalen…
Browse files Browse the repository at this point in the history
…ess-markers

scrape: expose disabling staleness markers
  • Loading branch information
thampiotr committed May 13, 2024
2 parents 51b39f2 + 53f3689 commit 793c8c9
Show file tree
Hide file tree
Showing 4 changed files with 144 additions and 3 deletions.
11 changes: 11 additions & 0 deletions scrape/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -379,3 +379,14 @@ func (m *Manager) TargetsDroppedCounts() map[string]int {
}
return counts
}

// DisableEndOfRunStalenessMarkers disables the end-of-run staleness markers for the provided targets in the given
// targetSet. When the end-of-run staleness is disabled for a target, when it goes away, there will be no staleness
// markers written for its series.
func (m *Manager) DisableEndOfRunStalenessMarkers(targetSet string, targets []*Target) {
m.mtxScrape.Lock()
defer m.mtxScrape.Unlock()
if pool, ok := m.scrapePools[targetSet]; ok {
pool.disableEndOfRunStalenessMarkers(targets)
}
}
49 changes: 49 additions & 0 deletions scrape/manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (

"github.com/prometheus/common/model"
"github.com/stretchr/testify/require"
"golang.org/x/exp/slices"
"gopkg.in/yaml.v2"

"github.com/prometheus/prometheus/config"
Expand Down Expand Up @@ -703,3 +704,51 @@ scrape_configs:
reload(scrapeManager, cfg2)
require.ElementsMatch(t, []string{"job1", "job3"}, scrapeManager.ScrapePools())
}

func TestManagerDisableEndOfRunStalenessMarkers(t *testing.T) {
configForJob := func(jobName string) *config.ScrapeConfig {
return &config.ScrapeConfig{
JobName: jobName,
ScrapeInterval: model.Duration(1 * time.Minute),
ScrapeTimeout: model.Duration(1 * time.Minute),
}
}

cfg := &config.Config{ScrapeConfigs: []*config.ScrapeConfig{
configForJob("one"),
configForJob("two"),
}}

m := NewManager(&Options{}, nil, &nopAppendable{})
defer m.Stop()
require.NoError(t, m.ApplyConfig(cfg))

// Pass targets to the manager.
tgs := map[string][]*targetgroup.Group{
"one": {{Targets: []model.LabelSet{{"__address__": "h1"}, {"__address__": "h2"}, {"__address__": "h3"}}}},
"two": {{Targets: []model.LabelSet{{"__address__": "h4"}}}},
}
m.updateTsets(tgs)
m.reload()

activeTargets := m.TargetsActive()
targetsToDisable := []*Target{
activeTargets["one"][0],
activeTargets["one"][2],
NewTarget(labels.FromStrings("__address__", "h4"), labels.EmptyLabels(), nil), // non-existent target.
}

// Disable end of run staleness markers for some targets.
m.DisableEndOfRunStalenessMarkers("one", targetsToDisable)
// This should be a no-op
m.DisableEndOfRunStalenessMarkers("non-existent-job", targetsToDisable)

// Check that the end of run staleness markers are disabled for the correct targets.
for _, group := range []string{"one", "two"} {
for _, tg := range activeTargets[group] {
loop := m.scrapePools[group].loops[tg.hash()].(*scrapeLoop)
expectedDisabled := slices.Contains(targetsToDisable, tg)
require.Equal(t, expectedDisabled, loop.disabledEndOfRunStalenessMarkers.Load())
}
}
}
22 changes: 19 additions & 3 deletions scrape/scrape.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ import (
config_util "github.com/prometheus/common/config"
"github.com/prometheus/common/model"
"github.com/prometheus/common/version"
"go.uber.org/atomic"
"golang.org/x/exp/slices"

"github.com/prometheus/prometheus/config"
Expand Down Expand Up @@ -668,6 +669,16 @@ func (sp *scrapePool) refreshTargetLimitErr() error {
return nil
}

func (sp *scrapePool) disableEndOfRunStalenessMarkers(targets []*Target) {
sp.mtx.Lock()
defer sp.mtx.Unlock()
for i := range targets {
if l, ok := sp.loops[targets[i].hash()]; ok {
l.disableEndOfRunStalenessMarkers()
}
}
}

func verifyLabelLimits(lset labels.Labels, limits *labelLimits) error {
if limits == nil {
return nil
Expand Down Expand Up @@ -933,7 +944,7 @@ type scrapeLoop struct {
cancel func()
stopped chan struct{}

disabledEndOfRunStalenessMarkers bool
disabledEndOfRunStalenessMarkers atomic.Bool

reportExtraMetrics bool
appendMetadataToWAL bool
Expand Down Expand Up @@ -1317,7 +1328,7 @@ mainLoop:

close(sl.stopped)

if !sl.disabledEndOfRunStalenessMarkers {
if !sl.disabledEndOfRunStalenessMarkers.Load() {
sl.endOfRunStaleness(last, ticker, sl.interval)
}
}
Expand Down Expand Up @@ -1478,6 +1489,11 @@ func (sl *scrapeLoop) endOfRunStaleness(last time.Time, ticker *time.Ticker, int
case <-time.After(interval / 10):
}

// Check if end-of-run staleness markers have been disabled while we were waiting.
if sl.disabledEndOfRunStalenessMarkers.Load() {
return
}

// Call sl.append again with an empty scrape to trigger stale markers.
// If the target has since been recreated and scraped, the
// stale markers will be out of order and ignored.
Expand Down Expand Up @@ -1512,7 +1528,7 @@ func (sl *scrapeLoop) stop() {
}

func (sl *scrapeLoop) disableEndOfRunStalenessMarkers() {
sl.disabledEndOfRunStalenessMarkers = true
sl.disabledEndOfRunStalenessMarkers.Store(true)
}

func (sl *scrapeLoop) getCache() *scrapeCache {
Expand Down
65 changes: 65 additions & 0 deletions scrape/scrape_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3817,3 +3817,68 @@ func TestScrapeLoopRunCreatesStaleMarkersOnFailedScrapeForTimestampedMetrics(t *
require.True(t, value.IsStaleNaN(appender.resultFloats[6].f),
"Appended second sample not as expected. Wanted: stale NaN Got: %x", math.Float64bits(appender.resultFloats[6].f))
}

func TestScrapeLoopDisableStalenessMarkerInjection(t *testing.T) {
var (
loopDone = make(chan struct{}, 1)
appender = &collectResultAppender{}
scraper = &testScraper{}
app = func(ctx context.Context) storage.Appender { return appender }
)

sl := newScrapeLoop(context.Background(),
scraper,
nil, nil,
nopMutator,
nopMutator,
app,
nil,
0,
true,
false,
0, 0,
nil,
10*time.Millisecond,
time.Hour,
false,
false,
false,
nil,
false,
)

// Count scrapes and terminate loop after 2 scrapes.
numScrapes, maxScrapes := 0, 2
scraper.scrapeFunc = func(ctx context.Context, w io.Writer) error {
numScrapes++
if numScrapes >= maxScrapes {
go sl.stop()
<-sl.ctx.Done()
}
if _, err := w.Write([]byte("metric_a 42\n")); err != nil {
return err
}
return ctx.Err()
}

go func() {
sl.run(nil)
loopDone <- struct{}{}
}()

// Disable end of run staleness markers.
sl.disableEndOfRunStalenessMarkers()

select {
case <-loopDone:
case <-time.After(5 * time.Second):
t.Fatalf("Scrape loop didn't stop.")
}

// No stale markers should be appended, since they were disabled.
for _, s := range appender.resultFloats {
if value.IsStaleNaN(s.f) {
t.Fatalf("Got stale NaN samples while end of run staleness is disabled: %x", math.Float64bits(s.f))
}
}
}

0 comments on commit 793c8c9

Please sign in to comment.