Skip to content

Commit

Permalink
Making the number of CPU cores used for sorting postings lists editab…
Browse files Browse the repository at this point in the history
…le (#12247)

Signed-off-by: Yuri Nikolic <durica.nikolic@grafana.com>
  • Loading branch information
duricanikolic committed Apr 18, 2023
1 parent bb217dd commit b028112
Show file tree
Hide file tree
Showing 3 changed files with 13 additions and 7 deletions.
2 changes: 1 addition & 1 deletion tsdb/head.go
Original file line number Diff line number Diff line change
Expand Up @@ -574,7 +574,7 @@ const cardinalityCacheExpirationTime = time.Duration(30) * time.Second
func (h *Head) Init(minValidTime int64) error {
h.minValidTime.Store(minValidTime)
defer func() {
h.postings.EnsureOrder()
h.postings.EnsureOrder(h.opts.WALReplayConcurrency)
}()
defer h.gc() // After loading the wal remove the obsolete data from the head.
defer func() {
Expand Down
14 changes: 10 additions & 4 deletions tsdb/index/postings.go
Original file line number Diff line number Diff line change
Expand Up @@ -224,21 +224,27 @@ func (p *MemPostings) All() Postings {

// EnsureOrder ensures that all postings lists are sorted. After it returns all further
// calls to add and addFor will insert new IDs in a sorted manner.
func (p *MemPostings) EnsureOrder() {
// Parameter numberOfConcurrentProcesses is used to specify the maximal number of
// CPU cores used for this operation. If it is <= 0, GOMAXPROCS is used.
// GOMAXPROCS was the default before introducing this parameter.
func (p *MemPostings) EnsureOrder(numberOfConcurrentProcesses int) {
p.mtx.Lock()
defer p.mtx.Unlock()

if p.ordered {
return
}

n := runtime.GOMAXPROCS(0)
concurrency := numberOfConcurrentProcesses
if concurrency <= 0 {
concurrency = runtime.GOMAXPROCS(0)
}
workc := make(chan *[][]storage.SeriesRef)

var wg sync.WaitGroup
wg.Add(n)
wg.Add(concurrency)

for i := 0; i < n; i++ {
for i := 0; i < concurrency; i++ {
go func() {
for job := range workc {
for _, l := range *job {
Expand Down
4 changes: 2 additions & 2 deletions tsdb/index/postings_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ func TestMemPostings_ensureOrder(t *testing.T) {
p.m["a"][v] = l
}

p.EnsureOrder()
p.EnsureOrder(0)

for _, e := range p.m {
for _, l := range e {
Expand Down Expand Up @@ -114,7 +114,7 @@ func BenchmarkMemPostings_ensureOrder(b *testing.B) {
b.ResetTimer()

for n := 0; n < b.N; n++ {
p.EnsureOrder()
p.EnsureOrder(0)
p.ordered = false
}
})
Expand Down

0 comments on commit b028112

Please sign in to comment.