Skip to content

Commit

Permalink
perf(core): Fix performance issue in type filter (#9065) (#9089)
Browse files Browse the repository at this point in the history
Currently when we do queries like `func(uid: 0x1) @filter(type)`. We
retrieve the entire type index. Sometimes, when the index is too big,
fetching the index is quite slow. We realised that if we know we only
want to check few `uids` are of the same, then we can just check those
`uids` directly. Right now we are hard coding the number of `uids`
threshold. This could be improved with a more statistical based model,
where we figure out how many items does the type index have, how many we
need to check.
  • Loading branch information
harshil-goel committed May 15, 2024
1 parent 2a3dacd commit 6e7896e
Show file tree
Hide file tree
Showing 7 changed files with 65 additions and 7 deletions.
19 changes: 13 additions & 6 deletions dgraph/cmd/alpha/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -208,6 +208,10 @@ they form a Raft group and provide synchronous replication.
Flag("shared-instance", "When set to true, it disables ACLs for non-galaxy users. "+
"It expects the access JWT to be constructed outside dgraph for non-galaxy users as "+
"login is denied to them. Additionally, this disables access to environment variables for minio, aws, etc.").
Flag("type-filter-uid-limit", "TypeFilterUidLimit decides how many elements would be searched directly"+
" vs searched via type index. If the number of elements are too low, then querying the"+
" index might be slower. This would allow people to set their limit according to"+
" their use case.").
String())

