Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore(perf): using tags instead of runtime callers to improve the performance of leak detection #255

Merged
merged 6 commits into from
Mar 9, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -801,7 +801,7 @@ func TestRistrettoCalloc(t *testing.T) {
rd := rand.New(rand.NewSource(time.Now().UnixNano()))
for i := 0; i < 10000; i++ {
k := rd.Intn(10000)
v := z.Calloc(256)
v := z.Calloc(256, "test")
rd.Read(v)
if !r.Set(k, v, 256) {
z.Free(v)
Expand Down Expand Up @@ -841,7 +841,7 @@ func TestRistrettoCallocTTL(t *testing.T) {
rd := rand.New(rand.NewSource(time.Now().UnixNano()))
for i := 0; i < 10000; i++ {
k := rd.Intn(10000)
v := z.Calloc(256)
v := z.Calloc(256, "test")
rd.Read(v)
if !r.SetWithTTL(k, v, 256, time.Second) {
z.Free(v)
Expand Down
2 changes: 1 addition & 1 deletion contrib/demo/node_allocator.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ import (

// Defined in node.go.
func init() {
alloc = z.NewAllocator(10 << 20)
alloc = z.NewAllocator(10 << 20, "demo")
}

func newNode(val int) *node {
Expand Down
2 changes: 1 addition & 1 deletion contrib/demo/node_jemalloc.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ import (
)

func newNode(val int) *node {
b := z.Calloc(nodeSz)
b := z.Calloc(nodeSz, "demo")
n := (*node)(unsafe.Pointer(&b[0]))
n.val = val
return n
Expand Down
4 changes: 2 additions & 2 deletions contrib/memtest/withjemalloc.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,12 @@ import (
"github.com/dgraph-io/ristretto/z"
)

func Calloc(size int) []byte { return z.Calloc(size) }
func Calloc(size int) []byte { return z.Calloc(size, "memtest") }
func Free(bs []byte) { z.Free(bs) }
func NumAllocBytes() int64 { return z.NumAllocBytes() }

func check() {
if buf := z.CallocNoRef(1); len(buf) == 0 {
if buf := z.CallocNoRef(1, "memtest"); len(buf) == 0 {
log.Fatalf("Not using manual memory management. Compile with jemalloc.")
} else {
z.Free(buf)
Expand Down
14 changes: 8 additions & 6 deletions z/allocator.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ func init() {
}

// NewAllocator creates an allocator starting with the given size.
func NewAllocator(sz int) *Allocator {
func NewAllocator(sz int, tag string) *Allocator {
ref := atomic.AddUint64(&allocRef, 1)
// We should not allow a zero sized page because addBufferWithMinSize
// will run into an infinite loop trying to double the pagesize.
Expand All @@ -76,12 +76,13 @@ func NewAllocator(sz int) *Allocator {
a := &Allocator{
Ref: ref,
buffers: make([][]byte, 64),
Tag: tag,
}
l2 := uint64(log2(sz))
if bits.OnesCount64(uint64(sz)) > 1 {
l2 += 1
}
a.buffers[0] = Calloc(1 << l2)
a.buffers[0] = Calloc(1<<l2, a.Tag)

allocsMu.Lock()
allocs[ref] = a
Expand Down Expand Up @@ -271,7 +272,7 @@ func (a *Allocator) addBufferAt(bufIdx, minSz int) {
pageSize = maxAlloc
}

buf := Calloc(pageSize)
buf := Calloc(pageSize, a.Tag)
assert(len(a.buffers[bufIdx]) == 0)
a.buffers[bufIdx] = buf
}
Expand Down Expand Up @@ -324,17 +325,18 @@ func NewAllocatorPool(sz int) *AllocatorPool {
return a
}

func (p *AllocatorPool) Get(sz int) *Allocator {
func (p *AllocatorPool) Get(sz int, tag string) *Allocator {
if p == nil {
return NewAllocator(sz)
return NewAllocator(sz, tag)
}
atomic.AddInt64(&p.numGets, 1)
select {
case alloc := <-p.allocCh:
alloc.Reset()
alloc.Tag = tag
return alloc
default:
return NewAllocator(sz)
return NewAllocator(sz, tag)
}
}
func (p *AllocatorPool) Return(a *Allocator) {
Expand Down
16 changes: 8 additions & 8 deletions z/allocator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ import (
)

func TestAllocate(t *testing.T) {
a := NewAllocator(1024)
a := NewAllocator(1024, "test")
defer a.Release()

check := func() {
Expand All @@ -51,17 +51,17 @@ func TestAllocate(t *testing.T) {
}

func TestAllocateSize(t *testing.T) {
a := NewAllocator(1024)
a := NewAllocator(1024, "test")
require.Equal(t, 1024, len(a.buffers[0]))
a.Release()

b := NewAllocator(1025)
b := NewAllocator(1025, "test")
require.Equal(t, 2048, len(b.buffers[0]))
b.Release()
}

func TestAllocateReset(t *testing.T) {
a := NewAllocator(16)
a := NewAllocator(16, "test")
defer a.Release()

buf := make([]byte, 128)
Expand All @@ -80,7 +80,7 @@ func TestAllocateReset(t *testing.T) {
}

func TestAllocateTrim(t *testing.T) {
a := NewAllocator(16)
a := NewAllocator(16, "test")
defer a.Release()

buf := make([]byte, 128)
Expand Down Expand Up @@ -108,7 +108,7 @@ func TestPowTwo(t *testing.T) {
}

func TestAllocateAligned(t *testing.T) {
a := NewAllocator(1024)
a := NewAllocator(1024, "test")
defer a.Release()

a.Allocate(1)
Expand All @@ -126,7 +126,7 @@ func TestAllocateAligned(t *testing.T) {
}

func TestAllocateConcurrent(t *testing.T) {
a := NewAllocator(63)
a := NewAllocator(63, "test")
defer a.Release()

N := 10240
Expand Down Expand Up @@ -180,7 +180,7 @@ func TestAllocateConcurrent(t *testing.T) {
}

func BenchmarkAllocate(b *testing.B) {
a := NewAllocator(15)
a := NewAllocator(15, "test")
b.RunParallel(func(pb *testing.PB) {
for pb.Next() {
buf := a.Allocate(1)
Expand Down
18 changes: 10 additions & 8 deletions z/buffer.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ type Buffer struct {
bufType BufferType
autoMmapAfter int
dir string
tag string
}

type BufferType int
Expand All @@ -72,8 +73,8 @@ const (
const smallBufferSize = 64

// NewBuffer is a helper utility, which creates a virtually unlimited Buffer in UseCalloc mode.
func NewBuffer(sz int) *Buffer {
buf, err := NewBufferWithDir(sz, MaxBufferSize, UseCalloc, "")
func NewBuffer(sz int, tag string) *Buffer {
buf, err := NewBufferWithDir(sz, MaxBufferSize, UseCalloc, "", tag)
if err != nil {
log.Fatalf("while creating buffer: %v", err)
}
Expand All @@ -83,8 +84,8 @@ func NewBuffer(sz int) *Buffer {
// NewBufferWith would allocate a buffer of size sz upfront, with the total size of the buffer not
// exceeding maxSz. Both sz and maxSz can be set to zero, in which case reasonable defaults would be
// used. Buffer can't be used without initialization via NewBuffer.
func NewBufferWith(sz, maxSz int, bufType BufferType) (*Buffer, error) {
buf, err := NewBufferWithDir(sz, maxSz, bufType, "")
func NewBufferWith(sz, maxSz int, bufType BufferType, tag string) (*Buffer, error) {
buf, err := NewBufferWithDir(sz, maxSz, bufType, "", tag)
return buf, err
}

Expand Down Expand Up @@ -125,7 +126,7 @@ func (b *Buffer) doMmap() error {
// not exceeding maxSz. Both sz and maxSz can be set to zero, in which case reasonable defaults
// would be used. Buffer can't be used without initialization via NewBuffer. The buffer is created
// inside dir. The caller should take care of existence of dir.
func NewBufferWithDir(sz, maxSz int, bufType BufferType, dir string) (*Buffer, error) {
func NewBufferWithDir(sz, maxSz int, bufType BufferType, dir, tag string) (*Buffer, error) {
if sz == 0 {
sz = smallBufferSize
}
Expand All @@ -142,11 +143,12 @@ func NewBufferWithDir(sz, maxSz int, bufType BufferType, dir string) (*Buffer, e
maxSz: maxSz,
bufType: UseCalloc, // by default.
dir: dir,
tag: tag,
}

switch bufType {
case UseCalloc:
b.buf = Calloc(sz)
b.buf = Calloc(sz, b.tag)
case UseMmap:
if err := b.doMmap(); err != nil {
return nil, err
Expand Down Expand Up @@ -219,7 +221,7 @@ func (b *Buffer) Grow(n int) {
check(b.doMmap())

} else {
newBuf := Calloc(b.curSz)
newBuf := Calloc(b.curSz, b.tag)
copy(newBuf, b.buf[:b.offset])
Free(b.buf)
b.buf = newBuf
Expand Down Expand Up @@ -430,7 +432,7 @@ func (b *Buffer) SortSliceBetween(start, end int, less LessFunc) {
b: b,
less: less,
small: make([]int, 0, 1024),
tmp: NewBuffer(szTmp),
tmp: NewBuffer(szTmp, b.tag),
}
defer s.tmp.Release()

Expand Down
14 changes: 7 additions & 7 deletions z/buffer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ func TestBuffer(t *testing.T) {
var bytesBuffer bytes.Buffer // This is just for verifying result.
bytesBuffer.Grow(512)

cBuffer, err := NewBufferWith(512, 4<<30, btype)
cBuffer, err := NewBufferWith(512, 4<<30, btype, "test")
require.Nil(t, err)
defer cBuffer.Release()

Expand Down Expand Up @@ -71,7 +71,7 @@ func TestBufferWrite(t *testing.T) {
var wb [128]byte
rand.Read(wb[:])

cb, err := NewBufferWith(32, 4<<30, btype)
cb, err := NewBufferWith(32, 4<<30, btype, "test")
require.Nil(t, err)
defer cb.Release()

Expand All @@ -94,7 +94,7 @@ func TestBufferWrite(t *testing.T) {
}

func TestBufferAutoMmap(t *testing.T) {
buf := NewBuffer(1 << 20)
buf := NewBuffer(1 << 20, "test")
defer buf.Release()
buf.AutoMmapAfter(64 << 20)

Expand Down Expand Up @@ -124,7 +124,7 @@ func TestBufferAutoMmap(t *testing.T) {
}

func TestBufferSimpleSort(t *testing.T) {
buf := NewBuffer(1 << 20)
buf := NewBuffer(1 << 20, "test")
defer buf.Release()
for i := 0; i < 25600; i++ {
b := buf.SliceAllocate(4)
Expand Down Expand Up @@ -154,7 +154,7 @@ func TestBufferSlice(t *testing.T) {
for btype := UseCalloc; btype < UseInvalid; btype++ {
name := fmt.Sprintf("Using mode %s", btype)
t.Run(name, func(t *testing.T) {
buf, err := NewBufferWith(0, 0, btype)
buf, err := NewBufferWith(0, 0, btype, "test")
require.Nil(t, err)
defer buf.Release()

Expand Down Expand Up @@ -209,7 +209,7 @@ func TestBufferSort(t *testing.T) {
for btype := UseCalloc; btype < UseInvalid; btype++ {
name := fmt.Sprintf("Using mode %s", btype)
t.Run(name, func(t *testing.T) {
buf, err := NewBufferWith(0, 0, btype)
buf, err := NewBufferWith(0, 0, btype, "test")
require.Nil(t, err)
defer buf.Release()

Expand Down Expand Up @@ -252,7 +252,7 @@ func TestBufferSort(t *testing.T) {

// Test that the APIs returns the expected offsets.
func TestBufferPadding(t *testing.T) {
buf := NewBuffer(1 << 10)
buf := NewBuffer(1 << 10, "test")
defer buf.Release()
sz := rand.Int31n(100)

Expand Down
Loading