Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Change uid intersection policy in some cases #8919

Open
wants to merge 13 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 9 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
55 changes: 55 additions & 0 deletions algo/uidlist.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,23 @@ func ApplyFilter(u *pb.List, f func(uint64, int) bool) {
u.Uids = out
}

func IntersectCompressedWithAlternate(pack *pb.UidPack, afterUID uint64, v, o *pb.List) {
if pack == nil {
return
}
dec := &codec.Decoder{Pack: pack}
dec.Seek(afterUID, codec.SeekStart)
k := &pb.List{Uids: make([]uint64, 0)}

for ; dec.Valid(); dec.Next() {
for _, uid := range dec.Uids() {
k.Uids = append(k.Uids, uid)
}
}

IntersectWith(k, v, o)
}

// IntersectCompressedWith intersects a packed list of UIDs with another list
// and writes the output to o.
func IntersectCompressedWith(pack *pb.UidPack, afterUID uint64, v, o *pb.List) {
Expand Down Expand Up @@ -136,6 +153,44 @@ func IntersectCompressedWithBin(dec *codec.Decoder, q []uint64, o *[]uint64) {
}
}

func IntersectMap(u map[uint64]bool, afterUid uint64, v, o *pb.List) {
n := len(u)
m := len(v.Uids)

min := n
if m < n {
min = m
}
if o.Uids == nil {
o.Uids = make([]uint64, 0, min)
}

for i := 0; i < m; i++ {
k := v.Uids[i]
if k < afterUid {
continue
}
if _, ok := u[k]; ok {
o.Uids = append(o.Uids, k)
}
}

}

func IntersectWithAfter(u, v, o *pb.List, afterUid uint64) {
IntersectWith(u, v, o)
index := 0
for i, uid := range o.Uids {
if uid >= afterUid {
if i > 0 {
index = i - 1
}
break
}
}
o.Uids = o.Uids[index:]
}

