diff --git a/algo/uidlist.go b/algo/uidlist.go index 1465d4ff2fe..cb37232f3bd 100644 --- a/algo/uidlist.go +++ b/algo/uidlist.go @@ -24,7 +24,8 @@ import ( "github.com/dgraph-io/dgraph/protos/pb" ) -const jump = 32 // Jump size in InsersectWithJump. +const jump = 32 // Jump size in InsersectWithJump. +const linVsBinRatio = 10 // When is linear search better than binary // ApplyFilter applies a filter to our UIDList. func ApplyFilter(u *pb.List, f func(uint64, int) bool) { @@ -60,7 +61,7 @@ func IntersectCompressedWith(pack *pb.UidPack, afterUID uint64, v, o *pb.List) { // Select appropriate function based on heuristics. ratio := float64(m) / float64(n) - if ratio < 500 { + if ratio < linVsBinRatio { IntersectCompressedWithLinJump(&dec, v.Uids, &dst) } else { IntersectCompressedWithBin(&dec, v.Uids, &dst) @@ -94,7 +95,7 @@ func IntersectCompressedWithLinJump(dec *codec.Decoder, v []uint64, o *[]uint64) // https://link.springer.com/chapter/10.1007/978-3-642-12476-1_3 // Call seek on dec before calling this function func IntersectCompressedWithBin(dec *codec.Decoder, q []uint64, o *[]uint64) { - ld := dec.ApproxLen() + ld := codec.ExactLen(dec.Pack) lq := len(q) if lq == 0 { @@ -105,46 +106,44 @@ func IntersectCompressedWithBin(dec *codec.Decoder, q []uint64, o *[]uint64) { } // Pick the shorter list and do binary search - if ld < lq { + if ld <= lq { for { blockUids := dec.Uids() if len(blockUids) == 0 { break } - IntersectWithBin(blockUids, q, o) - lastUid := blockUids[len(blockUids)-1] - qidx := sort.Search(len(q), func(idx int) bool { - return q[idx] >= lastUid - }) - if qidx >= len(q) { + _, off := IntersectWithJump(blockUids, q, o) + q = q[off:] + if len(q) == 0 { return } - q = q[qidx:] dec.Next() } return } - var uids []uint64 - for _, u := range q { + uids := dec.Uids() + qidx := 0 + for { + if qidx >= len(q) { + return + } + u := q[qidx] if len(uids) == 0 || u > uids[len(uids)-1] { - uids = dec.Seek(u, codec.SeekStart) + if lq*linVsBinRatio < ld { + uids = dec.LinearSeek(u) + } else { + uids = dec.SeekToBlock(u, codec.SeekCurrent) + } if len(uids) == 0 { return } } - uidIdx := sort.Search(len(uids), func(idx int) bool { - return uids[idx] >= u - }) - if uidIdx >= len(uids) { - // We know that u < max(uids). If we didn't find it here, it's not here. - continue - } - if uids[uidIdx] == u { - *o = append(*o, u) - uidIdx++ + _, off := IntersectWithJump(uids, q[qidx:], o) + if off == 0 { + off = 1 // if v[k] isn't in u, move forward } - uids = uids[uidIdx:] + qidx += off } } @@ -233,7 +232,8 @@ func IntersectWithJump(u, v []uint64, o *[]uint64) (int, int) { // IntersectWithBin is based on the paper // "Fast Intersection Algorithms for Sorted Sequences" // https://link.springer.com/chapter/10.1007/978-3-642-12476-1_3 -func IntersectWithBin(d, q []uint64, o *[]uint64) { +// Returns where to move the second array(q) to. O means not found +func IntersectWithBin(d, q []uint64, o *[]uint64) int { ld := len(d) lq := len(q) @@ -242,7 +242,7 @@ func IntersectWithBin(d, q []uint64, o *[]uint64) { d, q = q, d } if ld == 0 || lq == 0 || d[ld-1] < q[0] || q[lq-1] < d[0] { - return + return 0 } val := d[0] @@ -256,6 +256,7 @@ func IntersectWithBin(d, q []uint64, o *[]uint64) { }) binIntersect(d, q[minq:maxq], o) + return maxq } // binIntersect is the recursive function used. diff --git a/algo/uidlist_test.go b/algo/uidlist_test.go index 05eafba68e3..cd6dc194ace 100644 --- a/algo/uidlist_test.go +++ b/algo/uidlist_test.go @@ -370,10 +370,11 @@ func BenchmarkListIntersectRandom(b *testing.B) { func BenchmarkListIntersectCompressBin(b *testing.B) { randomTests := func(sz int, overlap float64) { rs := []float64{0.01, 0.1, 1, 10, 100} + //rs := []float64{0.002, 0.01, 0.1} for _, r := range rs { sz1 := sz sz2 := int(float64(sz) * r) - if sz2 > 1000000 || sz2 == 0 { + if sz2 > 10000000 || sz2 == 0 { break } @@ -389,8 +390,18 @@ func BenchmarkListIntersectCompressBin(b *testing.B) { sort.Slice(v1, func(i, j int) bool { return v1[i] < v1[j] }) dst2 := &pb.List{} + dst1 := &pb.List{} compressedUids := codec.Encode(v1, 256) + b.Run(fmt.Sprintf("linJump:IntersectWith:ratio=%v:size=%d:overlap=%.2f:", r, sz, overlap), + func(b *testing.B) { + for k := 0; k < b.N; k++ { + dec := codec.Decoder{Pack: compressedUids} + dec.Seek(0, codec.SeekStart) + IntersectCompressedWithLinJump(&dec, u1, &dst1.Uids) + } + }) + b.Run(fmt.Sprintf("compressed:IntersectWith:ratio=%v:size=%d:overlap=%.2f:", r, sz, overlap), func(b *testing.B) { for k := 0; k < b.N; k++ { @@ -399,7 +410,6 @@ func BenchmarkListIntersectCompressBin(b *testing.B) { IntersectCompressedWithBin(&dec, u1, &dst2.Uids) } }) - fmt.Println() codec.FreePack(compressedUids) } @@ -493,6 +503,43 @@ func sortUint64(nums []uint64) { sort.Slice(nums, func(i, j int) bool { return nums[i] < nums[j] }) } +func fillNumsDiff(N1, N2, N3 int) ([]uint64, []uint64, []uint64) { + rand.Seed(time.Now().UnixNano()) + + commonNums := make([]uint64, N1) + blockNums := make([]uint64, N1+N2) + otherNums := make([]uint64, N1+N3) + allC := make(map[uint64]bool) + + for i := 0; i < N1; i++ { + val := rand.Uint64() % 1000 + commonNums[i] = val + blockNums[i] = val + otherNums[i] = val + allC[val] = true + } + + for i := N1; i < N1+N2; i++ { + val := rand.Uint64() % 1000 + blockNums[i] = val + allC[val] = true + } + + for i := N1; i < N1+N3; i++ { + val := rand.Uint64() + for ok := true; ok; _, ok = allC[val] { + val = rand.Uint64() % 1000 + } + otherNums[i] = val + } + + sortUint64(commonNums) + sortUint64(blockNums) + sortUint64(otherNums) + + return commonNums, blockNums, otherNums +} + func fillNums(N1, N2 int) ([]uint64, []uint64, []uint64) { rand.Seed(time.Now().UnixNano()) @@ -545,12 +592,12 @@ func TestIntersectCompressedWithLinJump(t *testing.T) { } func TestIntersectCompressedWithBin(t *testing.T) { - lengths := []int{0, 1, 3, 11, 100} + //lengths := []int{0, 1, 3, 11, 100, 500, 1000} - for _, N1 := range lengths { - for _, N2 := range lengths { + for _, N1 := range []int{11} { + for _, N2 := range []int{3} { // Intersection of blockNums and otherNums is commonNums. - commonNums, blockNums, otherNums := fillNums(N1, N2) + commonNums, blockNums, otherNums := fillNumsDiff(N1/10, N1, N2) enc := codec.Encoder{BlockSize: 10} for _, num := range blockNums { @@ -570,7 +617,7 @@ func TestIntersectCompressedWithBin(t *testing.T) { } func TestIntersectCompressedWithBinMissingSize(t *testing.T) { - lengths := []int{0, 1, 3, 11, 100} + lengths := []int{0, 1, 3, 11, 100, 500, 1000} for _, N1 := range lengths { for _, N2 := range lengths { diff --git a/codec/codec.go b/codec/codec.go index 4ebc17a341d..b12a7d45c50 100644 --- a/codec/codec.go +++ b/codec/codec.go @@ -173,6 +173,7 @@ func (d *Decoder) UnpackBlock() []uint64 { if d.blockIdx >= len(d.Pack.Blocks) { return d.uids } + block := d.Pack.Blocks[d.blockIdx] last := block.Base @@ -223,6 +224,64 @@ func (d *Decoder) ApproxLen() int { type searchFunc func(int) bool +// SeekToBlock will find the block containing the uid, and unpack it. When we are going to +// intersect the list later, this function is useful. As this function skips the search function +// and returns the entire block, it is faster than Seek. Unlike seek, we don't truncate the uids +// returned, which would be done by the intersect function anyways. +func (d *Decoder) SeekToBlock(uid uint64, whence seekPos) []uint64 { + if d.Pack == nil { + return []uint64{} + } + prevBlockIdx := d.blockIdx + d.blockIdx = 0 + if uid == 0 { + return d.UnpackBlock() + } + + // If for some reason we are searching an older uid, we need to search the entire pack + if prevBlockIdx > 0 && uid < d.Pack.Blocks[prevBlockIdx].Base { + prevBlockIdx = 0 + } + + blocksFunc := func() searchFunc { + var f searchFunc + switch whence { + case SeekStart: + f = func(i int) bool { return d.Pack.Blocks[i+prevBlockIdx].Base >= uid } + case SeekCurrent: + f = func(i int) bool { return d.Pack.Blocks[i+prevBlockIdx].Base > uid } + } + return f + } + + idx := sort.Search(len(d.Pack.Blocks[prevBlockIdx:]), blocksFunc()) + prevBlockIdx + // The first block.Base >= uid. + if idx == 0 { + return d.UnpackBlock() + } + // The uid is the first entry in the block. + if idx < len(d.Pack.Blocks) && d.Pack.Blocks[idx].Base == uid { + d.blockIdx = idx + return d.UnpackBlock() + } + + // Either the idx = len(pack.Blocks) that means it wasn't found in any of the block's base. Or, + // we found the first block index whose base is greater than uid. In these cases, go to the + // previous block and search there. + d.blockIdx = idx - 1 // Move to the previous block. If blockIdx<0, unpack will deal with it. + if d.blockIdx != prevBlockIdx { + d.UnpackBlock() // And get all their uids. + } + + if uid <= d.uids[len(d.uids)-1] { + return d.uids + } + + // Could not find any uid in the block, which is >= uid. The next block might still have valid + // entries > uid. + return d.Next() +} + // Seek will search for uid in a packed block using the specified whence position. // The value of whence must be one of the predefined values SeekStart or SeekCurrent. // SeekStart searches uid and includes it as part of the results. diff --git a/go.mod b/go.mod index 634c273b634..45c7cf969f2 100644 --- a/go.mod +++ b/go.mod @@ -2,6 +2,8 @@ module github.com/dgraph-io/dgraph go 1.19 +replace github.com/dgraph-io/badger/v4 => ../badger + require ( contrib.go.opencensus.io/exporter/jaeger v0.1.0 contrib.go.opencensus.io/exporter/prometheus v0.1.0 @@ -40,7 +42,7 @@ require ( github.com/mitchellh/panicwrap v1.0.0 github.com/paulmach/go.geojson v0.0.0-20170327170536-40612a87147b github.com/pkg/errors v0.9.1 - github.com/pkg/profile v1.2.1 + github.com/pkg/profile v1.7.0 github.com/prometheus/client_golang v1.14.0 github.com/soheilhy/cmux v0.1.4 github.com/spf13/cast v1.3.0 @@ -85,10 +87,12 @@ require ( github.com/eapache/go-resiliency v1.4.0 // indirect github.com/eapache/go-xerial-snappy v0.0.0-20230731223053-c322873962e3 // indirect github.com/eapache/queue v1.1.0 // indirect + github.com/felixge/fgprof v0.9.3 // indirect github.com/frankban/quicktest v1.10.2 // indirect github.com/fsnotify/fsnotify v1.6.0 // indirect github.com/golang/groupcache v0.0.0-20210331224755-41bb18bfe9da // indirect github.com/google/flatbuffers v1.12.1 // indirect + github.com/google/pprof v0.0.0-20211214055906-6f57359322fd // indirect github.com/hashicorp/errwrap v1.1.0 // indirect github.com/hashicorp/go-cleanhttp v0.5.2 // indirect github.com/hashicorp/go-multierror v1.1.1 // indirect diff --git a/go.sum b/go.sum index 84b63f1b1ad..95283418f7e 100644 --- a/go.sum +++ b/go.sum @@ -145,8 +145,6 @@ github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSs github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/dgraph-io/badger v1.6.0/go.mod h1:zwt7syl517jmP8s94KqSxTlM6IMsdhYy6psNgSztDR4= -github.com/dgraph-io/badger/v4 v4.2.0 h1:kJrlajbXXL9DFTNuhhu9yCx7JJa4qpYWxtE8BzuWsEs= -github.com/dgraph-io/badger/v4 v4.2.0/go.mod h1:qfCqhPoWDFJRx1gp5QwwyGo8xk1lbHUxvK9nK0OGAak= github.com/dgraph-io/dgo/v230 v230.0.1 h1:kR7gI7/ZZv0jtG6dnedNgNOCxe1cbSG8ekF+pNfReks= github.com/dgraph-io/dgo/v230 v230.0.1/go.mod h1:5FerO2h4LPOxR2XTkOAtqUUPaFdQ+5aBOHXPBJ3nT10= github.com/dgraph-io/gqlgen v0.13.2 h1:TNhndk+eHKj5qE7BenKKSYdSIdOGhLqxR1rCiMso9KM= @@ -199,6 +197,8 @@ github.com/etcd-io/bbolt v1.3.3/go.mod h1:ZF2nL25h33cCyBtcyWeZ2/I3HQOfTP+0PIEvHj github.com/fasthttp-contrib/websocket v0.0.0-20160511215533-1f3b11f56072/go.mod h1:duJ4Jxv5lDcvg4QuQr0oowTf7dz4/CR8NtyCooz9HL8= github.com/fatih/color v1.7.0/go.mod h1:Zm6kSWBoL9eyXnKyktHP6abPY2pDugNf5KwzbycvMj4= github.com/fatih/structs v1.1.0/go.mod h1:9NiDSp5zOcgEDl+j00MP/WkGVPOlPRLejGD8Ga6PJ7M= +github.com/felixge/fgprof v0.9.3 h1:VvyZxILNuCiUCSXtPtYmmtGvb65nqXh2QFWc0Wpf2/g= +github.com/felixge/fgprof v0.9.3/go.mod h1:RdbpDgzqYVh/T9fPELJyV7EYJuHB55UTEULNun8eiPw= github.com/flosch/pongo2 v0.0.0-20190707114632-bbf5a6c351f4/go.mod h1:T9YF2M40nIgbVgp3rreNmTged+9HrbNTIQf1PsaIiTA= github.com/fogleman/gg v1.2.1-0.20190220221249-0403632d5b90/go.mod h1:R/bRT+9gY/C5z7JzPU0zXsXHKM4/ayA+zqcVNZzPa1k= github.com/fortytw2/leaktest v1.3.0 h1:u8491cBMTQ8ft8aeV+adlcytMZylmA5nnwwkRZjI8vw= @@ -316,6 +316,8 @@ github.com/google/pprof v0.0.0-20200708004538-1a94d8640e99/go.mod h1:ZgVRPoUq/hf github.com/google/pprof v0.0.0-20201023163331-3e6fc7fc9c4c/go.mod h1:kpwsk12EmLew5upagYY7GY0pfYCcupk39gWOCRROcvE= github.com/google/pprof v0.0.0-20201203190320-1bf35d6f28c2/go.mod h1:kpwsk12EmLew5upagYY7GY0pfYCcupk39gWOCRROcvE= github.com/google/pprof v0.0.0-20201218002935-b9804c9f04c2/go.mod h1:kpwsk12EmLew5upagYY7GY0pfYCcupk39gWOCRROcvE= +github.com/google/pprof v0.0.0-20211214055906-6f57359322fd h1:1FjCyPC+syAzJ5/2S8fqdZK1R22vvA0J7JZKcuOIQ7Y= +github.com/google/pprof v0.0.0-20211214055906-6f57359322fd/go.mod h1:KgnwoLYCZ8IQu3XUZ8Nc/bM9CCZFOyjUNOSygVozoDg= github.com/google/renameio v0.1.0/go.mod h1:KWCgfxg9yswjAJkECMjeO8J8rahYeXnNhOm40UhjYkI= github.com/google/uuid v1.1.2/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= github.com/google/uuid v1.3.0 h1:t6JiXgmwXMjEs8VusXIJk2BXHsn+wx8BZdTaoZ5fu7I= @@ -394,6 +396,7 @@ github.com/hashicorp/yamux v0.0.0-20181012175058-2f1d1f20f75d/go.mod h1:+NfK9FKe github.com/hpcloud/tail v1.0.0/go.mod h1:ab1qPbhIpdTxEkNHXyeSf5vhxWSCs/tWer42PpOxQnU= github.com/ianlancetaylor/demangle v0.0.0-20181102032728-5e5cf60278f6/go.mod h1:aSSvb/t6k1mPoxDqO4vJh6VOCGPwU4O0C2/Eqndh1Sc= github.com/ianlancetaylor/demangle v0.0.0-20200824232613-28f6c0f3b639/go.mod h1:aSSvb/t6k1mPoxDqO4vJh6VOCGPwU4O0C2/Eqndh1Sc= +github.com/ianlancetaylor/demangle v0.0.0-20210905161508-09a460cdf81d/go.mod h1:aYm2/VgdVmcIU8iMfdMvDMsRAQjcfZSKFby6HOFvi/w= github.com/imkira/go-interpol v1.1.0/go.mod h1:z0h2/2T3XF8kyEPpRgJ3kmNv+C43p+I/CoI+jC3w2iA= github.com/inconshreveable/mousetrap v1.0.0/go.mod h1:PxqpIevigyE2G7u3NXJIT2ANytuPF1OarO4DADm73n8= github.com/inconshreveable/mousetrap v1.1.0 h1:wN+x4NVGpMsO7ErUn/mUI3vEoE6Jt13X2s0bqwp9tc8= @@ -557,8 +560,8 @@ github.com/pkg/errors v0.8.0/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINE github.com/pkg/errors v0.8.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4= github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= -github.com/pkg/profile v1.2.1 h1:F++O52m40owAmADcojzM+9gyjmMOY/T4oYJkgFDH8RE= -github.com/pkg/profile v1.2.1/go.mod h1:hJw3o1OdXxsrSjjVksARp5W95eeEaEfptyVZyv6JUPA= +github.com/pkg/profile v1.7.0 h1:hnbDkaNWPCLMO9wGLdBFTIZvzDrDfBM2072E1S9gJkA= +github.com/pkg/profile v1.7.0/go.mod h1:8Uer0jas47ZQMJ7VD+OHknK4YDY07LPUC6dEvqDjvNo= github.com/pkg/sftp v1.13.1/go.mod h1:3HaPG6Dq1ILlpPZRO0HVMrsydcdLt6HRDccSgb87qRg= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= @@ -899,6 +902,7 @@ golang.org/x/sys v0.0.0-20210225134936-a50acf3fe073/go.mod h1:h1NjWce9XRLGQEsW7w golang.org/x/sys v0.0.0-20210423082822-04245dca01da/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20210423185535-09eb48e85fd7/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20210615035016-665e8c7367d1/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.0.0-20211007075335-d3039528d8ac/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20220520151302-bc2c85ada10a/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20220722155257-8c9f86f7a55f/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20220908164124-27713097b956/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= diff --git a/posting/list_test.go b/posting/list_test.go index 5256938e5d1..eefa2662848 100644 --- a/posting/list_test.go +++ b/posting/list_test.go @@ -435,6 +435,54 @@ func TestAddMutation_mrjn1(t *testing.T) { require.Equal(t, 0, ol.Length(txn.StartTs, 0)) } +func TestReadSingleValue(t *testing.T) { + defer setMaxListSize(maxListSize) + maxListSize = math.MaxInt32 + + // We call pl.Iterate and then stop iterating in the first loop when we are reading + // single values. This test confirms that the two functions, getFirst from this file + // and GetSingeValueForKey works without an issue. + + key := x.DataKey(x.GalaxyAttr("value"), 1240) + ol, err := getNew(key, ps, math.MaxUint64) + require.NoError(t, err) + N := int(10000) + for i := 2; i <= N; i += 2 { + edge := &pb.DirectedEdge{ + Value: []byte("ho hey there" + strconv.Itoa(i)), + } + txn := Txn{StartTs: uint64(i)} + addMutationHelper(t, ol, edge, Set, &txn) + require.NoError(t, ol.commitMutation(uint64(i), uint64(i)+1)) + kData := ol.getMutation(uint64(i)) + writer := NewTxnWriter(pstore) + if err := writer.SetAt(key, kData, BitDeltaPosting, uint64(i)); err != nil { + require.NoError(t, err) + } + writer.Flush() + + if i%10 == 0 { + // Do frequent rollups, and store data in old timestamp + kvs, err := ol.Rollup(nil, txn.StartTs-3) + require.NoError(t, err) + require.NoError(t, writePostingListToDisk(kvs)) + ol, err = getNew(key, ps, math.MaxUint64) + require.NoError(t, err) + } + + j := 2 + if j < int(ol.minTs) { + j = int(ol.minTs) + } + for ; j < i+6; j++ { + tx := NewTxn(uint64(j)) + k, err := tx.cache.GetSinglePosting(key) + require.NoError(t, err) + checkValue(t, ol, string(k.Postings[0].Value), uint64(j)) + } + } +} + func TestRollupMaxTsIsSet(t *testing.T) { defer setMaxListSize(maxListSize) maxListSize = math.MaxInt32 diff --git a/posting/lists.go b/posting/lists.go index 5c70fc5914b..eea54425efc 100644 --- a/posting/lists.go +++ b/posting/lists.go @@ -111,6 +111,8 @@ type LocalCache struct { // plists are posting lists in memory. They can be discarded to reclaim space. plists map[string]*List + + postings map[string]*pb.PostingList } // NewLocalCache returns a new LocalCache instance. @@ -120,13 +122,17 @@ func NewLocalCache(startTs uint64) *LocalCache { deltas: make(map[string][]byte), plists: make(map[string]*List), maxVersions: make(map[string]uint64), + postings: make(map[string]*pb.PostingList), } } // NoCache returns a new LocalCache instance, which won't cache anything. Useful to pass startTs // around. func NoCache(startTs uint64) *LocalCache { - return &LocalCache{startTs: startTs} + return &LocalCache{ + startTs: startTs, + postings: make(map[string]*pb.PostingList), + } } func (lc *LocalCache) getNoStore(key string) *List { @@ -138,6 +144,20 @@ func (lc *LocalCache) getNoStore(key string) *List { return nil } +// SetIfAbsent adds the list for the specified key to the cache. If a list for the same +// key already exists, the cache will not be modified and the existing list +// will be returned instead. This behavior is meant to prevent the goroutines +// using the cache from ending up with an orphaned version of a list. +func (lc *LocalCache) SetPostingIfAbsent(key string, updated *pb.PostingList) *pb.PostingList { + lc.Lock() + defer lc.Unlock() + if pl, ok := lc.postings[key]; ok { + return pl + } + lc.postings[key] = updated + return updated +} + // SetIfAbsent adds the list for the specified key to the cache. If a list for the same // key already exists, the cache will not be modified and the existing list // will be returned instead. This behavior is meant to prevent the goroutines @@ -195,6 +215,133 @@ func (lc *LocalCache) getInternal(key []byte, readFromDisk bool) (*List, error) return lc.SetIfAbsent(skey, pl), nil } +func (lc *LocalCache) GetBatchSinglePosting(keys [][]byte) ([]*pb.PostingList, error) { + results := make([]*pb.PostingList, len(keys)) + remaining_keys := make([][]byte, 0) + lc.RLock() + for i, key := range keys { + if pl, ok := lc.postings[string(key)]; ok && pl != nil { + results[i] = pl + } else if delta, ok := lc.deltas[string(key)]; ok && len(delta) > 0 { + pl := &pb.PostingList{} + err := pl.Unmarshal(delta) + if err != nil { + results[i] = pl + } + } else { + remaining_keys = append(remaining_keys, key) + } + } + lc.RUnlock() + + txn := pstore.NewTransactionAt(lc.startTs, false) + items, err := txn.GetBatch(remaining_keys) + if err != nil { + fmt.Println(err, keys) + return nil, err + } + idx := 0 + + for i := 0; i < len(results); i++ { + if results[i] != nil { + continue + } + pl := &pb.PostingList{} + err = items[idx].Value(func(val []byte) error { + if err := pl.Unmarshal(val); err != nil { + return err + } + return nil + }) + idx += 1 + results[i] = pl + } + + for i := 0; i < len(results); i++ { + pl := results[i] + idx := 0 + for _, postings := range pl.Postings { + if hasDeleteAll(postings) { + return nil, nil + } + if postings.Op != Del { + pl.Postings[idx] = postings + idx++ + } + } + pl.Postings = pl.Postings[:idx] + results[i] = pl + lc.SetPostingIfAbsent(string(keys[i]), pl) + } + + return results, err +} + +// GetSinglePosting retrieves the cached version of the first item in the list associated with the +// given key. This is used for retrieving the value of a scalar predicats. +func (lc *LocalCache) GetSinglePosting(key []byte) (*pb.PostingList, error) { + getDeltas := func() *pb.PostingList { + lc.RLock() + defer lc.RUnlock() + + pl := &pb.PostingList{} + if delta, ok := lc.deltas[string(key)]; ok && len(delta) > 0 { + err := pl.Unmarshal(delta) + if err != nil { + return pl + } + } + + return nil + } + + getPostings := func() (*pb.PostingList, error) { + pl := getDeltas() + if pl != nil { + return pl, nil + } + + pl = &pb.PostingList{} + txn := pstore.NewTransactionAt(lc.startTs, false) + item, err := txn.Get(key) + if err != nil { + return nil, err + } + + err = item.Value(func(val []byte) error { + if err := pl.Unmarshal(val); err != nil { + return err + } + return nil + }) + + return pl, err + } + + pl, err := getPostings() + if err == badger.ErrKeyNotFound { + return nil, nil + } + if err != nil { + return nil, err + } + + // Filter and remove STAR_ALL and OP_DELETE Postings + idx := 0 + for _, postings := range pl.Postings { + if hasDeleteAll(postings) { + return nil, nil + } + if postings.Op != Del { + pl.Postings[idx] = postings + idx++ + } + } + pl.Postings = pl.Postings[:idx] + lc.SetPostingIfAbsent(string(key), pl) + return pl, nil +} + // Get retrieves the cached version of the list associated with the given key. func (lc *LocalCache) Get(key []byte) (*List, error) { return lc.getInternal(key, true) diff --git a/worker/task.go b/worker/task.go index 8c73c58b606..445628fb17b 100644 --- a/worker/task.go +++ b/worker/task.go @@ -377,11 +377,20 @@ func (qs *queryState) handleValuePostings(ctx context.Context, args funcArgs) er outputs := make([]*pb.Result, numGo) listType := schema.State().IsList(q.Attr) + // These are certain special cases where we can get away with reading only the latest value + // Lang doesn't work because we would be storing various different languages at various + // time. So when we go to read the latest value, we might get a different language. + // Similarly with DoCount and ExpandAll and Facets. List types are also not supported + // because list is stored by time, and we combine all the list items at various timestamps. + hasLang := schema.State().HasLang(q.Attr) + getMultiplePosting := q.DoCount || q.ExpandAll || listType || hasLang || q.FacetParam != nil + calculate := func(start, end int) error { x.AssertTrue(start%width == 0) out := &pb.Result{} outputs[start/width] = out + cache := make([]*pb.PostingList, 0) for i := start; i < end; i++ { select { case <-ctx.Done(): @@ -391,49 +400,86 @@ func (qs *queryState) handleValuePostings(ctx context.Context, args funcArgs) er key := x.DataKey(q.Attr, q.UidList.Uids[i]) // Get or create the posting list for an entity, attribute combination. - pl, err := qs.cache.Get(key) - if err != nil { - return err - } - // If count is being requested, there is no need to populate value and facets matrix. - if q.DoCount { - count, err := countForValuePostings(args, pl, facetsTree, listType) - if err != nil && err != posting.ErrNoValue { + var vals []types.Val + fcs := &pb.FacetsList{FacetsList: make([]*pb.Facets, 0)} // TODO Figure out how it is stored + + if !getMultiplePosting { + if len(cache) == 0 { + keys := make([][]byte, 10) + keys[0] = key + for j := i + 1; j < i+10 && j < end; j++ { + keys[j-i] = x.DataKey(q.Attr, q.UidList.Uids[j]) + } + cache, err = qs.cache.GetBatchSinglePosting(keys) + if err != nil { + return err + } + } + pl := cache[0] + if len(cache) > 1 { + cache = cache[1:] + } + if pl == nil || len(pl.Postings) == 0 { + out.UidMatrix = append(out.UidMatrix, &pb.List{}) + out.FacetMatrix = append(out.FacetMatrix, &pb.FacetsList{}) + out.ValueMatrix = append(out.ValueMatrix, + &pb.ValueList{Values: []*pb.TaskValue{}}) + continue + } + vals = make([]types.Val, len(pl.Postings)) + for i, p := range pl.Postings { + vals[i] = types.Val{ + Tid: types.TypeID(p.ValType), + Value: p.Value, + } + } + } else { + pl, err := qs.cache.Get(key) + if err != nil { return err } - out.Counts = append(out.Counts, uint32(count)) - // Add an empty UID list to make later processing consistent. - out.UidMatrix = append(out.UidMatrix, &pb.List{}) - continue - } - vals, fcs, err := retrieveValuesAndFacets(args, pl, facetsTree, listType) - switch { - case err == posting.ErrNoValue || (err == nil && len(vals) == 0): - // This branch is taken when the value does not exist in the pl or - // the number of values retrieved is zero (there could still be facets). - // We add empty lists to the UidMatrix, FaceMatrix, ValueMatrix and - // LangMatrix so that all these data structure have predictable layouts. - out.UidMatrix = append(out.UidMatrix, &pb.List{}) - out.FacetMatrix = append(out.FacetMatrix, &pb.FacetsList{}) - out.ValueMatrix = append(out.ValueMatrix, - &pb.ValueList{Values: []*pb.TaskValue{}}) - if q.ExpandAll { - // To keep the cardinality same as that of ValueMatrix. - out.LangMatrix = append(out.LangMatrix, &pb.LangList{}) + // If count is being requested, there is no need to populate value and facets matrix. + if q.DoCount { + count, err := countForValuePostings(args, pl, facetsTree, listType) + if err != nil && err != posting.ErrNoValue { + return err + } + out.Counts = append(out.Counts, uint32(count)) + // Add an empty UID list to make later processing consistent. + out.UidMatrix = append(out.UidMatrix, &pb.List{}) + continue } - continue - case err != nil: - return err - } - if q.ExpandAll { - langTags, err := pl.GetLangTags(args.q.ReadTs) - if err != nil { + vals, fcs, err = retrieveValuesAndFacets(args, pl, facetsTree, listType) + + switch { + case err == posting.ErrNoValue || (err == nil && len(vals) == 0): + // This branch is taken when the value does not exist in the pl or + // the number of values retrieved is zero (there could still be facets). + // We add empty lists to the UidMatrix, FaceMatrix, ValueMatrix and + // LangMatrix so that all these data structure have predictable layouts. + out.UidMatrix = append(out.UidMatrix, &pb.List{}) + out.FacetMatrix = append(out.FacetMatrix, &pb.FacetsList{}) + out.ValueMatrix = append(out.ValueMatrix, + &pb.ValueList{Values: []*pb.TaskValue{}}) + if q.ExpandAll { + // To keep the cardinality same as that of ValueMatrix. + out.LangMatrix = append(out.LangMatrix, &pb.LangList{}) + } + continue + case err != nil: return err } - out.LangMatrix = append(out.LangMatrix, &pb.LangList{Lang: langTags}) + + if q.ExpandAll { + langTags, err := pl.GetLangTags(args.q.ReadTs) + if err != nil { + return err + } + out.LangMatrix = append(out.LangMatrix, &pb.LangList{Lang: langTags}) + } } uidList := new(pb.List)