Skip to content

Commit

Permalink
test: Fix race condition in LogQL test (grafana#12247)
Browse files Browse the repository at this point in the history
  • Loading branch information
bboreham authored and edsoncelio committed Mar 22, 2024
1 parent b897510 commit 1732648
Show file tree
Hide file tree
Showing 2 changed files with 24 additions and 19 deletions.
11 changes: 5 additions & 6 deletions pkg/iter/sample_iterator.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package iter
import (
"container/heap"
"context"
"go.uber.org/atomic"
"io"
"sync"

Expand Down Expand Up @@ -522,7 +521,7 @@ func NewSampleQueryResponseIterator(resp *logproto.SampleQueryResponse) SampleIt
}

type seriesIterator struct {
i *atomic.Int32
i int
series logproto.Series
}

Expand Down Expand Up @@ -568,14 +567,14 @@ func NewMultiSeriesIterator(series []logproto.Series) SampleIterator {
// NewSeriesIterator iterates over sample in a series.
func NewSeriesIterator(series logproto.Series) SampleIterator {
return &seriesIterator{
i: atomic.NewInt32(-1),
i: -1,
series: series,
}
}

func (i *seriesIterator) Next() bool {
tmp := i.i.Add(1)
return int(tmp) < len(i.series.Samples)
i.i++
return i.i < len(i.series.Samples)
}

func (i *seriesIterator) Error() error {
Expand All @@ -591,7 +590,7 @@ func (i *seriesIterator) StreamHash() uint64 {
}

func (i *seriesIterator) Sample() logproto.Sample {
return i.series.Samples[i.i.Load()]
return i.series.Samples[i.i]
}

func (i *seriesIterator) Close() error {
Expand Down
32 changes: 19 additions & 13 deletions pkg/logql/engine_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2383,16 +2383,16 @@ func TestEngine_LogsInstantQuery_Vector(t *testing.T) {
}

type errorIteratorQuerier struct {
samples []iter.SampleIterator
entries []iter.EntryIterator
samples func() []iter.SampleIterator
entries func() []iter.EntryIterator
}

func (e errorIteratorQuerier) SelectLogs(_ context.Context, p SelectLogParams) (iter.EntryIterator, error) {
return iter.NewSortEntryIterator(e.entries, p.Direction), nil
return iter.NewSortEntryIterator(e.entries(), p.Direction), nil
}

func (e errorIteratorQuerier) SelectSamples(_ context.Context, _ SelectSampleParams) (iter.SampleIterator, error) {
return iter.NewSortSampleIterator(e.samples), nil
return iter.NewSortSampleIterator(e.samples()), nil
}

func TestStepEvaluator_Error(t *testing.T) {
Expand All @@ -2406,9 +2406,11 @@ func TestStepEvaluator_Error(t *testing.T) {
"rangeAggEvaluator",
`count_over_time({app="foo"}[1m])`,
&errorIteratorQuerier{
samples: []iter.SampleIterator{
iter.NewSeriesIterator(newSeries(testSize, identity, `{app="foo"}`)),
NewErrorSampleIterator(),
samples: func() []iter.SampleIterator {
return []iter.SampleIterator{
iter.NewSeriesIterator(newSeries(testSize, identity, `{app="foo"}`)),
NewErrorSampleIterator(),
}
},
},
ErrMock,
Expand All @@ -2417,9 +2419,11 @@ func TestStepEvaluator_Error(t *testing.T) {
"stream",
`{app="foo"}`,
&errorIteratorQuerier{
entries: []iter.EntryIterator{
iter.NewStreamIterator(newStream(testSize, identity, `{app="foo"}`)),
NewErrorEntryIterator(),
entries: func() []iter.EntryIterator {
return []iter.EntryIterator{
iter.NewStreamIterator(newStream(testSize, identity, `{app="foo"}`)),
NewErrorEntryIterator(),
}
},
},
ErrMock,
Expand All @@ -2428,9 +2432,11 @@ func TestStepEvaluator_Error(t *testing.T) {
"binOpStepEvaluator",
`count_over_time({app="foo"}[1m]) / count_over_time({app="foo"}[1m])`,
&errorIteratorQuerier{
samples: []iter.SampleIterator{
iter.NewSeriesIterator(newSeries(testSize, identity, `{app="foo"}`)),
NewErrorSampleIterator(),
samples: func() []iter.SampleIterator {
return []iter.SampleIterator{
iter.NewSeriesIterator(newSeries(testSize, identity, `{app="foo"}`)),
NewErrorSampleIterator(),
}
},
},
ErrMockMultiple,
Expand Down

0 comments on commit 1732648

Please sign in to comment.