Skip to content

Commit

Permalink
tsdb: Fix (almost) all tests
Browse files Browse the repository at this point in the history
  • Loading branch information
brancz committed Nov 27, 2020
1 parent d202624 commit 173f6e8
Show file tree
Hide file tree
Showing 14 changed files with 70 additions and 56 deletions.
34 changes: 18 additions & 16 deletions tsdb/block_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,7 @@ func TestCorruptedChunk(t *testing.T) {
testutil.Ok(t, err)
testutil.Ok(t, f.Truncate(fi.Size()-1))
},
iterErr: errors.New("cannot populate chunk 8: segment doesn't include enough bytes to read the chunk - required:26, available:25"),
iterErr: errors.New("cannot populate chunk 8: segment doesn't include enough bytes to read the chunk - required:19, available:18"),
},
{
name: "checksum mismatch",
Expand All @@ -166,7 +166,7 @@ func TestCorruptedChunk(t *testing.T) {
testutil.Ok(t, err)
testutil.Equals(t, n, 1)
},
iterErr: errors.New("cannot populate chunk 8: checksum mismatch expected:cfc0526c, actual:34815eae"),
iterErr: errors.New("cannot populate chunk 8: checksum mismatch expected:11347d90, actual:28fdbbe0"),
},
} {
t.Run(tc.name, func(t *testing.T) {
Expand Down Expand Up @@ -267,19 +267,21 @@ func TestBlockSize(t *testing.T) {

func TestReadIndexFormatV1(t *testing.T) {
/* The block here was produced at the commit
706602daed1487f7849990678b4ece4599745905 used in 2.0.0 with:
db, _ := Open("v1db", nil, nil, nil)
app := db.Appender()
app.Add(labels.FromStrings("foo", "bar"), 1, 2)
app.Add(labels.FromStrings("foo", "baz"), 3, 4)
app.Add(labels.FromStrings("foo", "meh"), 1000*3600*4, 4) // Not in the block.
// Make sure we've enough values for the lack of sorting of postings offsets to show up.
for i := 0; i < 100; i++ {
app.Add(labels.FromStrings("bar", strconv.FormatInt(int64(i), 10)), 0, 0)
}
app.Commit()
db.compact()
db.Close()
github.com/conprof/db @ d202624dc72c95bfeb2a97d711709cfb7e4424cd:
{
db, _ := Open(filepath.Join("testdata", "index_format_v1"), nil, nil, nil)
app := db.Appender(context.Background())
app.Add(labels.FromStrings("foo", "bar"), 1, []byte("2"))
app.Add(labels.FromStrings("foo", "baz"), 3, []byte("4"))
app.Add(labels.FromStrings("foo", "meh"), 1000*3600*4, []byte("4")) // Not in the block.
// Make sure we've enough values for the lack of sorting of postings offsets to show up.
for i := 0; i < 100; i++ {
app.Add(labels.FromStrings("bar", strconv.FormatInt(int64(i), 10)), 0, []byte("0"))
}
app.Commit()
db.Compact()
db.Close()
}
*/

blockDir := filepath.Join("testdata", "index_format_v1")
Expand Down Expand Up @@ -368,8 +370,8 @@ func genSeries(totalSeries, labelCount int, mint, maxt int64) []storage.Series {
lbls[defaultLabelName+strconv.Itoa(j)] = defaultLabelValue + strconv.Itoa(j)
}
samples := make([]tsdbutil.Sample, 0, maxt-mint+1)
val := make([]byte, 4)
for t := mint; t < maxt; t++ {
val := make([]byte, 4)
rand.Read(val)
samples = append(samples, sample{t: t, v: val})
}
Expand Down
22 changes: 12 additions & 10 deletions tsdb/chunkenc/byte.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,19 +94,21 @@ func (c *BytesChunk) Compact() {

// Appender implements the Chunk interface.
func (c *BytesChunk) Appender() (Appender, error) {
// it := c.iterator()
it := c.iterator(nil)

// // To get an appender we must know the state it would have if we had
// // appended all existing data from scratch.
// // We iterate through the end and populate via the iterator's state.
// for it.Next() {
// }
// if err := it.Err(); err != nil {
// return nil, err
// }
// To get an appender we must know the state it would have if we had
// appended all existing data from scratch.
// We iterate through the end and populate via the iterator's state.
for it.Next() {
}
if err := it.Err(); err != nil {
return nil, err
}

a := &bytesAppender{
b: c,
b: c,
t: it.t,
tDelta: it.tDelta,
}
return a, nil
}
Expand Down
11 changes: 7 additions & 4 deletions tsdb/chunkenc/chunk.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,8 @@ func (e Encoding) String() string {
// The different available chunk encodings.
const (
EncNone Encoding = iota
EncXOR
EncBytes
EncXOR
)

// Chunk holds a sequence of sample pairs that can be iterated over and appended to.
Expand Down Expand Up @@ -129,7 +129,8 @@ func NewPool() Pool {

func (p *pool) Get(e Encoding, b []byte) (Chunk, error) {
switch e {
//case EncXOR:
case EncXOR:
panic("nothing should be using XOR encoding")
// c := p.xor.Get().(*XORChunk)
// c.b.stream = b
// c.b.count = 0
Expand All @@ -144,7 +145,8 @@ func (p *pool) Get(e Encoding, b []byte) (Chunk, error) {

func (p *pool) Put(c Chunk) error {
switch c.Encoding() {
//case EncXOR:
case EncXOR:
panic("nothing should be using XOR encoding")
// xc, ok := c.(*XORChunk)
// // This may happen often with wrapped chunks. Nothing we can really do about
// // it but returning an error would cause a lot of allocations again. Thus,
Expand Down Expand Up @@ -176,7 +178,8 @@ func (p *pool) Put(c Chunk) error {
// bytes.
func FromData(e Encoding, d []byte) (Chunk, error) {
switch e {
//case EncXOR:
case EncXOR:
panic("nothing should be using XOR encoding")
// return &XORChunk{b: bstream{count: 0, stream: d}}, nil
case EncBytes:
return &BytesChunk{b: d}, nil
Expand Down
10 changes: 5 additions & 5 deletions tsdb/db_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -496,7 +496,7 @@ func TestSkippingInvalidValuesInSameTxn(t *testing.T) {
ssMap := query(t, q, labels.MustNewMatcher(labels.MatchEqual, "a", "b"))

testutil.Equals(t, map[string][]tsdbutil.Sample{
labels.New(labels.Label{Name: "a", Value: "b"}).String(): {sample{0, []byte("0")}},
labels.New(labels.Label{Name: "a", Value: "b"}).String(): {sample{0, []byte("1")}},
}, ssMap)

// Append Out of Order Value.
Expand All @@ -513,7 +513,7 @@ func TestSkippingInvalidValuesInSameTxn(t *testing.T) {
ssMap = query(t, q, labels.MustNewMatcher(labels.MatchEqual, "a", "b"))

testutil.Equals(t, map[string][]tsdbutil.Sample{
labels.New(labels.Label{Name: "a", Value: "b"}).String(): {sample{0, []byte("0")}, sample{10, []byte("10")}},
labels.New(labels.Label{Name: "a", Value: "b"}).String(): {sample{0, []byte("1")}, sample{10, []byte("3")}},
}, ssMap)
}

Expand Down Expand Up @@ -562,7 +562,7 @@ func TestDB_Snapshot(t *testing.T) {
}
testutil.Ok(t, seriesSet.Err())
testutil.Equals(t, 0, len(seriesSet.Warnings()))
testutil.Equals(t, strings.Repeat("1", 1000), sum)
testutil.Equals(t, []byte(strings.Repeat("1", 1000)), sum)
}

// TestDB_Snapshot_ChunksOutsideOfCompactedRange ensures that a snapshot removes chunks samples
Expand Down Expand Up @@ -616,7 +616,7 @@ func TestDB_Snapshot_ChunksOutsideOfCompactedRange(t *testing.T) {
testutil.Equals(t, 0, len(seriesSet.Warnings()))

// Since we snapshotted with MaxTime - 10, so expect 10 less samples.
testutil.Equals(t, strings.Repeat("1", 1000-10), sum)
testutil.Equals(t, []byte(strings.Repeat("1", 1000-10)), sum)
}

func TestDB_SnapshotWithDelete(t *testing.T) {
Expand Down Expand Up @@ -2238,7 +2238,7 @@ func TestDBReadOnly_FlushWAL(t *testing.T) {
}
testutil.Ok(t, seriesSet.Err())
testutil.Equals(t, 0, len(seriesSet.Warnings()))
testutil.Equals(t, 1000.0, sum)
testutil.Equals(t, []byte(strings.Repeat("1.0", 1000)), sum)
}

func TestDBCannotSeePartialCommits(t *testing.T) {
Expand Down
4 changes: 2 additions & 2 deletions tsdb/head.go
Original file line number Diff line number Diff line change
Expand Up @@ -632,7 +632,7 @@ Outer:
}

if unknownRefs.Load() > 0 {
level.Warn(h.logger).Log("msg", "Unknown series references", "count", unknownRefs)
level.Warn(h.logger).Log("msg", "Unknown series references", "count", unknownRefs.Load())
}
return nil
}
Expand Down Expand Up @@ -2098,7 +2098,7 @@ func (s *memSeries) append(t int64, v []byte, appendID uint64, chunkDiskMapper *
// Based on Gorilla white papers this offers near-optimal compression ratio
// so anything bigger that this has diminishing returns and increases
// the time range within which we have to decompress all samples.
const samplesPerChunk = 10
const samplesPerChunk = 12

c := s.head()

Expand Down
12 changes: 6 additions & 6 deletions tsdb/head_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -312,10 +312,10 @@ func TestHead_WALMultiRef(t *testing.T) {
testutil.Ok(t, err)
series := query(t, q, labels.MustNewMatcher(labels.MatchEqual, "foo", "bar"))
testutil.Equals(t, map[string][]tsdbutil.Sample{`{foo="bar"}`: {
sample{100, []byte("100")},
sample{1500, []byte("1500")},
sample{1700, []byte("1700")},
sample{2000, []byte("2000")},
sample{100, []byte("1")},
sample{1500, []byte("2")},
sample{1700, []byte("3")},
sample{2000, []byte("4")},
}}, series)
}

Expand Down Expand Up @@ -669,7 +669,7 @@ func TestDeleteUntilCurMax(t *testing.T) {
it = exps.Iterator()
resSamples, err := storage.ExpandSamples(it, newSample)
testutil.Ok(t, err)
testutil.Equals(t, []tsdbutil.Sample{sample{11, []byte("11")}}, resSamples)
testutil.Equals(t, []tsdbutil.Sample{sample{11, []byte("1")}}, resSamples)
for res.Next() {
}
testutil.Ok(t, res.Err())
Expand Down Expand Up @@ -1002,7 +1002,7 @@ func TestMemSeries_append(t *testing.T) {
for i, c := range s.mmappedChunks[1:] {
chk, err := chunkDiskMapper.Chunk(c.ref)
testutil.Ok(t, err)
testutil.Assert(t, chk.NumSamples() > 100, "unexpected small chunk %d of length %d", i, chk.NumSamples())
testutil.Assert(t, chk.NumSamples() > 10, "unexpected small chunk %d of length %d", i, chk.NumSamples())
}
}

Expand Down
2 changes: 1 addition & 1 deletion tsdb/index/index_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -373,7 +373,7 @@ func TestPersistence_index_e2e(t *testing.T) {
MinTime: int64(j * 10000),
MaxTime: int64((j + 1) * 10000),
Ref: rand.Uint64(),
Chunk: chunkenc.NewXORChunk(),
Chunk: chunkenc.NewBytesChunk(),
})
}
input = append(input, &indexWriterSeries{
Expand Down
8 changes: 4 additions & 4 deletions tsdb/querier_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -864,7 +864,7 @@ func TestPopulateWithDelSeriesIterator_DoubleSeek(t *testing.T) {
testutil.Assert(t, it.Seek(2), "")
ts, v := it.At()
testutil.Equals(t, int64(2), ts)
testutil.Equals(t, float64(2), v)
testutil.Equals(t, []byte("2"), v)
}

// Regression when seeked chunks were still found via binary search and we always
Expand All @@ -880,12 +880,12 @@ func TestPopulateWithDelSeriesIterator_SeekInCurrentChunk(t *testing.T) {
testutil.Assert(t, it.Next(), "")
ts, v := it.At()
testutil.Equals(t, int64(1), ts)
testutil.Equals(t, float64(2), v)
testutil.Equals(t, []byte("1"), v)

testutil.Assert(t, it.Seek(4), "")
ts, v = it.At()
testutil.Equals(t, int64(5), ts)
testutil.Equals(t, float64(6), v)
testutil.Equals(t, []byte("5"), v)
}

func TestPopulateWithDelSeriesIterator_SeekWithMinTime(t *testing.T) {
Expand Down Expand Up @@ -983,9 +983,9 @@ func TestDeletedIterator(t *testing.T) {
testutil.Ok(t, err)
// Insert random stuff from (0, 1000).
act := make([]sample, 1000)
val := make([]byte, 4)
for i := 0; i < 1000; i++ {
act[i].t = int64(i)
val := make([]byte, 4)
rand.Read(val)
act[i].v = val
app.Append(act[i].t, act[i].v)
Expand Down
5 changes: 4 additions & 1 deletion tsdb/record/record.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,10 @@ func (d *Decoder) Samples(rec []byte, samples []RefSample) ([]RefSample, error)
for len(dec.B) > 0 && dec.Err() == nil {
dref := dec.Varint64()
dtime := dec.Varint64()
val := dec.UvarintBytes()

v := dec.UvarintBytes()
val := make([]byte, len(v))
copy(val, v)

samples = append(samples, RefSample{
Ref: uint64(int64(baseRef) + dref),
Expand Down
Binary file modified tsdb/testdata/index_format_v1/chunks/000001
Binary file not shown.
Binary file modified tsdb/testdata/index_format_v1/index
Binary file not shown.
10 changes: 5 additions & 5 deletions tsdb/testdata/index_format_v1/meta.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
{
"version": 1,
"ulid": "01DXXFZDYD1MQW6079WK0K6EDQ",
"ulid": "01ER4JHEPBGK730T6V3PSASGMD",
"minTime": 0,
"maxTime": 7200000,
"stats": {
Expand All @@ -11,7 +10,8 @@
"compaction": {
"level": 1,
"sources": [
"01DXXFZDYD1MQW6079WK0K6EDQ"
"01ER4JHEPBGK730T6V3PSASGMD"
]
}
}
},
"version": 1
}
4 changes: 3 additions & 1 deletion tsdb/wal.go
Original file line number Diff line number Diff line change
Expand Up @@ -1157,7 +1157,9 @@ func (r *walReader) decodeSamples(flag byte, b []byte, res *[]record.RefSample)
for len(dec.B) > 0 && dec.Err() == nil {
dref := dec.Varint64()
dtime := dec.Varint64()
val := dec.UvarintBytes()
v := dec.UvarintBytes()
val := make([]byte, len(v))
copy(val, v)

*res = append(*res, record.RefSample{
Ref: uint64(int64(baseRef) + dref),
Expand Down
4 changes: 3 additions & 1 deletion tsdb/wal/reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -180,7 +180,9 @@ func (r *Reader) Err() error {
// Record returns the current record. The returned byte slice is only
// valid until the next call to Next.
func (r *Reader) Record() []byte {
return r.rec
rec := make([]byte, len(r.rec))
copy(rec, r.rec)
return rec
}

// Segment returns the current segment being read.
Expand Down

0 comments on commit 173f6e8

Please sign in to comment.