Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve filtering performance of Stream #440

Merged
merged 20 commits into from
May 24, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
2 changes: 2 additions & 0 deletions banyand/cmd/server/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,14 @@ package main
import (
"fmt"
"os"
"runtime"

"github.com/apache/skywalking-banyandb/pkg/cmdsetup"
"github.com/apache/skywalking-banyandb/pkg/signal"
)

func main() {
runtime.GOMAXPROCS(runtime.NumCPU() * 2)
if err := cmdsetup.NewRoot(new(signal.Handler)).Execute(); err != nil {
_, _ = fmt.Fprintln(os.Stderr, err)
os.Exit(1)
Expand Down
39 changes: 27 additions & 12 deletions banyand/measure/query.go
Original file line number Diff line number Diff line change
Expand Up @@ -373,21 +373,36 @@ func (qr *queryResult) Pull() *pbv1.MeasureResult {
if len(qr.data) == 0 {
return nil
}
// TODO:// Parallel load
tmpBlock := generateBlock()
defer releaseBlock(tmpBlock)

cursorChan := make(chan int, len(qr.data))
for i := 0; i < len(qr.data); i++ {
if !qr.data[i].loadData(tmpBlock) {
qr.data = append(qr.data[:i], qr.data[i+1:]...)
i--
}
if i < 0 {
continue
}
if qr.orderByTimestampDesc() {
qr.data[i].idx = len(qr.data[i].timestamps) - 1
go func(i int) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please limit the maximum number of goroutines to twice the number of CPU cores specified by the GOMAXPROCS. You can find more information about GOMAXPROCS here.

tmpBlock := generateBlock()
defer releaseBlock(tmpBlock)
if !qr.data[i].loadData(tmpBlock) {
cursorChan <- i
return
}
if qr.orderByTimestampDesc() {
qr.data[i].idx = len(qr.data[i].timestamps) - 1
}
cursorChan <- -1
}(i)
}

blankCursorList := []int{}
for completed := 0; completed < len(qr.data); completed++ {
result := <-cursorChan
if result != -1 {
blankCursorList = append(blankCursorList, result)
}
}
sort.Slice(blankCursorList, func(i, j int) bool {
return blankCursorList[i] > blankCursorList[j]
})
for _, index := range blankCursorList {
qr.data = append(qr.data[:index], qr.data[index+1:]...)
}
qr.loaded = true
heap.Init(qr)
}
Expand Down
65 changes: 14 additions & 51 deletions banyand/stream/benchmark_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ import (
"github.com/apache/skywalking-banyandb/pkg/index/posting/roaring"
"github.com/apache/skywalking-banyandb/pkg/logger"
pbv1 "github.com/apache/skywalking-banyandb/pkg/pb/v1"
logicalstream "github.com/apache/skywalking-banyandb/pkg/query/logical/stream"
"github.com/apache/skywalking-banyandb/pkg/test"
"github.com/apache/skywalking-banyandb/pkg/timestamp"
)
Expand All @@ -54,6 +55,7 @@ const (
)

type parameter struct {
scenario string
batchCount int
timestampCount int
seriesCount int
Expand All @@ -63,9 +65,9 @@ type parameter struct {
}

var pList = [3]parameter{
{batchCount: 2, timestampCount: 500, seriesCount: 100, tagCardinality: 10, startTimestamp: 1, endTimestamp: 1000},
{batchCount: 2, timestampCount: 500, seriesCount: 100, tagCardinality: 10, startTimestamp: 900, endTimestamp: 1000},
{batchCount: 2, timestampCount: 500, seriesCount: 100, tagCardinality: 10, startTimestamp: 300, endTimestamp: 400},
{batchCount: 2, timestampCount: 500, seriesCount: 100, tagCardinality: 10, startTimestamp: 1, endTimestamp: 1000, scenario: "large-scale"},
{batchCount: 2, timestampCount: 500, seriesCount: 100, tagCardinality: 10, startTimestamp: 900, endTimestamp: 1000, scenario: "latest"},
{batchCount: 2, timestampCount: 500, seriesCount: 100, tagCardinality: 10, startTimestamp: 300, endTimestamp: 400, scenario: "historical"},
}

type mockIndex map[string]map[common.SeriesID]posting.List
Expand Down Expand Up @@ -287,47 +289,7 @@ func generateStream(db storage.TSDB[*tsTable, option]) *stream {
}
}

func generateStreamFilterOptions(p parameter, index mockIndex) pbv1.StreamFilterOptions {
timeRange := timestamp.TimeRange{
Start: time.Unix(int64(p.startTimestamp), 0),
End: time.Unix(int64(p.endTimestamp), 0),
IncludeStart: true,
IncludeEnd: true,
}
entities := make([][]*modelv1.TagValue, 0)
for i := 1; i <= p.seriesCount; i++ {
entity := []*modelv1.TagValue{
{
Value: &modelv1.TagValue_Str{
Str: &modelv1.Str{
Value: entityTagValuePrefix + strconv.Itoa(i),
},
},
},
}
entities = append(entities, entity)
}
num := generateRandomNumber(int64(p.tagCardinality))
value := filterTagValuePrefix + strconv.Itoa(num)
filter := mockFilter{
index: index,
value: value,
}
tagProjection := pbv1.TagProjection{
Family: "benchmark-family",
Names: []string{"entity-tag", "filter-tag"},
}
return pbv1.StreamFilterOptions{
Name: "benchmark",
TimeRange: &timeRange,
Entities: entities,
Filter: filter,
TagProjection: []pbv1.TagProjection{tagProjection},
MaxElementSize: math.MaxInt32,
}
}

func generateStreamSortOptions(p parameter, index mockIndex) pbv1.StreamSortOptions {
func generateStreamQueryOptions(p parameter, index mockIndex) pbv1.StreamQueryOptions {
timeRange := timestamp.TimeRange{
Start: time.Unix(int64(p.startTimestamp), 0),
End: time.Unix(int64(p.endTimestamp), 0),
Expand Down Expand Up @@ -368,7 +330,7 @@ func generateStreamSortOptions(p parameter, index mockIndex) pbv1.StreamSortOpti
Family: "benchmark-family",
Names: []string{"entity-tag", "filter-tag"},
}
return pbv1.StreamSortOptions{
return pbv1.StreamQueryOptions{
Name: "benchmark",
TimeRange: &timeRange,
Entities: entities,
Expand All @@ -385,10 +347,11 @@ func BenchmarkFilter(b *testing.B) {
esList, docsList, idx := generateData(p)
db := write(b, p, esList, docsList)
s := generateStream(db)
sfo := generateStreamFilterOptions(p, idx)
b.Run("filter", func(b *testing.B) {
_, err := s.Filter(context.TODO(), sfo)
sqo := generateStreamQueryOptions(p, idx)
b.Run("filter-"+p.scenario, func(b *testing.B) {
res, err := s.Filter(context.TODO(), sqo)
require.NoError(b, err)
logicalstream.BuildElementsFromStreamResult(res)
})
}
}
Expand All @@ -399,9 +362,9 @@ func BenchmarkSort(b *testing.B) {
esList, docsList, idx := generateData(p)
db := write(b, p, esList, docsList)
s := generateStream(db)
sso := generateStreamSortOptions(p, idx)
b.Run("sort", func(b *testing.B) {
_, err := s.Sort(context.TODO(), sso)
sqo := generateStreamQueryOptions(p, idx)
b.Run("sort-"+p.scenario, func(b *testing.B) {
_, err := s.Sort(context.TODO(), sqo)
require.NoError(b, err)
})
}
Expand Down
71 changes: 50 additions & 21 deletions banyand/stream/block.go
Original file line number Diff line number Diff line change
Expand Up @@ -408,16 +408,17 @@ func releaseBlock(b *block) {
var blockPool sync.Pool

type blockCursor struct {
p *part
timestamps []int64
elementIDs []string
tagFamilies []tagFamily
tagValuesDecoder encoding.BytesBlockDecoder
tagProjection []pbv1.TagProjection
bm blockMetadata
idx int
minTimestamp int64
maxTimestamp int64
p *part
timestamps []int64
expectedTimestamps []int64
elementIDs []string
tagFamilies []tagFamily
tagValuesDecoder encoding.BytesBlockDecoder
tagProjection []pbv1.TagProjection
bm blockMetadata
idx int
minTimestamp int64
maxTimestamp int64
}

func (bc *blockCursor) reset() {
Expand All @@ -438,13 +439,17 @@ func (bc *blockCursor) reset() {
bc.tagFamilies = tff[:0]
}

func (bc *blockCursor) init(p *part, bm *blockMetadata, queryOpts queryOptions) {
func (bc *blockCursor) init(p *part, bm *blockMetadata, opts queryOptions) {
bc.reset()
bc.p = p
bc.bm.copyFrom(bm)
bc.minTimestamp = queryOpts.minTimestamp
bc.maxTimestamp = queryOpts.maxTimestamp
bc.tagProjection = queryOpts.TagProjection
bc.minTimestamp = opts.minTimestamp
bc.maxTimestamp = opts.maxTimestamp
bc.tagProjection = opts.TagProjection
if opts.elementRefMap != nil {
seriesID := bc.bm.seriesID
bc.expectedTimestamps = opts.elementRefMap[seriesID]
}
}

func (bc *blockCursor) copyAllTo(r *pbv1.StreamResult, desc bool) {
Expand Down Expand Up @@ -543,12 +548,30 @@ func (bc *blockCursor) loadData(tmpBlock *block) bool {
bc.bm.tagFamilies = tf
tmpBlock.mustReadFrom(&bc.tagValuesDecoder, bc.p, bc.bm)

start, end, ok := timestamp.FindRange(tmpBlock.timestamps, bc.minTimestamp, bc.maxTimestamp)
if !ok {
return false
idxList := make([]int, 0)
var start, end int
if bc.expectedTimestamps != nil {
for _, ts := range bc.expectedTimestamps {
idx := timestamp.Find(tmpBlock.timestamps, ts)
if idx == -1 {
continue
}
idxList = append(idxList, idx)
bc.timestamps = append(bc.timestamps, tmpBlock.timestamps[idx])
bc.elementIDs = append(bc.elementIDs, tmpBlock.elementIDs[idx])
}
if len(bc.timestamps) == 0 {
return false
}
} else {
s, e, ok := timestamp.FindRange(tmpBlock.timestamps, bc.minTimestamp, bc.maxTimestamp)
start, end = s, e
if !ok {
return false
}
bc.timestamps = append(bc.timestamps, tmpBlock.timestamps[s:e+1]...)
bc.elementIDs = append(bc.elementIDs, tmpBlock.elementIDs[s:e+1]...)
}
bc.timestamps = append(bc.timestamps, tmpBlock.timestamps[start:end+1]...)
bc.elementIDs = append(bc.elementIDs, tmpBlock.elementIDs[start:end+1]...)

for i, projection := range bc.bm.tagProjection {
tf := tagFamily{
Expand All @@ -559,13 +582,19 @@ func (bc *blockCursor) loadData(tmpBlock *block) bool {
t := tag{
name: name,
}
if tmpBlock.tagFamilies[i].tags[blockIndex].name == name {
if len(tmpBlock.tagFamilies[i].tags) != 0 && tmpBlock.tagFamilies[i].tags[blockIndex].name == name {
t.valueType = tmpBlock.tagFamilies[i].tags[blockIndex].valueType
if len(tmpBlock.tagFamilies[i].tags[blockIndex].values) != len(tmpBlock.timestamps) {
logger.Panicf("unexpected number of values for tags %q: got %d; want %d",
tmpBlock.tagFamilies[i].tags[blockIndex].name, len(tmpBlock.tagFamilies[i].tags[blockIndex].values), len(tmpBlock.timestamps))
}
t.values = append(t.values, tmpBlock.tagFamilies[i].tags[blockIndex].values[start:end+1]...)
if bc.expectedTimestamps != nil {
for _, idx := range idxList {
t.values = append(t.values, tmpBlock.tagFamilies[i].tags[blockIndex].values[idx])
}
} else {
t.values = append(t.values, tmpBlock.tagFamilies[i].tags[blockIndex].values[start:end+1]...)
}
}
blockIndex++
tf.tags = append(tf.tags, t)
Expand Down
3 changes: 3 additions & 0 deletions banyand/stream/index.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,9 @@ func (e *elementIndex) Search(_ context.Context, seriesList pbv1.SeriesList, fil
if pl == nil {
pl = roaring.DummyPostingList
}
if pl.IsEmpty() {
continue
}
Comment on lines +90 to +92
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please move it up a bit.

timestamps := pl.ToSlice()
sort.Slice(timestamps, func(i, j int) bool {
return timestamps[i] < timestamps[j]
Expand Down
22 changes: 11 additions & 11 deletions banyand/stream/iter_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,34 +31,34 @@ import (
type filterFn func(itemID uint64) bool

func (s *stream) buildSeriesByIndex(tableWrappers []storage.TSTableWrapper[*tsTable],
seriesList pbv1.SeriesList, sso pbv1.StreamSortOptions,
seriesList pbv1.SeriesList, sqo pbv1.StreamQueryOptions,
) (series []*searcherIterator, err error) {
timeFilter := func(itemID uint64) bool {
return sso.TimeRange.Contains(int64(itemID))
return sqo.TimeRange.Contains(int64(itemID))
}
indexRuleForSorting := sso.Order.Index
indexRuleForSorting := sqo.Order.Index
if len(indexRuleForSorting.Tags) != 1 {
return nil, fmt.Errorf("only support one tag for sorting, but got %d", len(indexRuleForSorting.Tags))
}
sortedTag := indexRuleForSorting.Tags[0]
tl := newTagLocation()
for i := range sso.TagProjection {
for j := range sso.TagProjection[i].Names {
if sso.TagProjection[i].Names[j] == sortedTag {
for i := range sqo.TagProjection {
for j := range sqo.TagProjection[i].Names {
if sqo.TagProjection[i].Names[j] == sortedTag {
tl.familyIndex, tl.tagIndex = i, j
}
}
}
if !tl.valid() {
return nil, fmt.Errorf("sorted tag %s not found in tag projection", sortedTag)
}
entityMap, tagSpecIndex, tagProjIndex, sidToIndex := s.genIndex(sso.TagProjection, seriesList)
entityMap, tagSpecIndex, tagProjIndex, sidToIndex := s.genIndex(sqo.TagProjection, seriesList)
sids := seriesList.IDs()
for _, tw := range tableWrappers {
seriesFilter := make(map[common.SeriesID]filterFn)
if sso.Filter != nil {
if sqo.Filter != nil {
for i := range sids {
pl, errExe := sso.Filter.Execute(func(_ databasev1.IndexRule_Type) (index.Searcher, error) {
pl, errExe := sqo.Filter.Execute(func(_ databasev1.IndexRule_Type) (index.Searcher, error) {
return tw.Table().Index().store, nil
}, sids[i])
if errExe != nil {
Expand All @@ -79,14 +79,14 @@ func (s *stream) buildSeriesByIndex(tableWrappers []storage.TSTableWrapper[*tsTa
IndexRuleID: indexRuleForSorting.GetMetadata().GetId(),
Analyzer: indexRuleForSorting.GetAnalyzer(),
}
inner, err = tw.Table().Index().Sort(sids, fieldKey, sso.Order.Sort, sso.MaxElementSize)
inner, err = tw.Table().Index().Sort(sids, fieldKey, sqo.Order.Sort, sqo.MaxElementSize)
if err != nil {
return nil, err
}

if inner != nil {
series = append(series, newSearcherIterator(s.l, inner, tw.Table(),
seriesFilter, timeFilter, sso.TagProjection, tl,
seriesFilter, timeFilter, sqo.TagProjection, tl,
tagSpecIndex, tagProjIndex, sidToIndex, seriesList, entityMap))
}
}
Expand Down