Skip to content

Commit

Permalink
Bloom sharding (#192)
Browse files Browse the repository at this point in the history
* Checkpoint: Initial commit of bloom package

Signed-off-by: Annanay <annanayagarwal@gmail.com>

* Checkpoint: Migrate more pkgs to tempo/pkg/bloom

Signed-off-by: Annanay <annanayagarwal@gmail.com>

* Checkpoint: add WriteTo

Signed-off-by: Annanay <annanayagarwal@gmail.com>

* Checkpoint: migrated more backends to new interface

Signed-off-by: Annanay <annanayagarwal@gmail.com>

* Checkpoint: migrated all backends and interfaces

Signed-off-by: Annanay <annanayagarwal@gmail.com>

* Checkpoint: stuff compiles

Signed-off-by: Annanay <annanayagarwal@gmail.com>

* Checkpoint: docker-compose works

Signed-off-by: Annanay <annanayagarwal@gmail.com>

* Small additions

Signed-off-by: Annanay <annanayagarwal@gmail.com>

* lint

Signed-off-by: Annanay <annanayagarwal@gmail.com>

* Checkpoint: compile

Signed-off-by: Annanay <annanayagarwal@gmail.com>

* Add a real test, fix lint

Signed-off-by: Annanay <annanayagarwal@gmail.com>

* Fix disk cache for sharded bloom filter

Signed-off-by: Annanay <annanayagarwal@gmail.com>

* fix append block test failure

Signed-off-by: Annanay <annanayagarwal@gmail.com>

* make vendor-check

Signed-off-by: Annanay <annanayagarwal@gmail.com>

* Remove check on numTenants, add CHANGELOG entry

Signed-off-by: Annanay <annanayagarwal@gmail.com>
  • Loading branch information
annanay25 committed Oct 29, 2020
1 parent 03d976a commit d91c415
Show file tree
Hide file tree
Showing 27 changed files with 363 additions and 153 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
@@ -1,3 +1,4 @@
## master / unreleased

* [CHANGE] Bloom filters are now sharded to reduce size and improve caching, as blocks grow. This is a **breaking change** and all data stored before this change will **not** be queryable. [192](https://github.com/grafana/tempo/pull/192)
* [ENHANCEMENT] CI checks for vendored dependencies using `make vendor-check`. Update CONTRIBUTING.md to reflect the same before checking in files in a PR. [#274](https://github.com/grafana/tempo/pull/274)
2 changes: 1 addition & 1 deletion go.mod
Expand Up @@ -43,7 +43,7 @@ require (
go.uber.org/zap v1.15.0
golang.org/x/time v0.0.0-20200630173020-3af7569d3a1e
google.golang.org/api v0.29.0
google.golang.org/genproto v0.0.0-20201026171402-d4b8fe4fd877 // indirect
google.golang.org/genproto v0.0.0-20201028140639-c77dae4b0522 // indirect
google.golang.org/grpc v1.33.1
google.golang.org/protobuf v1.25.0 // indirect
gopkg.in/yaml.v2 v2.3.0
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Expand Up @@ -2311,8 +2311,8 @@ google.golang.org/genproto v0.0.0-20200526211855-cb27e3aa2013/go.mod h1:NbSheEEY
google.golang.org/genproto v0.0.0-20200603110839-e855014d5736/go.mod h1:jDfRM7FcilCzHH/e9qn6dsT145K34l5v+OpcnNgKAAA=
google.golang.org/genproto v0.0.0-20200710124503-20a17af7bd0e h1:k+p/u26/lVeNEpdxSeUrm7rTvoFckBKaf7gTzgmHyDA=
google.golang.org/genproto v0.0.0-20200710124503-20a17af7bd0e/go.mod h1:FWY/as6DDZQgahTzZj3fqbO1CbirC29ZNUFHwi0/+no=
google.golang.org/genproto v0.0.0-20201026171402-d4b8fe4fd877 h1:d4k3uIU763E31Rk4UZPA47oOoBymMsDImV3U4mGhX9E=
google.golang.org/genproto v0.0.0-20201026171402-d4b8fe4fd877/go.mod h1:FWY/as6DDZQgahTzZj3fqbO1CbirC29ZNUFHwi0/+no=
google.golang.org/genproto v0.0.0-20201028140639-c77dae4b0522 h1:7RoRaOmOAXwqnurgQ5g5/d0yCi9ha2UxuTZULXudK7A=
google.golang.org/genproto v0.0.0-20201028140639-c77dae4b0522/go.mod h1:FWY/as6DDZQgahTzZj3fqbO1CbirC29ZNUFHwi0/+no=
google.golang.org/grpc v1.29.1 h1:EC2SB8S04d2r73uptxphDSUG+kTKVgjRPF+N3xpxRB4=
google.golang.org/grpc v1.29.1/go.mod h1:itym6AZVZYACWQqET3MqgPpjcuV5QH3BxFS3IjizoKk=
google.golang.org/protobuf v0.0.0-20200109180630-ec00e32a8dfd/go.mod h1:DFci5gLYBciE7Vtevhsrf46CRTquxDuWsQurQQe4oz8=
Expand Down
2 changes: 1 addition & 1 deletion modules/ingester/instance_test.go
Expand Up @@ -57,7 +57,7 @@ func TestInstance(t *testing.T) {
block = i.GetBlockToBeFlushed()
}
assert.NotNil(t, block)
assert.Nil(t, i.completingBlock, 1)
assert.Nil(t, i.completingBlock)
assert.Len(t, i.completeBlocks, 1)

err = ingester.store.WriteBlock(context.Background(), block)
Expand Down
6 changes: 3 additions & 3 deletions tempodb/backend/backend.go
Expand Up @@ -17,17 +17,17 @@ var (
type AppendTracker interface{}

type Writer interface {
Write(ctx context.Context, meta *encoding.BlockMeta, bBloom []byte, bIndex []byte, objectFilePath string) error
Write(ctx context.Context, meta *encoding.BlockMeta, bBloom [][]byte, bIndex []byte, objectFilePath string) error

WriteBlockMeta(ctx context.Context, tracker AppendTracker, meta *encoding.BlockMeta, bBloom []byte, bIndex []byte) error
WriteBlockMeta(ctx context.Context, tracker AppendTracker, meta *encoding.BlockMeta, bBloom [][]byte, bIndex []byte) error
AppendObject(ctx context.Context, tracker AppendTracker, meta *encoding.BlockMeta, bObject []byte) (AppendTracker, error)
}

type Reader interface {
Tenants(ctx context.Context) ([]string, error)
Blocks(ctx context.Context, tenantID string) ([]uuid.UUID, error)
BlockMeta(ctx context.Context, blockID uuid.UUID, tenantID string) (*encoding.BlockMeta, error)
Bloom(ctx context.Context, blockID uuid.UUID, tenantID string) ([]byte, error)
Bloom(ctx context.Context, blockID uuid.UUID, tenantID string, bloomShard int) ([]byte, error)
Index(ctx context.Context, blockID uuid.UUID, tenantID string) ([]byte, error)
Object(ctx context.Context, blockID uuid.UUID, tenantID string, offset uint64, buffer []byte) error

Expand Down
14 changes: 10 additions & 4 deletions tempodb/backend/diskcache/cache.go
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"fmt"
"os"
"strconv"

"github.com/go-kit/kit/log"
"github.com/go-kit/kit/log/level"
Expand All @@ -15,7 +16,8 @@ import (
"github.com/prometheus/client_golang/prometheus/promauto"
)

type missFunc func(ctx context.Context, blockID uuid.UUID, tenantID string) ([]byte, error)
type bloomMissFunc func(ctx context.Context, blockID uuid.UUID, tenantID string, bloomShard int) ([]byte, error)
type indexMissFunc func(ctx context.Context, blockID uuid.UUID, tenantID string) ([]byte, error)

const (
typeBloom = "bloom"
Expand Down Expand Up @@ -95,8 +97,8 @@ func (r *reader) BlockMeta(ctx context.Context, blockID uuid.UUID, tenantID stri
return r.next.BlockMeta(ctx, blockID, tenantID)
}

func (r *reader) Bloom(ctx context.Context, blockID uuid.UUID, tenantID string) ([]byte, error) {
b, skippableErr, err := r.readOrCacheKeyToDisk(ctx, blockID, tenantID, typeBloom, r.next.Bloom)
func (r *reader) Bloom(ctx context.Context, blockID uuid.UUID, tenantID string, bloomShard int) ([]byte, error) {
b, skippableErr, err := r.readOrCacheBloom(ctx, blockID, tenantID, bloomShard, r.next.Bloom)

if skippableErr != nil {
metricDiskCache.WithLabelValues(typeBloom, "error").Inc()
Expand All @@ -109,7 +111,7 @@ func (r *reader) Bloom(ctx context.Context, blockID uuid.UUID, tenantID string)
}

func (r *reader) Index(ctx context.Context, blockID uuid.UUID, tenantID string) ([]byte, error) {
b, skippableErr, err := r.readOrCacheKeyToDisk(ctx, blockID, tenantID, typeIndex, r.next.Index)
b, skippableErr, err := r.readOrCacheIndex(ctx, blockID, tenantID, r.next.Index)

if skippableErr != nil {
metricDiskCache.WithLabelValues(typeIndex, "error").Inc()
Expand All @@ -134,3 +136,7 @@ func (r *reader) Shutdown() {
func key(blockID uuid.UUID, tenantID string, t string) string {
return blockID.String() + ":" + tenantID + ":" + t
}

func bloomKey(blockID uuid.UUID, tenantID string, t string, shardNum int) string {
return blockID.String() + ":" + tenantID + ":" + t + ":" + strconv.Itoa(shardNum)
}
39 changes: 36 additions & 3 deletions tempodb/backend/diskcache/disk_cache.go
Expand Up @@ -14,10 +14,11 @@ import (
"github.com/karrick/godirwalk"
)

func (r *reader) readOrCacheKeyToDisk(ctx context.Context, blockID uuid.UUID, tenantID string, t string, miss missFunc) ([]byte, error, error) {
// TODO: factor out common code with readOrCacheIndexToDisk into separate function
func (r *reader) readOrCacheBloom(ctx context.Context, blockID uuid.UUID, tenantID string, shardNum int, miss bloomMissFunc) ([]byte, error, error) {
var skippableError error

k := key(blockID, tenantID, t)
k := bloomKey(blockID, tenantID, typeBloom, shardNum)
filename := path.Join(r.cfg.Path, k)

bytes, err := ioutil.ReadFile(filename)
Expand All @@ -30,7 +31,39 @@ func (r *reader) readOrCacheKeyToDisk(ctx context.Context, blockID uuid.UUID, te
return bytes, nil, nil
}

metricDiskCacheMiss.WithLabelValues(t).Inc()
metricDiskCacheMiss.WithLabelValues(typeBloom).Inc()
bytes, err = miss(ctx, blockID, tenantID, shardNum)
if err != nil {
return nil, nil, err // backend store error. need to bubble this up
}

if bytes != nil {
err = r.writeKeyToDisk(filename, bytes)
if err != nil {
skippableError = err
}
}

return bytes, skippableError, nil
}

func (r *reader) readOrCacheIndex(ctx context.Context, blockID uuid.UUID, tenantID string, miss indexMissFunc) ([]byte, error, error) {
var skippableError error

k := key(blockID, tenantID, typeIndex)
filename := path.Join(r.cfg.Path, k)

bytes, err := ioutil.ReadFile(filename)

if err != nil && !os.IsNotExist(err) {
skippableError = err
}

if bytes != nil {
return bytes, nil, nil
}

metricDiskCacheMiss.WithLabelValues(typeIndex).Inc()
bytes, err = miss(ctx, blockID, tenantID)
if err != nil {
return nil, nil, err // backend store error. need to bubble this up
Expand Down
43 changes: 34 additions & 9 deletions tempodb/backend/diskcache/disk_cache_test.go
Expand Up @@ -21,12 +21,21 @@ func TestReadOrCache(t *testing.T) {
assert.NoError(t, err, "unexpected error creating temp dir")

missBytes := []byte{0x01}
missCalled := 0
missFunc := func(ctx context.Context, blockID uuid.UUID, tenantID string) ([]byte, error) {
missCalled++
indexMissCalled := 0
bloomMissCalled := 0

// indexMiss function to be called when the key is not cached
indexMiss := func(ctx context.Context, blockID uuid.UUID, tenantID string) ([]byte, error) {
indexMissCalled++
return missBytes, nil
}
// bloomMiss function to be called when the bloomKey is not cached
bloomMiss := func(ctx context.Context, blockID uuid.UUID, tenantID string, shardNum int) ([]byte, error) {
bloomMissCalled++
return missBytes, nil
}

// create new cache
cache, err := New(nil, &Config{
Path: tempDir,
MaxDiskMBs: 1024,
Expand All @@ -38,17 +47,33 @@ func TestReadOrCache(t *testing.T) {
blockID := uuid.New()
tenantID := testTenantID

bytes, skippableErr, err := cache.(*reader).readOrCacheKeyToDisk(context.Background(), blockID, tenantID, "type", missFunc)
// get key from cache
bytes, skippableErr, err := cache.(*reader).readOrCacheIndex(context.Background(), blockID, tenantID, indexMiss)
assert.NoError(t, err)
assert.NoError(t, skippableErr)
assert.Equal(t, missBytes, bytes)
assert.Equal(t, 1, missCalled)
assert.Equal(t, 1, indexMissCalled)

bytes, skippableErr, err = cache.(*reader).readOrCacheKeyToDisk(context.Background(), blockID, tenantID, "type", missFunc)
// make sure the missFunc is not called again since the key is cached already
bytes, skippableErr, err = cache.(*reader).readOrCacheIndex(context.Background(), blockID, tenantID, indexMiss)
assert.NoError(t, err)
assert.NoError(t, skippableErr)
assert.Equal(t, missBytes, bytes)
assert.Equal(t, 1, missCalled)
assert.Equal(t, 1, indexMissCalled)

// get key from cache
bytes, skippableErr, err = cache.(*reader).readOrCacheBloom(context.Background(), blockID, tenantID, 0, bloomMiss)
assert.NoError(t, err)
assert.NoError(t, skippableErr)
assert.Equal(t, missBytes, bytes)
assert.Equal(t, 1, bloomMissCalled)

// make sure the missFunc is not called again since the key is cached already
bytes, skippableErr, err = cache.(*reader).readOrCacheBloom(context.Background(), blockID, tenantID, 0, bloomMiss)
assert.NoError(t, err)
assert.NoError(t, skippableErr)
assert.Equal(t, missBytes, bytes)
assert.Equal(t, 1, bloomMissCalled)
}

func TestJanitor(t *testing.T) {
Expand All @@ -75,7 +100,7 @@ func TestJanitor(t *testing.T) {
blockID := uuid.New()
tenantID := testTenantID

bytes, skippableErr, err := cache.(*reader).readOrCacheKeyToDisk(context.Background(), blockID, tenantID, "type", missFunc)
bytes, skippableErr, err := cache.(*reader).readOrCacheIndex(context.Background(), blockID, tenantID, missFunc)
assert.NoError(t, err)
assert.NoError(t, skippableErr)
assert.Equal(t, missBytes, bytes)
Expand All @@ -96,7 +121,7 @@ func TestJanitor(t *testing.T) {
blockID := uuid.New()
tenantID := testTenantID

bytes, skippableErr, err := cache.(*reader).readOrCacheKeyToDisk(context.Background(), blockID, tenantID, "type", missFunc)
bytes, skippableErr, err := cache.(*reader).readOrCacheIndex(context.Background(), blockID, tenantID, missFunc)
assert.NoError(t, err)
assert.NoError(t, skippableErr)
assert.Equal(t, missBytes, bytes)
Expand Down
14 changes: 5 additions & 9 deletions tempodb/backend/gcs/compactor.go
Expand Up @@ -4,19 +4,19 @@ import (
"context"
"encoding/json"
"fmt"
"path"

"cloud.google.com/go/storage"
"github.com/google/uuid"
"github.com/grafana/tempo/tempodb/backend"
"github.com/grafana/tempo/tempodb/backend/util"
"github.com/grafana/tempo/tempodb/encoding"
"google.golang.org/api/iterator"
)

func (rw *readerWriter) MarkBlockCompacted(blockID uuid.UUID, tenantID string) error {
// move meta file to a new location
metaFilename := rw.metaFileName(blockID, tenantID)
compactedMetaFilename := rw.compactedMetaFileName(blockID, tenantID)
metaFilename := util.MetaFileName(blockID, tenantID)
compactedMetaFilename := util.CompactedMetaFileName(blockID, tenantID)

src := rw.bucket.Object(metaFilename)
dst := rw.bucket.Object(compactedMetaFilename)
Expand All @@ -41,7 +41,7 @@ func (rw *readerWriter) ClearBlock(blockID uuid.UUID, tenantID string) error {

ctx := context.TODO()
iter := rw.bucket.Objects(ctx, &storage.Query{
Prefix: rw.rootPath(blockID, tenantID),
Prefix: util.RootPath(blockID, tenantID),
Versions: false,
})

Expand All @@ -65,7 +65,7 @@ func (rw *readerWriter) ClearBlock(blockID uuid.UUID, tenantID string) error {
}

func (rw *readerWriter) CompactedBlockMeta(blockID uuid.UUID, tenantID string) (*encoding.CompactedBlockMeta, error) {
name := rw.compactedMetaFileName(blockID, tenantID)
name := util.CompactedMetaFileName(blockID, tenantID)

bytes, modTime, err := rw.readAllWithModTime(context.Background(), name)
if err == storage.ErrObjectNotExist {
Expand All @@ -84,7 +84,3 @@ func (rw *readerWriter) CompactedBlockMeta(blockID uuid.UUID, tenantID string) (

return out, err
}

func (rw *readerWriter) compactedMetaFileName(blockID uuid.UUID, tenantID string) string {
return path.Join(rw.rootPath(blockID, tenantID), "meta.compacted.json")
}

0 comments on commit d91c415

Please sign in to comment.