Skip to content

Commit

Permalink
Improve filtering performance of Stream (#440)
Browse files Browse the repository at this point in the history
* Improve filtering performance of Stream

---------

Co-authored-by: 吴晟 Wu Sheng <wu.sheng@foxmail.com>
  • Loading branch information
ButterBright and wu-sheng committed May 24, 2024
1 parent 3729364 commit 715674c
Show file tree
Hide file tree
Showing 17 changed files with 630 additions and 281 deletions.
2 changes: 2 additions & 0 deletions banyand/cmd/server/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,14 @@ package main
import (
"fmt"
"os"
"runtime"

"github.com/apache/skywalking-banyandb/pkg/cmdsetup"
"github.com/apache/skywalking-banyandb/pkg/signal"
)

func main() {
runtime.GOMAXPROCS(runtime.NumCPU() * 2)
if err := cmdsetup.NewRoot(new(signal.Handler)).Execute(); err != nil {
_, _ = fmt.Fprintln(os.Stderr, err)
os.Exit(1)
Expand Down
39 changes: 27 additions & 12 deletions banyand/measure/query.go
Original file line number Diff line number Diff line change
Expand Up @@ -373,21 +373,36 @@ func (qr *queryResult) Pull() *pbv1.MeasureResult {
if len(qr.data) == 0 {
return nil
}
// TODO:// Parallel load
tmpBlock := generateBlock()
defer releaseBlock(tmpBlock)

cursorChan := make(chan int, len(qr.data))
for i := 0; i < len(qr.data); i++ {
if !qr.data[i].loadData(tmpBlock) {
qr.data = append(qr.data[:i], qr.data[i+1:]...)
i--
}
if i < 0 {
continue
}
if qr.orderByTimestampDesc() {
qr.data[i].idx = len(qr.data[i].timestamps) - 1
go func(i int) {
tmpBlock := generateBlock()
defer releaseBlock(tmpBlock)
if !qr.data[i].loadData(tmpBlock) {
cursorChan <- i
return
}
if qr.orderByTimestampDesc() {
qr.data[i].idx = len(qr.data[i].timestamps) - 1
}
cursorChan <- -1
}(i)
}

blankCursorList := []int{}
for completed := 0; completed < len(qr.data); completed++ {
result := <-cursorChan
if result != -1 {
blankCursorList = append(blankCursorList, result)
}
}
sort.Slice(blankCursorList, func(i, j int) bool {
return blankCursorList[i] > blankCursorList[j]
})
for _, index := range blankCursorList {
qr.data = append(qr.data[:index], qr.data[index+1:]...)
}
qr.loaded = true
heap.Init(qr)
}
Expand Down
65 changes: 14 additions & 51 deletions banyand/stream/benchmark_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ import (
"github.com/apache/skywalking-banyandb/pkg/index/posting/roaring"
"github.com/apache/skywalking-banyandb/pkg/logger"
pbv1 "github.com/apache/skywalking-banyandb/pkg/pb/v1"
logicalstream "github.com/apache/skywalking-banyandb/pkg/query/logical/stream"
"github.com/apache/skywalking-banyandb/pkg/test"
"github.com/apache/skywalking-banyandb/pkg/timestamp"
)
Expand All @@ -54,6 +55,7 @@ const (
)

type parameter struct {
scenario string
batchCount int
timestampCount int
seriesCount int
Expand All @@ -63,9 +65,9 @@ type parameter struct {
}

var pList = [3]parameter{
{batchCount: 2, timestampCount: 500, seriesCount: 100, tagCardinality: 10, startTimestamp: 1, endTimestamp: 1000},
{batchCount: 2, timestampCount: 500, seriesCount: 100, tagCardinality: 10, startTimestamp: 900, endTimestamp: 1000},
{batchCount: 2, timestampCount: 500, seriesCount: 100, tagCardinality: 10, startTimestamp: 300, endTimestamp: 400},
{batchCount: 2, timestampCount: 500, seriesCount: 100, tagCardinality: 10, startTimestamp: 1, endTimestamp: 1000, scenario: "large-scale"},
{batchCount: 2, timestampCount: 500, seriesCount: 100, tagCardinality: 10, startTimestamp: 900, endTimestamp: 1000, scenario: "latest"},
{batchCount: 2, timestampCount: 500, seriesCount: 100, tagCardinality: 10, startTimestamp: 300, endTimestamp: 400, scenario: "historical"},
}

type mockIndex map[string]map[common.SeriesID]posting.List
Expand Down Expand Up @@ -287,47 +289,7 @@ func generateStream(db storage.TSDB[*tsTable, option]) *stream {
}
}

func generateStreamFilterOptions(p parameter, index mockIndex) pbv1.StreamFilterOptions {
timeRange := timestamp.TimeRange{
Start: time.Unix(int64(p.startTimestamp), 0),
End: time.Unix(int64(p.endTimestamp), 0),
IncludeStart: true,
IncludeEnd: true,
}
entities := make([][]*modelv1.TagValue, 0)
for i := 1; i <= p.seriesCount; i++ {
entity := []*modelv1.TagValue{
{
Value: &modelv1.TagValue_Str{
Str: &modelv1.Str{
Value: entityTagValuePrefix + strconv.Itoa(i),
},
},
},
}
entities = append(entities, entity)
}
num := generateRandomNumber(int64(p.tagCardinality))
value := filterTagValuePrefix + strconv.Itoa(num)
filter := mockFilter{
index: index,
value: value,
}
tagProjection := pbv1.TagProjection{
Family: "benchmark-family",
Names: []string{"entity-tag", "filter-tag"},
}
return pbv1.StreamFilterOptions{
Name: "benchmark",
TimeRange: &timeRange,
Entities: entities,
Filter: filter,
TagProjection: []pbv1.TagProjection{tagProjection},
MaxElementSize: math.MaxInt32,
}
}

func generateStreamSortOptions(p parameter, index mockIndex) pbv1.StreamSortOptions {
func generateStreamQueryOptions(p parameter, index mockIndex) pbv1.StreamQueryOptions {
timeRange := timestamp.TimeRange{
Start: time.Unix(int64(p.startTimestamp), 0),
End: time.Unix(int64(p.endTimestamp), 0),
Expand Down Expand Up @@ -368,7 +330,7 @@ func generateStreamSortOptions(p parameter, index mockIndex) pbv1.StreamSortOpti
Family: "benchmark-family",
Names: []string{"entity-tag", "filter-tag"},
}
return pbv1.StreamSortOptions{
return pbv1.StreamQueryOptions{
Name: "benchmark",
TimeRange: &timeRange,
Entities: entities,
Expand All @@ -385,10 +347,11 @@ func BenchmarkFilter(b *testing.B) {
esList, docsList, idx := generateData(p)
db := write(b, p, esList, docsList)
s := generateStream(db)
sfo := generateStreamFilterOptions(p, idx)
b.Run("filter", func(b *testing.B) {
_, err := s.Filter(context.TODO(), sfo)
sqo := generateStreamQueryOptions(p, idx)
b.Run("filter-"+p.scenario, func(b *testing.B) {
res, err := s.Filter(context.TODO(), sqo)
require.NoError(b, err)
logicalstream.BuildElementsFromStreamResult(res)
})
}
}
Expand All @@ -399,9 +362,9 @@ func BenchmarkSort(b *testing.B) {
esList, docsList, idx := generateData(p)
db := write(b, p, esList, docsList)
s := generateStream(db)
sso := generateStreamSortOptions(p, idx)
b.Run("sort", func(b *testing.B) {
_, err := s.Sort(context.TODO(), sso)
sqo := generateStreamQueryOptions(p, idx)
b.Run("sort-"+p.scenario, func(b *testing.B) {
_, err := s.Sort(context.TODO(), sqo)
require.NoError(b, err)
})
}
Expand Down
71 changes: 50 additions & 21 deletions banyand/stream/block.go
Original file line number Diff line number Diff line change
Expand Up @@ -408,16 +408,17 @@ func releaseBlock(b *block) {
var blockPool sync.Pool

type blockCursor struct {
p *part
timestamps []int64
elementIDs []string
tagFamilies []tagFamily
tagValuesDecoder encoding.BytesBlockDecoder
tagProjection []pbv1.TagProjection
bm blockMetadata
idx int
minTimestamp int64
maxTimestamp int64
p *part
timestamps []int64
expectedTimestamps []int64
elementIDs []string
tagFamilies []tagFamily
tagValuesDecoder encoding.BytesBlockDecoder
tagProjection []pbv1.TagProjection
bm blockMetadata
idx int
minTimestamp int64
maxTimestamp int64
}

func (bc *blockCursor) reset() {
Expand All @@ -438,13 +439,17 @@ func (bc *blockCursor) reset() {
bc.tagFamilies = tff[:0]
}

func (bc *blockCursor) init(p *part, bm *blockMetadata, queryOpts queryOptions) {
func (bc *blockCursor) init(p *part, bm *blockMetadata, opts queryOptions) {
bc.reset()
bc.p = p
bc.bm.copyFrom(bm)
bc.minTimestamp = queryOpts.minTimestamp
bc.maxTimestamp = queryOpts.maxTimestamp
bc.tagProjection = queryOpts.TagProjection
bc.minTimestamp = opts.minTimestamp
bc.maxTimestamp = opts.maxTimestamp
bc.tagProjection = opts.TagProjection
if opts.elementRefMap != nil {
seriesID := bc.bm.seriesID
bc.expectedTimestamps = opts.elementRefMap[seriesID]
}
}

func (bc *blockCursor) copyAllTo(r *pbv1.StreamResult, desc bool) {
Expand Down Expand Up @@ -543,12 +548,30 @@ func (bc *blockCursor) loadData(tmpBlock *block) bool {
bc.bm.tagFamilies = tf
tmpBlock.mustReadFrom(&bc.tagValuesDecoder, bc.p, bc.bm)

start, end, ok := timestamp.FindRange(tmpBlock.timestamps, bc.minTimestamp, bc.maxTimestamp)
if !ok {
return false
idxList := make([]int, 0)
var start, end int
if bc.expectedTimestamps != nil {
for _, ts := range bc.expectedTimestamps {
idx := timestamp.Find(tmpBlock.timestamps, ts)
if idx == -1 {
continue
}
idxList = append(idxList, idx)
bc.timestamps = append(bc.timestamps, tmpBlock.timestamps[idx])
bc.elementIDs = append(bc.elementIDs, tmpBlock.elementIDs[idx])
}
if len(bc.timestamps) == 0 {
return false
}
} else {
s, e, ok := timestamp.FindRange(tmpBlock.timestamps, bc.minTimestamp, bc.maxTimestamp)
start, end = s, e
if !ok {
return false
}
bc.timestamps = append(bc.timestamps, tmpBlock.timestamps[s:e+1]...)
bc.elementIDs = append(bc.elementIDs, tmpBlock.elementIDs[s:e+1]...)
}
bc.timestamps = append(bc.timestamps, tmpBlock.timestamps[start:end+1]...)
bc.elementIDs = append(bc.elementIDs, tmpBlock.elementIDs[start:end+1]...)

for i, projection := range bc.bm.tagProjection {
tf := tagFamily{
Expand All @@ -559,13 +582,19 @@ func (bc *blockCursor) loadData(tmpBlock *block) bool {
t := tag{
name: name,
}
if tmpBlock.tagFamilies[i].tags[blockIndex].name == name {
if len(tmpBlock.tagFamilies[i].tags) != 0 && tmpBlock.tagFamilies[i].tags[blockIndex].name == name {
t.valueType = tmpBlock.tagFamilies[i].tags[blockIndex].valueType
if len(tmpBlock.tagFamilies[i].tags[blockIndex].values) != len(tmpBlock.timestamps) {
logger.Panicf("unexpected number of values for tags %q: got %d; want %d",
tmpBlock.tagFamilies[i].tags[blockIndex].name, len(tmpBlock.tagFamilies[i].tags[blockIndex].values), len(tmpBlock.timestamps))
}
t.values = append(t.values, tmpBlock.tagFamilies[i].tags[blockIndex].values[start:end+1]...)
if bc.expectedTimestamps != nil {
for _, idx := range idxList {
t.values = append(t.values, tmpBlock.tagFamilies[i].tags[blockIndex].values[idx])
}
} else {
t.values = append(t.values, tmpBlock.tagFamilies[i].tags[blockIndex].values[start:end+1]...)
}
}
blockIndex++
tf.tags = append(tf.tags, t)
Expand Down
3 changes: 3 additions & 0 deletions banyand/stream/index.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,9 @@ func (e *elementIndex) Search(_ context.Context, seriesList pbv1.SeriesList, fil
if pl == nil {
pl = roaring.DummyPostingList
}
if pl.IsEmpty() {
continue
}
timestamps := pl.ToSlice()
sort.Slice(timestamps, func(i, j int) bool {
return timestamps[i] < timestamps[j]
Expand Down
22 changes: 11 additions & 11 deletions banyand/stream/iter_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,34 +31,34 @@ import (
type filterFn func(itemID uint64) bool

func (s *stream) buildSeriesByIndex(tableWrappers []storage.TSTableWrapper[*tsTable],
seriesList pbv1.SeriesList, sso pbv1.StreamSortOptions,
seriesList pbv1.SeriesList, sqo pbv1.StreamQueryOptions,
) (series []*searcherIterator, err error) {
timeFilter := func(itemID uint64) bool {
return sso.TimeRange.Contains(int64(itemID))
return sqo.TimeRange.Contains(int64(itemID))
}
indexRuleForSorting := sso.Order.Index
indexRuleForSorting := sqo.Order.Index
if len(indexRuleForSorting.Tags) != 1 {
return nil, fmt.Errorf("only support one tag for sorting, but got %d", len(indexRuleForSorting.Tags))
}
sortedTag := indexRuleForSorting.Tags[0]
tl := newTagLocation()
for i := range sso.TagProjection {
for j := range sso.TagProjection[i].Names {
if sso.TagProjection[i].Names[j] == sortedTag {
for i := range sqo.TagProjection {
for j := range sqo.TagProjection[i].Names {
if sqo.TagProjection[i].Names[j] == sortedTag {
tl.familyIndex, tl.tagIndex = i, j
}
}
}
if !tl.valid() {
return nil, fmt.Errorf("sorted tag %s not found in tag projection", sortedTag)
}
entityMap, tagSpecIndex, tagProjIndex, sidToIndex := s.genIndex(sso.TagProjection, seriesList)
entityMap, tagSpecIndex, tagProjIndex, sidToIndex := s.genIndex(sqo.TagProjection, seriesList)
sids := seriesList.IDs()
for _, tw := range tableWrappers {
seriesFilter := make(map[common.SeriesID]filterFn)
if sso.Filter != nil {
if sqo.Filter != nil {
for i := range sids {
pl, errExe := sso.Filter.Execute(func(_ databasev1.IndexRule_Type) (index.Searcher, error) {
pl, errExe := sqo.Filter.Execute(func(_ databasev1.IndexRule_Type) (index.Searcher, error) {
return tw.Table().Index().store, nil
}, sids[i])
if errExe != nil {
Expand All @@ -79,14 +79,14 @@ func (s *stream) buildSeriesByIndex(tableWrappers []storage.TSTableWrapper[*tsTa
IndexRuleID: indexRuleForSorting.GetMetadata().GetId(),
Analyzer: indexRuleForSorting.GetAnalyzer(),
}
inner, err = tw.Table().Index().Sort(sids, fieldKey, sso.Order.Sort, sso.MaxElementSize)
inner, err = tw.Table().Index().Sort(sids, fieldKey, sqo.Order.Sort, sqo.MaxElementSize)
if err != nil {
return nil, err
}

if inner != nil {
series = append(series, newSearcherIterator(s.l, inner, tw.Table(),
seriesFilter, timeFilter, sso.TagProjection, tl,
seriesFilter, timeFilter, sqo.TagProjection, tl,
tagSpecIndex, tagProjIndex, sidToIndex, seriesList, entityMap))
}
}
Expand Down

0 comments on commit 715674c

Please sign in to comment.