Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve filtering performance of Stream #440

Merged
merged 20 commits into from
May 24, 2024
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
98 changes: 83 additions & 15 deletions banyand/stream/block.go
Original file line number Diff line number Diff line change
Expand Up @@ -408,16 +408,17 @@ func releaseBlock(b *block) {
var blockPool sync.Pool

type blockCursor struct {
p *part
timestamps []int64
elementIDs []string
tagFamilies []tagFamily
tagValuesDecoder encoding.BytesBlockDecoder
tagProjection []pbv1.TagProjection
bm blockMetadata
idx int
minTimestamp int64
maxTimestamp int64
p *part
timestamps []int64
expectedTimestamps []int64
elementIDs []string
tagFamilies []tagFamily
tagValuesDecoder encoding.BytesBlockDecoder
tagProjection []pbv1.TagProjection
bm blockMetadata
idx int
minTimestamp int64
maxTimestamp int64
}

func (bc *blockCursor) reset() {
Expand All @@ -438,13 +439,22 @@ func (bc *blockCursor) reset() {
bc.tagFamilies = tff[:0]
}

func (bc *blockCursor) init(p *part, bm *blockMetadata, queryOpts queryOptions) {
func initBlockCursor[T queryOptions | filterOptions](bc *blockCursor, p *part, bm *blockMetadata, opts T) {
hanahmily marked this conversation as resolved.
Show resolved Hide resolved
bc.reset()
bc.p = p
bc.bm.copyFrom(bm)
bc.minTimestamp = queryOpts.minTimestamp
bc.maxTimestamp = queryOpts.maxTimestamp
bc.tagProjection = queryOpts.TagProjection
switch opts := interface{}(opts).(type) {
case queryOptions:
bc.minTimestamp = opts.minTimestamp
bc.maxTimestamp = opts.maxTimestamp
bc.tagProjection = opts.TagProjection
case filterOptions:
bc.minTimestamp = opts.minTimestamp
bc.maxTimestamp = opts.maxTimestamp
seriesID := bc.bm.seriesID
bc.expectedTimestamps = opts.elementRefMap[seriesID]
bc.tagProjection = opts.TagProjection
}
hanahmily marked this conversation as resolved.
Show resolved Hide resolved
}

func (bc *blockCursor) copyAllTo(r *pbv1.StreamResult, desc bool) {
Expand Down Expand Up @@ -559,7 +569,7 @@ func (bc *blockCursor) loadData(tmpBlock *block) bool {
t := tag{
name: name,
}
if tmpBlock.tagFamilies[i].tags[blockIndex].name == name {
if len(tmpBlock.tagFamilies[i].tags) != 0 && tmpBlock.tagFamilies[i].tags[blockIndex].name == name {
t.valueType = tmpBlock.tagFamilies[i].tags[blockIndex].valueType
if len(tmpBlock.tagFamilies[i].tags[blockIndex].values) != len(tmpBlock.timestamps) {
logger.Panicf("unexpected number of values for tags %q: got %d; want %d",
Expand All @@ -575,6 +585,64 @@ func (bc *blockCursor) loadData(tmpBlock *block) bool {
return true
}

func (bc *blockCursor) searchData(tmpBlock *block) bool {
hanahmily marked this conversation as resolved.
Show resolved Hide resolved
tmpBlock.reset()
bc.bm.tagProjection = bc.tagProjection
var tf map[string]*dataBlock
for i := range bc.tagProjection {
for tfName, block := range bc.bm.tagFamilies {
if bc.tagProjection[i].Family == tfName {
if tf == nil {
tf = make(map[string]*dataBlock, len(bc.tagProjection))
}
tf[tfName] = block
}
}
}
bc.bm.tagFamilies = tf
tmpBlock.mustReadFrom(&bc.tagValuesDecoder, bc.p, bc.bm)

idxList := make([]int, 0)
for _, ts := range bc.expectedTimestamps {
idx := timestamp.Find(tmpBlock.timestamps, ts)
if idx == -1 {
continue
}
idxList = append(idxList, idx)
bc.timestamps = append(bc.timestamps, tmpBlock.timestamps[idx])
bc.elementIDs = append(bc.elementIDs, tmpBlock.elementIDs[idx])
}
hanahmily marked this conversation as resolved.
Show resolved Hide resolved
if len(bc.timestamps) == 0 {
return false
}

for i, projection := range bc.bm.tagProjection {
tf := tagFamily{
name: projection.Family,
}
blockIndex := 0
for _, name := range projection.Names {
t := tag{
name: name,
}
if len(tmpBlock.tagFamilies[i].tags) != 0 && tmpBlock.tagFamilies[i].tags[blockIndex].name == name {
t.valueType = tmpBlock.tagFamilies[i].tags[blockIndex].valueType
if len(tmpBlock.tagFamilies[i].tags[blockIndex].values) != len(tmpBlock.timestamps) {
logger.Panicf("unexpected number of values for tags %q: got %d; want %d",
tmpBlock.tagFamilies[i].tags[blockIndex].name, len(tmpBlock.tagFamilies[i].tags[blockIndex].values), len(tmpBlock.timestamps))
}
for _, idx := range idxList {
t.values = append(t.values, tmpBlock.tagFamilies[i].tags[blockIndex].values[idx])
}
}
blockIndex++
tf.tags = append(tf.tags, t)
}
bc.tagFamilies = append(bc.tagFamilies, tf)
}
return true
}

var blockCursorPool sync.Pool

func generateBlockCursor() *blockCursor {
Expand Down
Loading
Loading