Skip to content

Commit

Permalink
fix: query plan optimisation (#3100)
Browse files Browse the repository at this point in the history
  • Loading branch information
kolesnikovae committed Mar 15, 2024
1 parent db14878 commit 26b2735
Show file tree
Hide file tree
Showing 4 changed files with 87 additions and 32 deletions.
3 changes: 3 additions & 0 deletions pkg/ingester/retention.go
Expand Up @@ -290,6 +290,9 @@ func (dc *diskCleaner) CleanupBlocksWhenHighDiskUtilization(ctx context.Context)

// isBlockDeletable returns true if this block can be deleted.
func (dc *diskCleaner) isBlockDeletable(block *tenantBlock) bool {
// TODO(kolesnikovae):
// Expiry defaults to -querier.query-store-after which should be deprecated,
// blocks-storage.bucket-store.ignore-blocks-within can be used instead.
expiryTs := time.Now().Add(-dc.policy.Expiry)
return block.Uploaded && ulid.Time(block.ID.Time()).Before(expiryTs)
}
Expand Down
5 changes: 0 additions & 5 deletions pkg/phlaredb/block_querier.go
Expand Up @@ -285,11 +285,6 @@ type BlockInfo struct {
Series uint64
}

func (b *BlockQuerier) BlockInfo() []BlockInfo {
result := make([]BlockInfo, len(b.queriers))
return result
}

type singleBlockQuerier struct {
logger log.Logger
metrics *BlocksMetrics
Expand Down
62 changes: 40 additions & 22 deletions pkg/querier/replication.go
Expand Up @@ -200,26 +200,23 @@ func (r *replicasPerBlockID) removeBlock(ulid string) {
}

// this step removes sharded blocks that don't have all the shards present for a time window
func (r *replicasPerBlockID) pruneIncompleteShardedBlocks() error {
func (r *replicasPerBlockID) pruneIncompleteShardedBlocks() (bool, error) {
type compactionKey struct {
level int32
minT int64
level int32
minTime int64
}
compactions := make(map[compactionKey][]string)

// group blocks by compaction level
for blockID := range r.m {
meta, ok := r.meta[blockID]
if !ok {
return fmt.Errorf("meta missing for block id %s", blockID)
}
if !ok {
continue
return false, fmt.Errorf("meta missing for block id %s", blockID)
}

key := compactionKey{
level: 0,
minT: meta.MinTime,
level: 0,
minTime: meta.MinTime,
}

if meta.Compaction != nil {
Expand All @@ -230,23 +227,25 @@ func (r *replicasPerBlockID) pruneIncompleteShardedBlocks() error {

// now we go through every group and check if we see at least a block for each shard
var (
shardsSeen []bool
shardedBlocks []string
shardsSeen []bool
shardedBlocks []string
hasShardedBlocks bool
)
for _, blocks := range compactions {
shardsSeen = shardsSeen[:0]
shardedBlocks = shardedBlocks[:0]
for _, block := range blocks {
meta, ok := r.meta[block]
if !ok {
return fmt.Errorf("meta missing for block id %s", block)
return false, fmt.Errorf("meta missing for block id %s", block)
}

shardIdx, shards, ok := shardFromBlock(meta)
if !ok {
// not a sharded block continue
continue
}
hasShardedBlocks = true
shardedBlocks = append(shardedBlocks, block)

if len(shardsSeen) == 0 {
Expand All @@ -261,7 +260,7 @@ func (r *replicasPerBlockID) pruneIncompleteShardedBlocks() error {
}

if len(shardsSeen) != int(shards) {
return fmt.Errorf("shard length mismatch, shards seen: %d, shards as per label: %d", len(shardsSeen), shards)
return false, fmt.Errorf("shard length mismatch, shards seen: %d, shards as per label: %d", len(shardsSeen), shards)
}

shardsSeen[shardIdx] = true
Expand All @@ -285,24 +284,32 @@ func (r *replicasPerBlockID) pruneIncompleteShardedBlocks() error {
}
}

return nil
return hasShardedBlocks, nil
}

// prunes blocks that are contained by a higher compaction level block
func (r *replicasPerBlockID) pruneSupersededBlocks() error {
func (r *replicasPerBlockID) pruneSupersededBlocks(sharded bool) error {
for blockID := range r.m {
meta, ok := r.meta[blockID]
if !ok {
if !ok {
return fmt.Errorf("meta missing for block id %s", blockID)
}
return fmt.Errorf("meta missing for block id %s", blockID)
}
if meta.Compaction == nil {
continue
}
if meta.Compaction.Level < 2 {
continue
}
// At split phase of compaction, L2 is an intermediate step where we
// split each group into split_shards parts, thus there will be up to
// groups_num * split_shards blocks, which is typically _significantly_
// greater that the number of source blocks. Moreover, these blocks are
// not yet deduplicated, therefore we should prefer L1 blocks over them.
// As an optimisation, we drop all L2 blocks.
if sharded && meta.Compaction.Level == 2 {
r.removeBlock(blockID)
continue
}
for _, blockID := range meta.Compaction.Parents {
r.removeBlock(blockID)
}
Expand Down Expand Up @@ -331,11 +338,21 @@ func (r *replicasPerBlockID) blockPlan(ctx context.Context) map[string]*ingestv1
smallestCompactionLevel = int32(0)
)

if err := r.pruneIncompleteShardedBlocks(); err != nil {
sharded, err := r.pruneIncompleteShardedBlocks()
if err != nil {
level.Warn(r.logger).Log("msg", "block planning failed to prune incomplete sharded blocks", "err", err)
return nil
}
if err := r.pruneSupersededBlocks(); err != nil {

// Depending on whether split sharding is used, the compaction level at
// which the data gets deduplicated differs: if split sharding is enabled,
// we deduplicate at level 3, and at level 2 otherwise.
var deduplicationLevel int32 = 2
if sharded {
deduplicationLevel = 3
}

if err := r.pruneSupersededBlocks(sharded); err != nil {
level.Warn(r.logger).Log("msg", "block planning failed to prune superseded blocks", "err", err)
return nil
}
Expand All @@ -351,8 +368,9 @@ func (r *replicasPerBlockID) blockPlan(ctx context.Context) map[string]*ingestv1
if !ok {
continue
}
// when we see a block with CompactionLevel <=1 or a block without compaction section, we want the queriers to deduplicate
if meta.Compaction == nil || meta.Compaction.Level <= 1 {
// when we see a block with CompactionLevel less than the level at which data is deduplicated,
// or a block without compaction section, we want the queriers to deduplicate
if meta.Compaction == nil || meta.Compaction.Level < deduplicationLevel {
deduplicate = true
}

Expand Down
49 changes: 44 additions & 5 deletions pkg/querier/replication_test.go
Expand Up @@ -190,9 +190,28 @@ func Test_replicasPerBlockID_blockPlan(t *testing.T) {
{
addr: "store-gateway-0",
response: []*typesv1.BlockInfo{
newBlockInfo("a-1").withCompactionLevel(2).withCompactionSources("a").withCompactionParents("a").withCompactorShard(1, 2).withMinTime(t1, time.Hour-time.Second).info(),
newBlockInfo("b-1").withCompactionLevel(2).withCompactionSources("b").withCompactionParents("b").withCompactorShard(2, 2).withMinTime(t2, time.Hour-(500*time.Millisecond)).info(),
newBlockInfo("b-2").withCompactionLevel(2).withCompactionSources("b").withCompactionParents("b").withCompactorShard(2, 2).withMinTime(t2, time.Hour-time.Second).info(),
newBlockInfo("a-1").
withCompactionLevel(3).
withCompactionSources("a").
withCompactionParents("a").
withCompactorShard(0, 2).
withMinTime(t1, time.Hour-time.Second).
info(),

newBlockInfo("b-1").
withCompactionLevel(3).
withCompactionSources("b").
withCompactionParents("b").
withCompactorShard(0, 2).
withMinTime(t2, time.Hour-(500*time.Millisecond)).info(),

newBlockInfo("b-2").
withCompactionLevel(3).
withCompactionSources("b").
withCompactionParents("b").
withCompactorShard(1, 2).
withMinTime(t2, time.Hour-time.Second).
info(),
},
},
}, storeGatewayInstance)
Expand All @@ -205,8 +224,9 @@ func Test_replicasPerBlockID_blockPlan(t *testing.T) {
},
},
{
// Using a split-and-merge compactor, the level 2 will be referencing sources but will not guarantee that all the source is in the
// TEST if all splits are there before erasing the level 1 before.
// Using a split-and-merge compactor, deduplication happens at level 3,
// level 2 is intermediate step, where series distributed among shards
// but not yet deduplicated.
name: "ignore blocks which are sharded and in level 2",
inputs: func(r *replicasPerBlockID) {
r.add([]ResponseFromReplica[[]*typesv1.BlockInfo]{
Expand All @@ -223,6 +243,25 @@ func Test_replicasPerBlockID_blockPlan(t *testing.T) {
addr: "store-gateway-0",
response: []*typesv1.BlockInfo{
newBlockInfo("a").info(),
newBlockInfo("a-1").
withCompactionLevel(2).
withCompactionSources("a").
withCompactionParents("a").
withCompactorShard(0, 2).
info(),

newBlockInfo("a-2").
withCompactionLevel(2).
withCompactionSources("a").
withCompactionParents("a").
withCompactorShard(1, 2).
info(),

newBlockInfo("a-3").
withCompactionLevel(3).
withCompactionSources("a-2").
withCompactorShard(0, 3).
info(),
},
},
}, storeGatewayInstance)
Expand Down

0 comments on commit 26b2735

Please sign in to comment.