Skip to content

Commit

Permalink
Fix another case of compaction job state inconsistency
Browse files Browse the repository at this point in the history
  • Loading branch information
aleks-p committed Jul 17, 2024
1 parent d4f7916 commit 32a5391
Show file tree
Hide file tree
Showing 3 changed files with 15 additions and 5 deletions.
4 changes: 2 additions & 2 deletions pkg/metastore/metastore_compaction_planner.go
Original file line number Diff line number Diff line change
Expand Up @@ -161,7 +161,7 @@ func (m *metastoreState) tryCreateJob(block *metastorev1.BlockMeta) *compactionp
return job
}

func (m *metastoreState) addCompactionJob(job *compactionpb.CompactionJob, compactionLevel uint32) {
func (m *metastoreState) addCompactionJob(job *compactionpb.CompactionJob) {
key := tenantShard{
tenant: job.TenantId,
shard: job.Shard,
Expand All @@ -171,7 +171,7 @@ func (m *metastoreState) addCompactionJob(job *compactionpb.CompactionJob, compa
defer plan.jobsMutex.Unlock()

plan.jobsByName[job.Name] = job
plan.queuedBlocksByLevel[compactionLevel] = plan.queuedBlocksByLevel[compactionLevel][:0]
plan.queuedBlocksByLevel[job.CompactionLevel] = plan.queuedBlocksByLevel[job.CompactionLevel][:0]
}

func (m *metastoreState) addBlockToCompactionJobQueue(block *metastorev1.BlockMeta) {
Expand Down
14 changes: 12 additions & 2 deletions pkg/metastore/metastore_state_add_block.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"go.etcd.io/bbolt"

metastorev1 "github.com/grafana/pyroscope/api/gen/proto/go/metastore/v1"
"github.com/grafana/pyroscope/pkg/metastore/compactionpb"
)

func (m *Metastore) AddBlock(_ context.Context, req *metastorev1.AddBlockRequest) (*metastorev1.AddBlockResponse, error) {
Expand All @@ -22,6 +23,10 @@ func (m *metastoreState) applyAddBlock(request *metastorev1.AddBlockRequest) (*m
if err != nil {
return nil, err
}

var jobToAdd *compactionpb.CompactionJob
var blockToAddToQueue *metastorev1.BlockMeta

err = m.db.boltdb.Update(func(tx *bbolt.Tx) error {
err := updateBlockMetadataBucket(tx, name, func(bucket *bbolt.Bucket) error {
return bucket.Put(key, value)
Expand All @@ -40,9 +45,9 @@ func (m *metastoreState) applyAddBlock(request *metastorev1.AddBlockRequest) (*m
if err != nil {
return err
}
m.addCompactionJob(job, request.Block.CompactionLevel)
jobToAdd = job
} else {
m.addBlockToCompactionJobQueue(request.Block)
blockToAddToQueue = request.Block
}
return nil
})
Expand All @@ -55,5 +60,10 @@ func (m *metastoreState) applyAddBlock(request *metastorev1.AddBlockRequest) (*m
return nil, err
}
m.getOrCreateShard(request.Block.Shard).putSegment(request.Block)
if jobToAdd != nil {
m.addCompactionJob(jobToAdd)
} else if blockToAddToQueue != nil {
m.addBlockToCompactionJobQueue(blockToAddToQueue)
}
return &metastorev1.AddBlockResponse{}, nil
}
2 changes: 1 addition & 1 deletion pkg/metastore/metastore_state_poll_compaction_jobs.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ func (m *metastoreState) applyPollCompactionJobsStatus(request *compactorv1.Poll
}

for _, j := range jResult.newJobs {
m.addCompactionJob(j, j.CompactionLevel)
m.addCompactionJob(j)
}

for _, b := range jResult.newQueuedBlocks {
Expand Down

0 comments on commit 32a5391

Please sign in to comment.