From 20a7a77f74721648a706ade8b78c97d708a530a1 Mon Sep 17 00:00:00 2001 From: Harshil Goel Date: Fri, 15 Sep 2023 12:15:14 +0530 Subject: [PATCH 01/29] updated test --- algo/uidlist_test.go | 11 ++++++++++- 1 file changed, 10 insertions(+), 1 deletion(-) diff --git a/algo/uidlist_test.go b/algo/uidlist_test.go index 05eafba68e3..1340fad7ad9 100644 --- a/algo/uidlist_test.go +++ b/algo/uidlist_test.go @@ -389,8 +389,18 @@ func BenchmarkListIntersectCompressBin(b *testing.B) { sort.Slice(v1, func(i, j int) bool { return v1[i] < v1[j] }) dst2 := &pb.List{} + dst1 := &pb.List{} compressedUids := codec.Encode(v1, 256) + b.Run(fmt.Sprintf("linJump:IntersectWith:ratio=%v:size=%d:overlap=%.2f:", r, sz, overlap), + func(b *testing.B) { + for k := 0; k < b.N; k++ { + dec := codec.Decoder{Pack: compressedUids} + dec.Seek(0, codec.SeekStart) + IntersectCompressedWithLinJump(&dec, u1, &dst1.Uids) + } + }) + b.Run(fmt.Sprintf("compressed:IntersectWith:ratio=%v:size=%d:overlap=%.2f:", r, sz, overlap), func(b *testing.B) { for k := 0; k < b.N; k++ { @@ -399,7 +409,6 @@ func BenchmarkListIntersectCompressBin(b *testing.B) { IntersectCompressedWithBin(&dec, u1, &dst2.Uids) } }) - fmt.Println() codec.FreePack(compressedUids) } From 4e458f2e03a49314432c1636185daa0ffc86d5e3 Mon Sep 17 00:00:00 2001 From: Harshil Goel Date: Fri, 15 Sep 2023 14:32:48 +0530 Subject: [PATCH 02/29] Fixed CompressedBin algo, updated heuristic based on benchmarks --- algo/uidlist.go | 45 ++++++++++++++++++------------- algo/uidlist_test.go | 2 +- codec/codec.go | 64 +++++++++++++++++++++++++++++++++++++++++--- 3 files changed, 88 insertions(+), 23 deletions(-) diff --git a/algo/uidlist.go b/algo/uidlist.go index 1465d4ff2fe..166808e1975 100644 --- a/algo/uidlist.go +++ b/algo/uidlist.go @@ -60,7 +60,7 @@ func IntersectCompressedWith(pack *pb.UidPack, afterUID uint64, v, o *pb.List) { // Select appropriate function based on heuristics. ratio := float64(m) / float64(n) - if ratio < 500 { + if ratio < 100 { IntersectCompressedWithLinJump(&dec, v.Uids, &dst) } else { IntersectCompressedWithBin(&dec, v.Uids, &dst) @@ -94,7 +94,7 @@ func IntersectCompressedWithLinJump(dec *codec.Decoder, v []uint64, o *[]uint64) // https://link.springer.com/chapter/10.1007/978-3-642-12476-1_3 // Call seek on dec before calling this function func IntersectCompressedWithBin(dec *codec.Decoder, q []uint64, o *[]uint64) { - ld := dec.ApproxLen() + ld := codec.ExactLen(dec.Pack) lq := len(q) if lq == 0 { @@ -105,13 +105,19 @@ func IntersectCompressedWithBin(dec *codec.Decoder, q []uint64, o *[]uint64) { } // Pick the shorter list and do binary search - if ld < lq { + if ld <= lq { for { blockUids := dec.Uids() if len(blockUids) == 0 { break } - IntersectWithBin(blockUids, q, o) + if ld*10 < lq { + IntersectWithBin(blockUids, q, o) + } else { + // For small enough difference between two arrays, we should just + // do lin intersect + IntersectWithLin(blockUids, q, o) + } lastUid := blockUids[len(blockUids)-1] qidx := sort.Search(len(q), func(idx int) bool { return q[idx] >= lastUid @@ -125,26 +131,29 @@ func IntersectCompressedWithBin(dec *codec.Decoder, q []uint64, o *[]uint64) { return } - var uids []uint64 - for _, u := range q { + uids := dec.Uids() + qidx := -1 + for { + qidx += 1 + if qidx >= len(q) { + return + } + u := q[qidx] if len(uids) == 0 || u > uids[len(uids)-1] { - uids = dec.Seek(u, codec.SeekStart) + if lq*10 < ld { + uids = dec.LinearSeek(u) + } else { + uids = dec.SeekToBlock(u, codec.SeekStart) + } if len(uids) == 0 { return } } - uidIdx := sort.Search(len(uids), func(idx int) bool { - return uids[idx] >= u - }) - if uidIdx >= len(uids) { - // We know that u < max(uids). If we didn't find it here, it's not here. - continue - } - if uids[uidIdx] == u { - *o = append(*o, u) - uidIdx++ + _, off := IntersectWithJump(uids, q[qidx:], o) + if off == 0 { + off = 1 // if v[k] isn't in u, move forward } - uids = uids[uidIdx:] + qidx += off } } diff --git a/algo/uidlist_test.go b/algo/uidlist_test.go index 1340fad7ad9..8bf9a224ee5 100644 --- a/algo/uidlist_test.go +++ b/algo/uidlist_test.go @@ -373,7 +373,7 @@ func BenchmarkListIntersectCompressBin(b *testing.B) { for _, r := range rs { sz1 := sz sz2 := int(float64(sz) * r) - if sz2 > 1000000 || sz2 == 0 { + if sz2 > 10000000 || sz2 == 0 { break } diff --git a/codec/codec.go b/codec/codec.go index 4ebc17a341d..30c6cbe3b63 100644 --- a/codec/codec.go +++ b/codec/codec.go @@ -223,6 +223,59 @@ func (d *Decoder) ApproxLen() int { type searchFunc func(int) bool +// SeekToBlock will find the nearest block, and unpack it. Unlike Seek, it doesn't +// apply search in the resulting uid list and then move the pointer forward. When we are going +// to intersect the list later, this function is useful. +func (d *Decoder) SeekToBlock(uid uint64, whence seekPos) []uint64 { + if d.Pack == nil { + return []uint64{} + } + prevBlockIdx := d.blockIdx + d.blockIdx = 0 + if uid == 0 { + return d.UnpackBlock() + } + + pack := d.Pack + blocksFunc := func() searchFunc { + var f searchFunc + switch whence { + case SeekStart: + f = func(i int) bool { return pack.Blocks[i+prevBlockIdx].Base >= uid } + case SeekCurrent: + f = func(i int) bool { return pack.Blocks[i+prevBlockIdx].Base > uid } + } + return f + } + + idx := sort.Search(len(pack.Blocks[prevBlockIdx:]), blocksFunc()) + prevBlockIdx + // The first block.Base >= uid. + if idx == 0 { + return d.UnpackBlock() + } + // The uid is the first entry in the block. + if idx < len(pack.Blocks) && pack.Blocks[idx].Base == uid { + d.blockIdx = idx + return d.UnpackBlock() + } + + // Either the idx = len(pack.Blocks) that means it wasn't found in any of the block's base. Or, + // we found the first block index whose base is greater than uid. In these cases, go to the + // previous block and search there. + d.blockIdx = idx - 1 // Move to the previous block. If blockIdx<0, unpack will deal with it. + if d.blockIdx != prevBlockIdx { + d.UnpackBlock() // And get all their uids. + } + + if uid < d.uids[len(d.uids)-1] { + return d.uids + } + + // Could not find any uid in the block, which is >= uid. The next block might still have valid + // entries > uid. + return d.Next() +} + // Seek will search for uid in a packed block using the specified whence position. // The value of whence must be one of the predefined values SeekStart or SeekCurrent. // SeekStart searches uid and includes it as part of the results. @@ -233,6 +286,7 @@ func (d *Decoder) Seek(uid uint64, whence seekPos) []uint64 { if d.Pack == nil { return []uint64{} } + prevBlockIdx := d.blockIdx d.blockIdx = 0 if uid == 0 { return d.UnpackBlock() @@ -243,14 +297,14 @@ func (d *Decoder) Seek(uid uint64, whence seekPos) []uint64 { var f searchFunc switch whence { case SeekStart: - f = func(i int) bool { return pack.Blocks[i].Base >= uid } + f = func(i int) bool { return pack.Blocks[i+prevBlockIdx].Base >= uid } case SeekCurrent: - f = func(i int) bool { return pack.Blocks[i].Base > uid } + f = func(i int) bool { return pack.Blocks[i+prevBlockIdx].Base > uid } } return f } - idx := sort.Search(len(pack.Blocks), blocksFunc()) + idx := sort.Search(len(pack.Blocks[prevBlockIdx:]), blocksFunc()) + prevBlockIdx // The first block.Base >= uid. if idx == 0 { return d.UnpackBlock() @@ -265,7 +319,9 @@ func (d *Decoder) Seek(uid uint64, whence seekPos) []uint64 { // we found the first block index whose base is greater than uid. In these cases, go to the // previous block and search there. d.blockIdx = idx - 1 // Move to the previous block. If blockIdx<0, unpack will deal with it. - d.UnpackBlock() // And get all their uids. + if d.blockIdx != prevBlockIdx { + d.UnpackBlock() // And get all their uids. + } uidsFunc := func() searchFunc { var f searchFunc From a0b1e23590343fa31d7a51b6b6cc2218d74c3f71 Mon Sep 17 00:00:00 2001 From: Harshil Goel Date: Fri, 15 Sep 2023 17:47:28 +0530 Subject: [PATCH 03/29] updated algo --- algo/uidlist.go | 33 +++++++++++++++++++-------------- 1 file changed, 19 insertions(+), 14 deletions(-) diff --git a/algo/uidlist.go b/algo/uidlist.go index 166808e1975..c0e966b6279 100644 --- a/algo/uidlist.go +++ b/algo/uidlist.go @@ -111,22 +111,25 @@ func IntersectCompressedWithBin(dec *codec.Decoder, q []uint64, o *[]uint64) { if len(blockUids) == 0 { break } - if ld*10 < lq { - IntersectWithBin(blockUids, q, o) + if ld*10 < len(q) { + q = q[IntersectWithBin(blockUids, q, o):] + dec.Next() } else { // For small enough difference between two arrays, we should just // do lin intersect - IntersectWithLin(blockUids, q, o) + _, off := IntersectWithLin(blockUids, q, o) + if off == 0 { + off = 1 + } + if off == len(q) { + return + } + q = q[off:] + if len(q) == 0 { + return + } + dec.Seek(q[0], codec.SeekStart) } - lastUid := blockUids[len(blockUids)-1] - qidx := sort.Search(len(q), func(idx int) bool { - return q[idx] >= lastUid - }) - if qidx >= len(q) { - return - } - q = q[qidx:] - dec.Next() } return } @@ -242,7 +245,8 @@ func IntersectWithJump(u, v []uint64, o *[]uint64) (int, int) { // IntersectWithBin is based on the paper // "Fast Intersection Algorithms for Sorted Sequences" // https://link.springer.com/chapter/10.1007/978-3-642-12476-1_3 -func IntersectWithBin(d, q []uint64, o *[]uint64) { +// Returns where to move the second array(q) to. O means not found +func IntersectWithBin(d, q []uint64, o *[]uint64) int { ld := len(d) lq := len(q) @@ -251,7 +255,7 @@ func IntersectWithBin(d, q []uint64, o *[]uint64) { d, q = q, d } if ld == 0 || lq == 0 || d[ld-1] < q[0] || q[lq-1] < d[0] { - return + return 0 } val := d[0] @@ -265,6 +269,7 @@ func IntersectWithBin(d, q []uint64, o *[]uint64) { }) binIntersect(d, q[minq:maxq], o) + return maxq } // binIntersect is the recursive function used. From 09458787324a19e41d6bb810303703232c8f1205 Mon Sep 17 00:00:00 2001 From: Harshil Goel Date: Fri, 15 Sep 2023 17:51:25 +0530 Subject: [PATCH 04/29] updated algo --- algo/uidlist.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/algo/uidlist.go b/algo/uidlist.go index c0e966b6279..5e65473525a 100644 --- a/algo/uidlist.go +++ b/algo/uidlist.go @@ -60,7 +60,7 @@ func IntersectCompressedWith(pack *pb.UidPack, afterUID uint64, v, o *pb.List) { // Select appropriate function based on heuristics. ratio := float64(m) / float64(n) - if ratio < 100 { + if ratio < 10 { IntersectCompressedWithLinJump(&dec, v.Uids, &dst) } else { IntersectCompressedWithBin(&dec, v.Uids, &dst) From b8b96d6f234e4195d2821e9ea5067f69f536a067 Mon Sep 17 00:00:00 2001 From: Harshil Goel Date: Fri, 15 Sep 2023 19:15:06 +0530 Subject: [PATCH 05/29] Fixed tests --- algo/uidlist.go | 15 ++++---------- codec/codec.go | 53 ++++++++++++++++++++++++++++++++----------------- 2 files changed, 39 insertions(+), 29 deletions(-) diff --git a/algo/uidlist.go b/algo/uidlist.go index 5e65473525a..7a1f4903b27 100644 --- a/algo/uidlist.go +++ b/algo/uidlist.go @@ -113,23 +113,16 @@ func IntersectCompressedWithBin(dec *codec.Decoder, q []uint64, o *[]uint64) { } if ld*10 < len(q) { q = q[IntersectWithBin(blockUids, q, o):] - dec.Next() } else { // For small enough difference between two arrays, we should just // do lin intersect _, off := IntersectWithLin(blockUids, q, o) - if off == 0 { - off = 1 - } - if off == len(q) { - return - } q = q[off:] - if len(q) == 0 { - return - } - dec.Seek(q[0], codec.SeekStart) } + if len(q) == 0 { + return + } + dec.Next() } return } diff --git a/codec/codec.go b/codec/codec.go index 30c6cbe3b63..e1b8d119da6 100644 --- a/codec/codec.go +++ b/codec/codec.go @@ -286,41 +286,58 @@ func (d *Decoder) Seek(uid uint64, whence seekPos) []uint64 { if d.Pack == nil { return []uint64{} } + prevBlockIdx := d.blockIdx - d.blockIdx = 0 + pack := d.Pack + blocksToSearch := pack.Blocks + offset := 0 + if len(d.uids) > 0 { + first := d.uids[0] + second := d.uids[len(d.uids)-1] + + if uid >= first && uid <= second { + blocksToSearch = nil + } else if uid > second { + blocksToSearch = blocksToSearch[prevBlockIdx:] + offset = prevBlockIdx + } else { + blocksToSearch = blocksToSearch[:prevBlockIdx] + } + } + if uid == 0 { return d.UnpackBlock() } - pack := d.Pack blocksFunc := func() searchFunc { var f searchFunc switch whence { case SeekStart: - f = func(i int) bool { return pack.Blocks[i+prevBlockIdx].Base >= uid } + f = func(i int) bool { return blocksToSearch[i].Base >= uid } case SeekCurrent: - f = func(i int) bool { return pack.Blocks[i+prevBlockIdx].Base > uid } + f = func(i int) bool { return blocksToSearch[i].Base > uid } } return f } - idx := sort.Search(len(pack.Blocks[prevBlockIdx:]), blocksFunc()) + prevBlockIdx - // The first block.Base >= uid. - if idx == 0 { - return d.UnpackBlock() - } - // The uid is the first entry in the block. - if idx < len(pack.Blocks) && pack.Blocks[idx].Base == uid { - d.blockIdx = idx - return d.UnpackBlock() - } - // Either the idx = len(pack.Blocks) that means it wasn't found in any of the block's base. Or, // we found the first block index whose base is greater than uid. In these cases, go to the // previous block and search there. - d.blockIdx = idx - 1 // Move to the previous block. If blockIdx<0, unpack will deal with it. - if d.blockIdx != prevBlockIdx { - d.UnpackBlock() // And get all their uids. + if blocksToSearch != nil { + idx := sort.Search(len(blocksToSearch), blocksFunc()) + offset + // The first block.Base >= uid. + if idx == 0 { + return d.UnpackBlock() + } + + // The uid is the first entry in the block. + if idx < len(pack.Blocks) && pack.Blocks[idx].Base == uid { + d.blockIdx = idx + return d.UnpackBlock() + } + + d.blockIdx = idx - 1 // Move to the previous block. If blockIdx<0, unpack will deal with it. + d.UnpackBlock() // And get all their uids. } uidsFunc := func() searchFunc { From 7bf96050758ceafa988d0207c8272a4a48eb9dcf Mon Sep 17 00:00:00 2001 From: Harshil Goel Date: Fri, 29 Sep 2023 14:55:18 +0530 Subject: [PATCH 06/29] fixed test --- algo/uidlist_test.go | 4 ++-- codec/codec.go | 54 ++++++++++++++------------------------------ 2 files changed, 19 insertions(+), 39 deletions(-) diff --git a/algo/uidlist_test.go b/algo/uidlist_test.go index 8bf9a224ee5..48dd289c23d 100644 --- a/algo/uidlist_test.go +++ b/algo/uidlist_test.go @@ -554,7 +554,7 @@ func TestIntersectCompressedWithLinJump(t *testing.T) { } func TestIntersectCompressedWithBin(t *testing.T) { - lengths := []int{0, 1, 3, 11, 100} + lengths := []int{0, 1, 3, 11, 100, 500, 1000} for _, N1 := range lengths { for _, N2 := range lengths { @@ -579,7 +579,7 @@ func TestIntersectCompressedWithBin(t *testing.T) { } func TestIntersectCompressedWithBinMissingSize(t *testing.T) { - lengths := []int{0, 1, 3, 11, 100} + lengths := []int{0, 1, 3, 11, 100, 500, 1000} for _, N1 := range lengths { for _, N2 := range lengths { diff --git a/codec/codec.go b/codec/codec.go index e1b8d119da6..2967d732247 100644 --- a/codec/codec.go +++ b/codec/codec.go @@ -286,59 +286,39 @@ func (d *Decoder) Seek(uid uint64, whence seekPos) []uint64 { if d.Pack == nil { return []uint64{} } - - prevBlockIdx := d.blockIdx - pack := d.Pack - blocksToSearch := pack.Blocks - offset := 0 - if len(d.uids) > 0 { - first := d.uids[0] - second := d.uids[len(d.uids)-1] - - if uid >= first && uid <= second { - blocksToSearch = nil - } else if uid > second { - blocksToSearch = blocksToSearch[prevBlockIdx:] - offset = prevBlockIdx - } else { - blocksToSearch = blocksToSearch[:prevBlockIdx] - } - } - + d.blockIdx = 0 if uid == 0 { return d.UnpackBlock() } + pack := d.Pack blocksFunc := func() searchFunc { var f searchFunc switch whence { case SeekStart: - f = func(i int) bool { return blocksToSearch[i].Base >= uid } + f = func(i int) bool { return pack.Blocks[i].Base >= uid } case SeekCurrent: - f = func(i int) bool { return blocksToSearch[i].Base > uid } + f = func(i int) bool { return pack.Blocks[i].Base > uid } } return f } + idx := sort.Search(len(pack.Blocks), blocksFunc()) + // The first block.Base >= uid. + if idx == 0 { + return d.UnpackBlock() + } + // The uid is the first entry in the block. + if idx < len(pack.Blocks) && pack.Blocks[idx].Base == uid { + d.blockIdx = idx + return d.UnpackBlock() + } + // Either the idx = len(pack.Blocks) that means it wasn't found in any of the block's base. Or, // we found the first block index whose base is greater than uid. In these cases, go to the // previous block and search there. - if blocksToSearch != nil { - idx := sort.Search(len(blocksToSearch), blocksFunc()) + offset - // The first block.Base >= uid. - if idx == 0 { - return d.UnpackBlock() - } - - // The uid is the first entry in the block. - if idx < len(pack.Blocks) && pack.Blocks[idx].Base == uid { - d.blockIdx = idx - return d.UnpackBlock() - } - - d.blockIdx = idx - 1 // Move to the previous block. If blockIdx<0, unpack will deal with it. - d.UnpackBlock() // And get all their uids. - } + d.blockIdx = idx - 1 // Move to the previous block. If blockIdx<0, unpack will deal with it. + d.UnpackBlock() // And get all their uids. uidsFunc := func() searchFunc { var f searchFunc From d350c12e695ba5f975de24f233b3f8599ad20b1e Mon Sep 17 00:00:00 2001 From: Harshil Goel Date: Fri, 29 Sep 2023 15:47:20 +0530 Subject: [PATCH 07/29] Fixed failing tests, updated existing test to be more robust --- algo/uidlist.go | 5 ++--- algo/uidlist_test.go | 45 ++++++++++++++++++++++++++++++++++++++++---- codec/codec.go | 7 ++++++- 3 files changed, 49 insertions(+), 8 deletions(-) diff --git a/algo/uidlist.go b/algo/uidlist.go index 7a1f4903b27..cb4c08688a6 100644 --- a/algo/uidlist.go +++ b/algo/uidlist.go @@ -128,9 +128,8 @@ func IntersectCompressedWithBin(dec *codec.Decoder, q []uint64, o *[]uint64) { } uids := dec.Uids() - qidx := -1 + qidx := 0 for { - qidx += 1 if qidx >= len(q) { return } @@ -139,7 +138,7 @@ func IntersectCompressedWithBin(dec *codec.Decoder, q []uint64, o *[]uint64) { if lq*10 < ld { uids = dec.LinearSeek(u) } else { - uids = dec.SeekToBlock(u, codec.SeekStart) + uids = dec.SeekToBlock(u, codec.SeekCurrent) } if len(uids) == 0 { return diff --git a/algo/uidlist_test.go b/algo/uidlist_test.go index 48dd289c23d..50fddb30b38 100644 --- a/algo/uidlist_test.go +++ b/algo/uidlist_test.go @@ -502,6 +502,43 @@ func sortUint64(nums []uint64) { sort.Slice(nums, func(i, j int) bool { return nums[i] < nums[j] }) } +func fillNumsDiff(N1, N2, N3 int) ([]uint64, []uint64, []uint64) { + rand.Seed(time.Now().UnixNano()) + + commonNums := make([]uint64, N1) + blockNums := make([]uint64, N1+N2) + otherNums := make([]uint64, N1+N3) + allC := make(map[uint64]bool) + + for i := 0; i < N1; i++ { + val := rand.Uint64() % 1000 + commonNums[i] = val + blockNums[i] = val + otherNums[i] = val + allC[val] = true + } + + for i := N1; i < N1+N2; i++ { + val := rand.Uint64() % 1000 + blockNums[i] = val + allC[val] = true + } + + for i := N1; i < N1+N3; i++ { + val := rand.Uint64() + for ok := true; ok; _, ok = allC[val] { + val = rand.Uint64() % 1000 + } + otherNums[i] = val + } + + sortUint64(commonNums) + sortUint64(blockNums) + sortUint64(otherNums) + + return commonNums, blockNums, otherNums +} + func fillNums(N1, N2 int) ([]uint64, []uint64, []uint64) { rand.Seed(time.Now().UnixNano()) @@ -554,12 +591,12 @@ func TestIntersectCompressedWithLinJump(t *testing.T) { } func TestIntersectCompressedWithBin(t *testing.T) { - lengths := []int{0, 1, 3, 11, 100, 500, 1000} + //lengths := []int{0, 1, 3, 11, 100, 500, 1000} - for _, N1 := range lengths { - for _, N2 := range lengths { + for _, N1 := range []int{11} { + for _, N2 := range []int{3} { // Intersection of blockNums and otherNums is commonNums. - commonNums, blockNums, otherNums := fillNums(N1, N2) + commonNums, blockNums, otherNums := fillNumsDiff(N1/10, N1, N2) enc := codec.Encoder{BlockSize: 10} for _, num := range blockNums { diff --git a/codec/codec.go b/codec/codec.go index 2967d732247..36b1b3f1228 100644 --- a/codec/codec.go +++ b/codec/codec.go @@ -236,6 +236,11 @@ func (d *Decoder) SeekToBlock(uid uint64, whence seekPos) []uint64 { return d.UnpackBlock() } + // If for some reason we are searching an older uid, we need to search the entire pack + if prevBlockIdx > 0 && uid < d.Pack.Blocks[prevBlockIdx].Base { + prevBlockIdx = 0 + } + pack := d.Pack blocksFunc := func() searchFunc { var f searchFunc @@ -267,7 +272,7 @@ func (d *Decoder) SeekToBlock(uid uint64, whence seekPos) []uint64 { d.UnpackBlock() // And get all their uids. } - if uid < d.uids[len(d.uids)-1] { + if uid <= d.uids[len(d.uids)-1] { return d.uids } From 36b27222408033fe3994b377c05e93de13975863 Mon Sep 17 00:00:00 2001 From: Harshil Goel Date: Fri, 29 Sep 2023 19:54:15 +0530 Subject: [PATCH 08/29] Fixed comment --- algo/uidlist.go | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/algo/uidlist.go b/algo/uidlist.go index cb4c08688a6..5c0b6013743 100644 --- a/algo/uidlist.go +++ b/algo/uidlist.go @@ -24,7 +24,8 @@ import ( "github.com/dgraph-io/dgraph/protos/pb" ) -const jump = 32 // Jump size in InsersectWithJump. +const jump = 32 // Jump size in InsersectWithJump. +const linVsBinRatio = 10 // When is linear search better than binary // ApplyFilter applies a filter to our UIDList. func ApplyFilter(u *pb.List, f func(uint64, int) bool) { @@ -60,7 +61,7 @@ func IntersectCompressedWith(pack *pb.UidPack, afterUID uint64, v, o *pb.List) { // Select appropriate function based on heuristics. ratio := float64(m) / float64(n) - if ratio < 10 { + if ratio < linVsBinRatio { IntersectCompressedWithLinJump(&dec, v.Uids, &dst) } else { IntersectCompressedWithBin(&dec, v.Uids, &dst) @@ -111,7 +112,7 @@ func IntersectCompressedWithBin(dec *codec.Decoder, q []uint64, o *[]uint64) { if len(blockUids) == 0 { break } - if ld*10 < len(q) { + if ld*linVsBinRatio < len(q) { q = q[IntersectWithBin(blockUids, q, o):] } else { // For small enough difference between two arrays, we should just @@ -135,7 +136,7 @@ func IntersectCompressedWithBin(dec *codec.Decoder, q []uint64, o *[]uint64) { } u := q[qidx] if len(uids) == 0 || u > uids[len(uids)-1] { - if lq*10 < ld { + if lq*linVsBinRatio < ld { uids = dec.LinearSeek(u) } else { uids = dec.SeekToBlock(u, codec.SeekCurrent) From 170f37cc28548700ca9db582c6d009942927e88d Mon Sep 17 00:00:00 2001 From: Harshil Goel Date: Wed, 11 Oct 2023 20:23:35 +0530 Subject: [PATCH 09/29] Fixed comments --- algo/uidlist.go | 11 ++++------- codec/codec.go | 16 ++++++++-------- 2 files changed, 12 insertions(+), 15 deletions(-) diff --git a/algo/uidlist.go b/algo/uidlist.go index 5c0b6013743..be1dcf4074e 100644 --- a/algo/uidlist.go +++ b/algo/uidlist.go @@ -112,14 +112,11 @@ func IntersectCompressedWithBin(dec *codec.Decoder, q []uint64, o *[]uint64) { if len(blockUids) == 0 { break } - if ld*linVsBinRatio < len(q) { - q = q[IntersectWithBin(blockUids, q, o):] - } else { - // For small enough difference between two arrays, we should just - // do lin intersect - _, off := IntersectWithLin(blockUids, q, o) - q = q[off:] + _, off := IntersectWithJump(blockUids, q, o) + if off == 0 { + off = 1 // if v[k] isn't in u, move forward } + q = q[off:] if len(q) == 0 { return } diff --git a/codec/codec.go b/codec/codec.go index 36b1b3f1228..60359f474fa 100644 --- a/codec/codec.go +++ b/codec/codec.go @@ -223,9 +223,10 @@ func (d *Decoder) ApproxLen() int { type searchFunc func(int) bool -// SeekToBlock will find the nearest block, and unpack it. Unlike Seek, it doesn't -// apply search in the resulting uid list and then move the pointer forward. When we are going -// to intersect the list later, this function is useful. +// SeekToBlock will find the block containing the uid, and unpack it. When we are going to +// intersect the list later, this function is useful. As this function skips the search function +// and returns the entire block, it is faster than Seek. Unlike seek, we don't truncate the uids +// returned, which would be done by the intersect function anyways. func (d *Decoder) SeekToBlock(uid uint64, whence seekPos) []uint64 { if d.Pack == nil { return []uint64{} @@ -241,25 +242,24 @@ func (d *Decoder) SeekToBlock(uid uint64, whence seekPos) []uint64 { prevBlockIdx = 0 } - pack := d.Pack blocksFunc := func() searchFunc { var f searchFunc switch whence { case SeekStart: - f = func(i int) bool { return pack.Blocks[i+prevBlockIdx].Base >= uid } + f = func(i int) bool { return d.Pack.Blocks[i+prevBlockIdx].Base >= uid } case SeekCurrent: - f = func(i int) bool { return pack.Blocks[i+prevBlockIdx].Base > uid } + f = func(i int) bool { return d.Pack.Blocks[i+prevBlockIdx].Base > uid } } return f } - idx := sort.Search(len(pack.Blocks[prevBlockIdx:]), blocksFunc()) + prevBlockIdx + idx := sort.Search(len(d.Pack.Blocks[prevBlockIdx:]), blocksFunc()) + prevBlockIdx // The first block.Base >= uid. if idx == 0 { return d.UnpackBlock() } // The uid is the first entry in the block. - if idx < len(pack.Blocks) && pack.Blocks[idx].Base == uid { + if idx < len(d.Pack.Blocks) && d.Pack.Blocks[idx].Base == uid { d.blockIdx = idx return d.UnpackBlock() } From f1e65ae75fbf242bd54ae475aab351bd3057dada Mon Sep 17 00:00:00 2001 From: Harshil Goel Date: Thu, 12 Oct 2023 17:27:27 +0530 Subject: [PATCH 10/29] Fixed failing test --- algo/uidlist.go | 3 --- 1 file changed, 3 deletions(-) diff --git a/algo/uidlist.go b/algo/uidlist.go index be1dcf4074e..cb37232f3bd 100644 --- a/algo/uidlist.go +++ b/algo/uidlist.go @@ -113,9 +113,6 @@ func IntersectCompressedWithBin(dec *codec.Decoder, q []uint64, o *[]uint64) { break } _, off := IntersectWithJump(blockUids, q, o) - if off == 0 { - off = 1 // if v[k] isn't in u, move forward - } q = q[off:] if len(q) == 0 { return From f55f273900bbd014ace60fa11c998151628940ae Mon Sep 17 00:00:00 2001 From: Harshil Goel Date: Fri, 11 Aug 2023 09:53:59 +0530 Subject: [PATCH 11/29] added single get call --- posting/list_test.go | 35 ++++++++++++++++++ posting/lists.go | 19 ++++++++++ posting/mvcc.go | 25 +++++++++++++ worker/task.go | 85 +++++++++++++++++++++++++++----------------- 4 files changed, 131 insertions(+), 33 deletions(-) diff --git a/posting/list_test.go b/posting/list_test.go index 5256938e5d1..586e587542f 100644 --- a/posting/list_test.go +++ b/posting/list_test.go @@ -435,6 +435,41 @@ func TestAddMutation_mrjn1(t *testing.T) { require.Equal(t, 0, ol.Length(txn.StartTs, 0)) } +func TestReadSingleValue(t *testing.T) { + defer setMaxListSize(maxListSize) + maxListSize = math.MaxInt32 + + // We call pl.Iterate and then stop iterating in the first loop when we are reading + // single values. This test confirms that the two functions, getFirst from this file + // and GetSingeValueForKey works without an issue. + + key := x.DataKey(x.GalaxyAttr("value"), 1240) + ol, err := getNew(key, ps, math.MaxUint64) + require.NoError(t, err) + N := int(1e2) + for i := 2; i <= N; i += 2 { + edge := &pb.DirectedEdge{ + Value: []byte("ho hey there" + strconv.Itoa(i)), + } + txn := Txn{StartTs: uint64(i)} + addMutationHelper(t, ol, edge, Set, &txn) + require.NoError(t, ol.commitMutation(uint64(i), uint64(i)+1)) + kData := ol.getMutation(uint64(i)) + writer := NewTxnWriter(pstore) + if err := writer.SetAt(key, kData, BitDeltaPosting, uint64(i)); err != nil { + require.NoError(t, err) + } + writer.Flush() + + for j := 3; j < i+6; j++ { + k, err, _ := GetSingleValueForKey(key, uint64(j)) + require.NoError(t, err) + checkValue(t, ol, string(k.Postings[0].Value), uint64(j)) + } + + } +} + func TestRollupMaxTsIsSet(t *testing.T) { defer setMaxListSize(maxListSize) maxListSize = math.MaxInt32 diff --git a/posting/lists.go b/posting/lists.go index 5c70fc5914b..c3e97769b92 100644 --- a/posting/lists.go +++ b/posting/lists.go @@ -195,6 +195,25 @@ func (lc *LocalCache) getInternal(key []byte, readFromDisk bool) (*List, error) return lc.SetIfAbsent(skey, pl), nil } +func (lc *LocalCache) GetSingleItem(key []byte) (*List, error) { + pl, err, _ := GetSingleValueForKey(key, lc.startTs) + if err != nil { + return nil, err + } + + l := new(List) + l.key = key + l.plist = pl + + lc.RLock() + if delta, ok := lc.deltas[string(key)]; ok && len(delta) > 0 { + l.setMutation(lc.startTs, delta) + } + lc.RUnlock() + + return l, nil +} + // Get retrieves the cached version of the list associated with the given key. func (lc *LocalCache) Get(key []byte) (*List, error) { return lc.getInternal(key, true) diff --git a/posting/mvcc.go b/posting/mvcc.go index d228ad10f0d..1cc9ad04f4d 100644 --- a/posting/mvcc.go +++ b/posting/mvcc.go @@ -457,6 +457,31 @@ func ReadPostingList(key []byte, it *badger.Iterator) (*List, error) { return l, nil } +func GetSingleValueForKey(key []byte, readTs uint64) (*pb.PostingList, error, int) { + //fmt.Println("KEY:", key) + txn := pstore.NewTransactionAt(readTs, false) + item, err := txn.Get(key) + if err != nil { + return nil, err, 0 + } + pl := &pb.PostingList{} + k := 0 + + err = item.Value(func(val []byte) error { + k = len(key) + len(val) + if err := pl.Unmarshal(val); err != nil { + return err + } + return nil + }) + + if err != nil { + return nil, err, 0 + } + + return pl, nil, k +} + func getNew(key []byte, pstore *badger.DB, readTs uint64) (*List, error) { cachedVal, ok := lCache.Get(key) if ok { diff --git a/worker/task.go b/worker/task.go index 8c73c58b606..329bb2d5191 100644 --- a/worker/task.go +++ b/worker/task.go @@ -391,49 +391,68 @@ func (qs *queryState) handleValuePostings(ctx context.Context, args funcArgs) er key := x.DataKey(q.Attr, q.UidList.Uids[i]) // Get or create the posting list for an entity, attribute combination. - pl, err := qs.cache.Get(key) - if err != nil { - return err - } + var pl *posting.List + pickMultiplePostings := q.ExpandAll || (listType && len(q.Langs) == 0) + + var vals []types.Val + fcs := &pb.FacetsList{FacetsList: make([]*pb.Facets, 0)} // TODO Figure out how it is stored - // If count is being requested, there is no need to populate value and facets matrix. - if q.DoCount { - count, err := countForValuePostings(args, pl, facetsTree, listType) - if err != nil && err != posting.ErrNoValue { + if pickMultiplePostings { + pl, err = qs.cache.Get(key) + if err != nil { return err } - out.Counts = append(out.Counts, uint32(count)) - // Add an empty UID list to make later processing consistent. - out.UidMatrix = append(out.UidMatrix, &pb.List{}) - continue - } + // If count is being requested, there is no need to populate value and facets matrix. + if q.DoCount { + count, err := countForValuePostings(args, pl, facetsTree, listType) + if err != nil && err != posting.ErrNoValue { + return err + } + out.Counts = append(out.Counts, uint32(count)) + // Add an empty UID list to make later processing consistent. + out.UidMatrix = append(out.UidMatrix, &pb.List{}) + continue + } - vals, fcs, err := retrieveValuesAndFacets(args, pl, facetsTree, listType) - switch { - case err == posting.ErrNoValue || (err == nil && len(vals) == 0): - // This branch is taken when the value does not exist in the pl or - // the number of values retrieved is zero (there could still be facets). - // We add empty lists to the UidMatrix, FaceMatrix, ValueMatrix and - // LangMatrix so that all these data structure have predictable layouts. - out.UidMatrix = append(out.UidMatrix, &pb.List{}) - out.FacetMatrix = append(out.FacetMatrix, &pb.FacetsList{}) - out.ValueMatrix = append(out.ValueMatrix, - &pb.ValueList{Values: []*pb.TaskValue{}}) if q.ExpandAll { - // To keep the cardinality same as that of ValueMatrix. - out.LangMatrix = append(out.LangMatrix, &pb.LangList{}) + langTags, err := pl.GetLangTags(args.q.ReadTs) + if err != nil { + return err + } + out.LangMatrix = append(out.LangMatrix, &pb.LangList{Lang: langTags}) + } + + vals, fcs, err = retrieveValuesAndFacets(args, pl, facetsTree, listType) + switch { + case err == posting.ErrNoValue || (err == nil && len(vals) == 0): + // This branch is taken when the value does not exist in the pl or + // the number of values retrieved is zero (there could still be facets). + // We add empty lists to the UidMatrix, FaceMatrix, ValueMatrix and + // LangMatrix so that all these data structure have predictable layouts. + out.UidMatrix = append(out.UidMatrix, &pb.List{}) + out.FacetMatrix = append(out.FacetMatrix, &pb.FacetsList{}) + out.ValueMatrix = append(out.ValueMatrix, + &pb.ValueList{Values: []*pb.TaskValue{}}) + if q.ExpandAll { + // To keep the cardinality same as that of ValueMatrix. + out.LangMatrix = append(out.LangMatrix, &pb.LangList{}) + } + continue + case err != nil: + return err + } + + } else { + pl, err = qs.cache.GetSingleItem(key) + if err != nil { + return err } - continue - case err != nil: - return err - } - if q.ExpandAll { - langTags, err := pl.GetLangTags(args.q.ReadTs) + val, err := pl.AllValues(q.ReadTs) if err != nil { return err } - out.LangMatrix = append(out.LangMatrix, &pb.LangList{Lang: langTags}) + vals = append(vals, val...) } uidList := new(pb.List) From 8595d65a54834efd0cab769691132caec42c68d4 Mon Sep 17 00:00:00 2001 From: Harshil Goel Date: Wed, 16 Aug 2023 20:19:40 +0530 Subject: [PATCH 12/29] second commit --- worker/task.go | 25 ++++++++++++++++++++----- 1 file changed, 20 insertions(+), 5 deletions(-) diff --git a/worker/task.go b/worker/task.go index 329bb2d5191..16b51d62179 100644 --- a/worker/task.go +++ b/worker/task.go @@ -443,16 +443,31 @@ func (qs *queryState) handleValuePostings(ctx context.Context, args funcArgs) er } } else { - pl, err = qs.cache.GetSingleItem(key) + pl, err, _ := posting.GetSingleValueForKey(key, q.ReadTs) if err != nil { return err } - val, err := pl.AllValues(q.ReadTs) - if err != nil { - return err + for _, p := range pl.Postings { + vals = append(vals, types.Val{ + Tid: types.TypeID(p.ValType), + Value: p.Value, + }) + + if q.FacetParam != nil { + fcs.FacetsList = append(fcs.FacetsList, &pb.Facets{Facets: facets.CopyFacets(p.Facets, q.FacetParam)}) + } } - vals = append(vals, val...) + //pl, err = qs.cache.GetSingleItem(key) + //if err != nil { + // return err + //} + + //val, err := pl.AllValues(q.ReadTs) + //if err != nil { + // return err + //} + //vals = append(vals, val...) } uidList := new(pb.List) From 54e98a92519f6cbf39247bcc172c459b0e7a4b26 Mon Sep 17 00:00:00 2001 From: Harshil Goel Date: Mon, 28 Aug 2023 18:24:05 +0530 Subject: [PATCH 13/29] updated cache --- posting/lists.go | 60 +++++++++++++++++++++++------- posting/mvcc.go | 38 +++++++++++++++++-- worker/task.go | 97 ++++++++++++++++++------------------------------ 3 files changed, 116 insertions(+), 79 deletions(-) diff --git a/posting/lists.go b/posting/lists.go index c3e97769b92..2e4cc2f236c 100644 --- a/posting/lists.go +++ b/posting/lists.go @@ -152,12 +152,13 @@ func (lc *LocalCache) SetIfAbsent(key string, updated *List) *List { return updated } -func (lc *LocalCache) getInternal(key []byte, readFromDisk bool) (*List, error) { +func (lc *LocalCache) getSingleInternal(key []byte, readFromDisk bool) (*List, error) { getNewPlistNil := func() (*List, error) { lc.RLock() defer lc.RUnlock() if lc.plists == nil { - return getNew(key, pstore, lc.startTs) + pl, err, _ := GetSingleValueForKey(key, lc.startTs) + return pl, err } return nil, nil } @@ -172,9 +173,10 @@ func (lc *LocalCache) getInternal(key []byte, readFromDisk bool) (*List, error) } var pl *List + var k int if readFromDisk { var err error - pl, err = getNew(key, pstore, lc.startTs) + pl, err, k = GetSingleValueForKey(key, lc.startTs) if err != nil { return nil, err } @@ -185,6 +187,8 @@ func (lc *LocalCache) getInternal(key []byte, readFromDisk bool) (*List, error) } } + fmt.Println(k) + // If we just brought this posting list into memory and we already have a delta for it, let's // apply it before returning the list. lc.RLock() @@ -192,26 +196,54 @@ func (lc *LocalCache) getInternal(key []byte, readFromDisk bool) (*List, error) pl.setMutation(lc.startTs, delta) } lc.RUnlock() - return lc.SetIfAbsent(skey, pl), nil + return pl, nil } -func (lc *LocalCache) GetSingleItem(key []byte) (*List, error) { - pl, err, _ := GetSingleValueForKey(key, lc.startTs) - if err != nil { - return nil, err +func (lc *LocalCache) getInternal(key []byte, readFromDisk bool) (*List, error) { + getNewPlistNil := func() (*List, error) { + lc.RLock() + defer lc.RUnlock() + if lc.plists == nil { + return getNew(key, pstore, lc.startTs) + } + return nil, nil + } + + if l, err := getNewPlistNil(); l != nil || err != nil { + return l, err } - l := new(List) - l.key = key - l.plist = pl + skey := string(key) + if pl := lc.getNoStore(skey); pl != nil { + return pl, nil + } + var pl *List + if readFromDisk { + var err error + pl, err = getNew(key, pstore, lc.startTs) + if err != nil { + return nil, err + } + } else { + pl = &List{ + key: key, + plist: new(pb.PostingList), + } + } + + // If we just brought this posting list into memory and we already have a delta for it, let's + // apply it before returning the list. lc.RLock() - if delta, ok := lc.deltas[string(key)]; ok && len(delta) > 0 { - l.setMutation(lc.startTs, delta) + if delta, ok := lc.deltas[skey]; ok && len(delta) > 0 { + pl.setMutation(lc.startTs, delta) } lc.RUnlock() + return lc.SetIfAbsent(skey, pl), nil +} - return l, nil +func (lc *LocalCache) GetSingle(key []byte) (*List, error) { + return lc.getSingleInternal(key, true) } // Get retrieves the cached version of the list associated with the given key. diff --git a/posting/mvcc.go b/posting/mvcc.go index 1cc9ad04f4d..61cbb334f65 100644 --- a/posting/mvcc.go +++ b/posting/mvcc.go @@ -457,19 +457,49 @@ func ReadPostingList(key []byte, it *badger.Iterator) (*List, error) { return l, nil } -func GetSingleValueForKey(key []byte, readTs uint64) (*pb.PostingList, error, int) { +func GetSingleValueForKey(key []byte, readTs uint64) (*List, error, int) { + cachedVal, ok := lCache.Get(key) + if ok { + l, ok := cachedVal.(*List) + if ok && l != nil { + // No need to clone the immutable layer or the key since mutations will not modify it. + lCopy := &List{ + minTs: l.minTs, + maxTs: l.maxTs, + key: key, + plist: l.plist, + } + l.RLock() + if l.mutationMap != nil { + lCopy.mutationMap = make(map[uint64]*pb.PostingList, len(l.mutationMap)) + for ts, pl := range l.mutationMap { + lCopy.mutationMap[ts] = proto.Clone(pl).(*pb.PostingList) + } + } + l.RUnlock() + return lCopy, nil, 0 + } + } + + if pstore.IsClosed() { + return nil, badger.ErrDBClosed, 0 + } + + l := new(List) + l.key = key + l.plist = new(pb.PostingList) + //fmt.Println("KEY:", key) txn := pstore.NewTransactionAt(readTs, false) item, err := txn.Get(key) if err != nil { return nil, err, 0 } - pl := &pb.PostingList{} k := 0 err = item.Value(func(val []byte) error { k = len(key) + len(val) - if err := pl.Unmarshal(val); err != nil { + if err := l.plist.Unmarshal(val); err != nil { return err } return nil @@ -479,7 +509,7 @@ func GetSingleValueForKey(key []byte, readTs uint64) (*pb.PostingList, error, in return nil, err, 0 } - return pl, nil, k + return l, nil, k } func getNew(key []byte, pstore *badger.DB, readTs uint64) (*List, error) { diff --git a/worker/task.go b/worker/task.go index 16b51d62179..133232b96c1 100644 --- a/worker/task.go +++ b/worker/task.go @@ -399,75 +399,50 @@ func (qs *queryState) handleValuePostings(ctx context.Context, args funcArgs) er if pickMultiplePostings { pl, err = qs.cache.Get(key) - if err != nil { - return err - } - // If count is being requested, there is no need to populate value and facets matrix. - if q.DoCount { - count, err := countForValuePostings(args, pl, facetsTree, listType) - if err != nil && err != posting.ErrNoValue { - return err - } - out.Counts = append(out.Counts, uint32(count)) - // Add an empty UID list to make later processing consistent. - out.UidMatrix = append(out.UidMatrix, &pb.List{}) - continue - } - - if q.ExpandAll { - langTags, err := pl.GetLangTags(args.q.ReadTs) - if err != nil { - return err - } - out.LangMatrix = append(out.LangMatrix, &pb.LangList{Lang: langTags}) - } - - vals, fcs, err = retrieveValuesAndFacets(args, pl, facetsTree, listType) - switch { - case err == posting.ErrNoValue || (err == nil && len(vals) == 0): - // This branch is taken when the value does not exist in the pl or - // the number of values retrieved is zero (there could still be facets). - // We add empty lists to the UidMatrix, FaceMatrix, ValueMatrix and - // LangMatrix so that all these data structure have predictable layouts. - out.UidMatrix = append(out.UidMatrix, &pb.List{}) - out.FacetMatrix = append(out.FacetMatrix, &pb.FacetsList{}) - out.ValueMatrix = append(out.ValueMatrix, - &pb.ValueList{Values: []*pb.TaskValue{}}) - if q.ExpandAll { - // To keep the cardinality same as that of ValueMatrix. - out.LangMatrix = append(out.LangMatrix, &pb.LangList{}) - } - continue - case err != nil: + } else { + pl, err = qs.cache.GetSingle(key) + } + if err != nil { + return err + } + // If count is being requested, there is no need to populate value and facets matrix. + if q.DoCount { + count, err := countForValuePostings(args, pl, facetsTree, listType) + if err != nil && err != posting.ErrNoValue { return err } + out.Counts = append(out.Counts, uint32(count)) + // Add an empty UID list to make later processing consistent. + out.UidMatrix = append(out.UidMatrix, &pb.List{}) + continue + } - } else { - pl, err, _ := posting.GetSingleValueForKey(key, q.ReadTs) + if q.ExpandAll { + langTags, err := pl.GetLangTags(args.q.ReadTs) if err != nil { return err } + out.LangMatrix = append(out.LangMatrix, &pb.LangList{Lang: langTags}) + } - for _, p := range pl.Postings { - vals = append(vals, types.Val{ - Tid: types.TypeID(p.ValType), - Value: p.Value, - }) - - if q.FacetParam != nil { - fcs.FacetsList = append(fcs.FacetsList, &pb.Facets{Facets: facets.CopyFacets(p.Facets, q.FacetParam)}) - } + vals, fcs, err = retrieveValuesAndFacets(args, pl, facetsTree, listType) + switch { + case err == posting.ErrNoValue || (err == nil && len(vals) == 0): + // This branch is taken when the value does not exist in the pl or + // the number of values retrieved is zero (there could still be facets). + // We add empty lists to the UidMatrix, FaceMatrix, ValueMatrix and + // LangMatrix so that all these data structure have predictable layouts. + out.UidMatrix = append(out.UidMatrix, &pb.List{}) + out.FacetMatrix = append(out.FacetMatrix, &pb.FacetsList{}) + out.ValueMatrix = append(out.ValueMatrix, + &pb.ValueList{Values: []*pb.TaskValue{}}) + if q.ExpandAll { + // To keep the cardinality same as that of ValueMatrix. + out.LangMatrix = append(out.LangMatrix, &pb.LangList{}) } - //pl, err = qs.cache.GetSingleItem(key) - //if err != nil { - // return err - //} - - //val, err := pl.AllValues(q.ReadTs) - //if err != nil { - // return err - //} - //vals = append(vals, val...) + continue + case err != nil: + return err } uidList := new(pb.List) From b1b90f041aced93c1747ec31c27e56c4619d860a Mon Sep 17 00:00:00 2001 From: Harshil Goel Date: Tue, 29 Aug 2023 03:32:59 +0530 Subject: [PATCH 14/29] updated comments --- posting/list_test.go | 7 +++++-- posting/lists.go | 3 ++- posting/mvcc.go | 40 +++++++++++++++++++++++++++++++--------- worker/task.go | 19 +++++++++++++++---- 4 files changed, 53 insertions(+), 16 deletions(-) diff --git a/posting/list_test.go b/posting/list_test.go index 586e587542f..40ff62cab98 100644 --- a/posting/list_test.go +++ b/posting/list_test.go @@ -18,6 +18,7 @@ package posting import ( "context" + "fmt" "math" "math/rand" "os" @@ -461,10 +462,12 @@ func TestReadSingleValue(t *testing.T) { } writer.Flush() - for j := 3; j < i+6; j++ { + for j := 2; j < i+6; j++ { k, err, _ := GetSingleValueForKey(key, uint64(j)) require.NoError(t, err) - checkValue(t, ol, string(k.Postings[0].Value), uint64(j)) + p := getFirst(t, k, uint64(j)) + fmt.Println("Here", p) + checkValue(t, ol, string(p.Value), uint64(j)) } } diff --git a/posting/lists.go b/posting/lists.go index 2e4cc2f236c..09fc82b19f7 100644 --- a/posting/lists.go +++ b/posting/lists.go @@ -178,7 +178,7 @@ func (lc *LocalCache) getSingleInternal(key []byte, readFromDisk bool) (*List, e var err error pl, err, k = GetSingleValueForKey(key, lc.startTs) if err != nil { - return nil, err + return pl, err } } else { pl = &List{ @@ -196,6 +196,7 @@ func (lc *LocalCache) getSingleInternal(key []byte, readFromDisk bool) (*List, e pl.setMutation(lc.startTs, delta) } lc.RUnlock() + fmt.Println("Here6") return pl, nil } diff --git a/posting/mvcc.go b/posting/mvcc.go index 61cbb334f65..ce0d31e3411 100644 --- a/posting/mvcc.go +++ b/posting/mvcc.go @@ -493,20 +493,42 @@ func GetSingleValueForKey(key []byte, readTs uint64) (*List, error, int) { txn := pstore.NewTransactionAt(readTs, false) item, err := txn.Get(key) if err != nil { - return nil, err, 0 + return l, err, 0 } k := 0 - err = item.Value(func(val []byte) error { - k = len(key) + len(val) - if err := l.plist.Unmarshal(val); err != nil { - return err + l.maxTs = x.Max(l.maxTs, item.Version()) + + switch item.UserMeta() { + case BitEmptyPosting: + l.minTs = item.Version() + case BitCompletePosting: + if err := unmarshalOrCopy(l.plist, item); err != nil { + return l, nil, k } - return nil - }) + l.minTs = item.Version() - if err != nil { - return nil, err, 0 + case BitDeltaPosting: + err := item.Value(func(val []byte) error { + pl := &pb.PostingList{} + if err := pl.Unmarshal(val); err != nil { + return err + } + pl.CommitTs = item.Version() + for _, mpost := range pl.Postings { + // commitTs, startTs are meant to be only in memory, not + // stored on disk. + mpost.CommitTs = item.Version() + } + if l.mutationMap == nil { + l.mutationMap = make(map[uint64]*pb.PostingList) + } + l.mutationMap[pl.CommitTs] = pl + return nil + }) + if err != nil { + return l, nil, k + } } return l, nil, k diff --git a/worker/task.go b/worker/task.go index 133232b96c1..dd93282f8f0 100644 --- a/worker/task.go +++ b/worker/task.go @@ -392,15 +392,26 @@ func (qs *queryState) handleValuePostings(ctx context.Context, args funcArgs) er // Get or create the posting list for an entity, attribute combination. var pl *posting.List - pickMultiplePostings := q.ExpandAll || (listType && len(q.Langs) == 0) + pickMultiplePostings := q.DoCount || q.ExpandAll || listType || len(q.Langs) > 0 var vals []types.Val fcs := &pb.FacetsList{FacetsList: make([]*pb.Facets, 0)} // TODO Figure out how it is stored - if pickMultiplePostings { - pl, err = qs.cache.Get(key) + if !pickMultiplePostings { + //fmt.Println("HERE GETTING SINGLE KEY", key) + //vals, fcs, err = retrieveValuesAndFacets(args, pl, facetsTree, listType) + pl, _ = qs.cache.GetSingle(key) + + //vals1, _, _ := retrieveValuesAndFacets(args, pl1, facetsTree, listType) + //fmt.Println("Here getting key", len(vals), vals, len(vals1), vals1, len(vals) != len(vals1)) + //if len(vals) != len(vals1) { + // fmt.Println("HERE") + //} + //vals, fcs, err = retrieveValuesAndFacets(args, pl, facetsTree, listType) + //fmt.Println("Here getting full key", len(vals), len(fcs.FacetsList), vals[0]) } else { - pl, err = qs.cache.GetSingle(key) + + pl, err = qs.cache.Get(key) } if err != nil { return err From 734b16c82c37458b56b6bd0e29f2b27fab0b71f0 Mon Sep 17 00:00:00 2001 From: Harshil Goel Date: Tue, 29 Aug 2023 04:40:08 +0530 Subject: [PATCH 15/29] fixed bug --- posting/lists.go | 7 ++----- 1 file changed, 2 insertions(+), 5 deletions(-) diff --git a/posting/lists.go b/posting/lists.go index 09fc82b19f7..50041b02c44 100644 --- a/posting/lists.go +++ b/posting/lists.go @@ -174,12 +174,9 @@ func (lc *LocalCache) getSingleInternal(key []byte, readFromDisk bool) (*List, e var pl *List var k int + var err error if readFromDisk { - var err error pl, err, k = GetSingleValueForKey(key, lc.startTs) - if err != nil { - return pl, err - } } else { pl = &List{ key: key, @@ -197,7 +194,7 @@ func (lc *LocalCache) getSingleInternal(key []byte, readFromDisk bool) (*List, e } lc.RUnlock() fmt.Println("Here6") - return pl, nil + return pl, err } func (lc *LocalCache) getInternal(key []byte, readFromDisk bool) (*List, error) { From efdfa9dfa38685740e326d4e66c829956d5d3946 Mon Sep 17 00:00:00 2001 From: Harshil Goel Date: Sat, 2 Sep 2023 03:45:49 +0530 Subject: [PATCH 16/29] trying potential optimization --- posting/lists.go | 33 ++++++++++++++++ worker/task.go | 97 ++++++++++++++++++++++++------------------------ 2 files changed, 82 insertions(+), 48 deletions(-) diff --git a/posting/lists.go b/posting/lists.go index 50041b02c44..c92414d3e19 100644 --- a/posting/lists.go +++ b/posting/lists.go @@ -240,6 +240,39 @@ func (lc *LocalCache) getInternal(key []byte, readFromDisk bool) (*List, error) return lc.SetIfAbsent(skey, pl), nil } +func (lc *LocalCache) GetSinglePosting(key []byte) (*pb.PostingList, error) { + pl := &pb.PostingList{} + lc.RLock() + if delta, ok := lc.deltas[string(key)]; ok && len(delta) > 0 { + err := pl.Unmarshal(delta) + lc.RUnlock() + if err != nil { + return pl, nil + } + } else { + lc.RUnlock() + } + + txn := pstore.NewTransactionAt(lc.startTs, false) + item, err := txn.Get(key) + if err != nil { + return pl, err + } + + err = item.Value(func(val []byte) error { + if err := pl.Unmarshal(val); err != nil { + return err + } + return nil + }) + + if err != nil { + return pl, err + } + + return pl, nil +} + func (lc *LocalCache) GetSingle(key []byte) (*List, error) { return lc.getSingleInternal(key, true) } diff --git a/worker/task.go b/worker/task.go index dd93282f8f0..99ce53f7f13 100644 --- a/worker/task.go +++ b/worker/task.go @@ -391,69 +391,70 @@ func (qs *queryState) handleValuePostings(ctx context.Context, args funcArgs) er key := x.DataKey(q.Attr, q.UidList.Uids[i]) // Get or create the posting list for an entity, attribute combination. - var pl *posting.List pickMultiplePostings := q.DoCount || q.ExpandAll || listType || len(q.Langs) > 0 var vals []types.Val fcs := &pb.FacetsList{FacetsList: make([]*pb.Facets, 0)} // TODO Figure out how it is stored if !pickMultiplePostings { - //fmt.Println("HERE GETTING SINGLE KEY", key) - //vals, fcs, err = retrieveValuesAndFacets(args, pl, facetsTree, listType) - pl, _ = qs.cache.GetSingle(key) - - //vals1, _, _ := retrieveValuesAndFacets(args, pl1, facetsTree, listType) - //fmt.Println("Here getting key", len(vals), vals, len(vals1), vals1, len(vals) != len(vals1)) - //if len(vals) != len(vals1) { - // fmt.Println("HERE") - //} - //vals, fcs, err = retrieveValuesAndFacets(args, pl, facetsTree, listType) - //fmt.Println("Here getting full key", len(vals), len(fcs.FacetsList), vals[0]) - } else { + pl, _ := qs.cache.GetSinglePosting(key) + for _, p := range pl.Postings { + vals = append(vals, types.Val{ + Tid: types.TypeID(p.ValType), + Value: p.Value, + }) + + if q.FacetParam != nil { + fcs.FacetsList = append(fcs.FacetsList, &pb.Facets{Facets: facets.CopyFacets(p.Facets, q.FacetParam)}) + } + } - pl, err = qs.cache.Get(key) - } - if err != nil { - return err - } - // If count is being requested, there is no need to populate value and facets matrix. - if q.DoCount { - count, err := countForValuePostings(args, pl, facetsTree, listType) - if err != nil && err != posting.ErrNoValue { + } else { + pl, err := qs.cache.Get(key) + if err != nil { return err } - out.Counts = append(out.Counts, uint32(count)) - // Add an empty UID list to make later processing consistent. - out.UidMatrix = append(out.UidMatrix, &pb.List{}) - continue - } - if q.ExpandAll { - langTags, err := pl.GetLangTags(args.q.ReadTs) - if err != nil { + // If count is being requested, there is no need to populate value and facets matrix. + if q.DoCount { + count, err := countForValuePostings(args, pl, facetsTree, listType) + if err != nil && err != posting.ErrNoValue { + return err + } + out.Counts = append(out.Counts, uint32(count)) + // Add an empty UID list to make later processing consistent. + out.UidMatrix = append(out.UidMatrix, &pb.List{}) + continue + } + + vals, fcs, err = retrieveValuesAndFacets(args, pl, facetsTree, listType) + + switch { + case err == posting.ErrNoValue || (err == nil && len(vals) == 0): + // This branch is taken when the value does not exist in the pl or + // the number of values retrieved is zero (there could still be facets). + // We add empty lists to the UidMatrix, FaceMatrix, ValueMatrix and + // LangMatrix so that all these data structure have predictable layouts. + out.UidMatrix = append(out.UidMatrix, &pb.List{}) + out.FacetMatrix = append(out.FacetMatrix, &pb.FacetsList{}) + out.ValueMatrix = append(out.ValueMatrix, + &pb.ValueList{Values: []*pb.TaskValue{}}) + if q.ExpandAll { + // To keep the cardinality same as that of ValueMatrix. + out.LangMatrix = append(out.LangMatrix, &pb.LangList{}) + } + continue + case err != nil: return err } - out.LangMatrix = append(out.LangMatrix, &pb.LangList{Lang: langTags}) - } - vals, fcs, err = retrieveValuesAndFacets(args, pl, facetsTree, listType) - switch { - case err == posting.ErrNoValue || (err == nil && len(vals) == 0): - // This branch is taken when the value does not exist in the pl or - // the number of values retrieved is zero (there could still be facets). - // We add empty lists to the UidMatrix, FaceMatrix, ValueMatrix and - // LangMatrix so that all these data structure have predictable layouts. - out.UidMatrix = append(out.UidMatrix, &pb.List{}) - out.FacetMatrix = append(out.FacetMatrix, &pb.FacetsList{}) - out.ValueMatrix = append(out.ValueMatrix, - &pb.ValueList{Values: []*pb.TaskValue{}}) if q.ExpandAll { - // To keep the cardinality same as that of ValueMatrix. - out.LangMatrix = append(out.LangMatrix, &pb.LangList{}) + langTags, err := pl.GetLangTags(args.q.ReadTs) + if err != nil { + return err + } + out.LangMatrix = append(out.LangMatrix, &pb.LangList{Lang: langTags}) } - continue - case err != nil: - return err } uidList := new(pb.List) From 9cbee9031c9fb3024ebd035254686683fd3c968b Mon Sep 17 00:00:00 2001 From: Harshil Goel Date: Tue, 5 Sep 2023 06:10:32 +0530 Subject: [PATCH 17/29] Added one more optimization --- posting/lists.go | 13 ++++++++++++- worker/task.go | 11 ++++++----- 2 files changed, 18 insertions(+), 6 deletions(-) diff --git a/posting/lists.go b/posting/lists.go index c92414d3e19..987547f88dc 100644 --- a/posting/lists.go +++ b/posting/lists.go @@ -202,7 +202,18 @@ func (lc *LocalCache) getInternal(key []byte, readFromDisk bool) (*List, error) lc.RLock() defer lc.RUnlock() if lc.plists == nil { - return getNew(key, pstore, lc.startTs) + if readFromDisk { + return getNew(key, pstore, lc.startTs) + } else { + pl := &List{ + key: key, + plist: new(pb.PostingList), + } + if delta, ok := lc.deltas[string(key)]; ok && len(delta) > 0 { + pl.setMutation(lc.startTs, delta) + } + return pl, nil + } } return nil, nil } diff --git a/worker/task.go b/worker/task.go index 99ce53f7f13..02087e4c30e 100644 --- a/worker/task.go +++ b/worker/task.go @@ -376,6 +376,8 @@ func (qs *queryState) handleValuePostings(ctx context.Context, args funcArgs) er outputs := make([]*pb.Result, numGo) listType := schema.State().IsList(q.Attr) + pickMultiplePostings := q.DoCount || q.ExpandAll || listType || len(q.Langs) > 0 + //pickMultiplePostings := true calculate := func(start, end int) error { x.AssertTrue(start%width == 0) @@ -391,24 +393,23 @@ func (qs *queryState) handleValuePostings(ctx context.Context, args funcArgs) er key := x.DataKey(q.Attr, q.UidList.Uids[i]) // Get or create the posting list for an entity, attribute combination. - pickMultiplePostings := q.DoCount || q.ExpandAll || listType || len(q.Langs) > 0 var vals []types.Val fcs := &pb.FacetsList{FacetsList: make([]*pb.Facets, 0)} // TODO Figure out how it is stored if !pickMultiplePostings { pl, _ := qs.cache.GetSinglePosting(key) - for _, p := range pl.Postings { - vals = append(vals, types.Val{ + vals = make([]types.Val, len(pl.Postings)) + for i, p := range pl.Postings { + vals[i] = types.Val{ Tid: types.TypeID(p.ValType), Value: p.Value, - }) + } if q.FacetParam != nil { fcs.FacetsList = append(fcs.FacetsList, &pb.Facets{Facets: facets.CopyFacets(p.Facets, q.FacetParam)}) } } - } else { pl, err := qs.cache.Get(key) if err != nil { From 2f46efaa5de35ef182bc8172561ff8e38a44780e Mon Sep 17 00:00:00 2001 From: Harshil Goel Date: Tue, 5 Sep 2023 09:01:19 +0530 Subject: [PATCH 18/29] Fixed one bug --- posting/list.go | 4 ++++ posting/lists.go | 12 ++++++++++++ worker/task.go | 16 +++++++++++++++- 3 files changed, 31 insertions(+), 1 deletion(-) diff --git a/posting/list.go b/posting/list.go index cba042d7126..4977f538aca 100644 --- a/posting/list.go +++ b/posting/list.go @@ -321,6 +321,10 @@ func NewPosting(t *pb.DirectedEdge) *pb.Posting { return p } +func HasDeleteAll(mpost *pb.Posting) bool { + return hasDeleteAll(mpost) +} + func hasDeleteAll(mpost *pb.Posting) bool { return mpost.Op == Del && bytes.Equal(mpost.Value, []byte(x.Star)) && len(mpost.LangTag) == 0 } diff --git a/posting/lists.go b/posting/lists.go index 987547f88dc..2daa07f1ea6 100644 --- a/posting/lists.go +++ b/posting/lists.go @@ -253,11 +253,20 @@ func (lc *LocalCache) getInternal(key []byte, readFromDisk bool) (*List, error) func (lc *LocalCache) GetSinglePosting(key []byte) (*pb.PostingList, error) { pl := &pb.PostingList{} + validatePl := func() { + for _, postings := range pl.Postings { + if hasDeleteAll(postings) { + pl = nil + return + } + } + } lc.RLock() if delta, ok := lc.deltas[string(key)]; ok && len(delta) > 0 { err := pl.Unmarshal(delta) lc.RUnlock() if err != nil { + validatePl() return pl, nil } } else { @@ -267,6 +276,7 @@ func (lc *LocalCache) GetSinglePosting(key []byte) (*pb.PostingList, error) { txn := pstore.NewTransactionAt(lc.startTs, false) item, err := txn.Get(key) if err != nil { + validatePl() return pl, err } @@ -278,9 +288,11 @@ func (lc *LocalCache) GetSinglePosting(key []byte) (*pb.PostingList, error) { }) if err != nil { + validatePl() return pl, err } + validatePl() return pl, nil } diff --git a/worker/task.go b/worker/task.go index 02087e4c30e..be996a330ea 100644 --- a/worker/task.go +++ b/worker/task.go @@ -398,7 +398,21 @@ func (qs *queryState) handleValuePostings(ctx context.Context, args funcArgs) er fcs := &pb.FacetsList{FacetsList: make([]*pb.Facets, 0)} // TODO Figure out how it is stored if !pickMultiplePostings { - pl, _ := qs.cache.GetSinglePosting(key) + pl, err := qs.cache.GetSinglePosting(key) + if pl == nil || err == posting.ErrNoValue { + out.UidMatrix = append(out.UidMatrix, &pb.List{}) + out.FacetMatrix = append(out.FacetMatrix, &pb.FacetsList{}) + out.ValueMatrix = append(out.ValueMatrix, + &pb.ValueList{Values: []*pb.TaskValue{}}) + if q.ExpandAll { + // To keep the cardinality same as that of ValueMatrix. + out.LangMatrix = append(out.LangMatrix, &pb.LangList{}) + } + continue + } + if err != nil { + return err + } vals = make([]types.Val, len(pl.Postings)) for i, p := range pl.Postings { vals[i] = types.Val{ From 2ec917c8dabc5645c823d7a03484d203a0498775 Mon Sep 17 00:00:00 2001 From: Harshil Goel Date: Tue, 5 Sep 2023 17:10:27 +0530 Subject: [PATCH 19/29] Fixed another small bug --- worker/task.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/worker/task.go b/worker/task.go index be996a330ea..dbb2a480066 100644 --- a/worker/task.go +++ b/worker/task.go @@ -399,7 +399,7 @@ func (qs *queryState) handleValuePostings(ctx context.Context, args funcArgs) er if !pickMultiplePostings { pl, err := qs.cache.GetSinglePosting(key) - if pl == nil || err == posting.ErrNoValue { + if pl == nil || err == posting.ErrNoValue || err == badger.ErrKeyNotFound { out.UidMatrix = append(out.UidMatrix, &pb.List{}) out.FacetMatrix = append(out.FacetMatrix, &pb.FacetsList{}) out.ValueMatrix = append(out.ValueMatrix, From c6fe0f2c7e1dc533c501b8ec6c99909780fe6f31 Mon Sep 17 00:00:00 2001 From: Harshil Goel Date: Tue, 5 Sep 2023 19:08:30 +0530 Subject: [PATCH 20/29] fixed another test --- worker/task.go | 7 ++----- 1 file changed, 2 insertions(+), 5 deletions(-) diff --git a/worker/task.go b/worker/task.go index dbb2a480066..0c44dbf07c5 100644 --- a/worker/task.go +++ b/worker/task.go @@ -398,8 +398,8 @@ func (qs *queryState) handleValuePostings(ctx context.Context, args funcArgs) er fcs := &pb.FacetsList{FacetsList: make([]*pb.Facets, 0)} // TODO Figure out how it is stored if !pickMultiplePostings { - pl, err := qs.cache.GetSinglePosting(key) - if pl == nil || err == posting.ErrNoValue || err == badger.ErrKeyNotFound { + pl, _ := qs.cache.GetSinglePosting(key) + if pl == nil { out.UidMatrix = append(out.UidMatrix, &pb.List{}) out.FacetMatrix = append(out.FacetMatrix, &pb.FacetsList{}) out.ValueMatrix = append(out.ValueMatrix, @@ -410,9 +410,6 @@ func (qs *queryState) handleValuePostings(ctx context.Context, args funcArgs) er } continue } - if err != nil { - return err - } vals = make([]types.Val, len(pl.Postings)) for i, p := range pl.Postings { vals[i] = types.Val{ From 6bfc6db41f2370c53b8c1b6a0278a18f15473d8c Mon Sep 17 00:00:00 2001 From: Harshil Goel Date: Wed, 6 Sep 2023 07:36:29 +0530 Subject: [PATCH 21/29] fixed bug related to lang --- worker/task.go | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/worker/task.go b/worker/task.go index 0c44dbf07c5..80706062b5d 100644 --- a/worker/task.go +++ b/worker/task.go @@ -376,7 +376,8 @@ func (qs *queryState) handleValuePostings(ctx context.Context, args funcArgs) er outputs := make([]*pb.Result, numGo) listType := schema.State().IsList(q.Attr) - pickMultiplePostings := q.DoCount || q.ExpandAll || listType || len(q.Langs) > 0 + hasLang := schema.State().HasLang(q.Attr) + getMultiplePosting := q.DoCount || q.ExpandAll || listType || hasLang //pickMultiplePostings := true calculate := func(start, end int) error { @@ -397,7 +398,7 @@ func (qs *queryState) handleValuePostings(ctx context.Context, args funcArgs) er var vals []types.Val fcs := &pb.FacetsList{FacetsList: make([]*pb.Facets, 0)} // TODO Figure out how it is stored - if !pickMultiplePostings { + if !getMultiplePosting { pl, _ := qs.cache.GetSinglePosting(key) if pl == nil { out.UidMatrix = append(out.UidMatrix, &pb.List{}) From 673a21deeb8057fa2bfc2a6037833eefd557c67b Mon Sep 17 00:00:00 2001 From: Harshil Goel Date: Wed, 6 Sep 2023 09:32:17 +0530 Subject: [PATCH 22/29] Fixed another bug --- posting/lists.go | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/posting/lists.go b/posting/lists.go index 2daa07f1ea6..190db4516a1 100644 --- a/posting/lists.go +++ b/posting/lists.go @@ -254,12 +254,18 @@ func (lc *LocalCache) getInternal(key []byte, readFromDisk bool) (*List, error) func (lc *LocalCache) GetSinglePosting(key []byte) (*pb.PostingList, error) { pl := &pb.PostingList{} validatePl := func() { + i := 0 for _, postings := range pl.Postings { if hasDeleteAll(postings) { pl = nil return } + if postings.Op != Del { + pl.Postings[i] = postings + i++ + } } + pl.Postings = pl.Postings[:i] } lc.RLock() if delta, ok := lc.deltas[string(key)]; ok && len(delta) > 0 { From 1b2d755a1c62c7bf28cbbe1175853d551fceee0e Mon Sep 17 00:00:00 2001 From: Harshil Goel Date: Wed, 6 Sep 2023 12:58:09 +0530 Subject: [PATCH 23/29] Fixed another test --- worker/task.go | 33 ++++++++++++++++----------------- 1 file changed, 16 insertions(+), 17 deletions(-) diff --git a/worker/task.go b/worker/task.go index 80706062b5d..1a29016c2dc 100644 --- a/worker/task.go +++ b/worker/task.go @@ -378,7 +378,7 @@ func (qs *queryState) handleValuePostings(ctx context.Context, args funcArgs) er listType := schema.State().IsList(q.Attr) hasLang := schema.State().HasLang(q.Attr) getMultiplePosting := q.DoCount || q.ExpandAll || listType || hasLang - //pickMultiplePostings := true + //getMultiplePosting := true calculate := func(start, end int) error { x.AssertTrue(start%width == 0) @@ -400,28 +400,27 @@ func (qs *queryState) handleValuePostings(ctx context.Context, args funcArgs) er if !getMultiplePosting { pl, _ := qs.cache.GetSinglePosting(key) - if pl == nil { + if pl != nil { + vals = make([]types.Val, len(pl.Postings)) + for i, p := range pl.Postings { + vals[i] = types.Val{ + Tid: types.TypeID(p.ValType), + Value: p.Value, + } + + // TODO Apply facet tree before + if q.FacetParam != nil { + fcs.FacetsList = append(fcs.FacetsList, &pb.Facets{Facets: facets.CopyFacets(p.Facets, q.FacetParam)}) + } + } + } + if pl == nil || len(vals) == 0 { out.UidMatrix = append(out.UidMatrix, &pb.List{}) out.FacetMatrix = append(out.FacetMatrix, &pb.FacetsList{}) out.ValueMatrix = append(out.ValueMatrix, &pb.ValueList{Values: []*pb.TaskValue{}}) - if q.ExpandAll { - // To keep the cardinality same as that of ValueMatrix. - out.LangMatrix = append(out.LangMatrix, &pb.LangList{}) - } continue } - vals = make([]types.Val, len(pl.Postings)) - for i, p := range pl.Postings { - vals[i] = types.Val{ - Tid: types.TypeID(p.ValType), - Value: p.Value, - } - - if q.FacetParam != nil { - fcs.FacetsList = append(fcs.FacetsList, &pb.Facets{Facets: facets.CopyFacets(p.Facets, q.FacetParam)}) - } - } } else { pl, err := qs.cache.Get(key) if err != nil { From 7410b8bac9160869eec3329f61f1ac3202e9bf18 Mon Sep 17 00:00:00 2001 From: Harshil Goel Date: Wed, 6 Sep 2023 14:39:26 +0530 Subject: [PATCH 24/29] minor refactor --- posting/list.go | 4 ---- posting/lists.go | 8 ++------ posting/mvcc.go | 16 +++++++--------- worker/task.go | 28 +++++++++++++--------------- 4 files changed, 22 insertions(+), 34 deletions(-) diff --git a/posting/list.go b/posting/list.go index 4977f538aca..cba042d7126 100644 --- a/posting/list.go +++ b/posting/list.go @@ -321,10 +321,6 @@ func NewPosting(t *pb.DirectedEdge) *pb.Posting { return p } -func HasDeleteAll(mpost *pb.Posting) bool { - return hasDeleteAll(mpost) -} - func hasDeleteAll(mpost *pb.Posting) bool { return mpost.Op == Del && bytes.Equal(mpost.Value, []byte(x.Star)) && len(mpost.LangTag) == 0 } diff --git a/posting/lists.go b/posting/lists.go index 190db4516a1..ba41331c081 100644 --- a/posting/lists.go +++ b/posting/lists.go @@ -157,7 +157,7 @@ func (lc *LocalCache) getSingleInternal(key []byte, readFromDisk bool) (*List, e lc.RLock() defer lc.RUnlock() if lc.plists == nil { - pl, err, _ := GetSingleValueForKey(key, lc.startTs) + pl, err := GetSingleValueForKey(key, lc.startTs) return pl, err } return nil, nil @@ -173,10 +173,9 @@ func (lc *LocalCache) getSingleInternal(key []byte, readFromDisk bool) (*List, e } var pl *List - var k int var err error if readFromDisk { - pl, err, k = GetSingleValueForKey(key, lc.startTs) + pl, err = GetSingleValueForKey(key, lc.startTs) } else { pl = &List{ key: key, @@ -184,8 +183,6 @@ func (lc *LocalCache) getSingleInternal(key []byte, readFromDisk bool) (*List, e } } - fmt.Println(k) - // If we just brought this posting list into memory and we already have a delta for it, let's // apply it before returning the list. lc.RLock() @@ -193,7 +190,6 @@ func (lc *LocalCache) getSingleInternal(key []byte, readFromDisk bool) (*List, e pl.setMutation(lc.startTs, delta) } lc.RUnlock() - fmt.Println("Here6") return pl, err } diff --git a/posting/mvcc.go b/posting/mvcc.go index ce0d31e3411..e55fc77a527 100644 --- a/posting/mvcc.go +++ b/posting/mvcc.go @@ -457,7 +457,7 @@ func ReadPostingList(key []byte, it *badger.Iterator) (*List, error) { return l, nil } -func GetSingleValueForKey(key []byte, readTs uint64) (*List, error, int) { +func GetSingleValueForKey(key []byte, readTs uint64) (*List, error) { cachedVal, ok := lCache.Get(key) if ok { l, ok := cachedVal.(*List) @@ -477,25 +477,23 @@ func GetSingleValueForKey(key []byte, readTs uint64) (*List, error, int) { } } l.RUnlock() - return lCopy, nil, 0 + return lCopy, nil } } if pstore.IsClosed() { - return nil, badger.ErrDBClosed, 0 + return nil, badger.ErrDBClosed } l := new(List) l.key = key l.plist = new(pb.PostingList) - //fmt.Println("KEY:", key) txn := pstore.NewTransactionAt(readTs, false) item, err := txn.Get(key) if err != nil { - return l, err, 0 + return l, err } - k := 0 l.maxTs = x.Max(l.maxTs, item.Version()) @@ -504,7 +502,7 @@ func GetSingleValueForKey(key []byte, readTs uint64) (*List, error, int) { l.minTs = item.Version() case BitCompletePosting: if err := unmarshalOrCopy(l.plist, item); err != nil { - return l, nil, k + return l, nil } l.minTs = item.Version() @@ -527,11 +525,11 @@ func GetSingleValueForKey(key []byte, readTs uint64) (*List, error, int) { return nil }) if err != nil { - return l, nil, k + return l, nil } } - return l, nil, k + return l, nil } func getNew(key []byte, pstore *badger.DB, readTs uint64) (*List, error) { diff --git a/worker/task.go b/worker/task.go index 1a29016c2dc..94d8dffb73f 100644 --- a/worker/task.go +++ b/worker/task.go @@ -400,27 +400,25 @@ func (qs *queryState) handleValuePostings(ctx context.Context, args funcArgs) er if !getMultiplePosting { pl, _ := qs.cache.GetSinglePosting(key) - if pl != nil { - vals = make([]types.Val, len(pl.Postings)) - for i, p := range pl.Postings { - vals[i] = types.Val{ - Tid: types.TypeID(p.ValType), - Value: p.Value, - } - - // TODO Apply facet tree before - if q.FacetParam != nil { - fcs.FacetsList = append(fcs.FacetsList, &pb.Facets{Facets: facets.CopyFacets(p.Facets, q.FacetParam)}) - } - } - } - if pl == nil || len(vals) == 0 { + if pl == nil || len(pl.Postings) == 0 { out.UidMatrix = append(out.UidMatrix, &pb.List{}) out.FacetMatrix = append(out.FacetMatrix, &pb.FacetsList{}) out.ValueMatrix = append(out.ValueMatrix, &pb.ValueList{Values: []*pb.TaskValue{}}) continue } + vals = make([]types.Val, len(pl.Postings)) + for i, p := range pl.Postings { + vals[i] = types.Val{ + Tid: types.TypeID(p.ValType), + Value: p.Value, + } + + // TODO Apply facet tree before + if q.FacetParam != nil { + fcs.FacetsList = append(fcs.FacetsList, &pb.Facets{Facets: facets.CopyFacets(p.Facets, q.FacetParam)}) + } + } } else { pl, err := qs.cache.Get(key) if err != nil { From 734a6ea383be81f2028312711eed26292d6e215d Mon Sep 17 00:00:00 2001 From: Harshil Goel Date: Wed, 6 Sep 2023 16:11:26 +0530 Subject: [PATCH 25/29] Updated test --- posting/list_test.go | 21 ++++++++++++++++----- 1 file changed, 16 insertions(+), 5 deletions(-) diff --git a/posting/list_test.go b/posting/list_test.go index 40ff62cab98..4f588c7e1d7 100644 --- a/posting/list_test.go +++ b/posting/list_test.go @@ -18,7 +18,6 @@ package posting import ( "context" - "fmt" "math" "math/rand" "os" @@ -447,7 +446,7 @@ func TestReadSingleValue(t *testing.T) { key := x.DataKey(x.GalaxyAttr("value"), 1240) ol, err := getNew(key, ps, math.MaxUint64) require.NoError(t, err) - N := int(1e2) + N := int(10000) for i := 2; i <= N; i += 2 { edge := &pb.DirectedEdge{ Value: []byte("ho hey there" + strconv.Itoa(i)), @@ -462,11 +461,23 @@ func TestReadSingleValue(t *testing.T) { } writer.Flush() - for j := 2; j < i+6; j++ { - k, err, _ := GetSingleValueForKey(key, uint64(j)) + if i%10 == 0 { + // Do frequent rollups, and store data in old timestamp + kvs, err := ol.Rollup(nil, txn.StartTs-3) + require.NoError(t, err) + require.NoError(t, writePostingListToDisk(kvs)) + ol, err = getNew(key, ps, math.MaxUint64) + require.NoError(t, err) + } + + j := 2 + if j < int(ol.minTs) { + j = int(ol.minTs) + } + for ; j < i+6; j++ { + k, err := GetSingleValueForKey(key, uint64(j)) require.NoError(t, err) p := getFirst(t, k, uint64(j)) - fmt.Println("Here", p) checkValue(t, ol, string(p.Value), uint64(j)) } From 3b32245c81e2cdd2b88d7cc8d5078915d517383a Mon Sep 17 00:00:00 2001 From: Harshil Goel Date: Thu, 7 Sep 2023 11:18:47 +0530 Subject: [PATCH 26/29] minor refactor --- posting/list_test.go | 6 +- posting/lists.go | 128 +++++++++++++------------------------------ posting/mvcc.go | 75 ------------------------- 3 files changed, 40 insertions(+), 169 deletions(-) diff --git a/posting/list_test.go b/posting/list_test.go index 4f588c7e1d7..9911dc60b45 100644 --- a/posting/list_test.go +++ b/posting/list_test.go @@ -475,10 +475,10 @@ func TestReadSingleValue(t *testing.T) { j = int(ol.minTs) } for ; j < i+6; j++ { - k, err := GetSingleValueForKey(key, uint64(j)) + tx := NewTxn(uint64(j)) + k, err := tx.cache.GetSinglePosting(key) require.NoError(t, err) - p := getFirst(t, k, uint64(j)) - checkValue(t, ol, string(p.Value), uint64(j)) + checkValue(t, ol, string(k.Postings[0].Value), uint64(j)) } } diff --git a/posting/lists.go b/posting/lists.go index ba41331c081..3d569766d7d 100644 --- a/posting/lists.go +++ b/posting/lists.go @@ -152,64 +152,12 @@ func (lc *LocalCache) SetIfAbsent(key string, updated *List) *List { return updated } -func (lc *LocalCache) getSingleInternal(key []byte, readFromDisk bool) (*List, error) { - getNewPlistNil := func() (*List, error) { - lc.RLock() - defer lc.RUnlock() - if lc.plists == nil { - pl, err := GetSingleValueForKey(key, lc.startTs) - return pl, err - } - return nil, nil - } - - if l, err := getNewPlistNil(); l != nil || err != nil { - return l, err - } - - skey := string(key) - if pl := lc.getNoStore(skey); pl != nil { - return pl, nil - } - - var pl *List - var err error - if readFromDisk { - pl, err = GetSingleValueForKey(key, lc.startTs) - } else { - pl = &List{ - key: key, - plist: new(pb.PostingList), - } - } - - // If we just brought this posting list into memory and we already have a delta for it, let's - // apply it before returning the list. - lc.RLock() - if delta, ok := lc.deltas[skey]; ok && len(delta) > 0 { - pl.setMutation(lc.startTs, delta) - } - lc.RUnlock() - return pl, err -} - func (lc *LocalCache) getInternal(key []byte, readFromDisk bool) (*List, error) { getNewPlistNil := func() (*List, error) { lc.RLock() defer lc.RUnlock() if lc.plists == nil { - if readFromDisk { - return getNew(key, pstore, lc.startTs) - } else { - pl := &List{ - key: key, - plist: new(pb.PostingList), - } - if delta, ok := lc.deltas[string(key)]; ok && len(delta) > 0 { - pl.setMutation(lc.startTs, delta) - } - return pl, nil - } + return getNew(key, pstore, lc.startTs) } return nil, nil } @@ -248,60 +196,58 @@ func (lc *LocalCache) getInternal(key []byte, readFromDisk bool) (*List, error) } func (lc *LocalCache) GetSinglePosting(key []byte) (*pb.PostingList, error) { - pl := &pb.PostingList{} - validatePl := func() { - i := 0 - for _, postings := range pl.Postings { - if hasDeleteAll(postings) { - pl = nil - return - } - if postings.Op != Del { - pl.Postings[i] = postings - i++ + + getPostings := func() (*pb.PostingList, error) { + pl := &pb.PostingList{} + lc.RLock() + if delta, ok := lc.deltas[string(key)]; ok && len(delta) > 0 { + err := pl.Unmarshal(delta) + if err != nil { + lc.RUnlock() + return pl, nil } } - pl.Postings = pl.Postings[:i] - } - lc.RLock() - if delta, ok := lc.deltas[string(key)]; ok && len(delta) > 0 { - err := pl.Unmarshal(delta) lc.RUnlock() + + txn := pstore.NewTransactionAt(lc.startTs, false) + item, err := txn.Get(key) if err != nil { - validatePl() - return pl, nil + return pl, err } - } else { - lc.RUnlock() - } - txn := pstore.NewTransactionAt(lc.startTs, false) - item, err := txn.Get(key) - if err != nil { - validatePl() + err = item.Value(func(val []byte) error { + if err := pl.Unmarshal(val); err != nil { + return err + } + return nil + }) + return pl, err } - err = item.Value(func(val []byte) error { - if err := pl.Unmarshal(val); err != nil { - return err - } - return nil - }) - + pl, err := getPostings() + if err == badger.ErrKeyNotFound { + err = nil + } if err != nil { - validatePl() return pl, err } - validatePl() + // Filter and remove STAR_ALL and OP_DELETE Postings + idx := 0 + for _, postings := range pl.Postings { + if hasDeleteAll(postings) { + return nil, nil + } + if postings.Op != Del { + pl.Postings[idx] = postings + idx++ + } + } + pl.Postings = pl.Postings[:idx] return pl, nil } -func (lc *LocalCache) GetSingle(key []byte) (*List, error) { - return lc.getSingleInternal(key, true) -} - // Get retrieves the cached version of the list associated with the given key. func (lc *LocalCache) Get(key []byte) (*List, error) { return lc.getInternal(key, true) diff --git a/posting/mvcc.go b/posting/mvcc.go index e55fc77a527..d228ad10f0d 100644 --- a/posting/mvcc.go +++ b/posting/mvcc.go @@ -457,81 +457,6 @@ func ReadPostingList(key []byte, it *badger.Iterator) (*List, error) { return l, nil } -func GetSingleValueForKey(key []byte, readTs uint64) (*List, error) { - cachedVal, ok := lCache.Get(key) - if ok { - l, ok := cachedVal.(*List) - if ok && l != nil { - // No need to clone the immutable layer or the key since mutations will not modify it. - lCopy := &List{ - minTs: l.minTs, - maxTs: l.maxTs, - key: key, - plist: l.plist, - } - l.RLock() - if l.mutationMap != nil { - lCopy.mutationMap = make(map[uint64]*pb.PostingList, len(l.mutationMap)) - for ts, pl := range l.mutationMap { - lCopy.mutationMap[ts] = proto.Clone(pl).(*pb.PostingList) - } - } - l.RUnlock() - return lCopy, nil - } - } - - if pstore.IsClosed() { - return nil, badger.ErrDBClosed - } - - l := new(List) - l.key = key - l.plist = new(pb.PostingList) - - txn := pstore.NewTransactionAt(readTs, false) - item, err := txn.Get(key) - if err != nil { - return l, err - } - - l.maxTs = x.Max(l.maxTs, item.Version()) - - switch item.UserMeta() { - case BitEmptyPosting: - l.minTs = item.Version() - case BitCompletePosting: - if err := unmarshalOrCopy(l.plist, item); err != nil { - return l, nil - } - l.minTs = item.Version() - - case BitDeltaPosting: - err := item.Value(func(val []byte) error { - pl := &pb.PostingList{} - if err := pl.Unmarshal(val); err != nil { - return err - } - pl.CommitTs = item.Version() - for _, mpost := range pl.Postings { - // commitTs, startTs are meant to be only in memory, not - // stored on disk. - mpost.CommitTs = item.Version() - } - if l.mutationMap == nil { - l.mutationMap = make(map[uint64]*pb.PostingList) - } - l.mutationMap[pl.CommitTs] = pl - return nil - }) - if err != nil { - return l, nil - } - } - - return l, nil -} - func getNew(key []byte, pstore *badger.DB, readTs uint64) (*List, error) { cachedVal, ok := lCache.Get(key) if ok { From e893711c9fd98788862b664f106f01ff5f9f493f Mon Sep 17 00:00:00 2001 From: Harshil Goel Date: Thu, 12 Oct 2023 17:50:01 +0530 Subject: [PATCH 27/29] Fixed comments --- posting/list_test.go | 1 - posting/lists.go | 27 +++++++++++++++++++-------- worker/task.go | 19 +++++++++++-------- 3 files changed, 30 insertions(+), 17 deletions(-) diff --git a/posting/list_test.go b/posting/list_test.go index 9911dc60b45..eefa2662848 100644 --- a/posting/list_test.go +++ b/posting/list_test.go @@ -480,7 +480,6 @@ func TestReadSingleValue(t *testing.T) { require.NoError(t, err) checkValue(t, ol, string(k.Postings[0].Value), uint64(j)) } - } } diff --git a/posting/lists.go b/posting/lists.go index 3d569766d7d..af5fefbaa37 100644 --- a/posting/lists.go +++ b/posting/lists.go @@ -195,24 +195,35 @@ func (lc *LocalCache) getInternal(key []byte, readFromDisk bool) (*List, error) return lc.SetIfAbsent(skey, pl), nil } +// GetSinglePosting retrieves the cached version of the first item in the list associated with the +// given key. This is used for retrieving the value of a scalar predicats. func (lc *LocalCache) GetSinglePosting(key []byte) (*pb.PostingList, error) { + getDeltas := func() *pb.PostingList { + lc.RLock() + defer lc.RUnlock() - getPostings := func() (*pb.PostingList, error) { pl := &pb.PostingList{} - lc.RLock() if delta, ok := lc.deltas[string(key)]; ok && len(delta) > 0 { err := pl.Unmarshal(delta) if err != nil { - lc.RUnlock() - return pl, nil + return pl } } - lc.RUnlock() + return nil + } + + getPostings := func() (*pb.PostingList, error) { + pl := getDeltas() + if pl != nil { + return pl, nil + } + + pl = &pb.PostingList{} txn := pstore.NewTransactionAt(lc.startTs, false) item, err := txn.Get(key) if err != nil { - return pl, err + return nil, err } err = item.Value(func(val []byte) error { @@ -227,10 +238,10 @@ func (lc *LocalCache) GetSinglePosting(key []byte) (*pb.PostingList, error) { pl, err := getPostings() if err == badger.ErrKeyNotFound { - err = nil + return nil, nil } if err != nil { - return pl, err + return nil, err } // Filter and remove STAR_ALL and OP_DELETE Postings diff --git a/worker/task.go b/worker/task.go index 94d8dffb73f..6ea7a5a4b2b 100644 --- a/worker/task.go +++ b/worker/task.go @@ -376,9 +376,14 @@ func (qs *queryState) handleValuePostings(ctx context.Context, args funcArgs) er outputs := make([]*pb.Result, numGo) listType := schema.State().IsList(q.Attr) + + // These are certain special cases where we can get away with reading only the latest value + // Lang doesn't work because we would be storing various different languages at various + // time. So when we go to read the latest value, we might get a different language. + // Similarly with DoCount and ExpandAll and Facets. List types are also not supported + // because list is stored by time, and we combine all the list items at various timestamps. hasLang := schema.State().HasLang(q.Attr) - getMultiplePosting := q.DoCount || q.ExpandAll || listType || hasLang - //getMultiplePosting := true + getMultiplePosting := q.DoCount || q.ExpandAll || listType || hasLang || q.FacetParam != nil calculate := func(start, end int) error { x.AssertTrue(start%width == 0) @@ -399,7 +404,10 @@ func (qs *queryState) handleValuePostings(ctx context.Context, args funcArgs) er fcs := &pb.FacetsList{FacetsList: make([]*pb.Facets, 0)} // TODO Figure out how it is stored if !getMultiplePosting { - pl, _ := qs.cache.GetSinglePosting(key) + pl, err := qs.cache.GetSinglePosting(key) + if err != nil { + return err + } if pl == nil || len(pl.Postings) == 0 { out.UidMatrix = append(out.UidMatrix, &pb.List{}) out.FacetMatrix = append(out.FacetMatrix, &pb.FacetsList{}) @@ -413,11 +421,6 @@ func (qs *queryState) handleValuePostings(ctx context.Context, args funcArgs) er Tid: types.TypeID(p.ValType), Value: p.Value, } - - // TODO Apply facet tree before - if q.FacetParam != nil { - fcs.FacetsList = append(fcs.FacetsList, &pb.Facets{Facets: facets.CopyFacets(p.Facets, q.FacetParam)}) - } } } else { pl, err := qs.cache.Get(key) From c1bd8393625476183e037f9a5089dd0c8281ecaf Mon Sep 17 00:00:00 2001 From: Harshil Goel Date: Thu, 7 Sep 2023 11:52:11 +0530 Subject: [PATCH 28/29] first commit --- algo/uidlist_test.go | 1 + codec/codec.go | 1 + go.mod | 6 ++++- go.sum | 12 ++++++---- posting/lists.go | 55 ++++++++++++++++++++++++++++++++++++++++++++ worker/sort.go | 30 +----------------------- worker/task.go | 18 ++++++++++++--- 7 files changed, 86 insertions(+), 37 deletions(-) diff --git a/algo/uidlist_test.go b/algo/uidlist_test.go index 50fddb30b38..cd6dc194ace 100644 --- a/algo/uidlist_test.go +++ b/algo/uidlist_test.go @@ -370,6 +370,7 @@ func BenchmarkListIntersectRandom(b *testing.B) { func BenchmarkListIntersectCompressBin(b *testing.B) { randomTests := func(sz int, overlap float64) { rs := []float64{0.01, 0.1, 1, 10, 100} + //rs := []float64{0.002, 0.01, 0.1} for _, r := range rs { sz1 := sz sz2 := int(float64(sz) * r) diff --git a/codec/codec.go b/codec/codec.go index 60359f474fa..b12a7d45c50 100644 --- a/codec/codec.go +++ b/codec/codec.go @@ -173,6 +173,7 @@ func (d *Decoder) UnpackBlock() []uint64 { if d.blockIdx >= len(d.Pack.Blocks) { return d.uids } + block := d.Pack.Blocks[d.blockIdx] last := block.Base diff --git a/go.mod b/go.mod index 634c273b634..45c7cf969f2 100644 --- a/go.mod +++ b/go.mod @@ -2,6 +2,8 @@ module github.com/dgraph-io/dgraph go 1.19 +replace github.com/dgraph-io/badger/v4 => ../badger + require ( contrib.go.opencensus.io/exporter/jaeger v0.1.0 contrib.go.opencensus.io/exporter/prometheus v0.1.0 @@ -40,7 +42,7 @@ require ( github.com/mitchellh/panicwrap v1.0.0 github.com/paulmach/go.geojson v0.0.0-20170327170536-40612a87147b github.com/pkg/errors v0.9.1 - github.com/pkg/profile v1.2.1 + github.com/pkg/profile v1.7.0 github.com/prometheus/client_golang v1.14.0 github.com/soheilhy/cmux v0.1.4 github.com/spf13/cast v1.3.0 @@ -85,10 +87,12 @@ require ( github.com/eapache/go-resiliency v1.4.0 // indirect github.com/eapache/go-xerial-snappy v0.0.0-20230731223053-c322873962e3 // indirect github.com/eapache/queue v1.1.0 // indirect + github.com/felixge/fgprof v0.9.3 // indirect github.com/frankban/quicktest v1.10.2 // indirect github.com/fsnotify/fsnotify v1.6.0 // indirect github.com/golang/groupcache v0.0.0-20210331224755-41bb18bfe9da // indirect github.com/google/flatbuffers v1.12.1 // indirect + github.com/google/pprof v0.0.0-20211214055906-6f57359322fd // indirect github.com/hashicorp/errwrap v1.1.0 // indirect github.com/hashicorp/go-cleanhttp v0.5.2 // indirect github.com/hashicorp/go-multierror v1.1.1 // indirect diff --git a/go.sum b/go.sum index 84b63f1b1ad..95283418f7e 100644 --- a/go.sum +++ b/go.sum @@ -145,8 +145,6 @@ github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSs github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/dgraph-io/badger v1.6.0/go.mod h1:zwt7syl517jmP8s94KqSxTlM6IMsdhYy6psNgSztDR4= -github.com/dgraph-io/badger/v4 v4.2.0 h1:kJrlajbXXL9DFTNuhhu9yCx7JJa4qpYWxtE8BzuWsEs= -github.com/dgraph-io/badger/v4 v4.2.0/go.mod h1:qfCqhPoWDFJRx1gp5QwwyGo8xk1lbHUxvK9nK0OGAak= github.com/dgraph-io/dgo/v230 v230.0.1 h1:kR7gI7/ZZv0jtG6dnedNgNOCxe1cbSG8ekF+pNfReks= github.com/dgraph-io/dgo/v230 v230.0.1/go.mod h1:5FerO2h4LPOxR2XTkOAtqUUPaFdQ+5aBOHXPBJ3nT10= github.com/dgraph-io/gqlgen v0.13.2 h1:TNhndk+eHKj5qE7BenKKSYdSIdOGhLqxR1rCiMso9KM= @@ -199,6 +197,8 @@ github.com/etcd-io/bbolt v1.3.3/go.mod h1:ZF2nL25h33cCyBtcyWeZ2/I3HQOfTP+0PIEvHj github.com/fasthttp-contrib/websocket v0.0.0-20160511215533-1f3b11f56072/go.mod h1:duJ4Jxv5lDcvg4QuQr0oowTf7dz4/CR8NtyCooz9HL8= github.com/fatih/color v1.7.0/go.mod h1:Zm6kSWBoL9eyXnKyktHP6abPY2pDugNf5KwzbycvMj4= github.com/fatih/structs v1.1.0/go.mod h1:9NiDSp5zOcgEDl+j00MP/WkGVPOlPRLejGD8Ga6PJ7M= +github.com/felixge/fgprof v0.9.3 h1:VvyZxILNuCiUCSXtPtYmmtGvb65nqXh2QFWc0Wpf2/g= +github.com/felixge/fgprof v0.9.3/go.mod h1:RdbpDgzqYVh/T9fPELJyV7EYJuHB55UTEULNun8eiPw= github.com/flosch/pongo2 v0.0.0-20190707114632-bbf5a6c351f4/go.mod h1:T9YF2M40nIgbVgp3rreNmTged+9HrbNTIQf1PsaIiTA= github.com/fogleman/gg v1.2.1-0.20190220221249-0403632d5b90/go.mod h1:R/bRT+9gY/C5z7JzPU0zXsXHKM4/ayA+zqcVNZzPa1k= github.com/fortytw2/leaktest v1.3.0 h1:u8491cBMTQ8ft8aeV+adlcytMZylmA5nnwwkRZjI8vw= @@ -316,6 +316,8 @@ github.com/google/pprof v0.0.0-20200708004538-1a94d8640e99/go.mod h1:ZgVRPoUq/hf github.com/google/pprof v0.0.0-20201023163331-3e6fc7fc9c4c/go.mod h1:kpwsk12EmLew5upagYY7GY0pfYCcupk39gWOCRROcvE= github.com/google/pprof v0.0.0-20201203190320-1bf35d6f28c2/go.mod h1:kpwsk12EmLew5upagYY7GY0pfYCcupk39gWOCRROcvE= github.com/google/pprof v0.0.0-20201218002935-b9804c9f04c2/go.mod h1:kpwsk12EmLew5upagYY7GY0pfYCcupk39gWOCRROcvE= +github.com/google/pprof v0.0.0-20211214055906-6f57359322fd h1:1FjCyPC+syAzJ5/2S8fqdZK1R22vvA0J7JZKcuOIQ7Y= +github.com/google/pprof v0.0.0-20211214055906-6f57359322fd/go.mod h1:KgnwoLYCZ8IQu3XUZ8Nc/bM9CCZFOyjUNOSygVozoDg= github.com/google/renameio v0.1.0/go.mod h1:KWCgfxg9yswjAJkECMjeO8J8rahYeXnNhOm40UhjYkI= github.com/google/uuid v1.1.2/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= github.com/google/uuid v1.3.0 h1:t6JiXgmwXMjEs8VusXIJk2BXHsn+wx8BZdTaoZ5fu7I= @@ -394,6 +396,7 @@ github.com/hashicorp/yamux v0.0.0-20181012175058-2f1d1f20f75d/go.mod h1:+NfK9FKe github.com/hpcloud/tail v1.0.0/go.mod h1:ab1qPbhIpdTxEkNHXyeSf5vhxWSCs/tWer42PpOxQnU= github.com/ianlancetaylor/demangle v0.0.0-20181102032728-5e5cf60278f6/go.mod h1:aSSvb/t6k1mPoxDqO4vJh6VOCGPwU4O0C2/Eqndh1Sc= github.com/ianlancetaylor/demangle v0.0.0-20200824232613-28f6c0f3b639/go.mod h1:aSSvb/t6k1mPoxDqO4vJh6VOCGPwU4O0C2/Eqndh1Sc= +github.com/ianlancetaylor/demangle v0.0.0-20210905161508-09a460cdf81d/go.mod h1:aYm2/VgdVmcIU8iMfdMvDMsRAQjcfZSKFby6HOFvi/w= github.com/imkira/go-interpol v1.1.0/go.mod h1:z0h2/2T3XF8kyEPpRgJ3kmNv+C43p+I/CoI+jC3w2iA= github.com/inconshreveable/mousetrap v1.0.0/go.mod h1:PxqpIevigyE2G7u3NXJIT2ANytuPF1OarO4DADm73n8= github.com/inconshreveable/mousetrap v1.1.0 h1:wN+x4NVGpMsO7ErUn/mUI3vEoE6Jt13X2s0bqwp9tc8= @@ -557,8 +560,8 @@ github.com/pkg/errors v0.8.0/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINE github.com/pkg/errors v0.8.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4= github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= -github.com/pkg/profile v1.2.1 h1:F++O52m40owAmADcojzM+9gyjmMOY/T4oYJkgFDH8RE= -github.com/pkg/profile v1.2.1/go.mod h1:hJw3o1OdXxsrSjjVksARp5W95eeEaEfptyVZyv6JUPA= +github.com/pkg/profile v1.7.0 h1:hnbDkaNWPCLMO9wGLdBFTIZvzDrDfBM2072E1S9gJkA= +github.com/pkg/profile v1.7.0/go.mod h1:8Uer0jas47ZQMJ7VD+OHknK4YDY07LPUC6dEvqDjvNo= github.com/pkg/sftp v1.13.1/go.mod h1:3HaPG6Dq1ILlpPZRO0HVMrsydcdLt6HRDccSgb87qRg= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= @@ -899,6 +902,7 @@ golang.org/x/sys v0.0.0-20210225134936-a50acf3fe073/go.mod h1:h1NjWce9XRLGQEsW7w golang.org/x/sys v0.0.0-20210423082822-04245dca01da/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20210423185535-09eb48e85fd7/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20210615035016-665e8c7367d1/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.0.0-20211007075335-d3039528d8ac/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20220520151302-bc2c85ada10a/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20220722155257-8c9f86f7a55f/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20220908164124-27713097b956/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= diff --git a/posting/lists.go b/posting/lists.go index af5fefbaa37..f95d8c95c23 100644 --- a/posting/lists.go +++ b/posting/lists.go @@ -195,6 +195,61 @@ func (lc *LocalCache) getInternal(key []byte, readFromDisk bool) (*List, error) return lc.SetIfAbsent(skey, pl), nil } +func (lc *LocalCache) GetBatchSinglePosting(keys [][]byte) ([]*pb.PostingList, error) { + results := make([]*pb.PostingList, len(keys)) + remaining_keys := make([][]byte, 0) + lc.RLock() + for i, key := range keys { + pl := &pb.PostingList{} + if delta, ok := lc.deltas[string(key)]; ok && len(delta) > 0 { + err := pl.Unmarshal(delta) + if err != nil { + results[i] = pl + } + } else { + remaining_keys = append(remaining_keys, key) + } + } + lc.RUnlock() + + txn := pstore.NewTransactionAt(lc.startTs, false) + items, err := txn.GetBatch(remaining_keys) + idx := 0 + + for i := 0; i < len(results); i++ { + if results[i] != nil { + continue + } + pl := &pb.PostingList{} + err = items[idx].Value(func(val []byte) error { + if err := pl.Unmarshal(val); err != nil { + return err + } + return nil + }) + idx += 1 + results[i] = pl + } + + for i := 0; i < len(results); i++ { + pl := results[i] + idx := 0 + for _, postings := range pl.Postings { + if hasDeleteAll(postings) { + return nil, nil + } + if postings.Op != Del { + pl.Postings[idx] = postings + idx++ + } + } + pl.Postings = pl.Postings[:idx] + results[i] = pl + } + + return results, err +} + // GetSinglePosting retrieves the cached version of the first item in the list associated with the // given key. This is used for retrieving the value of a scalar predicats. func (lc *LocalCache) GetSinglePosting(key []byte) (*pb.PostingList, error) { diff --git a/worker/sort.go b/worker/sort.go index a1867dd0d13..558c4e134d5 100644 --- a/worker/sort.go +++ b/worker/sort.go @@ -21,7 +21,6 @@ import ( "encoding/hex" "sort" "strings" - "time" "github.com/golang/glog" "github.com/pkg/errors" @@ -512,34 +511,7 @@ func processSort(ctx context.Context, ts *pb.SortMessage) (*pb.SortResult, error cctx, cancel := context.WithCancel(ctx) defer cancel() - resCh := make(chan *sortresult, 2) - go func() { - select { - case <-time.After(3 * time.Millisecond): - // Wait between ctx chan and time chan. - case <-ctx.Done(): - resCh <- &sortresult{err: ctx.Err()} - return - } - r := sortWithoutIndex(cctx, ts) - resCh <- r - }() - - go func() { - sr := sortWithIndex(cctx, ts) - resCh <- sr - }() - - r := <-resCh - if r.err == nil { - cancel() - // wait for other goroutine to get cancelled - <-resCh - } else { - span.Annotatef(nil, "processSort error: %v", r.err) - r = <-resCh - } - + r := sortWithIndex(cctx, ts) if r.err != nil { return nil, r.err } diff --git a/worker/task.go b/worker/task.go index 6ea7a5a4b2b..445628fb17b 100644 --- a/worker/task.go +++ b/worker/task.go @@ -390,6 +390,7 @@ func (qs *queryState) handleValuePostings(ctx context.Context, args funcArgs) er out := &pb.Result{} outputs[start/width] = out + cache := make([]*pb.PostingList, 0) for i := start; i < end; i++ { select { case <-ctx.Done(): @@ -404,9 +405,20 @@ func (qs *queryState) handleValuePostings(ctx context.Context, args funcArgs) er fcs := &pb.FacetsList{FacetsList: make([]*pb.Facets, 0)} // TODO Figure out how it is stored if !getMultiplePosting { - pl, err := qs.cache.GetSinglePosting(key) - if err != nil { - return err + if len(cache) == 0 { + keys := make([][]byte, 10) + keys[0] = key + for j := i + 1; j < i+10 && j < end; j++ { + keys[j-i] = x.DataKey(q.Attr, q.UidList.Uids[j]) + } + cache, err = qs.cache.GetBatchSinglePosting(keys) + if err != nil { + return err + } + } + pl := cache[0] + if len(cache) > 1 { + cache = cache[1:] } if pl == nil || len(pl.Postings) == 0 { out.UidMatrix = append(out.UidMatrix, &pb.List{}) From 7ba5300fe9ed16fe183ee245fe8c403377bd7503 Mon Sep 17 00:00:00 2001 From: Harshil Goel Date: Sun, 1 Oct 2023 18:54:25 +0530 Subject: [PATCH 29/29] Tried to add posting list cache for single values --- posting/lists.go | 34 +++++++++++++++++++++++++++++++--- worker/sort.go | 30 +++++++++++++++++++++++++++++- 2 files changed, 60 insertions(+), 4 deletions(-) diff --git a/posting/lists.go b/posting/lists.go index f95d8c95c23..eea54425efc 100644 --- a/posting/lists.go +++ b/posting/lists.go @@ -111,6 +111,8 @@ type LocalCache struct { // plists are posting lists in memory. They can be discarded to reclaim space. plists map[string]*List + + postings map[string]*pb.PostingList } // NewLocalCache returns a new LocalCache instance. @@ -120,13 +122,17 @@ func NewLocalCache(startTs uint64) *LocalCache { deltas: make(map[string][]byte), plists: make(map[string]*List), maxVersions: make(map[string]uint64), + postings: make(map[string]*pb.PostingList), } } // NoCache returns a new LocalCache instance, which won't cache anything. Useful to pass startTs // around. func NoCache(startTs uint64) *LocalCache { - return &LocalCache{startTs: startTs} + return &LocalCache{ + startTs: startTs, + postings: make(map[string]*pb.PostingList), + } } func (lc *LocalCache) getNoStore(key string) *List { @@ -138,6 +144,20 @@ func (lc *LocalCache) getNoStore(key string) *List { return nil } +// SetIfAbsent adds the list for the specified key to the cache. If a list for the same +// key already exists, the cache will not be modified and the existing list +// will be returned instead. This behavior is meant to prevent the goroutines +// using the cache from ending up with an orphaned version of a list. +func (lc *LocalCache) SetPostingIfAbsent(key string, updated *pb.PostingList) *pb.PostingList { + lc.Lock() + defer lc.Unlock() + if pl, ok := lc.postings[key]; ok { + return pl + } + lc.postings[key] = updated + return updated +} + // SetIfAbsent adds the list for the specified key to the cache. If a list for the same // key already exists, the cache will not be modified and the existing list // will be returned instead. This behavior is meant to prevent the goroutines @@ -200,8 +220,10 @@ func (lc *LocalCache) GetBatchSinglePosting(keys [][]byte) ([]*pb.PostingList, e remaining_keys := make([][]byte, 0) lc.RLock() for i, key := range keys { - pl := &pb.PostingList{} - if delta, ok := lc.deltas[string(key)]; ok && len(delta) > 0 { + if pl, ok := lc.postings[string(key)]; ok && pl != nil { + results[i] = pl + } else if delta, ok := lc.deltas[string(key)]; ok && len(delta) > 0 { + pl := &pb.PostingList{} err := pl.Unmarshal(delta) if err != nil { results[i] = pl @@ -214,6 +236,10 @@ func (lc *LocalCache) GetBatchSinglePosting(keys [][]byte) ([]*pb.PostingList, e txn := pstore.NewTransactionAt(lc.startTs, false) items, err := txn.GetBatch(remaining_keys) + if err != nil { + fmt.Println(err, keys) + return nil, err + } idx := 0 for i := 0; i < len(results); i++ { @@ -245,6 +271,7 @@ func (lc *LocalCache) GetBatchSinglePosting(keys [][]byte) ([]*pb.PostingList, e } pl.Postings = pl.Postings[:idx] results[i] = pl + lc.SetPostingIfAbsent(string(keys[i]), pl) } return results, err @@ -311,6 +338,7 @@ func (lc *LocalCache) GetSinglePosting(key []byte) (*pb.PostingList, error) { } } pl.Postings = pl.Postings[:idx] + lc.SetPostingIfAbsent(string(key), pl) return pl, nil } diff --git a/worker/sort.go b/worker/sort.go index 558c4e134d5..a1867dd0d13 100644 --- a/worker/sort.go +++ b/worker/sort.go @@ -21,6 +21,7 @@ import ( "encoding/hex" "sort" "strings" + "time" "github.com/golang/glog" "github.com/pkg/errors" @@ -511,7 +512,34 @@ func processSort(ctx context.Context, ts *pb.SortMessage) (*pb.SortResult, error cctx, cancel := context.WithCancel(ctx) defer cancel() - r := sortWithIndex(cctx, ts) + resCh := make(chan *sortresult, 2) + go func() { + select { + case <-time.After(3 * time.Millisecond): + // Wait between ctx chan and time chan. + case <-ctx.Done(): + resCh <- &sortresult{err: ctx.Err()} + return + } + r := sortWithoutIndex(cctx, ts) + resCh <- r + }() + + go func() { + sr := sortWithIndex(cctx, ts) + resCh <- sr + }() + + r := <-resCh + if r.err == nil { + cancel() + // wait for other goroutine to get cancelled + <-resCh + } else { + span.Annotatef(nil, "processSort error: %v", r.err) + r = <-resCh + } + if r.err != nil { return nil, r.err }