Skip to content

Use sync.Pool for reclaiming unused posting.List #223

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 8 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion loader/loader.go
Original file line number Diff line number Diff line change
Expand Up @@ -192,8 +192,11 @@ func (s *state) handleNQuads(wg *sync.WaitGroup) {
}

key := posting.Key(edge.Entity, edge.Attribute)
plist := posting.GetOrCreate(key, dataStore)

plist, decr := posting.GetOrCreate(key, dataStore)
plist.AddMutationWithIndex(ctx, edge, posting.Set)
decr() // Don't defer, just call because we're in a channel loop.

atomic.AddUint64(&s.ctr.processed, 1)
}
}
Expand Down
5 changes: 3 additions & 2 deletions posting/index.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ func init() {
indexLog = trace.NewEventLog("index", "Logger")
x.AddInit(func() {
if indexConfigFile == nil || len(*indexConfigFile) == 0 {
indexLog.Printf("No valid config file", *indexConfigFile)
indexLog.Printf("No valid config file: %v", *indexConfigFile)
return
}
f, err := ioutil.ReadFile(*indexConfigFile)
Expand Down Expand Up @@ -112,7 +112,8 @@ func processIndexTerm(attr string, uid uint64, term []byte, del bool) {
Attribute: attr,
}
key := IndexKey(edge.Attribute, term)
plist := GetOrCreate(key, indexStore)
plist, decr := GetOrCreate(key, indexStore)
defer decr()
x.Assertf(plist != nil, "plist is nil [%s] %d %s", key, edge.ValueId, edge.Attribute)

ctx := context.Background()
Expand Down
25 changes: 23 additions & 2 deletions posting/list.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ type List struct {
lastCompact time.Time
wg sync.WaitGroup
deleteMe int32
refcount int32

// Mutations
mlayer map[int]types.Posting // Stores only replace instructions.
Expand All @@ -73,10 +74,30 @@ type List struct {
dirtyTs int64 // Use atomics for this.
}

func NewList() *List {
l := new(List)
func (l *List) refCount() int32 { return atomic.LoadInt32(&l.refcount) }
func (l *List) incr() int32 { return atomic.AddInt32(&l.refcount, 1) }
func (l *List) decr() {
val := atomic.AddInt32(&l.refcount, -1)
x.Assertf(val >= 0, "List reference should never be less than zero: %v", val)
if val > 0 {
return
}
listPool.Put(l)
}

var listPool = sync.Pool{
New: func() interface{} {
return &List{}
},
}

func getNew() *List {
l := listPool.Get().(*List)
*l = List{}
l.wg.Add(1)
l.mlayer = make(map[int]types.Posting)
x.Assert(len(l.key) == 0)
l.refcount = 1
return l
}

Expand Down
16 changes: 8 additions & 8 deletions posting/list_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ func checkUids(t *testing.T, l *List, uids ...uint64) error {
}

func TestAddMutation(t *testing.T) {
l := NewList()
l := getNew()
key := Key(1, "name")
dir, err := ioutil.TempDir("", "storetest_")
if err != nil {
Expand Down Expand Up @@ -211,7 +211,7 @@ func TestAddMutation(t *testing.T) {
l.MergeIfDirty(ctx)

// Try reading the same data in another PostingList.
dl := NewList()
dl := getNew()
dl.init(key, ps)
if err := checkUids(t, dl, uids...); err != nil {
t.Error(err)
Expand Down Expand Up @@ -242,7 +242,7 @@ func checkValue(ol *List, val string) error {
}

func TestAddMutation_Value(t *testing.T) {
ol := NewList()
ol := getNew()
key := Key(10, "value")
dir, err := ioutil.TempDir("", "storetest_")
if err != nil {
Expand Down Expand Up @@ -309,7 +309,7 @@ func TestAddMutation_Value(t *testing.T) {
}

func TestAddMutation_jchiu1(t *testing.T) {
ol := NewList()
ol := getNew()
key := Key(10, "value")
dir, err := ioutil.TempDir("", "storetest_")
if err != nil {
Expand Down Expand Up @@ -386,7 +386,7 @@ func TestAddMutation_jchiu1(t *testing.T) {
}

func TestAddMutation_jchiu2(t *testing.T) {
ol := NewList()
ol := getNew()
key := Key(10, "value")
dir, err := ioutil.TempDir("", "storetest_")
if err != nil {
Expand Down Expand Up @@ -442,7 +442,7 @@ func TestAddMutation_jchiu2(t *testing.T) {
}

func TestAddMutation_jchiu3(t *testing.T) {
ol := NewList()
ol := getNew()
key := Key(10, "value")
dir, err := ioutil.TempDir("", "storetest_")
if err != nil {
Expand Down Expand Up @@ -543,7 +543,7 @@ func TestAddMutation_jchiu3(t *testing.T) {
}

func TestAddMutation_mrjn1(t *testing.T) {
ol := NewList()
ol := getNew()
key := Key(10, "value")
dir, err := ioutil.TempDir("", "storetest_")
if err != nil {
Expand Down Expand Up @@ -646,7 +646,7 @@ func TestAddMutation_mrjn1(t *testing.T) {

func benchmarkAddMutations(n int, b *testing.B) {
// logrus.SetLevel(logrus.DebugLevel)
l := NewList()
l := getNew()
key := Key(1, "name")
dir, err := ioutil.TempDir("", "storetest_")
if err != nil {
Expand Down
48 changes: 36 additions & 12 deletions posting/lists.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ import (
"github.com/zond/gotomic"

"github.com/dgraph-io/dgraph/store"
"github.com/dgraph-io/dgraph/x"
)

var maxmemory = flag.Int("stw_ram_mb", 4096,
Expand Down Expand Up @@ -263,26 +264,48 @@ func Init() {
go checkMemoryUsage()
}

// GetOrCreate stores the List corresponding to key(if its not there already)
// to lhmap and returns it.
func GetOrCreate(key []byte, pstore *store.Store) *List {
func getFromMap(gotomicKey gotomic.IntKey) *List {
lp, _ := lhmap.Get(gotomicKey)
if lp == nil {
return nil
}
result := lp.(*List)
result.incr()
return result
}

// GetOrCreate stores the List corresponding to key, if it's not there already.
// to lhmap and returns it. It also returns a reference decrement function to be called by caller.
//
// plist, decr := GetOrCreate(key, store)
// defer decr()
// ... // Use plist
func GetOrCreate(key []byte, pstore *store.Store) (rlist *List, decr func()) {
fp := farm.Fingerprint64(key)
gotomicKey := gotomic.IntKey(fp)

stopTheWorld.RLock()
defer stopTheWorld.RUnlock()
lp, _ := lhmap.Get(gotomicKey)
if lp != nil {
return lp.(*List)
if lp := getFromMap(gotomicKey); lp != nil {
return lp, lp.decr
}

l := NewList()
if inserted := lhmap.PutIfMissing(gotomicKey, l); inserted {
l.init(key, pstore)
return l
{
l := getNew() // This retrieves a new *List and increments its ref count.
if inserted := lhmap.PutIfMissing(gotomicKey, l); inserted {
l.incr() // Increment reference counter for the caller.
l.init(key, pstore)
return l, l.decr
}
// If we're unable to insert this, decrement the reference count.
// This would undo the increment in the newList() call, and allow this list to be reused.
l.decr()
}
if lp := getFromMap(gotomicKey); lp != nil {
return lp, lp.decr
}
lp, _ = lhmap.Get(gotomicKey)
return lp.(*List)
x.Assertf(false, "Key should be present.")
return nil, nil
}

func mergeAndUpdate(l *List, c *counters) {
Expand All @@ -308,6 +331,7 @@ func processOne(k gotomic.Hashable, c *counters) {
if l == nil {
return
}
defer l.decr()
l.SetForDeletion() // No more AddMutation.
mergeAndUpdate(l, c)
}
Expand Down
4 changes: 2 additions & 2 deletions posting/lmap_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ func BenchmarkGet(b *testing.B) {
for pb.Next() {
// i := uint64(rand.Int63())
_ = uint64(rand.Int63())
NewList()
getNew()
// lmap.Get(i)
}
})
Expand All @@ -38,7 +38,7 @@ func BenchmarkGetLinear(b *testing.B) {
for i := 0; i < b.N; i++ {
k := uint64(i)
if l, ok := m[k]; !ok {
l = NewList()
l = getNew()
m[k] = l
}
}
Expand Down
37 changes: 21 additions & 16 deletions query/query_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,11 @@ func TestNewGraph(t *testing.T) {
}
}

func getOrCreate(key []byte, ps *store.Store) *posting.List {
l, _ := posting.GetOrCreate(key, ps)
return l
}

func populateGraph(t *testing.T) (string, *store.Store) {
// logrus.SetLevel(logrus.DebugLevel)
dir, err := ioutil.TempDir("", "storetest_")
Expand Down Expand Up @@ -154,54 +159,54 @@ func populateGraph(t *testing.T) (string, *store.Store) {
Source: "testing",
Timestamp: time.Now(),
}
addEdge(t, edge, posting.GetOrCreate(posting.Key(1, "friend"), ps))
addEdge(t, edge, getOrCreate(posting.Key(1, "friend"), ps))

edge.ValueId = 24
addEdge(t, edge, posting.GetOrCreate(posting.Key(1, "friend"), ps))
addEdge(t, edge, getOrCreate(posting.Key(1, "friend"), ps))

edge.ValueId = 25
addEdge(t, edge, posting.GetOrCreate(posting.Key(1, "friend"), ps))
addEdge(t, edge, getOrCreate(posting.Key(1, "friend"), ps))

edge.ValueId = 31
addEdge(t, edge, posting.GetOrCreate(posting.Key(1, "friend"), ps))
addEdge(t, edge, getOrCreate(posting.Key(1, "friend"), ps))

edge.ValueId = 101
addEdge(t, edge, posting.GetOrCreate(posting.Key(1, "friend"), ps))
addEdge(t, edge, getOrCreate(posting.Key(1, "friend"), ps))

// Now let's add a few properties for the main user.
edge.Value = []byte("Michonne")
addEdge(t, edge, posting.GetOrCreate(posting.Key(1, "name"), ps))
addEdge(t, edge, getOrCreate(posting.Key(1, "name"), ps))

edge.Value = []byte("female")
addEdge(t, edge, posting.GetOrCreate(posting.Key(1, "gender"), ps))
addEdge(t, edge, getOrCreate(posting.Key(1, "gender"), ps))

edge.Value = []byte("alive")
addEdge(t, edge, posting.GetOrCreate(posting.Key(1, "status"), ps))
addEdge(t, edge, getOrCreate(posting.Key(1, "status"), ps))

edge.Value = []byte("38")
addEdge(t, edge, posting.GetOrCreate(posting.Key(1, "age"), ps))
addEdge(t, edge, getOrCreate(posting.Key(1, "age"), ps))

edge.Value = []byte("98.99")
addEdge(t, edge, posting.GetOrCreate(posting.Key(1, "survival_rate"), ps))
addEdge(t, edge, getOrCreate(posting.Key(1, "survival_rate"), ps))

edge.Value = []byte("true")
addEdge(t, edge, posting.GetOrCreate(posting.Key(1, "sword_present"), ps))
addEdge(t, edge, getOrCreate(posting.Key(1, "sword_present"), ps))

// Now let's add a name for each of the friends, except 101.
edge.Value = []byte("Rick Grimes")
addEdge(t, edge, posting.GetOrCreate(posting.Key(23, "name"), ps))
addEdge(t, edge, getOrCreate(posting.Key(23, "name"), ps))

edge.Value = []byte("Glenn Rhee")
addEdge(t, edge, posting.GetOrCreate(posting.Key(24, "name"), ps))
addEdge(t, edge, getOrCreate(posting.Key(24, "name"), ps))

edge.Value = []byte("Daryl Dixon")
addEdge(t, edge, posting.GetOrCreate(posting.Key(25, "name"), ps))
addEdge(t, edge, getOrCreate(posting.Key(25, "name"), ps))

edge.Value = []byte("Andrea")
addEdge(t, edge, posting.GetOrCreate(posting.Key(31, "name"), ps))
addEdge(t, edge, getOrCreate(posting.Key(31, "name"), ps))

edge.Value = []byte("mich")
addEdge(t, edge, posting.GetOrCreate(posting.Key(1, "_xid_"), ps))
addEdge(t, edge, getOrCreate(posting.Key(1, "_xid_"), ps))

return dir, ps
}
Expand Down
11 changes: 8 additions & 3 deletions uid/assigner.go
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,8 @@ func allocateUniqueUid(xid string, instanceIdx uint64,

// Check if this uid has already been allocated.
key := posting.Key(uid, "_xid_") // uid -> "_xid_" -> xid
pl := posting.GetOrCreate(key, uidStore)
pl, decr := posting.GetOrCreate(key, uidStore)
defer decr()

if pl.Length() > 0 {
// Something already present here.
Expand Down Expand Up @@ -207,7 +208,9 @@ func StringKey(xid string) []byte {
// Get returns the uid of the corresponding xid.
func Get(xid string) (uid uint64, rerr error) {
key := StringKey(xid)
pl := posting.GetOrCreate(key, uidStore)
pl, decr := posting.GetOrCreate(key, uidStore)
defer decr()

if pl.Length() == 0 {
return 0, fmt.Errorf("xid: %v doesn't have any uid assigned.", xid)
}
Expand All @@ -232,7 +235,9 @@ func GetOrAssign(xid string, instanceIdx uint64,
}

key := StringKey(xid)
pl := posting.GetOrCreate(key, uidStore)
pl, decr := posting.GetOrCreate(key, uidStore)
defer decr()

if pl.Length() == 0 {
return assignNew(pl, xid, instanceIdx, numInstances)
}
Expand Down
7 changes: 6 additions & 1 deletion uid/assigner_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,11 +31,16 @@ import (
"github.com/dgraph-io/dgraph/store"
)

func getOrCreate(key []byte, ps *store.Store) *posting.List {
l, _ := posting.GetOrCreate(key, ps)
return l
}

// externalId returns the xid of a given uid by reading from the uidstore.
// It returns an error if there is no corresponding xid.
func externalId(uid uint64) (xid string, rerr error) {
key := posting.Key(uid, "_xid_") // uid -> "_xid_" -> xid
pl := posting.GetOrCreate(key, uidStore)
pl := getOrCreate(key, uidStore)
if pl.Length() == 0 {
return "", errors.New("NO external id")
}
Expand Down
4 changes: 3 additions & 1 deletion worker/mutation.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,9 @@ func runMutations(ctx context.Context, edges []x.DirectedEdge, op byte, left *Mu
}

key := posting.Key(edge.Entity, edge.Attribute)
plist := posting.GetOrCreate(key, ws.dataStore)
plist, decr := posting.GetOrCreate(key, ws.dataStore)
defer decr()

if err := plist.AddMutationWithIndex(ctx, edge, op); err != nil {
if op == posting.Set {
left.Set = append(left.Set, edge)
Expand Down
3 changes: 2 additions & 1 deletion worker/task.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,8 @@ func processTask(query []byte) ([]byte, error) {
key = posting.Key(q.Uids(i), attr)
}
// Get or create the posting list for an entity, attribute combination.
pl := posting.GetOrCreate(key, store)
pl, decr := posting.GetOrCreate(key, store)
defer decr()

var valoffset flatbuffers.UOffsetT
// If a posting list contains a value, we store that or else we store a nil
Expand Down
Loading