From 715674c40d00fbd42748eda3ebcfe7f04a2d290f Mon Sep 17 00:00:00 2001 From: Huang Youliang Date: Fri, 24 May 2024 12:53:48 +0800 Subject: [PATCH] Improve filtering performance of Stream (#440) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * Improve filtering performance of Stream --------- Co-authored-by: 吴晟 Wu Sheng --- banyand/cmd/server/main.go | 2 + banyand/measure/query.go | 39 +- banyand/stream/benchmark_test.go | 65 +-- banyand/stream/block.go | 71 +++- banyand/stream/index.go | 3 + banyand/stream/iter_builder.go | 22 +- banyand/stream/query.go | 376 +++++++++++------- banyand/stream/stream.go | 4 +- pkg/pb/v1/metadata.go | 26 -- pkg/query/executor/interface.go | 4 +- .../stream/stream_plan_indexscan_local.go | 14 +- pkg/timestamp/range.go | 23 ++ test/cases/stream/data/input/order_asc.yaml | 27 ++ test/cases/stream/data/input/order_desc.yaml | 27 ++ test/cases/stream/data/want/order_asc.yaml | 103 +++++ test/cases/stream/data/want/order_desc.yaml | 103 +++++ test/cases/stream/stream.go | 2 + 17 files changed, 630 insertions(+), 281 deletions(-) create mode 100644 test/cases/stream/data/input/order_asc.yaml create mode 100644 test/cases/stream/data/input/order_desc.yaml create mode 100644 test/cases/stream/data/want/order_asc.yaml create mode 100644 test/cases/stream/data/want/order_desc.yaml diff --git a/banyand/cmd/server/main.go b/banyand/cmd/server/main.go index 99827d130..3d7b6d779 100644 --- a/banyand/cmd/server/main.go +++ b/banyand/cmd/server/main.go @@ -21,12 +21,14 @@ package main import ( "fmt" "os" + "runtime" "github.com/apache/skywalking-banyandb/pkg/cmdsetup" "github.com/apache/skywalking-banyandb/pkg/signal" ) func main() { + runtime.GOMAXPROCS(runtime.NumCPU() * 2) if err := cmdsetup.NewRoot(new(signal.Handler)).Execute(); err != nil { _, _ = fmt.Fprintln(os.Stderr, err) os.Exit(1) diff --git a/banyand/measure/query.go b/banyand/measure/query.go index 57bd381dc..876bcb5a7 100644 --- a/banyand/measure/query.go +++ b/banyand/measure/query.go @@ -373,21 +373,36 @@ func (qr *queryResult) Pull() *pbv1.MeasureResult { if len(qr.data) == 0 { return nil } - // TODO:// Parallel load - tmpBlock := generateBlock() - defer releaseBlock(tmpBlock) + + cursorChan := make(chan int, len(qr.data)) for i := 0; i < len(qr.data); i++ { - if !qr.data[i].loadData(tmpBlock) { - qr.data = append(qr.data[:i], qr.data[i+1:]...) - i-- - } - if i < 0 { - continue - } - if qr.orderByTimestampDesc() { - qr.data[i].idx = len(qr.data[i].timestamps) - 1 + go func(i int) { + tmpBlock := generateBlock() + defer releaseBlock(tmpBlock) + if !qr.data[i].loadData(tmpBlock) { + cursorChan <- i + return + } + if qr.orderByTimestampDesc() { + qr.data[i].idx = len(qr.data[i].timestamps) - 1 + } + cursorChan <- -1 + }(i) + } + + blankCursorList := []int{} + for completed := 0; completed < len(qr.data); completed++ { + result := <-cursorChan + if result != -1 { + blankCursorList = append(blankCursorList, result) } } + sort.Slice(blankCursorList, func(i, j int) bool { + return blankCursorList[i] > blankCursorList[j] + }) + for _, index := range blankCursorList { + qr.data = append(qr.data[:index], qr.data[index+1:]...) + } qr.loaded = true heap.Init(qr) } diff --git a/banyand/stream/benchmark_test.go b/banyand/stream/benchmark_test.go index c9e8d868f..4b0b4cbce 100644 --- a/banyand/stream/benchmark_test.go +++ b/banyand/stream/benchmark_test.go @@ -42,6 +42,7 @@ import ( "github.com/apache/skywalking-banyandb/pkg/index/posting/roaring" "github.com/apache/skywalking-banyandb/pkg/logger" pbv1 "github.com/apache/skywalking-banyandb/pkg/pb/v1" + logicalstream "github.com/apache/skywalking-banyandb/pkg/query/logical/stream" "github.com/apache/skywalking-banyandb/pkg/test" "github.com/apache/skywalking-banyandb/pkg/timestamp" ) @@ -54,6 +55,7 @@ const ( ) type parameter struct { + scenario string batchCount int timestampCount int seriesCount int @@ -63,9 +65,9 @@ type parameter struct { } var pList = [3]parameter{ - {batchCount: 2, timestampCount: 500, seriesCount: 100, tagCardinality: 10, startTimestamp: 1, endTimestamp: 1000}, - {batchCount: 2, timestampCount: 500, seriesCount: 100, tagCardinality: 10, startTimestamp: 900, endTimestamp: 1000}, - {batchCount: 2, timestampCount: 500, seriesCount: 100, tagCardinality: 10, startTimestamp: 300, endTimestamp: 400}, + {batchCount: 2, timestampCount: 500, seriesCount: 100, tagCardinality: 10, startTimestamp: 1, endTimestamp: 1000, scenario: "large-scale"}, + {batchCount: 2, timestampCount: 500, seriesCount: 100, tagCardinality: 10, startTimestamp: 900, endTimestamp: 1000, scenario: "latest"}, + {batchCount: 2, timestampCount: 500, seriesCount: 100, tagCardinality: 10, startTimestamp: 300, endTimestamp: 400, scenario: "historical"}, } type mockIndex map[string]map[common.SeriesID]posting.List @@ -287,47 +289,7 @@ func generateStream(db storage.TSDB[*tsTable, option]) *stream { } } -func generateStreamFilterOptions(p parameter, index mockIndex) pbv1.StreamFilterOptions { - timeRange := timestamp.TimeRange{ - Start: time.Unix(int64(p.startTimestamp), 0), - End: time.Unix(int64(p.endTimestamp), 0), - IncludeStart: true, - IncludeEnd: true, - } - entities := make([][]*modelv1.TagValue, 0) - for i := 1; i <= p.seriesCount; i++ { - entity := []*modelv1.TagValue{ - { - Value: &modelv1.TagValue_Str{ - Str: &modelv1.Str{ - Value: entityTagValuePrefix + strconv.Itoa(i), - }, - }, - }, - } - entities = append(entities, entity) - } - num := generateRandomNumber(int64(p.tagCardinality)) - value := filterTagValuePrefix + strconv.Itoa(num) - filter := mockFilter{ - index: index, - value: value, - } - tagProjection := pbv1.TagProjection{ - Family: "benchmark-family", - Names: []string{"entity-tag", "filter-tag"}, - } - return pbv1.StreamFilterOptions{ - Name: "benchmark", - TimeRange: &timeRange, - Entities: entities, - Filter: filter, - TagProjection: []pbv1.TagProjection{tagProjection}, - MaxElementSize: math.MaxInt32, - } -} - -func generateStreamSortOptions(p parameter, index mockIndex) pbv1.StreamSortOptions { +func generateStreamQueryOptions(p parameter, index mockIndex) pbv1.StreamQueryOptions { timeRange := timestamp.TimeRange{ Start: time.Unix(int64(p.startTimestamp), 0), End: time.Unix(int64(p.endTimestamp), 0), @@ -368,7 +330,7 @@ func generateStreamSortOptions(p parameter, index mockIndex) pbv1.StreamSortOpti Family: "benchmark-family", Names: []string{"entity-tag", "filter-tag"}, } - return pbv1.StreamSortOptions{ + return pbv1.StreamQueryOptions{ Name: "benchmark", TimeRange: &timeRange, Entities: entities, @@ -385,10 +347,11 @@ func BenchmarkFilter(b *testing.B) { esList, docsList, idx := generateData(p) db := write(b, p, esList, docsList) s := generateStream(db) - sfo := generateStreamFilterOptions(p, idx) - b.Run("filter", func(b *testing.B) { - _, err := s.Filter(context.TODO(), sfo) + sqo := generateStreamQueryOptions(p, idx) + b.Run("filter-"+p.scenario, func(b *testing.B) { + res, err := s.Filter(context.TODO(), sqo) require.NoError(b, err) + logicalstream.BuildElementsFromStreamResult(res) }) } } @@ -399,9 +362,9 @@ func BenchmarkSort(b *testing.B) { esList, docsList, idx := generateData(p) db := write(b, p, esList, docsList) s := generateStream(db) - sso := generateStreamSortOptions(p, idx) - b.Run("sort", func(b *testing.B) { - _, err := s.Sort(context.TODO(), sso) + sqo := generateStreamQueryOptions(p, idx) + b.Run("sort-"+p.scenario, func(b *testing.B) { + _, err := s.Sort(context.TODO(), sqo) require.NoError(b, err) }) } diff --git a/banyand/stream/block.go b/banyand/stream/block.go index 63342b11f..84b73b018 100644 --- a/banyand/stream/block.go +++ b/banyand/stream/block.go @@ -408,16 +408,17 @@ func releaseBlock(b *block) { var blockPool sync.Pool type blockCursor struct { - p *part - timestamps []int64 - elementIDs []string - tagFamilies []tagFamily - tagValuesDecoder encoding.BytesBlockDecoder - tagProjection []pbv1.TagProjection - bm blockMetadata - idx int - minTimestamp int64 - maxTimestamp int64 + p *part + timestamps []int64 + expectedTimestamps []int64 + elementIDs []string + tagFamilies []tagFamily + tagValuesDecoder encoding.BytesBlockDecoder + tagProjection []pbv1.TagProjection + bm blockMetadata + idx int + minTimestamp int64 + maxTimestamp int64 } func (bc *blockCursor) reset() { @@ -438,13 +439,17 @@ func (bc *blockCursor) reset() { bc.tagFamilies = tff[:0] } -func (bc *blockCursor) init(p *part, bm *blockMetadata, queryOpts queryOptions) { +func (bc *blockCursor) init(p *part, bm *blockMetadata, opts queryOptions) { bc.reset() bc.p = p bc.bm.copyFrom(bm) - bc.minTimestamp = queryOpts.minTimestamp - bc.maxTimestamp = queryOpts.maxTimestamp - bc.tagProjection = queryOpts.TagProjection + bc.minTimestamp = opts.minTimestamp + bc.maxTimestamp = opts.maxTimestamp + bc.tagProjection = opts.TagProjection + if opts.elementRefMap != nil { + seriesID := bc.bm.seriesID + bc.expectedTimestamps = opts.elementRefMap[seriesID] + } } func (bc *blockCursor) copyAllTo(r *pbv1.StreamResult, desc bool) { @@ -543,12 +548,30 @@ func (bc *blockCursor) loadData(tmpBlock *block) bool { bc.bm.tagFamilies = tf tmpBlock.mustReadFrom(&bc.tagValuesDecoder, bc.p, bc.bm) - start, end, ok := timestamp.FindRange(tmpBlock.timestamps, bc.minTimestamp, bc.maxTimestamp) - if !ok { - return false + idxList := make([]int, 0) + var start, end int + if bc.expectedTimestamps != nil { + for _, ts := range bc.expectedTimestamps { + idx := timestamp.Find(tmpBlock.timestamps, ts) + if idx == -1 { + continue + } + idxList = append(idxList, idx) + bc.timestamps = append(bc.timestamps, tmpBlock.timestamps[idx]) + bc.elementIDs = append(bc.elementIDs, tmpBlock.elementIDs[idx]) + } + if len(bc.timestamps) == 0 { + return false + } + } else { + s, e, ok := timestamp.FindRange(tmpBlock.timestamps, bc.minTimestamp, bc.maxTimestamp) + start, end = s, e + if !ok { + return false + } + bc.timestamps = append(bc.timestamps, tmpBlock.timestamps[s:e+1]...) + bc.elementIDs = append(bc.elementIDs, tmpBlock.elementIDs[s:e+1]...) } - bc.timestamps = append(bc.timestamps, tmpBlock.timestamps[start:end+1]...) - bc.elementIDs = append(bc.elementIDs, tmpBlock.elementIDs[start:end+1]...) for i, projection := range bc.bm.tagProjection { tf := tagFamily{ @@ -559,13 +582,19 @@ func (bc *blockCursor) loadData(tmpBlock *block) bool { t := tag{ name: name, } - if tmpBlock.tagFamilies[i].tags[blockIndex].name == name { + if len(tmpBlock.tagFamilies[i].tags) != 0 && tmpBlock.tagFamilies[i].tags[blockIndex].name == name { t.valueType = tmpBlock.tagFamilies[i].tags[blockIndex].valueType if len(tmpBlock.tagFamilies[i].tags[blockIndex].values) != len(tmpBlock.timestamps) { logger.Panicf("unexpected number of values for tags %q: got %d; want %d", tmpBlock.tagFamilies[i].tags[blockIndex].name, len(tmpBlock.tagFamilies[i].tags[blockIndex].values), len(tmpBlock.timestamps)) } - t.values = append(t.values, tmpBlock.tagFamilies[i].tags[blockIndex].values[start:end+1]...) + if bc.expectedTimestamps != nil { + for _, idx := range idxList { + t.values = append(t.values, tmpBlock.tagFamilies[i].tags[blockIndex].values[idx]) + } + } else { + t.values = append(t.values, tmpBlock.tagFamilies[i].tags[blockIndex].values[start:end+1]...) + } } blockIndex++ tf.tags = append(tf.tags, t) diff --git a/banyand/stream/index.go b/banyand/stream/index.go index a6f31db15..7c02bf0d4 100644 --- a/banyand/stream/index.go +++ b/banyand/stream/index.go @@ -87,6 +87,9 @@ func (e *elementIndex) Search(_ context.Context, seriesList pbv1.SeriesList, fil if pl == nil { pl = roaring.DummyPostingList } + if pl.IsEmpty() { + continue + } timestamps := pl.ToSlice() sort.Slice(timestamps, func(i, j int) bool { return timestamps[i] < timestamps[j] diff --git a/banyand/stream/iter_builder.go b/banyand/stream/iter_builder.go index 62fca5f00..f225acf96 100644 --- a/banyand/stream/iter_builder.go +++ b/banyand/stream/iter_builder.go @@ -31,20 +31,20 @@ import ( type filterFn func(itemID uint64) bool func (s *stream) buildSeriesByIndex(tableWrappers []storage.TSTableWrapper[*tsTable], - seriesList pbv1.SeriesList, sso pbv1.StreamSortOptions, + seriesList pbv1.SeriesList, sqo pbv1.StreamQueryOptions, ) (series []*searcherIterator, err error) { timeFilter := func(itemID uint64) bool { - return sso.TimeRange.Contains(int64(itemID)) + return sqo.TimeRange.Contains(int64(itemID)) } - indexRuleForSorting := sso.Order.Index + indexRuleForSorting := sqo.Order.Index if len(indexRuleForSorting.Tags) != 1 { return nil, fmt.Errorf("only support one tag for sorting, but got %d", len(indexRuleForSorting.Tags)) } sortedTag := indexRuleForSorting.Tags[0] tl := newTagLocation() - for i := range sso.TagProjection { - for j := range sso.TagProjection[i].Names { - if sso.TagProjection[i].Names[j] == sortedTag { + for i := range sqo.TagProjection { + for j := range sqo.TagProjection[i].Names { + if sqo.TagProjection[i].Names[j] == sortedTag { tl.familyIndex, tl.tagIndex = i, j } } @@ -52,13 +52,13 @@ func (s *stream) buildSeriesByIndex(tableWrappers []storage.TSTableWrapper[*tsTa if !tl.valid() { return nil, fmt.Errorf("sorted tag %s not found in tag projection", sortedTag) } - entityMap, tagSpecIndex, tagProjIndex, sidToIndex := s.genIndex(sso.TagProjection, seriesList) + entityMap, tagSpecIndex, tagProjIndex, sidToIndex := s.genIndex(sqo.TagProjection, seriesList) sids := seriesList.IDs() for _, tw := range tableWrappers { seriesFilter := make(map[common.SeriesID]filterFn) - if sso.Filter != nil { + if sqo.Filter != nil { for i := range sids { - pl, errExe := sso.Filter.Execute(func(_ databasev1.IndexRule_Type) (index.Searcher, error) { + pl, errExe := sqo.Filter.Execute(func(_ databasev1.IndexRule_Type) (index.Searcher, error) { return tw.Table().Index().store, nil }, sids[i]) if errExe != nil { @@ -79,14 +79,14 @@ func (s *stream) buildSeriesByIndex(tableWrappers []storage.TSTableWrapper[*tsTa IndexRuleID: indexRuleForSorting.GetMetadata().GetId(), Analyzer: indexRuleForSorting.GetAnalyzer(), } - inner, err = tw.Table().Index().Sort(sids, fieldKey, sso.Order.Sort, sso.MaxElementSize) + inner, err = tw.Table().Index().Sort(sids, fieldKey, sqo.Order.Sort, sqo.MaxElementSize) if err != nil { return nil, err } if inner != nil { series = append(series, newSearcherIterator(s.l, inner, tw.Table(), - seriesFilter, timeFilter, sso.TagProjection, tl, + seriesFilter, timeFilter, sqo.TagProjection, tl, tagSpecIndex, tagProjIndex, sidToIndex, seriesList, entityMap)) } } diff --git a/banyand/stream/query.go b/banyand/stream/query.go index 06a274436..ce29a7255 100644 --- a/banyand/stream/query.go +++ b/banyand/stream/query.go @@ -38,6 +38,7 @@ import ( ) type queryOptions struct { + elementRefMap map[common.SeriesID][]int64 pbv1.StreamQueryOptions minTimestamp int64 maxTimestamp int64 @@ -156,58 +157,77 @@ func (qr *queryResult) Pull() *pbv1.StreamResult { if len(qr.data) == 0 { return nil } - // TODO:// Parallel load - tmpBlock := generateBlock() - defer releaseBlock(tmpBlock) + + cursorChan := make(chan int, len(qr.data)) for i := 0; i < len(qr.data); i++ { - if !qr.data[i].loadData(tmpBlock) { - qr.data = append(qr.data[:i], qr.data[i+1:]...) - i-- - } - if qr.schema.GetEntity() == nil || len(qr.schema.GetEntity().GetTagNames()) == 0 { - continue - } - sidIndex := qr.sidToIndex[qr.data[i].bm.seriesID] - series := qr.seriesList[sidIndex] - entityMap := make(map[string]int) - tagFamilyMap := make(map[string]int) - for idx, entity := range qr.schema.GetEntity().GetTagNames() { - entityMap[entity] = idx + 1 - } - for idx, tagFamily := range qr.data[i].tagFamilies { - tagFamilyMap[tagFamily.name] = idx + 1 - } - for _, tagFamilyProj := range qr.data[i].tagProjection { - for j, tagProj := range tagFamilyProj.Names { - offset := qr.tagNameIndex[tagProj] - tagFamilySpec := qr.schema.GetTagFamilies()[offset.FamilyOffset] - tagSpec := tagFamilySpec.GetTags()[offset.TagOffset] - if tagSpec.IndexedOnly { - continue - } - entityPos := entityMap[tagProj] - tagFamilyPos := tagFamilyMap[tagFamilyProj.Family] - if entityPos == 0 { - continue - } - if tagFamilyPos == 0 { - qr.data[i].tagFamilies[tagFamilyPos-1] = tagFamily{ - name: tagFamilyProj.Family, - tags: make([]tag, 0), + go func(i int) { + tmpBlock := generateBlock() + defer releaseBlock(tmpBlock) + if !qr.data[i].loadData(tmpBlock) { + cursorChan <- i + return + } + if qr.schema.GetEntity() == nil || len(qr.schema.GetEntity().GetTagNames()) == 0 { + cursorChan <- -1 + return + } + sidIndex := qr.sidToIndex[qr.data[i].bm.seriesID] + series := qr.seriesList[sidIndex] + entityMap := make(map[string]int) + tagFamilyMap := make(map[string]int) + for idx, entity := range qr.schema.GetEntity().GetTagNames() { + entityMap[entity] = idx + 1 + } + for idx, tagFamily := range qr.data[i].tagFamilies { + tagFamilyMap[tagFamily.name] = idx + 1 + } + for _, tagFamilyProj := range qr.data[i].tagProjection { + for j, tagProj := range tagFamilyProj.Names { + offset := qr.tagNameIndex[tagProj] + tagFamilySpec := qr.schema.GetTagFamilies()[offset.FamilyOffset] + tagSpec := tagFamilySpec.GetTags()[offset.TagOffset] + if tagSpec.IndexedOnly { + continue + } + entityPos := entityMap[tagProj] + tagFamilyPos := tagFamilyMap[tagFamilyProj.Family] + if entityPos == 0 { + continue + } + if tagFamilyPos == 0 { + qr.data[i].tagFamilies[tagFamilyPos-1] = tagFamily{ + name: tagFamilyProj.Family, + tags: make([]tag, 0), + } + } + valueType := pbv1.MustTagValueToValueType(series.EntityValues[entityPos-1]) + qr.data[i].tagFamilies[tagFamilyPos-1].tags[j] = tag{ + name: tagProj, + values: mustEncodeTagValue(tagProj, tagSpec.GetType(), series.EntityValues[entityPos-1], len(qr.data[i].timestamps)), + valueType: valueType, } - } - valueType := pbv1.MustTagValueToValueType(series.EntityValues[entityPos-1]) - qr.data[i].tagFamilies[tagFamilyPos-1].tags[j] = tag{ - name: tagProj, - values: mustEncodeTagValue(tagProj, tagSpec.GetType(), series.EntityValues[entityPos-1], len(qr.data[i].timestamps)), - valueType: valueType, } } - } - if qr.orderByTimestampDesc() { - qr.data[i].idx = len(qr.data[i].timestamps) - 1 + if qr.orderByTimestampDesc() { + qr.data[i].idx = len(qr.data[i].timestamps) - 1 + } + cursorChan <- -1 + }(i) + } + + blankCursorList := []int{} + for completed := 0; completed < len(qr.data); completed++ { + result := <-cursorChan + if result != -1 { + blankCursorList = append(blankCursorList, result) } } + sort.Slice(blankCursorList, func(i, j int) bool { + return blankCursorList[i] > blankCursorList[j] + }) + for _, index := range blankCursorList { + qr.data = append(qr.data[:index], qr.data[index+1:]...) + } qr.loaded = true heap.Init(qr) } @@ -343,88 +363,118 @@ func (s *stream) genIndex(tagProj []pbv1.TagProjection, seriesList pbv1.SeriesLi return entityMap, tagSpecIndex, tagProjIndex, sidToIndex } -func (s *stream) Filter(ctx context.Context, sfo pbv1.StreamFilterOptions) (sfr pbv1.StreamFilterResult, err error) { - if sfo.TimeRange == nil || len(sfo.Entities) < 1 { +func (s *stream) Query(ctx context.Context, sqo pbv1.StreamQueryOptions) (pbv1.StreamQueryResult, error) { + if sqo.TimeRange == nil || len(sqo.Entities) < 1 { return nil, errors.New("invalid query options: timeRange and series are required") } - if len(sfo.TagProjection) == 0 { + if len(sqo.TagProjection) == 0 { return nil, errors.New("invalid query options: tagProjection is required") } db := s.databaseSupplier.SupplyTSDB() + var result queryResult if db == nil { - return sfr, nil + return &result, nil } tsdb := db.(storage.TSDB[*tsTable, option]) - tabWrappers := tsdb.SelectTSTables(*sfo.TimeRange) - sort.Slice(tabWrappers, func(i, j int) bool { - return tabWrappers[i].GetTimeRange().Start.Before(tabWrappers[j].GetTimeRange().Start) - }) + tabWrappers := tsdb.SelectTSTables(*sqo.TimeRange) defer func() { for i := range tabWrappers { tabWrappers[i].DecRef() } }() - - series := make([]*pbv1.Series, len(sfo.Entities)) - for i := range sfo.Entities { + series := make([]*pbv1.Series, len(sqo.Entities)) + for i := range sqo.Entities { series[i] = &pbv1.Series{ - Subject: sfo.Name, - EntityValues: sfo.Entities[i], + Subject: sqo.Name, + EntityValues: sqo.Entities[i], } } - seriesList, err := tsdb.Lookup(ctx, series) + sl, err := tsdb.Lookup(ctx, series) if err != nil { return nil, err } - if len(seriesList) == 0 { - return sfr, nil - } - entityMap, tagSpecIndex, tagProjIndex, sidToIndex := s.genIndex(sfo.TagProjection, seriesList) - ces := newColumnElements() - for _, tw := range tabWrappers { - if len(ces.timestamp) >= sfo.MaxElementSize { - break - } - index := tw.Table().Index() - erl, err := index.Search(ctx, seriesList, sfo.Filter, sfo.TimeRange) - if err != nil { - return nil, err + if len(sl) < 1 { + return &result, nil + } + var sids []common.SeriesID + for i := range sl { + sids = append(sids, sl[i].ID) + } + var parts []*part + qo := queryOptions{ + StreamQueryOptions: sqo, + minTimestamp: sqo.TimeRange.Start.UnixNano(), + maxTimestamp: sqo.TimeRange.End.UnixNano(), + } + var n int + for i := range tabWrappers { + s := tabWrappers[i].Table().currentSnapshot() + if s == nil { + continue } - if len(ces.timestamp)+len(erl) > sfo.MaxElementSize { - erl = erl[:sfo.MaxElementSize-len(ces.timestamp)] + parts, n = s.getParts(parts, qo.minTimestamp, qo.maxTimestamp) + if n < 1 { + s.decRef() + continue } - for _, er := range erl { - e, count, err := tw.Table().getElement(er.seriesID, er.timestamp, sfo.TagProjection) - if err != nil { - return nil, err - } - if len(tagProjIndex) != 0 { - for entity, offset := range tagProjIndex { - tagSpec := tagSpecIndex[entity] - if tagSpec.IndexedOnly { - continue - } - series := seriesList[sidToIndex[er.seriesID]] - entityPos := entityMap[entity] - 1 - e.tagFamilies[offset.FamilyOffset].tags[offset.TagOffset] = tag{ - name: entity, - values: mustEncodeTagValue(entity, tagSpec.GetType(), series.EntityValues[entityPos], count), - valueType: pbv1.MustTagValueToValueType(series.EntityValues[entityPos]), - } - } + result.snapshots = append(result.snapshots, s) + } + bma := generateBlockMetadataArray() + defer releaseBlockMetadataArray(bma) + // TODO: cache tstIter + var ti tstIter + defer ti.reset() + originalSids := make([]common.SeriesID, len(sids)) + copy(originalSids, sids) + sort.Slice(sids, func(i, j int) bool { return sids[i] < sids[j] }) + ti.init(bma, parts, sids, qo.minTimestamp, qo.maxTimestamp) + if ti.Error() != nil { + return nil, fmt.Errorf("cannot init tstIter: %w", ti.Error()) + } + for ti.nextBlock() { + bc := generateBlockCursor() + p := ti.piHeap[0] + bc.init(p.p, p.curBlock, qo) + result.data = append(result.data, bc) + } + if ti.Error() != nil { + return nil, fmt.Errorf("cannot iterate tstIter: %w", ti.Error()) + } + + entityMap, _, _, sidToIndex := s.genIndex(sqo.TagProjection, sl) + result.entityMap = entityMap + result.sidToIndex = sidToIndex + result.tagNameIndex = make(map[string]partition.TagLocator) + result.schema = s.schema + result.seriesList = sl + for i, si := range originalSids { + result.sidToIndex[si] = i + } + for i, tagFamilySpec := range s.schema.GetTagFamilies() { + for j, tagSpec := range tagFamilySpec.GetTags() { + result.tagNameIndex[tagSpec.GetName()] = partition.TagLocator{ + FamilyOffset: i, + TagOffset: j, } - ces.BuildFromElement(e, sfo.TagProjection) } } - return ces, nil + result.orderByTS = true + if sqo.Order == nil { + result.ascTS = true + return &result, nil + } + if sqo.Order.Sort == modelv1.Sort_SORT_ASC || sqo.Order.Sort == modelv1.Sort_SORT_UNSPECIFIED { + result.ascTS = true + } + return &result, nil } -func (s *stream) Sort(ctx context.Context, sso pbv1.StreamSortOptions) (ssr pbv1.StreamSortResult, err error) { - if sso.TimeRange == nil || len(sso.Entities) < 1 { +func (s *stream) Sort(ctx context.Context, sqo pbv1.StreamQueryOptions) (ssr pbv1.StreamSortResult, err error) { + if sqo.TimeRange == nil || len(sqo.Entities) < 1 { return nil, errors.New("invalid query options: timeRange and series are required") } - if len(sso.TagProjection) == 0 { + if len(sqo.TagProjection) == 0 { return nil, errors.New("invalid query options: tagProjection is required") } db := s.databaseSupplier.SupplyTSDB() @@ -432,17 +482,18 @@ func (s *stream) Sort(ctx context.Context, sso pbv1.StreamSortOptions) (ssr pbv1 return ssr, nil } tsdb := db.(storage.TSDB[*tsTable, option]) - tabWrappers := tsdb.SelectTSTables(*sso.TimeRange) + tabWrappers := tsdb.SelectTSTables(*sqo.TimeRange) defer func() { for i := range tabWrappers { tabWrappers[i].DecRef() } }() - series := make([]*pbv1.Series, len(sso.Entities)) - for i := range sso.Entities { + + series := make([]*pbv1.Series, len(sqo.Entities)) + for i := range sqo.Entities { series[i] = &pbv1.Series{ - Subject: sso.Name, - EntityValues: sso.Entities[i], + Subject: sqo.Name, + EntityValues: sqo.Entities[i], } } seriesList, err := tsdb.Lookup(ctx, series) @@ -453,16 +504,15 @@ func (s *stream) Sort(ctx context.Context, sso pbv1.StreamSortOptions) (ssr pbv1 return ssr, nil } - iters, err := s.buildSeriesByIndex(tabWrappers, seriesList, sso) + iters, err := s.buildSeriesByIndex(tabWrappers, seriesList, sqo) if err != nil { return nil, err } - if len(iters) == 0 { return ssr, nil } - it := newItemIter(iters, sso.Order.Sort) + it := newItemIter(iters, sqo.Order.Sort) defer func() { err = multierr.Append(err, it.Close()) }() @@ -471,15 +521,27 @@ func (s *stream) Sort(ctx context.Context, sso pbv1.StreamSortOptions) (ssr pbv1 for it.Next() { nextItem := it.Val() e := nextItem.element - ces.BuildFromElement(e, sso.TagProjection) - if len(ces.timestamp) >= sso.MaxElementSize { + ces.BuildFromElement(e, sqo.TagProjection) + if len(ces.timestamp) >= sqo.MaxElementSize { break } } return ces, err } -func (s *stream) Query(ctx context.Context, sqo pbv1.StreamQueryOptions) (pbv1.StreamQueryResult, error) { +// newItemIter returns a ItemIterator which mergers several tsdb.Iterator by input sorting order. +func newItemIter(iters []*searcherIterator, s modelv1.Sort) itersort.Iterator[item] { + var ii []itersort.Iterator[item] + for _, iter := range iters { + ii = append(ii, iter) + } + if s == modelv1.Sort_SORT_DESC { + return itersort.NewItemIter[item](ii, true) + } + return itersort.NewItemIter[item](ii, false) +} + +func (s *stream) Filter(ctx context.Context, sqo pbv1.StreamQueryOptions) (sqr pbv1.StreamQueryResult, err error) { if sqo.TimeRange == nil || len(sqo.Entities) < 1 { return nil, errors.New("invalid query options: timeRange and series are required") } @@ -489,7 +551,7 @@ func (s *stream) Query(ctx context.Context, sqo pbv1.StreamQueryOptions) (pbv1.S db := s.databaseSupplier.SupplyTSDB() var result queryResult if db == nil { - return &result, nil + return sqr, nil } tsdb := db.(storage.TSDB[*tsTable, option]) tabWrappers := tsdb.SelectTSTables(*sqo.TimeRange) @@ -498,6 +560,7 @@ func (s *stream) Query(ctx context.Context, sqo pbv1.StreamQueryOptions) (pbv1.S tabWrappers[i].DecRef() } }() + series := make([]*pbv1.Series, len(sqo.Entities)) for i := range sqo.Entities { series[i] = &pbv1.Series{ @@ -505,24 +568,46 @@ func (s *stream) Query(ctx context.Context, sqo pbv1.StreamQueryOptions) (pbv1.S EntityValues: sqo.Entities[i], } } - sl, err := tsdb.Lookup(ctx, series) + seriesList, err := tsdb.Lookup(ctx, series) if err != nil { return nil, err } + if len(seriesList) == 0 { + return sqr, nil + } - if len(sl) < 1 { - return &result, nil + var elementRefList []elementRef + for _, tw := range tabWrappers { + index := tw.Table().Index() + erl, err := index.Search(ctx, seriesList, sqo.Filter, sqo.TimeRange) + if err != nil { + return nil, err + } + elementRefList = append(elementRefList, erl...) + if len(elementRefList) > sqo.MaxElementSize { + elementRefList = elementRefList[:sqo.MaxElementSize] + break + } } - var sids []common.SeriesID - for i := range sl { - sids = append(sids, sl[i].ID) + var elementRefMap map[common.SeriesID][]int64 + if len(elementRefList) != 0 { + elementRefMap = make(map[common.SeriesID][]int64) + for _, ref := range elementRefList { + if _, ok := elementRefMap[ref.seriesID]; !ok { + elementRefMap[ref.seriesID] = []int64{ref.timestamp} + } else { + elementRefMap[ref.seriesID] = append(elementRefMap[ref.seriesID], ref.timestamp) + } + } } - var parts []*part qo := queryOptions{ StreamQueryOptions: sqo, minTimestamp: sqo.TimeRange.Start.UnixNano(), maxTimestamp: sqo.TimeRange.End.UnixNano(), + elementRefMap: elementRefMap, } + + var parts []*part var n int for i := range tabWrappers { s := tabWrappers[i].Table().currentSnapshot() @@ -539,31 +624,41 @@ func (s *stream) Query(ctx context.Context, sqo pbv1.StreamQueryOptions) (pbv1.S bma := generateBlockMetadataArray() defer releaseBlockMetadataArray(bma) // TODO: cache tstIter - var tstIter tstIter - defer tstIter.reset() + var ti tstIter + defer ti.reset() + var sids []common.SeriesID + for i := 0; i < len(seriesList); i++ { + sid := seriesList[i].ID + if _, ok := elementRefMap[sid]; !ok { + seriesList = append(seriesList[:i], seriesList[i+1:]...) + i-- + continue + } + sids = append(sids, sid) + } originalSids := make([]common.SeriesID, len(sids)) copy(originalSids, sids) sort.Slice(sids, func(i, j int) bool { return sids[i] < sids[j] }) - tstIter.init(bma, parts, sids, qo.minTimestamp, qo.maxTimestamp) - if tstIter.Error() != nil { - return nil, fmt.Errorf("cannot init tstIter: %w", tstIter.Error()) + ti.init(bma, parts, sids, qo.minTimestamp, qo.maxTimestamp) + if ti.Error() != nil { + return nil, fmt.Errorf("cannot init tstIter: %w", ti.Error()) } - for tstIter.nextBlock() { + for ti.nextBlock() { bc := generateBlockCursor() - p := tstIter.piHeap[0] + p := ti.piHeap[0] bc.init(p.p, p.curBlock, qo) result.data = append(result.data, bc) } - if tstIter.Error() != nil { - return nil, fmt.Errorf("cannot iterate tstIter: %w", tstIter.Error()) + if ti.Error() != nil { + return nil, fmt.Errorf("cannot iterate tstIter: %w", ti.Error()) } - entityMap, _, _, sidToIndex := s.genIndex(sqo.TagProjection, sl) + entityMap, _, _, sidToIndex := s.genIndex(sqo.TagProjection, seriesList) result.entityMap = entityMap result.sidToIndex = sidToIndex result.tagNameIndex = make(map[string]partition.TagLocator) result.schema = s.schema - result.seriesList = sl + result.seriesList = seriesList for i, si := range originalSids { result.sidToIndex[si] = i } @@ -575,30 +670,13 @@ func (s *stream) Query(ctx context.Context, sqo pbv1.StreamQueryOptions) (pbv1.S } } } + result.orderByTS = true if sqo.Order == nil { - result.orderByTS = true result.ascTS = true return &result, nil } - if sqo.Order.Index == nil { - result.orderByTS = true - if sqo.Order.Sort == modelv1.Sort_SORT_ASC || sqo.Order.Sort == modelv1.Sort_SORT_UNSPECIFIED { - result.ascTS = true - } - return &result, nil + if sqo.Order.Sort == modelv1.Sort_SORT_ASC || sqo.Order.Sort == modelv1.Sort_SORT_UNSPECIFIED { + result.ascTS = true } - return &result, nil } - -// newItemIter returns a ItemIterator which mergers several tsdb.Iterator by input sorting order. -func newItemIter(iters []*searcherIterator, s modelv1.Sort) itersort.Iterator[item] { - var ii []itersort.Iterator[item] - for _, iter := range iters { - ii = append(ii, iter) - } - if s == modelv1.Sort_SORT_DESC { - return itersort.NewItemIter[item](ii, true) - } - return itersort.NewItemIter[item](ii, false) -} diff --git a/banyand/stream/stream.go b/banyand/stream/stream.go index 0ba53a9d3..8c6f1b7ac 100644 --- a/banyand/stream/stream.go +++ b/banyand/stream/stream.go @@ -61,8 +61,8 @@ type Stream interface { GetSchema() *databasev1.Stream GetIndexRules() []*databasev1.IndexRule Query(ctx context.Context, opts pbv1.StreamQueryOptions) (pbv1.StreamQueryResult, error) - Sort(ctx context.Context, opts pbv1.StreamSortOptions) (pbv1.StreamSortResult, error) - Filter(ctx context.Context, opts pbv1.StreamFilterOptions) (pbv1.StreamFilterResult, error) + Sort(ctx context.Context, opts pbv1.StreamQueryOptions) (pbv1.StreamSortResult, error) + Filter(ctx context.Context, opts pbv1.StreamQueryOptions) (pbv1.StreamQueryResult, error) } var _ Stream = (*stream)(nil) diff --git a/pkg/pb/v1/metadata.go b/pkg/pb/v1/metadata.go index dce4d8321..140a0c324 100644 --- a/pkg/pb/v1/metadata.go +++ b/pkg/pb/v1/metadata.go @@ -132,27 +132,6 @@ type TagProjection struct { // StreamQueryOptions is the options of a stream query. type StreamQueryOptions struct { - Name string - TimeRange *timestamp.TimeRange - Entities [][]*modelv1.TagValue - Filter index.Filter - Order *OrderBy - TagProjection []TagProjection -} - -// StreamSortOptions is the options of a stream sort. -type StreamSortOptions struct { - Name string - TimeRange *timestamp.TimeRange - Entities [][]*modelv1.TagValue - Filter index.Filter - Order *OrderBy - TagProjection []TagProjection - MaxElementSize int -} - -// StreamFilterOptions is the options of a stream filter. -type StreamFilterOptions struct { Name string TimeRange *timestamp.TimeRange Entities [][]*modelv1.TagValue @@ -173,11 +152,6 @@ type StreamSortResult interface { Pull() *StreamColumnResult } -// StreamFilterResult is the result of a stream filter. -type StreamFilterResult interface { - Pull() *StreamColumnResult -} - // MeasureQueryOptions is the options of a measure query. type MeasureQueryOptions struct { Name string diff --git a/pkg/query/executor/interface.go b/pkg/query/executor/interface.go index 40b73dded..a5edf44c0 100644 --- a/pkg/query/executor/interface.go +++ b/pkg/query/executor/interface.go @@ -31,8 +31,8 @@ import ( // StreamExecutionContext allows retrieving data through the stream module. type StreamExecutionContext interface { Query(ctx context.Context, opts pbv1.StreamQueryOptions) (pbv1.StreamQueryResult, error) - Sort(ctx context.Context, opts pbv1.StreamSortOptions) (pbv1.StreamSortResult, error) - Filter(ctx context.Context, opts pbv1.StreamFilterOptions) (pbv1.StreamFilterResult, error) + Sort(ctx context.Context, opts pbv1.StreamQueryOptions) (pbv1.StreamSortResult, error) + Filter(ctx context.Context, opts pbv1.StreamQueryOptions) (pbv1.StreamQueryResult, error) } // StreamExecutionContextKey is the key of stream execution context in context.Context. diff --git a/pkg/query/logical/stream/stream_plan_indexscan_local.go b/pkg/query/logical/stream/stream_plan_indexscan_local.go index 99ce11f4a..2e7a39fb0 100644 --- a/pkg/query/logical/stream/stream_plan_indexscan_local.go +++ b/pkg/query/logical/stream/stream_plan_indexscan_local.go @@ -73,7 +73,7 @@ func (i *localIndexScan) Execute(ctx context.Context) ([]*streamv1.Element, erro ec := executor.FromStreamExecutionContext(ctx) if i.order != nil && i.order.Index != nil { - ssr, err := ec.Sort(ctx, pbv1.StreamSortOptions{ + ssr, err := ec.Sort(ctx, pbv1.StreamQueryOptions{ Name: i.metadata.GetName(), TimeRange: &i.timeRange, Entities: i.entities, @@ -93,7 +93,7 @@ func (i *localIndexScan) Execute(ctx context.Context) ([]*streamv1.Element, erro } if i.filter != nil && i.filter != logical.ENode { - sfr, err := ec.Filter(ctx, pbv1.StreamFilterOptions{ + result, err := ec.Filter(ctx, pbv1.StreamQueryOptions{ Name: i.metadata.GetName(), TimeRange: &i.timeRange, Entities: i.entities, @@ -105,11 +105,10 @@ func (i *localIndexScan) Execute(ctx context.Context) ([]*streamv1.Element, erro if err != nil { return nil, err } - if sfr == nil { + if result == nil { return nil, nil } - r := sfr.Pull() - return buildElementsFromColumnResult(r), nil + return BuildElementsFromStreamResult(result), nil } result, err := ec.Query(ctx, pbv1.StreamQueryOptions{ @@ -123,7 +122,7 @@ func (i *localIndexScan) Execute(ctx context.Context) ([]*streamv1.Element, erro if err != nil { return nil, fmt.Errorf("failed to query stream: %w", err) } - return buildElementsFromQueryResults(result), nil + return BuildElementsFromStreamResult(result), nil } func (i *localIndexScan) String() string { @@ -167,7 +166,8 @@ func buildElementsFromColumnResult(r *pbv1.StreamColumnResult) (elements []*stre return } -func buildElementsFromQueryResults(result pbv1.StreamQueryResult) (elements []*streamv1.Element) { +// BuildElementsFromStreamResult builds a slice of elements from the given stream query result. +func BuildElementsFromStreamResult(result pbv1.StreamQueryResult) (elements []*streamv1.Element) { deduplication := make(map[string]struct{}) for { r := result.Pull() diff --git a/pkg/timestamp/range.go b/pkg/timestamp/range.go index 77603ed07..aaf719bbe 100644 --- a/pkg/timestamp/range.go +++ b/pkg/timestamp/range.go @@ -148,3 +148,26 @@ func FindRange[T int64 | uint64](timestamps []T, min, max T) (int, int, bool) { } return start, end, start <= end } + +// Find returns the index of the target in the sorted 'timestamps' slice. +func Find(timestamps []int64, target int64) int { + if len(timestamps) == 0 { + return -1 + } + if timestamps[0] > target || timestamps[len(timestamps)-1] < target { + return -1 + } + left, right := 0, len(timestamps)-1 + for left <= right { + mid := (left + right) / 2 + if timestamps[mid] == target { + return mid + } + if timestamps[mid] < target { + left = mid + 1 + } else { + right = mid - 1 + } + } + return -1 +} diff --git a/test/cases/stream/data/input/order_asc.yaml b/test/cases/stream/data/input/order_asc.yaml new file mode 100644 index 000000000..9963ac2d4 --- /dev/null +++ b/test/cases/stream/data/input/order_asc.yaml @@ -0,0 +1,27 @@ +# Licensed to Apache Software Foundation (ASF) under one or more contributor +# license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright +# ownership. Apache Software Foundation (ASF) licenses this file to you under +# the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +name: "sw" +groups: ["default"] +projection: + tagFamilies: + - name: "searchable" + tags: ["trace_id", "duration"] + - name: "data" + tags: ["data_binary"] +orderBy: + sort: "SORT_ASC" diff --git a/test/cases/stream/data/input/order_desc.yaml b/test/cases/stream/data/input/order_desc.yaml new file mode 100644 index 000000000..3c2d97d3f --- /dev/null +++ b/test/cases/stream/data/input/order_desc.yaml @@ -0,0 +1,27 @@ +# Licensed to Apache Software Foundation (ASF) under one or more contributor +# license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright +# ownership. Apache Software Foundation (ASF) licenses this file to you under +# the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +name: "sw" +groups: ["default"] +projection: + tagFamilies: + - name: "searchable" + tags: ["trace_id", "duration"] + - name: "data" + tags: ["data_binary"] +orderBy: + sort: "SORT_DESC" diff --git a/test/cases/stream/data/want/order_asc.yaml b/test/cases/stream/data/want/order_asc.yaml new file mode 100644 index 000000000..725cf0847 --- /dev/null +++ b/test/cases/stream/data/want/order_asc.yaml @@ -0,0 +1,103 @@ +# Licensed to Apache Software Foundation (ASF) under one or more contributor +# license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright +# ownership. Apache Software Foundation (ASF) licenses this file to you under +# the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +elements: + - elementId: "0" + tagFamilies: + - name: searchable + tags: + - key: trace_id + value: + str: + value: "1" + - key: duration + value: + int: + value: "1000" + - name: data + tags: + - key: data_binary + value: + binaryData: YWJjMTIzIT8kKiYoKSctPUB+ + - elementId: "1" + tagFamilies: + - name: searchable + tags: + - key: trace_id + value: + str: + value: "2" + - key: duration + value: + int: + value: "500" + - name: data + tags: + - key: data_binary + value: + binaryData: YWJjMTIzIT8kKiYoKSctPUB+ + - elementId: "2" + tagFamilies: + - name: searchable + tags: + - key: trace_id + value: + str: + value: "3" + - key: duration + value: + int: + value: "30" + - name: data + tags: + - key: data_binary + value: + binaryData: YWJjMTIzIT8kKiYoKSctPUB+ + - elementId: "3" + tagFamilies: + - name: searchable + tags: + - key: trace_id + value: + str: + value: "4" + - key: duration + value: + int: + value: "60" + - name: data + tags: + - key: data_binary + value: + binaryData: YWJjMTIzIT8kKiYoKSctPUB+ + - elementId: "4" + tagFamilies: + - name: searchable + tags: + - key: trace_id + value: + str: + value: "5" + - key: duration + value: + int: + value: "300" + - name: data + tags: + - key: data_binary + value: + binaryData: YWJjMTIzIT8kKiYoKSctPUB+ diff --git a/test/cases/stream/data/want/order_desc.yaml b/test/cases/stream/data/want/order_desc.yaml new file mode 100644 index 000000000..055f9449f --- /dev/null +++ b/test/cases/stream/data/want/order_desc.yaml @@ -0,0 +1,103 @@ +# Licensed to Apache Software Foundation (ASF) under one or more contributor +# license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright +# ownership. Apache Software Foundation (ASF) licenses this file to you under +# the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +elements: + - elementId: "4" + tagFamilies: + - name: searchable + tags: + - key: trace_id + value: + str: + value: "5" + - key: duration + value: + int: + value: "300" + - name: data + tags: + - key: data_binary + value: + binaryData: YWJjMTIzIT8kKiYoKSctPUB+ + - elementId: "3" + tagFamilies: + - name: searchable + tags: + - key: trace_id + value: + str: + value: "4" + - key: duration + value: + int: + value: "60" + - name: data + tags: + - key: data_binary + value: + binaryData: YWJjMTIzIT8kKiYoKSctPUB+ + - elementId: "2" + tagFamilies: + - name: searchable + tags: + - key: trace_id + value: + str: + value: "3" + - key: duration + value: + int: + value: "30" + - name: data + tags: + - key: data_binary + value: + binaryData: YWJjMTIzIT8kKiYoKSctPUB+ + - elementId: "1" + tagFamilies: + - name: searchable + tags: + - key: trace_id + value: + str: + value: "2" + - key: duration + value: + int: + value: "500" + - name: data + tags: + - key: data_binary + value: + binaryData: YWJjMTIzIT8kKiYoKSctPUB+ + - elementId: "0" + tagFamilies: + - name: searchable + tags: + - key: trace_id + value: + str: + value: "1" + - key: duration + value: + int: + value: "1000" + - name: data + tags: + - key: data_binary + value: + binaryData: YWJjMTIzIT8kKiYoKSctPUB+ diff --git a/test/cases/stream/stream.go b/test/cases/stream/stream.go index 66408bdd2..200a99794 100644 --- a/test/cases/stream/stream.go +++ b/test/cases/stream/stream.go @@ -49,6 +49,8 @@ var _ = g.DescribeTable("Scanning Streams", func(args helpers.Args) { g.Entry("all elements", helpers.Args{Input: "all", Duration: 1 * time.Hour}), g.Entry("limit", helpers.Args{Input: "limit", Duration: 1 * time.Hour}), g.Entry("offset", helpers.Args{Input: "offset", Duration: 1 * time.Hour}), + g.Entry("order asc", helpers.Args{Input: "order_asc", Duration: 1 * time.Hour}), + g.Entry("order desc", helpers.Args{Input: "order_desc", Duration: 1 * time.Hour}), g.Entry("nothing", helpers.Args{ Input: "all", Begin: timestamppb.New(time.Unix(0, 0).Truncate(time.Millisecond)),