Skip to content

Commit

Permalink
Make blockingQuery efficient with 'not found' results.
Browse files Browse the repository at this point in the history
By using the query results as state.

Blocking queries are efficient when the query matches some results,
because the ModifyIndex of those results, returned as queryMeta.Mindex,
will never change unless the items themselves change.

Blocking queries for non-existent items are not efficient because the
queryMeta.Index can (and often does) change when other entities are
written.

This commit reduces the churn of these queries by using a different
comparison for "has changed". Instead of using the modified index, we
use the existence of the results. If the previous result was "not found"
and the new result is still "not found", we know we can ignore the
modified index and continue to block.

This is done by setting the minQueryIndex to the returned
queryMeta.Index, which prevents the query from returning before a state
change is observed.
  • Loading branch information
dnephin committed Jan 17, 2022
1 parent 39e2837 commit ce99688
Show file tree
Hide file tree
Showing 2 changed files with 17 additions and 2 deletions.
2 changes: 1 addition & 1 deletion agent/consul/config_endpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,7 @@ func (c *ConfigEntry) Get(args *structs.ConfigEntryQuery, reply *structs.ConfigE

reply.Index = index
if entry == nil {
return nil
return errNotFound
}

reply.Entry = entry
Expand Down
17 changes: 16 additions & 1 deletion agent/consul/rpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -935,6 +935,9 @@ func (s *Server) blockingQuery(queryOpts structs.QueryOptionsCompat, queryMeta s
var ws memdb.WatchSet
err := fn(ws, s.fsm.State())
s.setQueryMeta(queryMeta, queryOpts.GetToken())
if errors.Is(err, errNotFound) {
return nil
}
return err
}

Expand All @@ -947,6 +950,8 @@ func (s *Server) blockingQuery(queryOpts structs.QueryOptionsCompat, queryMeta s
// decrement the count when the function returns.
defer atomic.AddUint64(&s.queriesBlocking, ^uint64(0))

var notFound bool

for {
if queryOpts.GetRequireConsistent() {
if err := s.consistentRead(); err != nil {
Expand All @@ -966,7 +971,15 @@ func (s *Server) blockingQuery(queryOpts structs.QueryOptionsCompat, queryMeta s

err := fn(ws, state)
s.setQueryMeta(queryMeta, queryOpts.GetToken())
if err != nil {
switch {
case errors.Is(err, errNotFound):
if notFound {
// query result has not changed
minQueryIndex = queryMeta.GetIndex()
}

notFound = true
case err != nil:
return err
}

Expand All @@ -989,6 +1002,8 @@ func (s *Server) blockingQuery(queryOpts structs.QueryOptionsCompat, queryMeta s
}
}

var errNotFound = fmt.Errorf("no data found for query")

// setQueryMeta is used to populate the QueryMeta data for an RPC call
//
// Note: This method must be called *after* filtering query results with ACLs.
Expand Down

0 comments on commit ce99688

Please sign in to comment.