// IntersectWith intersects u with v. The update is made to o.
// u, v should be sorted.
func IntersectWith(u, v, o *pb.List) {
Expand Down
49 changes: 24 additions & 25 deletions algo/uidlist_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -323,16 +323,15 @@ func BenchmarkListIntersectRandom(b *testing.B) {
sort.Slice(u1, func(i, j int) bool { return u1[i] < u1[j] })
sort.Slice(v1, func(i, j int) bool { return v1[i] < v1[j] })

u := newList(u1)
v := newList(v1)
dst1 := &pb.List{}
dst2 := &pb.List{}
compressedUids := codec.Encode(u1, 256)

b.Run(fmt.Sprintf(":size=%d:overlap=%.2f:", arrSz, overlap),
b.Run(fmt.Sprintf(":compressed2:size=%d:overlap=%.2f:", arrSz, overlap),
func(b *testing.B) {
for k := 0; k < b.N; k++ {
IntersectWith(u, v, dst1)
IntersectCompressedWithAlternate(compressedUids, 0, v, dst1)
}
})

Expand Down Expand Up @@ -369,11 +368,11 @@ func BenchmarkListIntersectRandom(b *testing.B) {

func BenchmarkListIntersectRatio(b *testing.B) {
randomTests := func(sz int, overlap float64) {
rs := []int{1, 10, 50, 100, 500, 1000, 10000, 100000, 1000000}
rs := []float64{100, 200, 400, 500, 1000}
for _, r := range rs {
sz1 := sz
sz2 := sz * r
if sz2 > 1000000 {
sz2 := int(float64(sz) * r)
if sz2 > 1000000 || sz2 == 0 {
break
}

Expand All @@ -388,40 +387,40 @@ func BenchmarkListIntersectRatio(b *testing.B) {
sort.Slice(u1, func(i, j int) bool { return u1[i] < u1[j] })
sort.Slice(v1, func(i, j int) bool { return v1[i] < v1[j] })

u := &pb.List{Uids: u1}
//u := &pb.List{Uids: u1}
v := &pb.List{Uids: v1}
dst1 := &pb.List{}
dst2 := &pb.List{}
compressedUids := codec.Encode(v1, 256)
compressedUids := codec.Encode(u1, 256)

fmt.Printf("len: %d, compressed: %d, bytes/int: %f\n",
len(v1), compressedUids.Size(), float64(compressedUids.Size())/float64(len(v1)))
b.Run(fmt.Sprintf(":IntersectWith:ratio=%d:size=%d:overlap=%.2f:", r, sz, overlap),
b.Run(fmt.Sprintf("a:IntersectWith:ratio=%f:size=%d:overlap=%.2f:", r, sz, overlap),
func(b *testing.B) {
for k := 0; k < b.N; k++ {
IntersectWith(u, v, dst1)
IntersectCompressedWithAlternate(compressedUids, 0, v, dst1)
}
})
b.Run(fmt.Sprintf("compressed:IntersectWith:ratio=%d:size=%d:overlap=%.2f:", r, sz, overlap),
b.Run(fmt.Sprintf("compressed:IntersectWith:ratio=%f:size=%d:overlap=%.2f:", r, sz, overlap),
func(b *testing.B) {
for k := 0; k < b.N; k++ {
IntersectCompressedWith(compressedUids, 0, u, dst2)
IntersectCompressedWith(compressedUids, 0, v, dst2)
}
})
fmt.Println()
i := 0
j := 0
for i < len(dst1.Uids) {
if dst1.Uids[i] != dst2.Uids[j] {
b.Errorf("Unexpected error in intersection")
}
// Behaviour of bin intersect is not defined when duplicates are present
i = skipDuplicate(dst1.Uids, i)
j = skipDuplicate(dst2.Uids, j)
}
if j < len(dst2.Uids) {
b.Errorf("Unexpected error in intersection")
}
//i := 0
//j := 0
//for i < len(dst1.Uids) {
// if dst1.Uids[i] != dst2.Uids[j] {
// b.Errorf("Unexpected error in intersection")
// }
// // Behaviour of bin intersect is not defined when duplicates are present
// i = skipDuplicate(dst1.Uids, i)
// j = skipDuplicate(dst2.Uids, j)
//}
//if j < len(dst2.Uids) {
// b.Errorf("Unexpected error in intersection")
//}

codec.FreePack(compressedUids)
}
Expand Down
6 changes: 4 additions & 2 deletions dgraph/cmd/alpha/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -671,8 +671,6 @@ func run() {
os.Exit(1)
}

worker.SetConfiguration(&opts)

ips, err := getIPsFromString(security.GetString("whitelist"))
x.Check(err)

Expand Down Expand Up @@ -701,6 +699,10 @@ func run() {
}
x.WorkerConfig.Parse(Alpha.Conf)

// We need to set configurations now that we have updated TmpDir and other folders
// so that we validate the correct folders.
worker.SetConfiguration(&opts)

if telemetry.GetBool("reports") {
go edgraph.PeriodicallyPostTelemetry()
}
Expand Down
2 changes: 1 addition & 1 deletion posting/index_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ import (
)

func uids(l *List, readTs uint64) []uint64 {
r, err := l.Uids(ListOptions{ReadTs: readTs})
r, err := l.Uids(context.Background(), ListOptions{ReadTs: readTs})
x.Check(err)
return r.Uids
}
Expand Down
33 changes: 31 additions & 2 deletions posting/list.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,8 @@ import (
"github.com/dgraph-io/dgraph/types/facets"
"github.com/dgraph-io/dgraph/x"
"github.com/dgraph-io/ristretto/z"

otrace "go.opencensus.io/trace"
)

var (
Expand Down Expand Up @@ -71,6 +73,7 @@ const (
// List stores the in-memory representation of a posting list.
type List struct {
x.SafeMutex
immutable *pb.List
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you add a comment that this is unpacked posting list?

key []byte
plist *pb.PostingList
mutationMap map[uint64]*pb.PostingList
Expand Down Expand Up @@ -875,6 +878,21 @@ func (l *List) Rollup(alloc *z.Allocator, readTs uint64) ([]*bpb.KV, error) {
return kvs, nil
}

func (l *List) CreateUnpacked() {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks like all callers to this are commented out. Is it dead code?

if l.plist.Pack == nil {
return
}
l.immutable = &pb.List{
Uids: make([]uint64, 0),
}
decoder := codec.NewDecoder(l.plist.Pack)
for ; decoder.Valid(); decoder.Next() {
for _, uid := range decoder.Uids() {
l.immutable.Uids = append(l.immutable.Uids, uid)
}
}
}

// ToBackupPostingList uses rollup to generate a single list with no splits.
// It's used during backup so that each backed up posting list is stored in a single key.
func (l *List) ToBackupPostingList(
Expand Down Expand Up @@ -1058,6 +1076,7 @@ func (l *List) encode(out *rollupOutput, readTs uint64, split bool) error {
return errors.Wrapf(err, "cannot iterate through the list")
}
plist.Pack = enc.Done()
//l.CreateUnpacked()
if plist.Pack != nil {
if plist.Pack.BlockSize != uint32(blockSize) {
return errors.Errorf("actual block size %d is different from expected value %d",
Expand Down Expand Up @@ -1137,7 +1156,9 @@ func (l *List) ApproxLen() int {
// Uids returns the UIDs given some query params.
// We have to apply the filtering before applying (offset, count).
// WARNING: Calling this function just to get UIDs is expensive
func (l *List) Uids(opt ListOptions) (*pb.List, error) {
func (l *List) Uids(ctx context.Context, opt ListOptions) (*pb.List, error) {
ctx, span := otrace.StartSpan(ctx, "Posting.Uids")
defer span.End()
if opt.First == 0 {
opt.First = math.MaxInt32
}
Expand All @@ -1151,11 +1172,18 @@ func (l *List) Uids(opt ListOptions) (*pb.List, error) {
l.RUnlock()
return out, ErrTsTooOld
}
algo.IntersectCompressedWith(l.plist.Pack, opt.AfterUid, opt.Intersect, out)
stop := x.SpanTimer(span, "IntersectCompressedWith")
if len(opt.Intersect.Uids) > 5*codec.ApproxLen(l.plist.Pack) {
algo.IntersectCompressedWithAlternate(l.plist.Pack, opt.AfterUid, opt.Intersect, out)
} else {
algo.IntersectCompressedWith(l.plist.Pack, opt.AfterUid, opt.Intersect, out)
}
stop()
l.RUnlock()
return out, nil
}

span.Annotate(nil, "Starting to read latest data from badger")
err := l.iterate(opt.ReadTs, opt.AfterUid, func(p *pb.Posting) error {
if p.PostingType == pb.Posting_REF {
res = append(res, p.Uid)
Expand All @@ -1178,6 +1206,7 @@ func (l *List) Uids(opt ListOptions) (*pb.List, error) {
}

// Do The intersection here as it's optimized.
span.Annotate(nil, "Starting normal Intersect")
out.Uids = res
if opt.Intersect != nil {
algo.IntersectWith(out, opt.Intersect, out)
Expand Down
18 changes: 9 additions & 9 deletions posting/list_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -500,7 +500,7 @@ func TestMillion(t *testing.T) {

t.Logf("Completed a million writes.\n")
opt := ListOptions{ReadTs: uint64(N) + 1}
l, err := ol.Uids(opt)
l, err := ol.Uids(context.Background(), opt)
require.NoError(t, err)
require.Equal(t, commits, len(l.Uids), "List of Uids received: %+v", l.Uids)
for i, uid := range l.Uids {
Expand All @@ -526,7 +526,7 @@ func TestAddMutation_mrjn2(t *testing.T) {
for i := 1; i < 10; i++ {
// Each of these txns see their own write.
opt := ListOptions{ReadTs: uint64(i)}
list, err := ol.Uids(opt)
list, err := ol.Uids(context.Background(), opt)
require.NoError(t, err)
require.EqualValues(t, 1, len(list.Uids))
require.EqualValues(t, uint64(i), list.Uids[0])
Expand Down Expand Up @@ -578,7 +578,7 @@ func TestAddMutation_mrjn2(t *testing.T) {
require.EqualValues(t, 1, ol.Length(12, 0)) // Find committed 11.
require.EqualValues(t, 2, ol.Length(15, 0)) // Find committed 14.
opts := ListOptions{ReadTs: 15}
list, err := ol.Uids(opts)
list, err := ol.Uids(context.Background(), opts)
require.NoError(t, err)
require.EqualValues(t, 7, list.Uids[0])
require.EqualValues(t, 9, list.Uids[1])
Expand Down Expand Up @@ -1095,7 +1095,7 @@ func TestMultiPartListBasic(t *testing.T) {
size := int(1e5)
ol, commits := createMultiPartList(t, size, false)
opt := ListOptions{ReadTs: uint64(size) + 1}
l, err := ol.Uids(opt)
l, err := ol.Uids(context.Background(), opt)
require.NoError(t, err)
require.Equal(t, commits, len(l.Uids), "List of Uids received: %+v", l.Uids)
for i, uid := range l.Uids {
Expand Down Expand Up @@ -1249,9 +1249,9 @@ func TestMultiPartListWriteToDisk(t *testing.T) {
require.NoError(t, err)

opt := ListOptions{ReadTs: uint64(size) + 1}
originalUids, err := originalList.Uids(opt)
originalUids, err := originalList.Uids(context.Background(), opt)
require.NoError(t, err)
newUids, err := newList.Uids(opt)
newUids, err := newList.Uids(context.Background(), opt)
require.NoError(t, err)
require.Equal(t, commits, len(originalUids.Uids))
require.Equal(t, len(originalUids.Uids), len(newUids.Uids))
Expand Down Expand Up @@ -1315,7 +1315,7 @@ func TestMultiPartListDeleteAndAdd(t *testing.T) {

// Verify all entries are in the list.
opt := ListOptions{ReadTs: math.MaxUint64}
l, err := ol.Uids(opt)
l, err := ol.Uids(context.Background(), opt)
require.NoError(t, err)
require.Equal(t, size, len(l.Uids), "List of Uids received: %+v", l.Uids)
for i, uid := range l.Uids {
Expand Down Expand Up @@ -1351,7 +1351,7 @@ func TestMultiPartListDeleteAndAdd(t *testing.T) {
}
// Verify that the entries were actually deleted.
opt = ListOptions{ReadTs: math.MaxUint64}
l, err = ol.Uids(opt)
l, err = ol.Uids(context.Background(), opt)
require.NoError(t, err)
require.Equal(t, 50000, len(l.Uids), "List of Uids received: %+v", l.Uids)
for i, uid := range l.Uids {
Expand Down Expand Up @@ -1386,7 +1386,7 @@ func TestMultiPartListDeleteAndAdd(t *testing.T) {

// Verify all entries are once again in the list.
opt = ListOptions{ReadTs: math.MaxUint64}
l, err = ol.Uids(opt)
l, err = ol.Uids(context.Background(), opt)
require.NoError(t, err)
require.Equal(t, size, len(l.Uids), "List of Uids received: %+v", l.Uids)
for i, uid := range l.Uids {
Expand Down
1 change: 1 addition & 0 deletions posting/lists.go
Original file line number Diff line number Diff line change
Expand Up @@ -168,6 +168,7 @@ func (lc *LocalCache) getInternal(key []byte, readFromDisk bool) (*List, error)

skey := string(key)
if pl := lc.getNoStore(skey); pl != nil {
//pl.CreateUnpacked()
return pl, nil
}

Expand Down
5 changes: 5 additions & 0 deletions posting/mvcc.go
Original file line number Diff line number Diff line change
Expand Up @@ -395,6 +395,10 @@ func ReadPostingList(key []byte, it *badger.Iterator) (*List, error) {
}
}()

defer func() {
//l.CreateUnpacked()
}()

// Iterates from highest Ts to lowest Ts
for it.Valid() {
item := it.Item()
Expand Down Expand Up @@ -454,6 +458,7 @@ func ReadPostingList(key []byte, it *badger.Iterator) (*List, error) {
}
it.Next()
}

return l, nil
}

Expand Down