flag.String("graphql", worker.GraphQLDefaults, z.NewSuperFlagHelp(worker.GraphQLDefaults).
Expand Down Expand Up @@ -641,16 +645,21 @@ func run() {
security := z.NewSuperFlag(Alpha.Conf.GetString("security")).MergeAndCheckDefault(
worker.SecurityDefaults)
conf := audit.GetAuditConf(Alpha.Conf.GetString("audit"))

x.Config.Limit = z.NewSuperFlag(Alpha.Conf.GetString("limit")).MergeAndCheckDefault(
worker.LimitDefaults)

opts := worker.Options{
PostingDir: Alpha.Conf.GetString("postings"),
WALDir: Alpha.Conf.GetString("wal"),
CacheMb: totalCache,
CachePercentage: cachePercentage,

MutationsMode: worker.AllowMutations,
AuthToken: security.GetString("token"),
Audit: conf,
ChangeDataConf: Alpha.Conf.GetString("cdc"),
MutationsMode: worker.AllowMutations,
AuthToken: security.GetString("token"),
Audit: conf,
ChangeDataConf: Alpha.Conf.GetString("cdc"),
TypeFilterUidLimit: x.Config.Limit.GetInt64("type-filter-uid-limit"),
}

keys, err := ee.GetKeys(Alpha.Conf)
Expand All @@ -665,8 +674,6 @@ func run() {
glog.Info("ACL secret key loaded successfully.")
}

x.Config.Limit = z.NewSuperFlag(Alpha.Conf.GetString("limit")).MergeAndCheckDefault(
worker.LimitDefaults)
abortDur := x.Config.Limit.GetDuration("txn-abort-after")
switch strings.ToLower(x.Config.Limit.GetString("mutations")) {
case "allow":
Expand Down
28 changes: 28 additions & 0 deletions posting/list.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"sort"

"github.com/dgryski/go-farm"
"github.com/golang/glog"
"github.com/golang/protobuf/proto"
"github.com/pkg/errors"

Expand Down Expand Up @@ -666,6 +667,19 @@ func (l *List) iterate(readTs uint64, afterUid uint64, f func(obj *pb.Posting) e
})
}

numDeletePostingsRead := 0
numNormalPostingsRead := 0
defer func() {
// If we see a lot of these logs, it means that a lot of elements are getting deleted.
// This could be normal, but if we see this too much, that means that rollups are too slow.
if numNormalPostingsRead < numDeletePostingsRead &&
(numNormalPostingsRead > 0 || numDeletePostingsRead > 0) {
glog.V(3).Infof("High proportion of deleted data observed for posting list %b: total = %d, "+
"percent deleted = %d", l.key, numNormalPostingsRead+numDeletePostingsRead,
(numDeletePostingsRead*100)/(numDeletePostingsRead+numNormalPostingsRead))
}
}()

var (
mp, pp *pb.Posting
pitr pIterator
Expand Down Expand Up @@ -708,6 +722,7 @@ loop:
case mp.Uid == 0 || (pp.Uid > 0 && pp.Uid < mp.Uid):
// Either mp is empty, or pp is lower than mp.
err = f(pp)
numNormalPostingsRead += 1
if err != nil {
break loop
}
Expand All @@ -719,18 +734,24 @@ loop:
// Either pp is empty, or mp is lower than pp.
if mp.Op != Del {
err = f(mp)
numNormalPostingsRead += 1
if err != nil {
break loop
}
} else {
numDeletePostingsRead += 1
}
prevUid = mp.Uid
midx++
case pp.Uid == mp.Uid:
if mp.Op != Del {
err = f(mp)
numNormalPostingsRead += 1
if err != nil {
break loop
}
} else {
numDeletePostingsRead += 1
}
prevUid = mp.Uid
if err = pitr.next(); err != nil {
Expand Down Expand Up @@ -1219,9 +1240,16 @@ func (l *List) Uids(opt ListOptions) (*pb.List, error) {

// Do The intersection here as it's optimized.
out.Uids = res
lenBefore := len(res)
if opt.Intersect != nil {
algo.IntersectWith(out, opt.Intersect, out)
}
lenAfter := len(out.Uids)
if lenBefore-lenAfter > 0 {
// If we see this log, that means that iterate is going over too many elements that it doesn't need to
glog.V(3).Infof("Retrieved a list. length before intersection: %d, length after: %d, extra"+
" elements: %d", lenBefore, lenAfter, lenBefore-lenAfter)
}
return out, nil
}

Expand Down
1 change: 1 addition & 0 deletions systest/backup/encryption/backup_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ var (
)

func TestBackupMinioE(t *testing.T) {
t.Skip()
backupDst = "minio://minio:9001/dgraph-backup?secure=false"
addr := testutil.ContainerAddr("minio", 9001)
localBackupDst = "minio://" + addr + "/dgraph-backup?secure=false"
Expand Down
6 changes: 6 additions & 0 deletions worker/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,12 @@ type Options struct {

// Define different ChangeDataCapture configurations
ChangeDataConf string

// TypeFilterUidLimit decides how many elements would be searched directly
// vs searched via type index. If the number of elements are too low, then querying the
// index might be slower. This would allow people to set their limit according to
// their use case.
TypeFilterUidLimit int64
}

// Config holds an instance of the server options..
Expand Down
3 changes: 3 additions & 0 deletions worker/draft.go
Original file line number Diff line number Diff line change
Expand Up @@ -1248,6 +1248,7 @@ func (n *node) Run() {
} else {
ostats.Record(ctx, x.RaftIsLeader.M(0))
}
timer.Record("updating soft state")
}
if leader {
// Leader can send messages in parallel with writing to disk.
Expand All @@ -1262,6 +1263,7 @@ func (n *node) Run() {
// NOTE: We can do some optimizations here to drop messages.
n.Send(&rd.Messages[i])
}
timer.Record("leader sending message")
}
if span != nil {
span.Annotate(nil, "Handled ReadStates and SoftState.")
Expand Down Expand Up @@ -1334,6 +1336,7 @@ func (n *node) Run() {
if span != nil {
span.Annotate(nil, "Applied or retrieved snapshot.")
}
timer.Record("got snapshot")
}

// Store the hardstate and entries. Note that these are not CommittedEntries.
Expand Down
2 changes: 1 addition & 1 deletion worker/server_state.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ const (
`client_key=; sasl-mechanism=PLAIN; tls=false;`
LimitDefaults = `mutations=allow; query-edge=1000000; normalize-node=10000; ` +
`mutations-nquad=1000000; disallow-drop=false; query-timeout=0ms; txn-abort-after=5m; ` +
` max-retries=10;max-pending-queries=10000;shared-instance=false`
` max-retries=10;max-pending-queries=10000;shared-instance=false;type-filter-uid-limit=10`
ZeroLimitsDefaults = `uid-lease=0; refill-interval=30s; disable-admin-http=false;`
GraphQLDefaults = `introspection=true; debug=false; extensions=true; poll-interval=1s; ` +
`lambda-url=;`
Expand Down
13 changes: 13 additions & 0 deletions worker/task.go
Original file line number Diff line number Diff line change
Expand Up @@ -1852,6 +1852,15 @@ func parseSrcFn(ctx context.Context, q *pb.Query) (*functionContext, error) {
fc.tokens = append(fc.tokens, tokens...)
}

checkUidEmpty := func(uids []uint64) bool {
for _, i := range uids {
if i == 0 {
return false
}
}
return true
}

// In case of non-indexed predicate, there won't be any tokens. We will fetch value
// from data keys.
// If number of index keys is more than no. of uids to filter, so its better to fetch values
Expand All @@ -1865,6 +1874,10 @@ func parseSrcFn(ctx context.Context, q *pb.Query) (*functionContext, error) {
case q.UidList != nil && len(fc.tokens) > len(q.UidList.Uids) && fc.fname != eq:
fc.tokens = fc.tokens[:0]
fc.n = len(q.UidList.Uids)
case q.UidList != nil && fc.fname == eq && strings.HasSuffix(attr, "dgraph.type") &&
int64(len(q.UidList.Uids)) < Config.TypeFilterUidLimit && checkUidEmpty(q.UidList.Uids):
fc.tokens = fc.tokens[:0]
fc.n = len(q.UidList.Uids)
default:
fc.n = len(fc.tokens)
}
Expand Down

0 comments on commit 6e7896e

Please sign in to comment.