Skip to content

Commit

Permalink
Fix issues (1), (2) and (3) described in #328.
Browse files Browse the repository at this point in the history
Signed-off-by: Alin Sinpalean <alin.sinpalean@gmail.com>
  • Loading branch information
free committed May 9, 2018
1 parent d298c5a commit 2c86185
Show file tree
Hide file tree
Showing 2 changed files with 125 additions and 38 deletions.
60 changes: 38 additions & 22 deletions querier.go
Original file line number Diff line number Diff line change
Expand Up @@ -669,7 +669,7 @@ func (s *chunkSeries) Iterator() SeriesIterator {
type SeriesIterator interface {
// Seek advances the iterator forward to the given timestamp.
// If there's no value exactly at t, it advances to the first value
// after t.
// after t. Has no effect if already past t.
Seek(t int64) bool
// At returns the current timestamp/value pair.
At() (t int64, v float64)
Expand Down Expand Up @@ -783,6 +783,9 @@ func newChunkSeriesIterator(cs []chunks.Meta, dranges Intervals, mint, maxt int6

func (it *chunkSeriesIterator) Seek(t int64) (ok bool) {
if t > it.maxt {
it.i = len(it.chunks)
}
if it.cur.Err() != nil || it.i >= len(it.chunks) {
return false
}

Expand All @@ -791,31 +794,48 @@ func (it *chunkSeriesIterator) Seek(t int64) (ok bool) {
t = it.mint
}

for ; it.chunks[it.i].MaxTime < t; it.i++ {
if it.i == len(it.chunks)-1 {
return false
}
// Return early if already past t.
if t0, _ := it.cur.At(); t0 >= t {
return t0 <= it.maxt
}

it.cur = it.chunks[it.i].Chunk.Iterator()
if len(it.intervals) > 0 {
it.cur = &deletedIterator{it: it.cur, intervals: it.intervals}
}
iCur := it.i
for ; it.i < len(it.chunks); it.i++ {
// Skip chunks with MaxTime < t.
if it.chunks[it.i].MaxTime >= t {
// Don't reset the iterator unless we've moved on to a different chunk.
if it.i != iCur {
it.cur = it.chunks[it.i].Chunk.Iterator()
if len(it.intervals) > 0 {
it.cur = &deletedIterator{it: it.cur, intervals: it.intervals}
}
}

for it.cur.Next() {
t0, _ := it.cur.At()
if t0 >= t {
return true
for it.cur.Next() {
t0, _ := it.cur.At()
if t0 > it.maxt || it.cur.Err() != nil {
return false
}
if t0 >= t {
return true
}
}
}
}
return false
}

func (it *chunkSeriesIterator) At() (t int64, v float64) {
// Should it panic if it.cur.Err() != nil || it.i >= len(it.chunks)?
// What about before Next() or Seek() were called?
return it.cur.At()
}

func (it *chunkSeriesIterator) Next() bool {
if it.cur.Err() != nil || it.i >= len(it.chunks) {
return false
}

if it.cur.Next() {
t, _ := it.cur.At()

Expand All @@ -824,22 +844,18 @@ func (it *chunkSeriesIterator) Next() bool {
return false
}
t, _ = it.At()

return t <= it.maxt
}
if t > it.maxt {
return false
}
return true

return t <= it.maxt
}
if err := it.cur.Err(); err != nil {
return false
}
if it.i == len(it.chunks)-1 {
return false
}

it.i++
if it.i == len(it.chunks) {
return false
}
it.cur = it.chunks[it.i].Chunk.Iterator()
if len(it.intervals) > 0 {
it.cur = &deletedIterator{it: it.cur, intervals: it.intervals}
Expand Down
103 changes: 87 additions & 16 deletions querier_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -390,6 +390,16 @@ func TestBlockQuerier(t *testing.T) {
},
},
},
{
lset: map[string]string{
"s": "s",
},
chunks: [][]sample{
{
{1, 2}, {10, 11},
},
},
},
},

queries: []query{
Expand Down Expand Up @@ -448,6 +458,18 @@ func TestBlockQuerier(t *testing.T) {
),
}),
},
{
mint: 2,
maxt: 9,
ms: []labels.Matcher{labels.NewEqualMatcher("s", "s")},
exp: newListSeriesSet([]Series{
newSeries(map[string]string{
"s": "s",
},
[]sample{},
),
}),
},
},
}

Expand Down Expand Up @@ -558,8 +580,8 @@ func TestBlockQuerierDelete(t *testing.T) {
},
},
tombstones: memTombstones{
1: Intervals{{1, 3}},
2: Intervals{{1, 3}, {6, 10}},
1: Intervals{{1, 2}},
2: Intervals{{2, 3}, {6, 10}},
3: Intervals{{6, 10}},
},

