Skip to content

Commit

Permalink
recording the cost for different types of queries (#1829)
Browse files Browse the repository at this point in the history
* new callback for tracking bytes read for different query types

* bug fix: invocations of the done logic, accounting regexp cost

* removing duplicate code

* accounting cost of other geo types and code cleanup

* unit test fixes

* rename bytesRead to search_cost

* adding an abort invocation, to indicate that the context was cancelled

* code cleanup with respect to messages for callback

* comments around the searchResult struct

* bug fix: handling a nil pointer check in statsMap() API
  • Loading branch information
Thejas-bhat committed Jun 13, 2023
1 parent d352a81 commit c4a521b
Show file tree
Hide file tree
Showing 24 changed files with 181 additions and 55 deletions.
4 changes: 4 additions & 0 deletions index/scorch/scorch.go
Expand Up @@ -588,6 +588,10 @@ func (s *Scorch) StatsMap() map[string]interface{} {
m := s.stats.ToMap()

indexSnapshot := s.currentSnapshot()
if indexSnapshot == nil {
return nil
}

defer func() {
_ = indexSnapshot.Close()
}()
Expand Down
8 changes: 5 additions & 3 deletions index/scorch/snapshot_index_tfr.go
Expand Up @@ -102,10 +102,10 @@ func (i *IndexSnapshotTermFieldReader) Next(preAlloced *index.TermFieldDoc) (*in
// this is because there are chances of having a series of loadChunk calls,
// and they have to be added together before sending the bytesRead at this point
// upstream.
if delta := i.iterators[i.segmentOffset].BytesRead() - prevBytesRead; delta > 0 {
i.incrementBytesRead(delta)
bytesRead := i.iterators[i.segmentOffset].BytesRead()
if bytesRead > prevBytesRead {
i.incrementBytesRead(bytesRead - prevBytesRead)
}

return rv, nil
}
i.segmentOffset++
Expand Down Expand Up @@ -204,6 +204,8 @@ func (i *IndexSnapshotTermFieldReader) Close() error {
// reader's bytesRead value
statsCallbackFn.(search.SearchIOStatsCallbackFunc)(i.bytesRead)
}

search.RecordSearchCost(i.ctx, search.AddM, i.bytesRead)
}

if i.snapshot != nil {
Expand Down
16 changes: 11 additions & 5 deletions index_impl.go
Expand Up @@ -474,9 +474,9 @@ func (i *indexImpl) SearchInContext(ctx context.Context, req *SearchRequest) (sr
// accounted by invoking this callback when the TFR is closed.
// 2. the docvalues portion (accounted in collector) and the retrieval
// of stored fields bytes (by LoadAndHighlightFields)
var totalBytesRead uint64
var totalSearchCost uint64
sendBytesRead := func(bytesRead uint64) {
totalBytesRead += bytesRead
totalSearchCost += bytesRead
}

ctx = context.WithValue(ctx, search.SearchIOStatsCallbackKey,
Expand All @@ -495,11 +495,13 @@ func (i *indexImpl) SearchInContext(ctx context.Context, req *SearchRequest) (sr
err = serr
}
if sr != nil {
sr.BytesRead = totalBytesRead
sr.Cost = totalSearchCost
}
if sr, ok := indexReader.(*scorch.IndexSnapshot); ok {
sr.UpdateIOStats(totalBytesRead)
sr.UpdateIOStats(totalSearchCost)
}

search.RecordSearchCost(ctx, search.DoneM, 0)
}()

if req.Facets != nil {
Expand Down Expand Up @@ -574,6 +576,7 @@ func (i *indexImpl) SearchInContext(ctx context.Context, req *SearchRequest) (sr
}
}

var storedFieldsCost uint64
for _, hit := range hits {
if i.name != "" {
hit.Index = i.name
Expand All @@ -582,9 +585,12 @@ func (i *indexImpl) SearchInContext(ctx context.Context, req *SearchRequest) (sr
if err != nil {
return nil, err
}
totalBytesRead += storedFieldsBytes
storedFieldsCost += storedFieldsBytes
}

totalSearchCost += storedFieldsCost
search.RecordSearchCost(ctx, search.AddM, storedFieldsCost)

atomic.AddUint64(&i.stats.searches, 1)
searchDuration := time.Since(searchStart)
atomic.AddUint64(&i.stats.searchTime, uint64(searchDuration))
Expand Down
26 changes: 13 additions & 13 deletions index_test.go
Expand Up @@ -401,7 +401,7 @@ func TestBytesRead(t *testing.T) {
}
stats, _ := idx.StatsMap()["index"].(map[string]interface{})
prevBytesRead, _ := stats["num_bytes_read_at_query_time"].(uint64)
if prevBytesRead != 32349 && res.BytesRead == prevBytesRead {
if prevBytesRead != 32349 && res.Cost == prevBytesRead {
t.Fatalf("expected bytes read for query string 32349, got %v",
prevBytesRead)
}
Expand All @@ -415,7 +415,7 @@ func TestBytesRead(t *testing.T) {
}
stats, _ = idx.StatsMap()["index"].(map[string]interface{})
bytesRead, _ := stats["num_bytes_read_at_query_time"].(uint64)
if bytesRead-prevBytesRead != 23 && res.BytesRead == bytesRead-prevBytesRead {
if bytesRead-prevBytesRead != 23 && res.Cost == bytesRead-prevBytesRead {
t.Fatalf("expected bytes read for query string 23, got %v",
bytesRead-prevBytesRead)
}
Expand All @@ -431,7 +431,7 @@ func TestBytesRead(t *testing.T) {
}
stats, _ = idx.StatsMap()["index"].(map[string]interface{})
bytesRead, _ = stats["num_bytes_read_at_query_time"].(uint64)
if bytesRead-prevBytesRead != 8468 && res.BytesRead == bytesRead-prevBytesRead {
if bytesRead-prevBytesRead != 8468 && res.Cost == bytesRead-prevBytesRead {
t.Fatalf("expected bytes read for fuzzy query is 8468, got %v",
bytesRead-prevBytesRead)
}
Expand All @@ -448,7 +448,7 @@ func TestBytesRead(t *testing.T) {

stats, _ = idx.StatsMap()["index"].(map[string]interface{})
bytesRead, _ = stats["num_bytes_read_at_query_time"].(uint64)
if !approxSame(bytesRead-prevBytesRead, 150) && res.BytesRead == bytesRead-prevBytesRead {
if !approxSame(bytesRead-prevBytesRead, 150) && res.Cost == bytesRead-prevBytesRead {
t.Fatalf("expected bytes read for faceted query is around 150, got %v",
bytesRead-prevBytesRead)
}
Expand All @@ -466,7 +466,7 @@ func TestBytesRead(t *testing.T) {

stats, _ = idx.StatsMap()["index"].(map[string]interface{})
bytesRead, _ = stats["num_bytes_read_at_query_time"].(uint64)
if bytesRead-prevBytesRead != 924 && res.BytesRead == bytesRead-prevBytesRead {
if bytesRead-prevBytesRead != 924 && res.Cost == bytesRead-prevBytesRead {
t.Fatalf("expected bytes read for numeric range query is 924, got %v",
bytesRead-prevBytesRead)
}
Expand All @@ -481,7 +481,7 @@ func TestBytesRead(t *testing.T) {

stats, _ = idx.StatsMap()["index"].(map[string]interface{})
bytesRead, _ = stats["num_bytes_read_at_query_time"].(uint64)
if bytesRead-prevBytesRead != 60 && res.BytesRead == bytesRead-prevBytesRead {
if bytesRead-prevBytesRead != 60 && res.Cost == bytesRead-prevBytesRead {
t.Fatalf("expected bytes read for query with highlighter is 60, got %v",
bytesRead-prevBytesRead)
}
Expand All @@ -498,7 +498,7 @@ func TestBytesRead(t *testing.T) {
// since it's created afresh and not reused
stats, _ = idx.StatsMap()["index"].(map[string]interface{})
bytesRead, _ = stats["num_bytes_read_at_query_time"].(uint64)
if bytesRead-prevBytesRead != 83 && res.BytesRead == bytesRead-prevBytesRead {
if bytesRead-prevBytesRead != 83 && res.Cost == bytesRead-prevBytesRead {
t.Fatalf("expected bytes read for disjunction query is 83, got %v",
bytesRead-prevBytesRead)
}
Expand Down Expand Up @@ -580,7 +580,7 @@ func TestBytesReadStored(t *testing.T) {

stats, _ := idx.StatsMap()["index"].(map[string]interface{})
bytesRead, _ := stats["num_bytes_read_at_query_time"].(uint64)
if bytesRead != 25928 && bytesRead == res.BytesRead {
if bytesRead != 25928 && bytesRead == res.Cost {
t.Fatalf("expected the bytes read stat to be around 25928, got %v", bytesRead)
}
prevBytesRead := bytesRead
Expand All @@ -592,7 +592,7 @@ func TestBytesReadStored(t *testing.T) {
}
stats, _ = idx.StatsMap()["index"].(map[string]interface{})
bytesRead, _ = stats["num_bytes_read_at_query_time"].(uint64)
if bytesRead-prevBytesRead != 15 && bytesRead-prevBytesRead == res.BytesRead {
if bytesRead-prevBytesRead != 15 && bytesRead-prevBytesRead == res.Cost {
t.Fatalf("expected the bytes read stat to be around 15, got %v", bytesRead-prevBytesRead)
}
prevBytesRead = bytesRead
Expand All @@ -607,7 +607,7 @@ func TestBytesReadStored(t *testing.T) {
stats, _ = idx.StatsMap()["index"].(map[string]interface{})
bytesRead, _ = stats["num_bytes_read_at_query_time"].(uint64)

if bytesRead-prevBytesRead != 26478 && bytesRead-prevBytesRead == res.BytesRead {
if bytesRead-prevBytesRead != 26478 && bytesRead-prevBytesRead == res.Cost {
t.Fatalf("expected the bytes read stat to be around 26478, got %v",
bytesRead-prevBytesRead)
}
Expand Down Expand Up @@ -651,7 +651,7 @@ func TestBytesReadStored(t *testing.T) {

stats, _ = idx1.StatsMap()["index"].(map[string]interface{})
bytesRead, _ = stats["num_bytes_read_at_query_time"].(uint64)
if bytesRead != 18114 && bytesRead == res.BytesRead {
if bytesRead != 18114 && bytesRead == res.Cost {
t.Fatalf("expected the bytes read stat to be around 18114, got %v", bytesRead)
}
prevBytesRead = bytesRead
Expand All @@ -662,7 +662,7 @@ func TestBytesReadStored(t *testing.T) {
}
stats, _ = idx1.StatsMap()["index"].(map[string]interface{})
bytesRead, _ = stats["num_bytes_read_at_query_time"].(uint64)
if bytesRead-prevBytesRead != 12 && bytesRead-prevBytesRead == res.BytesRead {
if bytesRead-prevBytesRead != 12 && bytesRead-prevBytesRead == res.Cost {
t.Fatalf("expected the bytes read stat to be around 12, got %v", bytesRead-prevBytesRead)
}
prevBytesRead = bytesRead
Expand All @@ -675,7 +675,7 @@ func TestBytesReadStored(t *testing.T) {

stats, _ = idx1.StatsMap()["index"].(map[string]interface{})
bytesRead, _ = stats["num_bytes_read_at_query_time"].(uint64)
if bytesRead-prevBytesRead != 42 && bytesRead-prevBytesRead == res.BytesRead {
if bytesRead-prevBytesRead != 42 && bytesRead-prevBytesRead == res.Cost {
t.Fatalf("expected the bytes read stat to be around 42, got %v", bytesRead-prevBytesRead)
}
}
Expand Down
30 changes: 21 additions & 9 deletions search.go
Expand Up @@ -485,15 +485,27 @@ func (ss *SearchStatus) Merge(other *SearchStatus) {

// A SearchResult describes the results of executing
// a SearchRequest.
//
// Status - Whether the search was executed on the underlying indexes successfully
// or failed, and the corresponding errors.
// Request - The SearchRequest that was executed.
// Hits - The list of documents that matched the query and their corresponding
// scores, score explanation, location info and so on.
// Total - The total number of documents that matched the query.
// Cost - indicates how expensive was the query with respect to bytes read
// from the mmaped index files.
// MaxScore - The maximum score seen across all document hits seen for this query.
// Took - The time taken to execute the search.
// Facets - The facet results for the search.
type SearchResult struct {
Status *SearchStatus `json:"status"`
Request *SearchRequest `json:"request"`
Hits search.DocumentMatchCollection `json:"hits"`
Total uint64 `json:"total_hits"`
BytesRead uint64 `json:"bytesRead"`
MaxScore float64 `json:"max_score"`
Took time.Duration `json:"took"`
Facets search.FacetResults `json:"facets"`
Status *SearchStatus `json:"status"`
Request *SearchRequest `json:"request"`
Hits search.DocumentMatchCollection `json:"hits"`
Total uint64 `json:"total_hits"`
Cost uint64 `json:"cost"`
MaxScore float64 `json:"max_score"`
Took time.Duration `json:"took"`
Facets search.FacetResults `json:"facets"`
}

func (sr *SearchResult) Size() int {
Expand Down Expand Up @@ -566,7 +578,7 @@ func (sr *SearchResult) Merge(other *SearchResult) {
sr.Status.Merge(other.Status)
sr.Hits = append(sr.Hits, other.Hits...)
sr.Total += other.Total
sr.BytesRead += other.BytesRead
sr.Cost += other.Cost
if other.MaxScore > sr.MaxScore {
sr.MaxScore = other.MaxScore
}
Expand Down
4 changes: 4 additions & 0 deletions search/collector/topn.go
Expand Up @@ -200,6 +200,7 @@ func (hc *TopNCollector) Collect(ctx context.Context, searcher search.Searcher,
hc.needDocIds = hc.needDocIds || loadID
select {
case <-ctx.Done():
search.RecordSearchCost(ctx, search.AbortM, 0)
return ctx.Err()
default:
next, err = searcher.Next(searchContext)
Expand All @@ -208,6 +209,7 @@ func (hc *TopNCollector) Collect(ctx context.Context, searcher search.Searcher,
if hc.total%CheckDoneEvery == 0 {
select {
case <-ctx.Done():
search.RecordSearchCost(ctx, search.AbortM, 0)
return ctx.Err()
default:
}
Expand All @@ -232,6 +234,8 @@ func (hc *TopNCollector) Collect(ctx context.Context, searcher search.Searcher,
// total bytes read as part of docValues being read every hit
// which must be accounted by invoking the callback.
statsCallbackFn.(search.SearchIOStatsCallbackFunc)(hc.bytesRead)

search.RecordSearchCost(ctx, search.AddM, hc.bytesRead)
}

// help finalize/flush the results in case
Expand Down
2 changes: 2 additions & 0 deletions search/query/geo_boundingbox.go
Expand Up @@ -63,6 +63,8 @@ func (q *GeoBoundingBoxQuery) Searcher(ctx context.Context, i index.IndexReader,
field = m.DefaultSearchField()
}

ctx = context.WithValue(ctx, search.QueryTypeKey, search.Geo)

if q.BottomRight[0] < q.TopLeft[0] {
// cross date line, rewrite as two parts

Expand Down
2 changes: 2 additions & 0 deletions search/query/geo_boundingpolygon.go
Expand Up @@ -61,6 +61,8 @@ func (q *GeoBoundingPolygonQuery) Searcher(ctx context.Context, i index.IndexRea
field = m.DefaultSearchField()
}

ctx = context.WithValue(ctx, search.QueryTypeKey, search.Geo)

return searcher.NewGeoBoundedPolygonSearcher(ctx, i, q.Points, field, q.BoostVal.Value(), options)
}

Expand Down
2 changes: 2 additions & 0 deletions search/query/geo_distance.go
Expand Up @@ -64,6 +64,8 @@ func (q *GeoDistanceQuery) Searcher(ctx context.Context, i index.IndexReader, m
field = m.DefaultSearchField()
}

ctx = context.WithValue(ctx, search.QueryTypeKey, search.Geo)

dist, err := geo.ParseDistance(q.Distance)
if err != nil {
return nil, err
Expand Down
2 changes: 2 additions & 0 deletions search/query/geo_shape.go
Expand Up @@ -107,6 +107,8 @@ func (q *GeoShapeQuery) Searcher(ctx context.Context, i index.IndexReader,
field = m.DefaultSearchField()
}

ctx = context.WithValue(ctx, search.QueryTypeKey, search.Geo)

return searcher.NewGeoShapeSearcher(ctx, i, q.Geometry.Shape, q.Geometry.Relation, field,
q.BoostVal.Value(), options)
}
Expand Down
1 change: 1 addition & 0 deletions search/query/numeric_range.go
Expand Up @@ -77,6 +77,7 @@ func (q *NumericRangeQuery) Searcher(ctx context.Context, i index.IndexReader, m
if q.FieldVal == "" {
field = m.DefaultSearchField()
}
ctx = context.WithValue(ctx, search.QueryTypeKey, search.Numeric)
return searcher.NewNumericRangeSearcher(ctx, i, q.Min, q.Max, q.InclusiveMin, q.InclusiveMax, field, q.BoostVal.Value(), options)
}

Expand Down
4 changes: 0 additions & 4 deletions search/search.go
Expand Up @@ -27,10 +27,6 @@ var reflectStaticSizeDocumentMatch int
var reflectStaticSizeSearchContext int
var reflectStaticSizeLocation int

const SearchIOStatsCallbackKey = "_search_io_stats_callback_key"

type SearchIOStatsCallbackFunc func(uint64)

func init() {
var dm DocumentMatch
reflectStaticSizeDocumentMatch = int(reflect.TypeOf(dm).Size())
Expand Down
13 changes: 8 additions & 5 deletions search/searcher/search_fuzzy.go
Expand Up @@ -59,7 +59,8 @@ func NewFuzzySearcher(ctx context.Context, indexReader index.IndexReader, term s
}

if ctx != nil {
reportIOStats(dictBytesRead, ctx)
reportIOStats(ctx, dictBytesRead)
search.RecordSearchCost(ctx, search.AddM, dictBytesRead)
}

return NewMultiTermSearcher(ctx, indexReader, candidates, field,
Expand All @@ -71,13 +72,15 @@ type fuzzyCandidates struct {
bytesRead uint64
}

func reportIOStats(bytesRead uint64, ctx context.Context) {
func reportIOStats(ctx context.Context, bytesRead uint64) {
// The fuzzy, regexp like queries essentially load a dictionary,
// which potentially incurs a cost that must be accounted by
// using the callback to report the value.
statsCallbackFn := ctx.Value(search.SearchIOStatsCallbackKey)
if statsCallbackFn != nil {
statsCallbackFn.(search.SearchIOStatsCallbackFunc)(bytesRead)
if ctx != nil {
statsCallbackFn := ctx.Value(search.SearchIOStatsCallbackKey)
if statsCallbackFn != nil {
statsCallbackFn.(search.SearchIOStatsCallbackFunc)(bytesRead)
}
}
}

Expand Down
11 changes: 8 additions & 3 deletions search/searcher/search_geoboundingbox.go
Expand Up @@ -49,7 +49,7 @@ func NewGeoBoundingBoxSearcher(ctx context.Context, indexReader index.IndexReade
return nil, err
}

return NewFilteringSearcher(ctx, boxSearcher, buildRectFilter(dvReader,
return NewFilteringSearcher(ctx, boxSearcher, buildRectFilter(ctx, dvReader,
field, minLon, minLat, maxLon, maxLat)), nil
}
}
Expand Down Expand Up @@ -85,7 +85,7 @@ func NewGeoBoundingBoxSearcher(ctx context.Context, indexReader index.IndexReade
}
// add filter to check points near the boundary
onBoundarySearcher = NewFilteringSearcher(ctx, rawOnBoundarySearcher,
buildRectFilter(dvReader, field, minLon, minLat, maxLon, maxLat))
buildRectFilter(ctx, dvReader, field, minLon, minLat, maxLon, maxLat))
openedSearchers = append(openedSearchers, onBoundarySearcher)
}

Expand Down Expand Up @@ -201,7 +201,7 @@ func buildIsIndexedFunc(ctx context.Context, indexReader index.IndexReader, fiel
return isIndexed, closeF, err
}

func buildRectFilter(dvReader index.DocValueReader, field string,
func buildRectFilter(ctx context.Context, dvReader index.DocValueReader, field string,
minLon, minLat, maxLon, maxLat float64) FilterFunc {
return func(d *search.DocumentMatch) bool {
// check geo matches against all numeric type terms indexed
Expand All @@ -222,6 +222,11 @@ func buildRectFilter(dvReader index.DocValueReader, field string,
}
})
if err == nil && found {
bytes := dvReader.BytesRead()
if bytes > 0 {
reportIOStats(ctx, bytes)
search.RecordSearchCost(ctx, search.AddM, bytes)
}
for i := range lons {
if geo.BoundingBoxContains(lons[i], lats[i],
minLon, minLat, maxLon, maxLat) {
Expand Down

0 comments on commit c4a521b

Please sign in to comment.