Skip to content

Commit

Permalink
feat(blooms): Dedupe download queue items to reduce queue size (#12222)
Browse files Browse the repository at this point in the history
  • Loading branch information
cyriltovena committed Mar 19, 2024
1 parent ebdf8fe commit cdb934c
Show file tree
Hide file tree
Showing 17 changed files with 1,174 additions and 13 deletions.
2 changes: 2 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,7 @@ require (
github.com/IBM/ibm-cos-sdk-go v1.10.0
github.com/axiomhq/hyperloglog v0.0.0-20240124082744-24bca3a5b39b
github.com/d4l3k/messagediff v1.2.1
github.com/dolthub/swiss v0.2.1
github.com/efficientgo/core v1.0.0-rc.2
github.com/fsnotify/fsnotify v1.6.0
github.com/gogo/googleapis v1.4.0
Expand Down Expand Up @@ -199,6 +200,7 @@ require (
github.com/docker/go-connections v0.4.0 // indirect
github.com/docker/go-metrics v0.0.1 // indirect
github.com/docker/go-units v0.5.0 // indirect
github.com/dolthub/maphash v0.1.0 // indirect
github.com/eapache/go-resiliency v1.3.0 // indirect
github.com/eapache/go-xerial-snappy v0.0.0-20230111030713-bf00bc1b83b6 // indirect
github.com/eapache/queue v1.1.0 // indirect
Expand Down
4 changes: 4 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -523,6 +523,10 @@ github.com/docker/go-units v0.5.0 h1:69rxXcBk27SvSaaxTtLh/8llcHD8vYHT7WSdRZ/jvr4
github.com/docker/go-units v0.5.0/go.mod h1:fgPhTUdO+D/Jk86RDLlptpiXQzgHJF7gydDDbaIK4Dk=
github.com/docker/libnetwork v0.8.0-dev.2.0.20181012153825-d7b61745d166/go.mod h1:93m0aTqz6z+g32wla4l4WxTrdtvBRmVzYRkYvasA5Z8=
github.com/docker/spdystream v0.0.0-20160310174837-449fdfce4d96/go.mod h1:Qh8CwZgvJUkLughtfhJv5dyTYa91l1fOUCrgjqmcifM=
github.com/dolthub/maphash v0.1.0 h1:bsQ7JsF4FkkWyrP3oCnFJgrCUAFbFf3kOl4L/QxPDyQ=
github.com/dolthub/maphash v0.1.0/go.mod h1:gkg4Ch4CdCDu5h6PMriVLawB7koZ+5ijb9puGMV50a4=
github.com/dolthub/swiss v0.2.1 h1:gs2osYs5SJkAaH5/ggVJqXQxRXtWshF6uE0lgR/Y3Gw=
github.com/dolthub/swiss v0.2.1/go.mod h1:8AhKZZ1HK7g18j7v7k6c5cYIGEZJcPn0ARsai8cUrh0=
github.com/drone/envsubst v1.0.3 h1:PCIBwNDYjs50AsLZPYdfhSATKaRg/FJmDc2D6+C2x8g=
github.com/drone/envsubst v1.0.3/go.mod h1:N2jZmlMufstn1KEqvbHjw40h1KyTmnVzHcSc9bFiJ2g=
github.com/dustin/go-humanize v0.0.0-20171111073723-bb3d318650d4/go.mod h1:HtrtbFcZ19U5GC7JDqmcUSB87Iq5E25KnS6fMYU6eOk=
Expand Down
53 changes: 40 additions & 13 deletions pkg/storage/stores/shipper/bloomshipper/fetcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"sync"
"time"

"github.com/dolthub/swiss"
"github.com/go-kit/log"
"github.com/go-kit/log/level"
"github.com/pkg/errors"
Expand All @@ -20,7 +21,7 @@ import (
"github.com/grafana/loki/pkg/util/constants"
)

var downloadQueueCapacity = 100000
var downloadQueueCapacity = 10000

type options struct {
ignoreNotFound bool // ignore 404s from object storage; default=true
Expand Down Expand Up @@ -205,6 +206,7 @@ func (f *Fetcher) FetchBlocks(ctx context.Context, refs []BlockRef, opts ...Fetc
item: refs[i],
key: key,
idx: i,
async: cfg.fetchAsync,
results: responses,
errors: errors,
})
Expand Down Expand Up @@ -397,6 +399,7 @@ type downloadRequest[T any, R any] struct {
item T
key string
idx int
async bool
results chan<- downloadResponse[R]
errors chan<- error
}
Expand All @@ -408,12 +411,14 @@ type downloadResponse[R any] struct {
}

type downloadQueue[T any, R any] struct {
queue chan downloadRequest[T, R]
mu keymutex.KeyMutex
wg sync.WaitGroup
done chan struct{}
process processFunc[T, R]
logger log.Logger
queue chan downloadRequest[T, R]
enqueued *swiss.Map[string, struct{}]
enqueuedMutex sync.Mutex
mu keymutex.KeyMutex
wg sync.WaitGroup
done chan struct{}
process processFunc[T, R]
logger log.Logger
}

func newDownloadQueue[T any, R any](size, workers int, process processFunc[T, R], logger log.Logger) (*downloadQueue[T, R], error) {
Expand All @@ -424,11 +429,12 @@ func newDownloadQueue[T any, R any](size, workers int, process processFunc[T, R]
return nil, errors.New("queue requires at least 1 worker")
}
q := &downloadQueue[T, R]{
queue: make(chan downloadRequest[T, R], size),
mu: keymutex.NewHashed(workers),
done: make(chan struct{}),
process: process,
logger: logger,
queue: make(chan downloadRequest[T, R], size),
enqueued: swiss.NewMap[string, struct{}](uint32(size)),
mu: keymutex.NewHashed(workers),
done: make(chan struct{}),
process: process,
logger: logger,
}
for i := 0; i < workers; i++ {
q.wg.Add(1)
Expand All @@ -438,7 +444,23 @@ func newDownloadQueue[T any, R any](size, workers int, process processFunc[T, R]
}

func (q *downloadQueue[T, R]) enqueue(t downloadRequest[T, R]) {
q.queue <- t
if !t.async {
q.queue <- t
}
// for async task we attempt to dedupe task already in progress.
q.enqueuedMutex.Lock()
defer q.enqueuedMutex.Unlock()
if q.enqueued.Has(t.key) {
return
}
select {
case q.queue <- t:
q.enqueued.Put(t.key, struct{}{})
default:
// todo we probably want a metric on dropped items
level.Warn(q.logger).Log("msg", "download queue is full, dropping item", "key", t.key)
return
}
}

func (q *downloadQueue[T, R]) runWorker() {
Expand All @@ -464,6 +486,11 @@ func (q *downloadQueue[T, R]) do(ctx context.Context, task downloadRequest[T, R]
if err != nil {
level.Error(q.logger).Log("msg", "failed to unlock key in block lock", "key", task.key, "err", err)
}
if task.async {
q.enqueuedMutex.Lock()
_ = q.enqueued.Delete(task.key)
q.enqueuedMutex.Unlock()
}
}()

q.process(ctx, task)
Expand Down
2 changes: 2 additions & 0 deletions vendor/github.com/dolthub/maphash/.gitignore

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

201 changes: 201 additions & 0 deletions vendor/github.com/dolthub/maphash/LICENSE

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 4 additions & 0 deletions vendor/github.com/dolthub/maphash/README.md

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

48 changes: 48 additions & 0 deletions vendor/github.com/dolthub/maphash/hasher.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading

0 comments on commit cdb934c

Please sign in to comment.