Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Embed fingerprints in TSDB #5591

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 20 additions & 10 deletions pkg/storage/tsdb/index/index.go
Original file line number Diff line number Diff line change
Expand Up @@ -443,6 +443,7 @@ func (w *Writer) AddSeries(ref storage.SeriesRef, lset labels.Labels, chunks ...
}

w.buf2.Reset()
w.buf2.PutBE64(lset.Hash())
w.buf2.PutUvarint(len(lset))

for _, l := range lset {
Expand Down Expand Up @@ -904,6 +905,7 @@ func (w *Writer) writePostingsToTmpFiles() error {
l := d.Uvarint() // Length of this series in bytes.
startLen := d.Len()

_ = d.Be64() // skip fingerprint
// See if label names we want are in the series.
numLabels := d.Uvarint()
for i := 0; i < numLabels; i++ {
Expand Down Expand Up @@ -1625,7 +1627,7 @@ func (r *Reader) LabelValueFor(id storage.SeriesRef, label string) (string, erro
}

// Series reads the series with the given ID and writes its labels and chunks into lbls and chks.
func (r *Reader) Series(id storage.SeriesRef, lbls *labels.Labels, chks *[]ChunkMeta) error {
func (r *Reader) Series(id storage.SeriesRef, lbls *labels.Labels, chks *[]ChunkMeta) (uint64, error) {
offset := id
// In version 2 series IDs are no longer exact references but series are 16-byte padded
// and the ID is the multiple of 16 of the actual position.
Expand All @@ -1634,9 +1636,14 @@ func (r *Reader) Series(id storage.SeriesRef, lbls *labels.Labels, chks *[]Chunk
}
d := encoding.DecWrap(tsdb_enc.NewDecbufUvarintAt(r.b, int(offset), castagnoliTable))
if d.Err() != nil {
return d.Err()
return 0, d.Err()
}

fprint, err := r.dec.Series(d.Get(), lbls, chks)
if err != nil {
return 0, errors.Wrap(err, "read series")
}
return errors.Wrap(r.dec.Series(d.Get(), lbls, chks), "read series")
return fprint, nil
}

func (r *Reader) Postings(name string, values ...string) (Postings, error) {
Expand Down Expand Up @@ -1817,6 +1824,7 @@ func (dec *Decoder) Postings(b []byte) (int, Postings, error) {
// They are returned in the same order they're stored, which should be sorted lexicographically.
func (dec *Decoder) LabelNamesOffsetsFor(b []byte) ([]uint32, error) {
d := encoding.DecWrap(tsdb_enc.Decbuf{B: b})
_ = d.Be64() // skip fingerprint
k := d.Uvarint()

offsets := make([]uint32, k)
Expand All @@ -1835,6 +1843,7 @@ func (dec *Decoder) LabelNamesOffsetsFor(b []byte) ([]uint32, error) {
// LabelValueFor decodes a label for a given series.
func (dec *Decoder) LabelValueFor(b []byte, label string) (string, error) {
d := encoding.DecWrap(tsdb_enc.Decbuf{B: b})
_ = d.Be64() // skip fingerprint
k := d.Uvarint()

for i := 0; i < k; i++ {
Expand Down Expand Up @@ -1864,29 +1873,30 @@ func (dec *Decoder) LabelValueFor(b []byte, label string) (string, error) {
}

// Series decodes a series entry from the given byte slice into lset and chks.
func (dec *Decoder) Series(b []byte, lbls *labels.Labels, chks *[]ChunkMeta) error {
func (dec *Decoder) Series(b []byte, lbls *labels.Labels, chks *[]ChunkMeta) (uint64, error) {
*lbls = (*lbls)[:0]
*chks = (*chks)[:0]

d := encoding.DecWrap(tsdb_enc.Decbuf{B: b})

fprint := d.Be64()
k := d.Uvarint()

for i := 0; i < k; i++ {
lno := uint32(d.Uvarint())
lvo := uint32(d.Uvarint())

if d.Err() != nil {
return errors.Wrap(d.Err(), "read series label offsets")
return 0, errors.Wrap(d.Err(), "read series label offsets")
}

ln, err := dec.LookupSymbol(lno)
if err != nil {
return errors.Wrap(err, "lookup label name")
return 0, errors.Wrap(err, "lookup label name")
}
lv, err := dec.LookupSymbol(lvo)
if err != nil {
return errors.Wrap(err, "lookup label value")
return 0, errors.Wrap(err, "lookup label value")
}

*lbls = append(*lbls, labels.Label{Name: ln, Value: lv})
Expand All @@ -1896,7 +1906,7 @@ func (dec *Decoder) Series(b []byte, lbls *labels.Labels, chks *[]ChunkMeta) err
k = d.Uvarint()

if k == 0 {
return d.Err()
return 0, d.Err()
}

t0 := d.Varint64()
Expand Down Expand Up @@ -1925,7 +1935,7 @@ func (dec *Decoder) Series(b []byte, lbls *labels.Labels, chks *[]ChunkMeta) err
t0 = maxt

if d.Err() != nil {
return errors.Wrapf(d.Err(), "read meta for chunk %d", i)
return 0, errors.Wrapf(d.Err(), "read meta for chunk %d", i)
}

*chks = append(*chks, ChunkMeta{
Expand All @@ -1936,7 +1946,7 @@ func (dec *Decoder) Series(b []byte, lbls *labels.Labels, chks *[]ChunkMeta) err
Entries: entries,
})
}
return d.Err()
return fprint, d.Err()
}

func yoloString(b []byte) string {
Expand Down
7 changes: 4 additions & 3 deletions pkg/storage/tsdb/index/index_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -197,7 +197,7 @@ func TestIndexRW_Postings(t *testing.T) {
var c []ChunkMeta

for i := 0; p.Next(); i++ {
err := ir.Series(p.At(), &l, &c)
_, err := ir.Series(p.At(), &l, &c)

require.NoError(t, err)
require.Equal(t, 0, len(c))
Expand Down Expand Up @@ -312,7 +312,8 @@ func TestPostingsMany(t *testing.T) {
var lbls labels.Labels
var metas []ChunkMeta
for it.Next() {
require.NoError(t, ir.Series(it.At(), &lbls, &metas))
_, err := ir.Series(it.At(), &lbls, &metas)
require.NoError(t, err)
got = append(got, lbls.Get("i"))
}
require.NoError(t, it.Err())
Expand Down Expand Up @@ -419,7 +420,7 @@ func TestPersistence_index_e2e(t *testing.T) {

ref := gotp.At()

err := ir.Series(ref, &lset, &chks)
_, err := ir.Series(ref, &lset, &chks)
require.NoError(t, err)

err = mi.Series(expp.At(), &explset, &expchks)
Expand Down
2 changes: 1 addition & 1 deletion pkg/storage/tsdb/querier.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ type IndexReader interface {
// Series populates the given labels and chunk metas for the series identified
// by the reference.
// Returns storage.ErrNotFound if the ref does not resolve to a known series.
Series(ref storage.SeriesRef, lset *labels.Labels, chks *[]index.ChunkMeta) error
Series(ref storage.SeriesRef, lset *labels.Labels, chks *[]index.ChunkMeta) (uint64, error)

// LabelNames returns all the unique label names present in the index in sorted order.
LabelNames(matchers ...*labels.Matcher) ([]string, error)
Expand Down
6 changes: 4 additions & 2 deletions pkg/storage/tsdb/querier_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,13 +102,15 @@ func TestQueryIndex(t *testing.T) {
)

require.True(t, p.Next())
require.Nil(t, reader.Series(p.At(), &ls, &chks))
_, err = reader.Series(p.At(), &ls, &chks)
require.Nil(t, err)
// the second series should be the first returned as it's lexicographically sorted
// and bazz < foo
require.Equal(t, cases[1].labels.String(), ls.String())
require.Equal(t, cases[1].chunks, chks)
require.True(t, p.Next())
require.Nil(t, reader.Series(p.At(), &ls, &chks))
_, err = reader.Series(p.At(), &ls, &chks)
require.Nil(t, err)
// Now we should encounter the series "added" first.
require.Equal(t, cases[0].labels.String(), ls.String())
require.Equal(t, cases[0].chunks, chks)
Expand Down
15 changes: 4 additions & 11 deletions pkg/storage/tsdb/single_file_index.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,14 +34,11 @@ func (i *TSDBIndex) GetChunkRefs(_ context.Context, userID string, from, through
)

for p.Next() {
if err := i.reader.Series(p.At(), &ls, &chks); err != nil {
fprint, err := i.reader.Series(p.At(), &ls, &chks)
if err != nil {
return nil, err
}

// cache hash calculation across chunks of the same series
// TODO(owen-d): Should we store this in the index? in an in-mem cache?
var hash uint64

// TODO(owen-d): use logarithmic approach
for _, chk := range chks {

Expand All @@ -50,13 +47,9 @@ func (i *TSDBIndex) GetChunkRefs(_ context.Context, userID string, from, through
continue
}

if hash == 0 {
hash = ls.Hash()
}

res = append(res, ChunkRef{
User: userID, // assumed to be the same, will be enforced by caller.
Fingerprint: model.Fingerprint(hash),
Fingerprint: model.Fingerprint(fprint),
Start: chk.From(),
End: chk.Through(),
Checksum: chk.Checksum,
Expand All @@ -79,7 +72,7 @@ func (i *TSDBIndex) Series(_ context.Context, _ string, from, through model.Time
)

for p.Next() {
if err := i.reader.Series(p.At(), &ls, &chks); err != nil {
if _, err := i.reader.Series(p.At(), &ls, &chks); err != nil {
return nil, err
}

Expand Down