Skip to content

Commit

Permalink
Merge pull request prometheus#115 from prometheus/compactionfix
Browse files Browse the repository at this point in the history
Fix compactions
  • Loading branch information
fabxc committed Aug 4, 2017
2 parents 15baaa5 + 3951d8c commit 1875d05
Show file tree
Hide file tree
Showing 4 changed files with 78 additions and 51 deletions.
2 changes: 1 addition & 1 deletion cmd/tsdb/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ func (b *writeBenchmark) run(cmd *cobra.Command, args []string) {
st, err := tsdb.Open(dir, nil, nil, &tsdb.Options{
WALFlushInterval: 200 * time.Millisecond,
RetentionDuration: 15 * 24 * 60 * 60 * 1000, // 15 days in milliseconds
BlockRanges: tsdb.ExponentialBlockRanges(3*60*60*1000, 3, 5),
BlockRanges: tsdb.ExponentialBlockRanges(2*60*60*1000, 5, 3),
})
if err != nil {
exitWithError(err)
Expand Down
63 changes: 28 additions & 35 deletions compact.go
Original file line number Diff line number Diff line change
Expand Up @@ -180,57 +180,50 @@ func (c *compactor) Plan() ([][]string, error) {
return nil, nil
}

// selectDirs returns the dir metas that should be compacted into a single new block.
// If only a single block range is configured, the result is always nil.
func (c *compactor) selectDirs(ds []dirMeta) []dirMeta {
// The way to skip compaction is to not have blockRanges.
if len(c.opts.blockRanges) == 1 {
if len(c.opts.blockRanges) < 2 || len(ds) < 1 {
return nil
}

return selectRecurse(ds, c.opts.blockRanges)
}

func selectRecurse(dms []dirMeta, intervals []int64) []dirMeta {
if len(intervals) == 0 {
return dms
}
highTime := ds[len(ds)-1].meta.MinTime

// Get the blocks by the max interval
blocks := splitByRange(dms, intervals[len(intervals)-1])
dirs := []dirMeta{}
for i := len(blocks) - 1; i >= 0; i-- {
// We need to choose the oldest blocks to compact. If there are a couple of blocks in
// the largest interval, we should compact those first.
if len(blocks[i]) > 1 {
dirs = blocks[i]
break
for _, iv := range c.opts.blockRanges[1:] {
parts := splitByRange(ds, iv)
if len(parts) == 0 {
continue
}
}

// If there are too many blocks, see if a smaller interval will catch them.
// i.e, if we have 0-20, 60-80, 80-100; all fall under 0-240, but we'd rather compact 60-100
// than all at once.
// Again if have 0-1d, 1d-2d, 3-6d we compact 0-1d, 1d-2d to compact it into the 0-3d block instead of compacting all three
// This is to honor the boundaries as much as possible.
if len(dirs) > 2 {
smallerDirs := selectRecurse(dirs, intervals[:len(intervals)-1])
if len(smallerDirs) > 1 {
return smallerDirs
for _, p := range parts {
mint := p[0].meta.MinTime
maxt := p[len(p)-1].meta.MaxTime
// Pick the range of blocks if it spans the full range (potentially with gaps)
// or is before the most recent block.
// This ensures we don't compact blocks prematurely when another one of the same
// size still fits in the range.
if (maxt-mint == iv || maxt <= highTime) && len(p) > 1 {
return p
}
}
}

return dirs
return nil
}

// splitByRange splits the directories by the time range.
// for example if we have blocks 0-10, 10-20, 50-60, 90-100 and want to split them into 30 interval ranges
// splitByRange returns [0-10, 10-20], [50-60], [90-100].
// splitByRange splits the directories by the time range. The range sequence starts at 0.
//
// For example, if we have blocks [0-10, 10-20, 50-60, 90-100] and the split range tr is 30
// it returns [0-10, 10-20], [50-60], [90-100].
func splitByRange(ds []dirMeta, tr int64) [][]dirMeta {
var splitDirs [][]dirMeta

for i := 0; i < len(ds); {
var group []dirMeta
var t0 int64
m := ds[i].meta
var (
group []dirMeta
t0 int64
m = ds[i].meta
)
// Compute start of aligned time range of size tr closest to the current block's start.
if m.MinTime >= 0 {
t0 = tr * (m.MinTime / tr)
Expand Down
57 changes: 44 additions & 13 deletions compact_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,22 @@ func TestCompactionSelect(t *testing.T) {
planned: nil,
},
{
// We should wait for a third block of size 20 to appear before compacting
// the existing ones.
blocks: []dirMetaSimple{
{
dir: "1",
tr: []int64{0, 20},
},
{
dir: "2",
tr: []int64{20, 40},
},
},
planned: nil,
},
{
// Block to fill the entire parent range appeared – should be compacted.
blocks: []dirMetaSimple{
{
dir: "1",
Expand All @@ -65,6 +81,25 @@ func TestCompactionSelect(t *testing.T) {
},
planned: [][]string{{"1", "2", "3"}},
},
{
// Block for the next parent range appeared. Nothing will happen in the first one
// anymore and we should compact it.
blocks: []dirMetaSimple{
{
dir: "1",
tr: []int64{0, 20},
},
{
dir: "2",
tr: []int64{20, 40},
},
{
dir: "3",
tr: []int64{60, 80},
},
},
planned: [][]string{{"1", "2"}},
},
{
blocks: []dirMetaSimple{
{
Expand Down Expand Up @@ -92,18 +127,10 @@ func TestCompactionSelect(t *testing.T) {
},
{
blocks: []dirMetaSimple{
{
dir: "1",
tr: []int64{0, 20},
},
{
dir: "2",
tr: []int64{20, 40},
},
{
dir: "3",
tr: []int64{40, 60},
},
{
dir: "4",
tr: []int64{60, 120},
Expand All @@ -121,24 +148,28 @@ func TestCompactionSelect(t *testing.T) {
tr: []int64{1200, 1440},
},
},
planned: [][]string{{"6", "7"}},
planned: [][]string{{"2", "4", "5"}},
},
{
blocks: []dirMetaSimple{
{
dir: "1",
tr: []int64{0, 20},
tr: []int64{0, 60},
},
{
dir: "2",
dir: "4",
tr: []int64{60, 80},
},
{
dir: "3",
dir: "5",
tr: []int64{80, 100},
},
{
dir: "6",
tr: []int64{100, 120},
},
},
planned: [][]string{{"2", "3"}},
planned: [][]string{{"4", "5", "6"}},
},
}

Expand Down
7 changes: 5 additions & 2 deletions db.go
Original file line number Diff line number Diff line change
Expand Up @@ -360,8 +360,11 @@ func (db *DB) completedHeads() (r []headBlock) {
// Add the 2nd last head if the last head is more than 50% filled.
// Compacting it early allows us to free its memory before allocating
// more for the next block and thus reduces spikes.
if h2 := db.heads[len(db.heads)-2]; headFullness(h2) >= 0.5 && h2.ActiveWriters() == 0 {
r = append(r, h2)
h0 := db.heads[len(db.heads)-1]
h1 := db.heads[len(db.heads)-2]

if headFullness(h0) >= 0.5 && h1.ActiveWriters() == 0 {
r = append(r, h1)
}
return r
}
Expand Down

0 comments on commit 1875d05

Please sign in to comment.