Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

recording the cost for different types of queries #1829

Merged
merged 10 commits into from Jun 13, 2023
4 changes: 4 additions & 0 deletions index/scorch/scorch.go
Expand Up @@ -588,6 +588,10 @@ func (s *Scorch) StatsMap() map[string]interface{} {
m := s.stats.ToMap()

indexSnapshot := s.currentSnapshot()
if indexSnapshot == nil {
return nil
}

defer func() {
_ = indexSnapshot.Close()
}()
Expand Down
8 changes: 5 additions & 3 deletions index/scorch/snapshot_index_tfr.go
Expand Up @@ -102,10 +102,10 @@ func (i *IndexSnapshotTermFieldReader) Next(preAlloced *index.TermFieldDoc) (*in
// this is because there are chances of having a series of loadChunk calls,
// and they have to be added together before sending the bytesRead at this point
// upstream.
if delta := i.iterators[i.segmentOffset].BytesRead() - prevBytesRead; delta > 0 {
i.incrementBytesRead(delta)
bytesRead := i.iterators[i.segmentOffset].BytesRead()
if bytesRead > prevBytesRead {
i.incrementBytesRead(bytesRead - prevBytesRead)
}

return rv, nil
}
i.segmentOffset++
Expand Down Expand Up @@ -204,6 +204,8 @@ func (i *IndexSnapshotTermFieldReader) Close() error {
// reader's bytesRead value
statsCallbackFn.(search.SearchIOStatsCallbackFunc)(i.bytesRead)
}

search.RecordSearchCost(i.ctx, search.AddM, i.bytesRead)
}

if i.snapshot != nil {
Expand Down
16 changes: 11 additions & 5 deletions index_impl.go
Expand Up @@ -474,9 +474,9 @@ func (i *indexImpl) SearchInContext(ctx context.Context, req *SearchRequest) (sr
// accounted by invoking this callback when the TFR is closed.
// 2. the docvalues portion (accounted in collector) and the retrieval
// of stored fields bytes (by LoadAndHighlightFields)
var totalBytesRead uint64
var totalSearchCost uint64
sendBytesRead := func(bytesRead uint64) {
totalBytesRead += bytesRead
totalSearchCost += bytesRead
}

ctx = context.WithValue(ctx, search.SearchIOStatsCallbackKey,
Expand All @@ -495,11 +495,13 @@ func (i *indexImpl) SearchInContext(ctx context.Context, req *SearchRequest) (sr
err = serr
}
if sr != nil {
sr.BytesRead = totalBytesRead
sr.Cost = totalSearchCost
}
metonymic-smokey marked this conversation as resolved.
Show resolved Hide resolved
if sr, ok := indexReader.(*scorch.IndexSnapshot); ok {
sr.UpdateIOStats(totalBytesRead)
sr.UpdateIOStats(totalSearchCost)
}

search.RecordSearchCost(ctx, search.DoneM, 0)
}()

if req.Facets != nil {
Expand Down Expand Up @@ -574,6 +576,7 @@ func (i *indexImpl) SearchInContext(ctx context.Context, req *SearchRequest) (sr
}
}

var storedFieldsCost uint64
for _, hit := range hits {
if i.name != "" {
hit.Index = i.name
Expand All @@ -582,9 +585,12 @@ func (i *indexImpl) SearchInContext(ctx context.Context, req *SearchRequest) (sr
if err != nil {
return nil, err
}
totalBytesRead += storedFieldsBytes
storedFieldsCost += storedFieldsBytes
}

totalSearchCost += storedFieldsCost
search.RecordSearchCost(ctx, search.AddM, storedFieldsCost)

atomic.AddUint64(&i.stats.searches, 1)
searchDuration := time.Since(searchStart)
atomic.AddUint64(&i.stats.searchTime, uint64(searchDuration))
Expand Down
26 changes: 13 additions & 13 deletions index_test.go
Expand Up @@ -401,7 +401,7 @@ func TestBytesRead(t *testing.T) {
}
stats, _ := idx.StatsMap()["index"].(map[string]interface{})
prevBytesRead, _ := stats["num_bytes_read_at_query_time"].(uint64)
if prevBytesRead != 32349 && res.BytesRead == prevBytesRead {
if prevBytesRead != 32349 && res.Cost == prevBytesRead {
t.Fatalf("expected bytes read for query string 32349, got %v",
prevBytesRead)
}
Expand All @@ -415,7 +415,7 @@ func TestBytesRead(t *testing.T) {
}
stats, _ = idx.StatsMap()["index"].(map[string]interface{})
bytesRead, _ := stats["num_bytes_read_at_query_time"].(uint64)
if bytesRead-prevBytesRead != 23 && res.BytesRead == bytesRead-prevBytesRead {
if bytesRead-prevBytesRead != 23 && res.Cost == bytesRead-prevBytesRead {
t.Fatalf("expected bytes read for query string 23, got %v",
bytesRead-prevBytesRead)
}
Expand All @@ -431,7 +431,7 @@ func TestBytesRead(t *testing.T) {
}
stats, _ = idx.StatsMap()["index"].(map[string]interface{})
bytesRead, _ = stats["num_bytes_read_at_query_time"].(uint64)
if bytesRead-prevBytesRead != 8468 && res.BytesRead == bytesRead-prevBytesRead {
if bytesRead-prevBytesRead != 8468 && res.Cost == bytesRead-prevBytesRead {
t.Fatalf("expected bytes read for fuzzy query is 8468, got %v",
bytesRead-prevBytesRead)
}
Expand All @@ -448,7 +448,7 @@ func TestBytesRead(t *testing.T) {

stats, _ = idx.StatsMap()["index"].(map[string]interface{})
bytesRead, _ = stats["num_bytes_read_at_query_time"].(uint64)
if !approxSame(bytesRead-prevBytesRead, 150) && res.BytesRead == bytesRead-prevBytesRead {
if !approxSame(bytesRead-prevBytesRead, 150) && res.Cost == bytesRead-prevBytesRead {
t.Fatalf("expected bytes read for faceted query is around 150, got %v",
bytesRead-prevBytesRead)
}
Expand All @@ -466,7 +466,7 @@ func TestBytesRead(t *testing.T) {

stats, _ = idx.StatsMap()["index"].(map[string]interface{})
bytesRead, _ = stats["num_bytes_read_at_query_time"].(uint64)
if bytesRead-prevBytesRead != 924 && res.BytesRead == bytesRead-prevBytesRead {
if bytesRead-prevBytesRead != 924 && res.Cost == bytesRead-prevBytesRead {
t.Fatalf("expected bytes read for numeric range query is 924, got %v",
bytesRead-prevBytesRead)
}
Expand All @@ -481,7 +481,7 @@ func TestBytesRead(t *testing.T) {

stats, _ = idx.StatsMap()["index"].(map[string]interface{})
bytesRead, _ = stats["num_bytes_read_at_query_time"].(uint64)
if bytesRead-prevBytesRead != 60 && res.BytesRead == bytesRead-prevBytesRead {
if bytesRead-prevBytesRead != 60 && res.Cost == bytesRead-prevBytesRead {
t.Fatalf("expected bytes read for query with highlighter is 60, got %v",
bytesRead-prevBytesRead)
}
Expand All @@ -498,7 +498,7 @@ func TestBytesRead(t *testing.T) {
// since it's created afresh and not reused
stats, _ = idx.StatsMap()["index"].(map[string]interface{})
bytesRead, _ = stats["num_bytes_read_at_query_time"].(uint64)
if bytesRead-prevBytesRead != 83 && res.BytesRead == bytesRead-prevBytesRead {
if bytesRead-prevBytesRead != 83 && res.Cost == bytesRead-prevBytesRead {
t.Fatalf("expected bytes read for disjunction query is 83, got %v",
bytesRead-prevBytesRead)
}
Expand Down Expand Up @@ -580,7 +580,7 @@ func TestBytesReadStored(t *testing.T) {

stats, _ := idx.StatsMap()["index"].(map[string]interface{})
bytesRead, _ := stats["num_bytes_read_at_query_time"].(uint64)
if bytesRead != 25928 && bytesRead == res.BytesRead {
if bytesRead != 25928 && bytesRead == res.Cost {
t.Fatalf("expected the bytes read stat to be around 25928, got %v", bytesRead)
}
prevBytesRead := bytesRead
Expand All @@ -592,7 +592,7 @@ func TestBytesReadStored(t *testing.T) {
}
stats, _ = idx.StatsMap()["index"].(map[string]interface{})
bytesRead, _ = stats["num_bytes_read_at_query_time"].(uint64)
if bytesRead-prevBytesRead != 15 && bytesRead-prevBytesRead == res.BytesRead {
if bytesRead-prevBytesRead != 15 && bytesRead-prevBytesRead == res.Cost {
t.Fatalf("expected the bytes read stat to be around 15, got %v", bytesRead-prevBytesRead)
}
prevBytesRead = bytesRead
Expand All @@ -607,7 +607,7 @@ func TestBytesReadStored(t *testing.T) {
stats, _ = idx.StatsMap()["index"].(map[string]interface{})
bytesRead, _ = stats["num_bytes_read_at_query_time"].(uint64)

if bytesRead-prevBytesRead != 26478 && bytesRead-prevBytesRead == res.BytesRead {
if bytesRead-prevBytesRead != 26478 && bytesRead-prevBytesRead == res.Cost {
t.Fatalf("expected the bytes read stat to be around 26478, got %v",
bytesRead-prevBytesRead)
}
Expand Down Expand Up @@ -651,7 +651,7 @@ func TestBytesReadStored(t *testing.T) {

stats, _ = idx1.StatsMap()["index"].(map[string]interface{})
bytesRead, _ = stats["num_bytes_read_at_query_time"].(uint64)
if bytesRead != 18114 && bytesRead == res.BytesRead {
if bytesRead != 18114 && bytesRead == res.Cost {
t.Fatalf("expected the bytes read stat to be around 18114, got %v", bytesRead)
}
prevBytesRead = bytesRead
Expand All @@ -662,7 +662,7 @@ func TestBytesReadStored(t *testing.T) {
}
stats, _ = idx1.StatsMap()["index"].(map[string]interface{})
bytesRead, _ = stats["num_bytes_read_at_query_time"].(uint64)
if bytesRead-prevBytesRead != 12 && bytesRead-prevBytesRead == res.BytesRead {
if bytesRead-prevBytesRead != 12 && bytesRead-prevBytesRead == res.Cost {
t.Fatalf("expected the bytes read stat to be around 12, got %v", bytesRead-prevBytesRead)
}
prevBytesRead = bytesRead
Expand All @@ -675,7 +675,7 @@ func TestBytesReadStored(t *testing.T) {

stats, _ = idx1.StatsMap()["index"].(map[string]interface{})
bytesRead, _ = stats["num_bytes_read_at_query_time"].(uint64)
if bytesRead-prevBytesRead != 42 && bytesRead-prevBytesRead == res.BytesRead {
if bytesRead-prevBytesRead != 42 && bytesRead-prevBytesRead == res.Cost {
t.Fatalf("expected the bytes read stat to be around 42, got %v", bytesRead-prevBytesRead)
}
}
Expand Down
30 changes: 21 additions & 9 deletions search.go
Expand Up @@ -485,15 +485,27 @@ func (ss *SearchStatus) Merge(other *SearchStatus) {

// A SearchResult describes the results of executing
// a SearchRequest.
//
// Status - Whether the search was executed on the underlying indexes successfully
// or failed, and the corresponding errors.
// Request - The SearchRequest that was executed.
// Hits - The list of documents that matched the query and their corresponding
// scores, score explanation, location info and so on.
// Total - The total number of documents that matched the query.
// Cost - indicates how expensive was the query with respect to bytes read
// from the mmaped index files.
// MaxScore - The maximum score seen across all document hits seen for this query.
// Took - The time taken to execute the search.
// Facets - The facet results for the search.
type SearchResult struct {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would you add commentary above this construct explaining what each of the response attribute stands for. This would then include what our definition for Cost is - the user can benefit from this with godocs.

Status *SearchStatus `json:"status"`
Request *SearchRequest `json:"request"`
Hits search.DocumentMatchCollection `json:"hits"`
Total uint64 `json:"total_hits"`
BytesRead uint64 `json:"bytesRead"`
MaxScore float64 `json:"max_score"`
Took time.Duration `json:"took"`
Facets search.FacetResults `json:"facets"`
Status *SearchStatus `json:"status"`
Request *SearchRequest `json:"request"`
Hits search.DocumentMatchCollection `json:"hits"`
Total uint64 `json:"total_hits"`
Cost uint64 `json:"cost"`
MaxScore float64 `json:"max_score"`
Took time.Duration `json:"took"`
Facets search.FacetResults `json:"facets"`
}

func (sr *SearchResult) Size() int {
Expand Down Expand Up @@ -566,7 +578,7 @@ func (sr *SearchResult) Merge(other *SearchResult) {
sr.Status.Merge(other.Status)
sr.Hits = append(sr.Hits, other.Hits...)
sr.Total += other.Total
sr.BytesRead += other.BytesRead
sr.Cost += other.Cost
if other.MaxScore > sr.MaxScore {
sr.MaxScore = other.MaxScore
}
Expand Down
4 changes: 4 additions & 0 deletions search/collector/topn.go
Expand Up @@ -200,6 +200,7 @@ func (hc *TopNCollector) Collect(ctx context.Context, searcher search.Searcher,
hc.needDocIds = hc.needDocIds || loadID
select {
case <-ctx.Done():
search.RecordSearchCost(ctx, search.AbortM, 0)
return ctx.Err()
default:
next, err = searcher.Next(searchContext)
Expand All @@ -208,6 +209,7 @@ func (hc *TopNCollector) Collect(ctx context.Context, searcher search.Searcher,
if hc.total%CheckDoneEvery == 0 {
select {
case <-ctx.Done():
search.RecordSearchCost(ctx, search.AbortM, 0)
return ctx.Err()
default:
}
Expand All @@ -232,6 +234,8 @@ func (hc *TopNCollector) Collect(ctx context.Context, searcher search.Searcher,
// total bytes read as part of docValues being read every hit
// which must be accounted by invoking the callback.
statsCallbackFn.(search.SearchIOStatsCallbackFunc)(hc.bytesRead)

search.RecordSearchCost(ctx, search.AddM, hc.bytesRead)
}

// help finalize/flush the results in case
Expand Down
2 changes: 2 additions & 0 deletions search/query/geo_boundingbox.go
Expand Up @@ -63,6 +63,8 @@ func (q *GeoBoundingBoxQuery) Searcher(ctx context.Context, i index.IndexReader,
field = m.DefaultSearchField()
}

ctx = context.WithValue(ctx, search.QueryTypeKey, search.Geo)

if q.BottomRight[0] < q.TopLeft[0] {
// cross date line, rewrite as two parts

Expand Down
2 changes: 2 additions & 0 deletions search/query/geo_boundingpolygon.go
Expand Up @@ -61,6 +61,8 @@ func (q *GeoBoundingPolygonQuery) Searcher(ctx context.Context, i index.IndexRea
field = m.DefaultSearchField()
}

ctx = context.WithValue(ctx, search.QueryTypeKey, search.Geo)

return searcher.NewGeoBoundedPolygonSearcher(ctx, i, q.Points, field, q.BoostVal.Value(), options)
}

Expand Down
2 changes: 2 additions & 0 deletions search/query/geo_distance.go
Expand Up @@ -64,6 +64,8 @@ func (q *GeoDistanceQuery) Searcher(ctx context.Context, i index.IndexReader, m
field = m.DefaultSearchField()
}

ctx = context.WithValue(ctx, search.QueryTypeKey, search.Geo)

dist, err := geo.ParseDistance(q.Distance)
if err != nil {
return nil, err
Expand Down
2 changes: 2 additions & 0 deletions search/query/geo_shape.go
Expand Up @@ -107,6 +107,8 @@ func (q *GeoShapeQuery) Searcher(ctx context.Context, i index.IndexReader,
field = m.DefaultSearchField()
}

ctx = context.WithValue(ctx, search.QueryTypeKey, search.Geo)

return searcher.NewGeoShapeSearcher(ctx, i, q.Geometry.Shape, q.Geometry.Relation, field,
q.BoostVal.Value(), options)
}
Expand Down
1 change: 1 addition & 0 deletions search/query/numeric_range.go
Expand Up @@ -77,6 +77,7 @@ func (q *NumericRangeQuery) Searcher(ctx context.Context, i index.IndexReader, m
if q.FieldVal == "" {
field = m.DefaultSearchField()
}
ctx = context.WithValue(ctx, search.QueryTypeKey, search.Numeric)
return searcher.NewNumericRangeSearcher(ctx, i, q.Min, q.Max, q.InclusiveMin, q.InclusiveMax, field, q.BoostVal.Value(), options)
}

Expand Down
4 changes: 0 additions & 4 deletions search/search.go
Expand Up @@ -27,10 +27,6 @@ var reflectStaticSizeDocumentMatch int
var reflectStaticSizeSearchContext int
var reflectStaticSizeLocation int

const SearchIOStatsCallbackKey = "_search_io_stats_callback_key"

type SearchIOStatsCallbackFunc func(uint64)

func init() {
var dm DocumentMatch
reflectStaticSizeDocumentMatch = int(reflect.TypeOf(dm).Size())
Expand Down
13 changes: 8 additions & 5 deletions search/searcher/search_fuzzy.go
Expand Up @@ -59,7 +59,8 @@ func NewFuzzySearcher(ctx context.Context, indexReader index.IndexReader, term s
}

if ctx != nil {
reportIOStats(dictBytesRead, ctx)
reportIOStats(ctx, dictBytesRead)
search.RecordSearchCost(ctx, search.AddM, dictBytesRead)
}

return NewMultiTermSearcher(ctx, indexReader, candidates, field,
Expand All @@ -71,13 +72,15 @@ type fuzzyCandidates struct {
bytesRead uint64
}

func reportIOStats(bytesRead uint64, ctx context.Context) {
func reportIOStats(ctx context.Context, bytesRead uint64) {
// The fuzzy, regexp like queries essentially load a dictionary,
// which potentially incurs a cost that must be accounted by
// using the callback to report the value.
statsCallbackFn := ctx.Value(search.SearchIOStatsCallbackKey)
if statsCallbackFn != nil {
statsCallbackFn.(search.SearchIOStatsCallbackFunc)(bytesRead)
if ctx != nil {
statsCallbackFn := ctx.Value(search.SearchIOStatsCallbackKey)
if statsCallbackFn != nil {
statsCallbackFn.(search.SearchIOStatsCallbackFunc)(bytesRead)
}
}
}

Expand Down
11 changes: 8 additions & 3 deletions search/searcher/search_geoboundingbox.go
Expand Up @@ -49,7 +49,7 @@ func NewGeoBoundingBoxSearcher(ctx context.Context, indexReader index.IndexReade
return nil, err
}

return NewFilteringSearcher(ctx, boxSearcher, buildRectFilter(dvReader,
return NewFilteringSearcher(ctx, boxSearcher, buildRectFilter(ctx, dvReader,
field, minLon, minLat, maxLon, maxLat)), nil
}
}
Expand Down Expand Up @@ -85,7 +85,7 @@ func NewGeoBoundingBoxSearcher(ctx context.Context, indexReader index.IndexReade
}
// add filter to check points near the boundary
onBoundarySearcher = NewFilteringSearcher(ctx, rawOnBoundarySearcher,
buildRectFilter(dvReader, field, minLon, minLat, maxLon, maxLat))
buildRectFilter(ctx, dvReader, field, minLon, minLat, maxLon, maxLat))
openedSearchers = append(openedSearchers, onBoundarySearcher)
}

Expand Down Expand Up @@ -201,7 +201,7 @@ func buildIsIndexedFunc(ctx context.Context, indexReader index.IndexReader, fiel
return isIndexed, closeF, err
}

func buildRectFilter(dvReader index.DocValueReader, field string,
func buildRectFilter(ctx context.Context, dvReader index.DocValueReader, field string,
minLon, minLat, maxLon, maxLat float64) FilterFunc {
return func(d *search.DocumentMatch) bool {
// check geo matches against all numeric type terms indexed
Expand All @@ -222,6 +222,11 @@ func buildRectFilter(dvReader index.DocValueReader, field string,
}
})
if err == nil && found {
bytes := dvReader.BytesRead()
if bytes > 0 {
reportIOStats(ctx, bytes)
search.RecordSearchCost(ctx, search.AddM, bytes)
}
for i := range lons {
if geo.BoundingBoxContains(lons[i], lats[i],
minLon, minLat, maxLon, maxLat) {
Expand Down