Expand All @@ -572,7 +594,7 @@ func TestBlockQuerierDelete(t *testing.T) {
newSeries(map[string]string{
"a": "a",
},
[]sample{{5, 2}, {6, 3}, {7, 4}},
[]sample{{3, 4}, {5, 2}, {6, 3}, {7, 4}},
),
newSeries(map[string]string{
"a": "a",
Expand Down Expand Up @@ -605,19 +627,36 @@ func TestBlockQuerierDelete(t *testing.T) {
maxt: 4,
ms: []labels.Matcher{labels.NewEqualMatcher("a", "a")},
exp: newListSeriesSet([]Series{
newSeries(map[string]string{
"a": "a",
},
[]sample{{3, 4}},
),
newSeries(map[string]string{
"a": "a",
"b": "b",
},
[]sample{{4, 15}},
[]sample{{1, 1}, {4, 15}},
),
}),
},
{
mint: 1,
maxt: 3,
maxt: 2,
ms: []labels.Matcher{labels.NewEqualMatcher("a", "a")},
exp: newListSeriesSet([]Series{}),
exp: newListSeriesSet([]Series{
newSeries(map[string]string{
"a": "a",
},
[]sample{},
),
newSeries(map[string]string{
"a": "a",
"b": "b",
},
[]sample{{1, 1}},
),
}),
},
},
}
Expand Down Expand Up @@ -866,7 +905,7 @@ func TestSeriesIterator(t *testing.T) {
b: []sample{},
c: []sample{},

seek: 0,
seek: 1,
success: false,
exp: nil,
},
Expand Down Expand Up @@ -987,7 +1026,7 @@ func TestSeriesIterator(t *testing.T) {
{10, 22}, {203, 3493},
},

seek: 203,
seek: 101,
success: false,
exp: nil,
mint: 2,
Expand Down Expand Up @@ -1038,10 +1077,10 @@ func TestSeriesIterator(t *testing.T) {
testutil.Assert(t, remaining == true, "")

for remaining {
sExp, eExp := exp.At()
sRes, eRes := res.At()
testutil.Equals(t, eExp, eRes)
testutil.Equals(t, sExp, sRes)
tExp, vExp := exp.At()
tRes, vRes := res.At()
testutil.Equals(t, tExp, tRes)
testutil.Equals(t, vExp, vRes)

remaining = exp.Next()
testutil.Equals(t, remaining, res.Next())
Expand Down Expand Up @@ -1084,10 +1123,10 @@ func TestSeriesIterator(t *testing.T) {
testutil.Assert(t, remaining == true, "")

for remaining {
sExp, eExp := exp.At()
sRes, eRes := res.At()
testutil.Equals(t, eExp, eRes)
testutil.Equals(t, sExp, sRes)
tExp, vExp := exp.At()
tRes, vRes := res.At()
testutil.Equals(t, tExp, tRes)
testutil.Equals(t, vExp, vRes)

remaining = exp.Next()
testutil.Equals(t, remaining, res.Next())
Expand Down Expand Up @@ -1116,6 +1155,24 @@ func TestChunkSeriesIterator_DoubleSeek(t *testing.T) {
testutil.Equals(t, float64(2), v)
}

// Regression for: https://github.com/prometheus/tsdb/pull/327
// Seek would go back within the same chunk.
func TestChunkSeriesIterator_DoubleSeekBackwards(t *testing.T) {
chkMetas := []chunks.Meta{
chunkFromSamples([]sample{}),
chunkFromSamples([]sample{{1, 1}, {2, 2}, {3, 3}}),
chunkFromSamples([]sample{{4, 4}, {5, 5}}),
}

// Seek for 3, then 2. Iterator should remain positioned on 3.
res := newChunkSeriesIterator(chkMetas, nil, 2, 8)
testutil.Assert(t, res.Seek(3) == true, "")
testutil.Assert(t, res.Seek(2) == true, "")
ts, v := res.At()
testutil.Equals(t, int64(3), ts)
testutil.Equals(t, float64(3), v)
}

// Regression when seeked chunks were still found via binary search and we always
// skipped to the end when seeking a value in the current chunk.
func TestChunkSeriesIterator_SeekInCurrentChunk(t *testing.T) {
Expand Down Expand Up @@ -1149,6 +1206,20 @@ func TestChunkSeriesIterator_NextWithMinTime(t *testing.T) {
testutil.Assert(t, it.Next() == false, "")
}

// Regression for: https://github.com/prometheus/tsdb/pull/327
// Calling Seek() with a time between [mint, maxt] after the iterator had
// already passed the end would incorrectly return true.
func TestChunkSeriesIterator_SeekWithMinTime(t *testing.T) {
metas := []chunks.Meta{
chunkFromSamples([]sample{{1, 6}, {5, 6}, {7, 8}}),
}

it := newChunkSeriesIterator(metas, nil, 2, 5)
testutil.Assert(t, it.Seek(6) == false, "")
// A second, within bounds Seek() used to succeed. Make sure it also returns false.
testutil.Assert(t, it.Seek(3) == false, "")
}

func TestPopulatedCSReturnsValidChunkSlice(t *testing.T) {
lbls := []labels.Labels{labels.New(labels.Label{"a", "b"})}
chunkMetas := [][]chunks.Meta{
Expand Down

0 comments on commit 2c86185

Please sign in to comment.