From 284a1d97ff5740edb060a47302ae0f0a11dca849 Mon Sep 17 00:00:00 2001 From: mknapphrt <39998367+mknapphrt@users.noreply.github.com> Date: Wed, 16 Jan 2019 05:03:52 -0500 Subject: [PATCH] Added storage size based retention method and new metrics (#343) Added methods needed to retain data based on a byte limitation rather than time. Limitation is only applied if the flag is set (defaults to 0). Both blocks that are older than the retention period and the blocks that make the size of the storage too large are removed. 2 new metrics for keeping track of the size of the local storage folder and the amount of times data has been deleted because the size restriction was exceeded. Signed-off-by: Mark Knapp --- CHANGELOG.md | 4 + block.go | 38 +++++- block_test.go | 4 +- chunks/chunks.go | 24 ++-- compact.go | 2 +- db.go | 280 ++++++++++++++++++++++++++++----------------- db_test.go | 126 +++++++++++++------- index/index.go | 5 + querier_test.go | 4 +- tombstones.go | 36 ++++-- tombstones_test.go | 2 +- 11 files changed, 349 insertions(+), 176 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index af44615e..a4447168 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,9 @@ ## master / unreleased - [CHANGE] New `WALSegmentSize` option to override the `DefaultOptions.WALSegmentSize`. Added to allow using smaller wal files. For example using tmpfs on a RPI to minimise the SD card wear out from the constant WAL writes. As part of this change the `DefaultOptions.WALSegmentSize` constant was also exposed. + - [FEATURE] Size base retention through `Options.MaxBytes`. As part of this change: + - added new metrics - `prometheus_tsdb_storage_blocks_bytes_total`, `prometheus_tsdb_size_retentions_total`, `prometheus_tsdb_time_retentions_total` + - new public interface `SizeReader: Size() int64` + - `OpenBlock` signature changed to take a logger. - [REMOVED] `PrefixMatcher` is considered unused so was removed. - [CLEANUP] `Options.WALFlushInterval` is removed as it wasn't used anywhere. diff --git a/block.go b/block.go index e5a66bd9..837f4a76 100644 --- a/block.go +++ b/block.go @@ -21,6 +21,8 @@ import ( "path/filepath" "sync" + "github.com/go-kit/kit/log" + "github.com/go-kit/kit/log/level" "github.com/oklog/ulid" "github.com/pkg/errors" "github.com/prometheus/tsdb/chunkenc" @@ -140,6 +142,12 @@ type Appendable interface { Appender() Appender } +// SizeReader returns the size of the object in bytes. +type SizeReader interface { + // Size returns the size in bytes. + Size() int64 +} + // BlockMeta provides meta information about a block. type BlockMeta struct { // Unique identifier for the block and its contents. Changes on compaction. @@ -166,6 +174,7 @@ type BlockStats struct { NumSeries uint64 `json:"numSeries,omitempty"` NumChunks uint64 `json:"numChunks,omitempty"` NumTombstones uint64 `json:"numTombstones,omitempty"` + NumBytes int64 `json:"numBytes,omitempty"` } // BlockDesc describes a block by ULID and time range. @@ -257,7 +266,10 @@ type Block struct { // OpenBlock opens the block in the directory. It can be passed a chunk pool, which is used // to instantiate chunk structs. -func OpenBlock(dir string, pool chunkenc.Pool) (*Block, error) { +func OpenBlock(logger log.Logger, dir string, pool chunkenc.Pool) (*Block, error) { + if logger == nil { + logger = log.NewNopLogger() + } meta, err := readMetaFile(dir) if err != nil { return nil, err @@ -272,11 +284,20 @@ func OpenBlock(dir string, pool chunkenc.Pool) (*Block, error) { return nil, err } - tr, err := readTombstones(dir) + tr, tsr, err := readTombstones(dir) if err != nil { return nil, err } + // TODO refactor to set this at block creation time as + // that would be the logical place for a block size to be calculated. + bs := blockSize(cr, ir, tsr) + meta.Stats.NumBytes = bs + err = writeMetaFile(dir, meta) + if err != nil { + level.Warn(logger).Log("msg", "couldn't write the meta file for the block size", "block", dir, "err", err) + } + pb := &Block{ dir: dir, meta: *meta, @@ -288,6 +309,16 @@ func OpenBlock(dir string, pool chunkenc.Pool) (*Block, error) { return pb, nil } +func blockSize(rr ...SizeReader) int64 { + var total int64 + for _, r := range rr { + if r != nil { + total += r.Size() + } + } + return total +} + // Close closes the on-disk block. It blocks as long as there are readers reading from the block. func (pb *Block) Close() error { pb.mtx.Lock() @@ -315,6 +346,9 @@ func (pb *Block) Dir() string { return pb.dir } // Meta returns meta information about the block. func (pb *Block) Meta() BlockMeta { return pb.meta } +// Size returns the number of bytes that the block takes up. +func (pb *Block) Size() int64 { return pb.meta.Stats.NumBytes } + // ErrClosing is returned when a block is in the process of being closed. var ErrClosing = errors.New("block is closing") diff --git a/block_test.go b/block_test.go index cf871606..789aebaa 100644 --- a/block_test.go +++ b/block_test.go @@ -46,14 +46,14 @@ func TestSetCompactionFailed(t *testing.T) { defer os.RemoveAll(tmpdir) blockDir := createBlock(t, tmpdir, 0, 0, 0) - b, err := OpenBlock(blockDir, nil) + b, err := OpenBlock(nil, blockDir, nil) testutil.Ok(t, err) testutil.Equals(t, false, b.meta.Compaction.Failed) testutil.Ok(t, b.setCompactionFailed()) testutil.Equals(t, true, b.meta.Compaction.Failed) testutil.Ok(t, b.Close()) - b, err = OpenBlock(blockDir, nil) + b, err = OpenBlock(nil, blockDir, nil) testutil.Ok(t, err) testutil.Equals(t, true, b.meta.Compaction.Failed) testutil.Ok(t, b.Close()) diff --git a/chunks/chunks.go b/chunks/chunks.go index 569aeddc..8fb28838 100644 --- a/chunks/chunks.go +++ b/chunks/chunks.go @@ -285,17 +285,15 @@ func (b realByteSlice) Sub(start, end int) ByteSlice { // Reader implements a SeriesReader for a serialized byte stream // of series data. type Reader struct { - // The underlying bytes holding the encoded series data. - bs []ByteSlice - - // Closers for resources behind the byte slices. - cs []io.Closer - + bs []ByteSlice // The underlying bytes holding the encoded series data. + cs []io.Closer // Closers for resources behind the byte slices. + size int64 // The total size of bytes in the reader. pool chunkenc.Pool } func newReader(bs []ByteSlice, cs []io.Closer, pool chunkenc.Pool) (*Reader, error) { cr := Reader{pool: pool, bs: bs, cs: cs} + var totalSize int64 for i, b := range cr.bs { if b.Len() < 4 { @@ -305,7 +303,9 @@ func newReader(bs []ByteSlice, cs []io.Closer, pool chunkenc.Pool) (*Reader, err if m := binary.BigEndian.Uint32(b.Range(0, 4)); m != MagicChunks { return nil, errors.Errorf("invalid magic number %x", m) } + totalSize += int64(b.Len()) } + cr.size = totalSize return &cr, nil } @@ -328,9 +328,10 @@ func NewDirReader(dir string, pool chunkenc.Pool) (*Reader, error) { pool = chunkenc.NewPool() } - var bs []ByteSlice - var cs []io.Closer - + var ( + bs []ByteSlice + cs []io.Closer + ) for _, fn := range files { f, err := fileutil.OpenMmapFile(fn) if err != nil { @@ -346,6 +347,11 @@ func (s *Reader) Close() error { return closeAll(s.cs...) } +// Size returns the size of the chunks. +func (s *Reader) Size() int64 { + return s.size +} + func (s *Reader) Chunk(ref uint64) (chunkenc.Chunk, error) { var ( seq = int(ref >> 32) diff --git a/compact.go b/compact.go index e11a83ec..cae825e1 100644 --- a/compact.go +++ b/compact.go @@ -347,7 +347,7 @@ func (c *LeveledCompactor) Compact(dest string, dirs []string, open []*Block) (u if b == nil { var err error - b, err = OpenBlock(d, c.chunkPool) + b, err = OpenBlock(c.logger, d, c.chunkPool) if err != nil { return uid, err } diff --git a/db.go b/db.go index 43cfadd4..63070043 100644 --- a/db.go +++ b/db.go @@ -58,6 +58,13 @@ type Options struct { // Duration of persisted data to keep. RetentionDuration uint64 + // Maximum number of bytes in blocks to be retained. + // 0 or less means disabled. + // NOTE: For proper storage calculations need to consider + // the size of the WAL folder which is not added when calculating + // the current size of the database. + MaxBytes int64 + // The sizes of the Blocks. BlockRanges []int64 @@ -127,11 +134,12 @@ type dbMetrics struct { reloads prometheus.Counter reloadsFailed prometheus.Counter compactionsTriggered prometheus.Counter + timeRetentionCount prometheus.Counter compactionsSkipped prometheus.Counter - cutoffs prometheus.Counter - cutoffsFailed prometheus.Counter startTime prometheus.GaugeFunc tombCleanTimer prometheus.Histogram + blocksBytes prometheus.Gauge + sizeRetentionCount prometheus.Counter } func newDBMetrics(db *DB, r prometheus.Registerer) *dbMetrics { @@ -170,18 +178,14 @@ func newDBMetrics(db *DB, r prometheus.Registerer) *dbMetrics { Name: "prometheus_tsdb_compactions_triggered_total", Help: "Total number of triggered compactions for the partition.", }) + m.timeRetentionCount = prometheus.NewCounter(prometheus.CounterOpts{ + Name: "prometheus_tsdb_time_retentions_total", + Help: "The number of times that blocks were deleted because the maximum time limit was exceeded.", + }) m.compactionsSkipped = prometheus.NewCounter(prometheus.CounterOpts{ Name: "prometheus_tsdb_compactions_skipped_total", Help: "Total number of skipped compactions due to disabled auto compaction.", }) - m.cutoffs = prometheus.NewCounter(prometheus.CounterOpts{ - Name: "prometheus_tsdb_retention_cutoffs_total", - Help: "Number of times the database cut off block data from disk.", - }) - m.cutoffsFailed = prometheus.NewCounter(prometheus.CounterOpts{ - Name: "prometheus_tsdb_retention_cutoffs_failures_total", - Help: "Number of times the database failed to cut off block data from disk.", - }) m.startTime = prometheus.NewGaugeFunc(prometheus.GaugeOpts{ Name: "prometheus_tsdb_lowest_timestamp", Help: "Lowest timestamp value stored in the database. The unit is decided by the library consumer.", @@ -197,6 +201,14 @@ func newDBMetrics(db *DB, r prometheus.Registerer) *dbMetrics { Name: "prometheus_tsdb_tombstone_cleanup_seconds", Help: "The time taken to recompact blocks to remove tombstones.", }) + m.blocksBytes = prometheus.NewGauge(prometheus.GaugeOpts{ + Name: "prometheus_tsdb_storage_blocks_bytes_total", + Help: "The number of bytes that are currently used for local storage by all blocks.", + }) + m.sizeRetentionCount = prometheus.NewCounter(prometheus.CounterOpts{ + Name: "prometheus_tsdb_size_retentions_total", + Help: "The number of times that blocks were deleted because the maximum number of bytes was exceeded.", + }) if r != nil { r.MustRegister( @@ -204,11 +216,12 @@ func newDBMetrics(db *DB, r prometheus.Registerer) *dbMetrics { m.symbolTableSize, m.reloads, m.reloadsFailed, - m.cutoffs, - m.cutoffsFailed, + m.timeRetentionCount, m.compactionsTriggered, m.startTime, m.tombCleanTimer, + m.blocksBytes, + m.sizeRetentionCount, ) } return m @@ -340,25 +353,6 @@ func (db *DB) run() { } } -func (db *DB) beyondRetention(meta *BlockMeta) bool { - if db.opts.RetentionDuration == 0 { - return false - } - - db.mtx.RLock() - blocks := db.blocks[:] - db.mtx.RUnlock() - - if len(blocks) == 0 { - return false - } - - last := blocks[len(db.blocks)-1] - mint := last.Meta().MaxTime - int64(db.opts.RetentionDuration) - - return meta.MaxTime < mint -} - // Appender opens a new appender against the database. func (db *DB) Appender() Appender { return dbAppender{db: db, Appender: db.head.Appender()} @@ -474,8 +468,7 @@ func (db *DB) getBlock(id ulid.ULID) (*Block, bool) { return nil, false } -// reload on-disk blocks and trigger head truncation if new blocks appeared. It takes -// a list of block directories which should be deleted during reload. +// reload blocks and trigger head truncation if new blocks appeared. // Blocks that are obsolete due to replacement or retention will be deleted. func (db *DB) reload() (err error) { defer func() { @@ -485,110 +478,185 @@ func (db *DB) reload() (err error) { db.metrics.reloads.Inc() }() - dirs, err := blockDirs(db.dir) + loadable, corrupted, err := db.openBlocks() if err != nil { - return errors.Wrap(err, "find blocks") - } - // We delete old blocks that have been superseded by new ones by gathering all parents - // from existing blocks. Those parents all have newer replacements and can be safely deleted - // after we loaded the other blocks. - // This makes us resilient against the process crashing towards the end of a compaction. - // Creation of a new block and deletion of its parents cannot happen atomically. By creating - // blocks with their parents, we can pick up the deletion where it left off during a crash. - var ( - blocks []*Block - corrupted = map[ulid.ULID]error{} - opened = map[ulid.ULID]struct{}{} - deleteable = map[ulid.ULID]struct{}{} - ) - for _, dir := range dirs { - meta, err := readMetaFile(dir) - if err != nil { - // The block was potentially in the middle of being deleted during a crash. - // Skip it since we may delete it properly further down again. - level.Warn(db.logger).Log("msg", "read meta information", "err", err, "dir", dir) + return err + } - ulid, err2 := ulid.Parse(filepath.Base(dir)) - if err2 != nil { - level.Error(db.logger).Log("msg", "not a block dir", "dir", dir) - continue - } - corrupted[ulid] = err - continue + deletable := db.deletableBlocks(loadable) + + // Corrupted blocks that have been replaced by parents can be safely ignored and deleted. + // This makes it resilient against the process crashing towards the end of a compaction. + // Creation of a new block and deletion of its parents cannot happen atomically. + // By creating blocks with their parents, we can pick up the deletion where it left off during a crash. + for _, block := range loadable { + for _, b := range block.Meta().Compaction.Parents { + delete(corrupted, b.ULID) + deletable[b.ULID] = nil } - if db.beyondRetention(meta) { - deleteable[meta.ULID] = struct{}{} + } + if len(corrupted) > 0 { + return errors.Wrap(err, "unexpected corrupted block") + } + + // All deletable blocks should not be loaded. + var ( + bb []*Block + blocksSize int64 + ) + for _, block := range loadable { + if _, ok := deletable[block.Meta().ULID]; ok { + deletable[block.Meta().ULID] = block continue } - for _, b := range meta.Compaction.Parents { - deleteable[b.ULID] = struct{}{} - } + bb = append(bb, block) + blocksSize += block.Size() + + } + loadable = bb + db.metrics.blocksBytes.Set(float64(blocksSize)) + + sort.Slice(loadable, func(i, j int) bool { + return loadable[i].Meta().MaxTime < loadable[j].Meta().MaxTime + }) + if err := validateBlockSequence(loadable); err != nil { + return errors.Wrap(err, "invalid block sequence") } - // Blocks we failed to open should all be those we are want to delete anyway. - for c, err := range corrupted { - if _, ok := deleteable[c]; !ok { - return errors.Wrapf(err, "unexpected corrupted block %s", c) + + // Swap new blocks first for subsequently created readers to be seen. + db.mtx.Lock() + oldBlocks := db.blocks + db.blocks = loadable + db.mtx.Unlock() + + for _, b := range oldBlocks { + if _, ok := deletable[b.Meta().ULID]; ok { + deletable[b.Meta().ULID] = b } } - // Load new blocks into memory. + + if err := db.deleteBlocks(deletable); err != nil { + return err + } + + // Garbage collect data in the head if the most recent persisted block + // covers data of its current time range. + if len(loadable) == 0 { + return nil + } + + maxt := loadable[len(loadable)-1].Meta().MaxTime + + return errors.Wrap(db.head.Truncate(maxt), "head truncate failed") +} + +func (db *DB) openBlocks() (blocks []*Block, corrupted map[ulid.ULID]error, err error) { + dirs, err := blockDirs(db.dir) + if err != nil { + return nil, nil, errors.Wrap(err, "find blocks") + } + + corrupted = make(map[ulid.ULID]error) for _, dir := range dirs { meta, err := readMetaFile(dir) if err != nil { - return errors.Wrapf(err, "read meta information %s", dir) - } - // Don't load blocks that are scheduled for deletion. - if _, ok := deleteable[meta.ULID]; ok { + level.Error(db.logger).Log("msg", "not a block dir", "dir", dir) continue } + // See if we already have the block in memory or open it otherwise. - b, ok := db.getBlock(meta.ULID) + block, ok := db.getBlock(meta.ULID) if !ok { - b, err = OpenBlock(dir, db.chunkPool) + block, err = OpenBlock(db.logger, dir, db.chunkPool) if err != nil { - return errors.Wrapf(err, "open block %s", dir) + corrupted[meta.ULID] = err + continue } } - blocks = append(blocks, b) - opened[meta.ULID] = struct{}{} + blocks = append(blocks, block) } + return blocks, corrupted, nil +} + +// deletableBlocks returns all blocks past retention policy. +func (db *DB) deletableBlocks(blocks []*Block) map[ulid.ULID]*Block { + deletable := make(map[ulid.ULID]*Block) + + // Sort the blocks by time - newest to oldest (largest to smallest timestamp). + // This ensures that the retentions will remove the oldest blocks. sort.Slice(blocks, func(i, j int) bool { - return blocks[i].Meta().MinTime < blocks[j].Meta().MinTime + return blocks[i].Meta().MaxTime > blocks[j].Meta().MaxTime }) - if err := validateBlockSequence(blocks); err != nil { - return errors.Wrap(err, "invalid block sequence") + + for ulid, block := range db.beyondTimeRetention(blocks) { + deletable[ulid] = block } - // Swap in new blocks first for subsequently created readers to be seen. - // Then close previous blocks, which may block for pending readers to complete. - db.mtx.Lock() - oldBlocks := db.blocks - db.blocks = blocks - db.mtx.Unlock() + for ulid, block := range db.beyondSizeRetention(blocks) { + deletable[ulid] = block + } - // Drop old blocks from memory. - for _, b := range oldBlocks { - if _, ok := opened[b.Meta().ULID]; ok { - continue + return deletable +} + +func (db *DB) beyondTimeRetention(blocks []*Block) (deleteable map[ulid.ULID]*Block) { + // Time retention is disabled or no blocks to work with. + if len(db.blocks) == 0 || db.opts.RetentionDuration == 0 { + return + } + + deleteable = make(map[ulid.ULID]*Block) + for i, block := range blocks { + // The difference between the first block and this block is larger than + // the retention period so any blocks after that are added as deleteable. + if i > 0 && blocks[0].Meta().MaxTime-block.Meta().MaxTime > int64(db.opts.RetentionDuration) { + for _, b := range blocks[i:] { + deleteable[b.meta.ULID] = b + } + db.metrics.timeRetentionCount.Inc() + break } - if err := b.Close(); err != nil { - level.Warn(db.logger).Log("msg", "closing block failed", "err", err) + } + return deleteable +} + +func (db *DB) beyondSizeRetention(blocks []*Block) (deleteable map[ulid.ULID]*Block) { + // Size retention is disabled or no blocks to work with. + if len(db.blocks) == 0 || db.opts.MaxBytes <= 0 { + return + } + + deleteable = make(map[ulid.ULID]*Block) + blocksSize := int64(0) + for i, block := range blocks { + blocksSize += block.Size() + if blocksSize > db.opts.MaxBytes { + // Add this and all following blocks for deletion. + for _, b := range blocks[i:] { + deleteable[b.meta.ULID] = b + } + db.metrics.sizeRetentionCount.Inc() + break } } - // Delete all obsolete blocks. None of them are opened any longer. - for ulid := range deleteable { + return deleteable +} + +// deleteBlocks closes and deletes blocks from the disk. +// When the map contains a non nil block object it means it is loaded in memory +// so needs to be closed first as it might need to wait for pending readers to complete. +func (db *DB) deleteBlocks(blocks map[ulid.ULID]*Block) error { + for ulid, block := range blocks { + if block != nil { + if err := block.Close(); err != nil { + level.Warn(db.logger).Log("msg", "closing block failed", "err", err) + } + } if err := os.RemoveAll(filepath.Join(db.dir, ulid.String())); err != nil { return errors.Wrapf(err, "delete obsolete block %s", ulid) } } - - // Garbage collect data in the head if the most recent persisted block - // covers data of its current time range. - if len(blocks) == 0 { - return nil - } - maxt := blocks[len(blocks)-1].Meta().MaxTime - - return errors.Wrap(db.head.Truncate(maxt), "head truncate failed") + return nil } // validateBlockSequence returns error if given block meta files indicate that some blocks overlaps within sequence. diff --git a/db_test.go b/db_test.go index 92d487ee..2beef0c5 100644 --- a/db_test.go +++ b/db_test.go @@ -92,6 +92,9 @@ func TestDB_reloadOrder(t *testing.T) { testutil.Ok(t, db.reload()) blocks := db.Blocks() + for _, b := range blocks { + b.meta.Stats.NumBytes = 0 + } testutil.Equals(t, 3, len(blocks)) testutil.Equals(t, metas[1].MinTime, blocks[0].Meta().MinTime) testutil.Equals(t, metas[1].MaxTime, blocks[0].Meta().MaxTime) @@ -834,7 +837,7 @@ func TestTombstoneCleanFail(t *testing.T) { totalBlocks := 2 for i := 0; i < totalBlocks; i++ { blockDir := createBlock(t, db.Dir(), 0, 0, 0) - block, err := OpenBlock(blockDir, nil) + block, err := OpenBlock(nil, blockDir, nil) testutil.Ok(t, err) // Add some some fake tombstones to trigger the compaction. tomb := newMemTombstones() @@ -877,7 +880,7 @@ func (c *mockCompactorFailing) Write(dest string, b BlockReader, mint, maxt int6 return ulid.ULID{}, fmt.Errorf("the compactor already did the maximum allowed blocks so it is time to fail") } - block, err := OpenBlock(createBlock(c.t, dest, 0, 0, 0), nil) + block, err := OpenBlock(nil, createBlock(c.t, dest, 0, 0, 0), nil) testutil.Ok(c.t, err) testutil.Ok(c.t, block.Close()) // Close block as we won't be using anywhere. c.blocks = append(c.blocks, block) @@ -901,59 +904,98 @@ func (*mockCompactorFailing) Compact(dest string, dirs []string, open []*Block) } -func TestDB_Retention(t *testing.T) { - db, close := openTestDB(t, nil) +func TestTimeRetention(t *testing.T) { + db, close := openTestDB(t, &Options{ + BlockRanges: []int64{1000}, + }) defer close() + defer db.Close() - lbls := labels.Labels{labels.Label{Name: "labelname", Value: "labelvalue"}} + blocks := []*BlockMeta{ + {MinTime: 500, MaxTime: 900}, // Oldest block + {MinTime: 1000, MaxTime: 1500}, + {MinTime: 1500, MaxTime: 2000}, // Newest Block + } - app := db.Appender() - _, err := app.Add(lbls, 0, 1) - testutil.Ok(t, err) - testutil.Ok(t, app.Commit()) + for _, m := range blocks { + createBlock(t, db.Dir(), 10, m.MinTime, m.MaxTime) + } - // create snapshot to make it create a block. - // TODO(gouthamve): Add a method to compact headblock. - snap, err := ioutil.TempDir("", "snap") - testutil.Ok(t, err) + testutil.Ok(t, db.reload()) // Reload the db to register the new blocks. + testutil.Equals(t, len(blocks), len(db.Blocks())) // Ensure all blocks are registered. - defer os.RemoveAll(snap) - testutil.Ok(t, db.Snapshot(snap, true)) - testutil.Ok(t, db.Close()) + db.opts.RetentionDuration = uint64(blocks[2].MaxTime - blocks[1].MinTime) + testutil.Ok(t, db.reload()) - // reopen DB from snapshot - db, err = Open(snap, nil, nil, nil) - testutil.Ok(t, err) + expBlocks := blocks[1:] + actBlocks := db.Blocks() - testutil.Equals(t, 1, len(db.blocks)) + testutil.Equals(t, 1, int(prom_testutil.ToFloat64(db.metrics.timeRetentionCount)), "metric retention count mismatch") + testutil.Equals(t, len(expBlocks), len(actBlocks)) + testutil.Equals(t, expBlocks[0].MaxTime, actBlocks[0].meta.MaxTime) + testutil.Equals(t, expBlocks[len(expBlocks)-1].MaxTime, actBlocks[len(actBlocks)-1].meta.MaxTime) +} - app = db.Appender() - _, err = app.Add(lbls, 100, 1) - testutil.Ok(t, err) - testutil.Ok(t, app.Commit()) +func TestSizeRetention(t *testing.T) { + db, close := openTestDB(t, &Options{ + BlockRanges: []int64{100}, + }) + defer close() + defer db.Close() - // Snapshot again to create another block. - snap, err = ioutil.TempDir("", "snap") - testutil.Ok(t, err) - defer os.RemoveAll(snap) + blocks := []*BlockMeta{ + {MinTime: 100, MaxTime: 200}, // Oldest block + {MinTime: 200, MaxTime: 300}, + {MinTime: 300, MaxTime: 400}, + {MinTime: 400, MaxTime: 500}, + {MinTime: 500, MaxTime: 600}, // Newest Block + } - testutil.Ok(t, db.Snapshot(snap, true)) - testutil.Ok(t, db.Close()) + for _, m := range blocks { + createBlock(t, db.Dir(), 100, m.MinTime, m.MaxTime) + } - // reopen DB from snapshot - db, err = Open(snap, nil, nil, &Options{ - RetentionDuration: 10, - BlockRanges: []int64{50}, - }) - testutil.Ok(t, err) - defer db.Close() + // Test that registered size matches the actual disk size. + testutil.Ok(t, db.reload()) // Reload the db to register the new db size. + testutil.Equals(t, len(blocks), len(db.Blocks())) // Ensure all blocks are registered. + expSize := int64(prom_testutil.ToFloat64(db.metrics.blocksBytes)) // Use the the actual internal metrics. + actSize := dbDiskSize(db.Dir()) + testutil.Equals(t, expSize, actSize, "registered size doesn't match actual disk size") + + // Decrease the max bytes limit so that a delete is triggered. + // Check total size, total count and check that the oldest block was deleted. + firstBlockSize := db.Blocks()[0].Size() + sizeLimit := actSize - firstBlockSize + db.opts.MaxBytes = sizeLimit // Set the new db size limit one block smaller that the actual size. + testutil.Ok(t, db.reload()) // Reload the db to register the new db size. + + expBlocks := blocks[1:] + actBlocks := db.Blocks() + expSize = int64(prom_testutil.ToFloat64(db.metrics.blocksBytes)) + actRetentCount := int(prom_testutil.ToFloat64(db.metrics.sizeRetentionCount)) + actSize = dbDiskSize(db.Dir()) + + testutil.Equals(t, 1, actRetentCount, "metric retention count mismatch") + testutil.Equals(t, actSize, expSize, "metric db size doesn't match actual disk size") + testutil.Assert(t, expSize <= sizeLimit, "actual size (%v) is expected to be less than or equal to limit (%v)", expSize, sizeLimit) + testutil.Equals(t, len(blocks)-1, len(actBlocks), "new block count should be decreased from:%v to:%v", len(blocks), len(blocks)-1) + testutil.Equals(t, expBlocks[0].MaxTime, actBlocks[0].meta.MaxTime, "maxT mismatch of the first block") + testutil.Equals(t, expBlocks[len(expBlocks)-1].MaxTime, actBlocks[len(actBlocks)-1].meta.MaxTime, "maxT mismatch of the last block") - testutil.Equals(t, 2, len(db.blocks)) +} - // Reload blocks, which should drop blocks beyond the retention boundary. - testutil.Ok(t, db.reload()) - testutil.Equals(t, 1, len(db.blocks)) - testutil.Equals(t, int64(100), db.blocks[0].meta.MaxTime) // To verify its the right block. +func dbDiskSize(dir string) int64 { + var statSize int64 + filepath.Walk(dir, func(path string, info os.FileInfo, err error) error { + // Include only index,tombstone and chunks. + if filepath.Dir(path) == chunkDir(filepath.Dir(filepath.Dir(path))) || + info.Name() == indexFilename || + info.Name() == tombstoneFilename { + statSize += info.Size() + } + return nil + }) + return statSize } func TestNotMatcherSelectsLabelsUnsetSeries(t *testing.T) { diff --git a/index/index.go b/index/index.go index cce29d9b..74e08d46 100644 --- a/index/index.go +++ b/index/index.go @@ -914,6 +914,11 @@ func (r *Reader) SortedPostings(p Postings) Postings { return p } +// Size returns the size of an index file. +func (r *Reader) Size() int64 { + return int64(r.b.Len()) +} + // LabelNames returns all the unique label names present in the index. func (r *Reader) LabelNames() ([]string, error) { labelNamesMap := make(map[string]struct{}, len(r.labels)) diff --git a/querier_test.go b/querier_test.go index 79dfbff7..69ca8460 100644 --- a/querier_test.go +++ b/querier_test.go @@ -1227,12 +1227,12 @@ func BenchmarkMergedSeriesSet(b *testing.B) { func BenchmarkPersistedQueries(b *testing.B) { for _, nSeries := range []int{10, 100} { - for _, nSamples := range []int{1000, 10000, 100000} { + for _, nSamples := range []int64{1000, 10000, 100000} { b.Run(fmt.Sprintf("series=%d,samplesPerSeries=%d", nSeries, nSamples), func(b *testing.B) { dir, err := ioutil.TempDir("", "bench_persisted") testutil.Ok(b, err) defer os.RemoveAll(dir) - block, err := OpenBlock(createBlock(b, dir, nSeries, 1, int64(nSamples)), nil) + block, err := OpenBlock(nil, createBlock(b, dir, nSeries, 1, int64(nSamples)), nil) testutil.Ok(b, err) defer block.Close() diff --git a/tombstones.go b/tombstones.go index a1f30b59..07814040 100644 --- a/tombstones.go +++ b/tombstones.go @@ -113,37 +113,41 @@ type Stone struct { intervals Intervals } -func readTombstones(dir string) (TombstoneReader, error) { +func readTombstones(dir string) (TombstoneReader, SizeReader, error) { b, err := ioutil.ReadFile(filepath.Join(dir, tombstoneFilename)) if os.IsNotExist(err) { - return newMemTombstones(), nil + return newMemTombstones(), nil, nil } else if err != nil { - return nil, err + return nil, nil, err + } + + sr := &TombstoneFile{ + size: int64(len(b)), } if len(b) < 5 { - return nil, errors.Wrap(errInvalidSize, "tombstones header") + return nil, sr, errors.Wrap(errInvalidSize, "tombstones header") } d := &decbuf{b: b[:len(b)-4]} // 4 for the checksum. if mg := d.be32(); mg != MagicTombstone { - return nil, fmt.Errorf("invalid magic number %x", mg) + return nil, sr, fmt.Errorf("invalid magic number %x", mg) } if flag := d.byte(); flag != tombstoneFormatV1 { - return nil, fmt.Errorf("invalid tombstone format %x", flag) + return nil, sr, fmt.Errorf("invalid tombstone format %x", flag) } if d.err() != nil { - return nil, d.err() + return nil, sr, d.err() } // Verify checksum. hash := newCRC32() if _, err := hash.Write(d.get()); err != nil { - return nil, errors.Wrap(err, "write to hash") + return nil, sr, errors.Wrap(err, "write to hash") } if binary.BigEndian.Uint32(b[len(b)-4:]) != hash.Sum32() { - return nil, errors.New("checksum did not match") + return nil, sr, errors.New("checksum did not match") } stonesMap := newMemTombstones() @@ -153,13 +157,13 @@ func readTombstones(dir string) (TombstoneReader, error) { mint := d.varint64() maxt := d.varint64() if d.err() != nil { - return nil, d.err() + return nil, sr, d.err() } stonesMap.addInterval(k, Interval{mint, maxt}) } - return stonesMap, nil + return stonesMap, sr, nil } type memTombstones struct { @@ -210,6 +214,16 @@ func (t *memTombstones) addInterval(ref uint64, itvs ...Interval) { } } +// TombstoneFile holds information about the tombstone file. +type TombstoneFile struct { + size int64 +} + +// Size returns the tombstone file size. +func (t *TombstoneFile) Size() int64 { + return t.size +} + func (*memTombstones) Close() error { return nil } diff --git a/tombstones_test.go b/tombstones_test.go index e12574f1..2a106d70 100644 --- a/tombstones_test.go +++ b/tombstones_test.go @@ -46,7 +46,7 @@ func TestWriteAndReadbackTombStones(t *testing.T) { testutil.Ok(t, writeTombstoneFile(tmpdir, stones)) - restr, err := readTombstones(tmpdir) + restr, _, err := readTombstones(tmpdir) testutil.Ok(t, err) // Compare the two readers.