-
Notifications
You must be signed in to change notification settings - Fork 13
/
alloc.go
100 lines (89 loc) · 2.09 KB
/
alloc.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
package alloc
import (
"bytes"
"sync"
"sync/atomic"
"github.com/Yiling-J/theine-go/internal/nvm/directio"
)
func alignDown(num int, alignment int) int {
return num - num%alignment
}
func alignUp(num int, alignment int) int {
return alignDown(num+alignment-1, alignment)
}
type Allocator struct {
pool sync.Pool
itemPool sync.Pool
blockSize int
bucketSize int
regionSize int
mu sync.Mutex
current *BufferItem
}
type BufferItem struct {
pool *sync.Pool
buffer *bytes.Buffer
full atomic.Bool
count atomic.Int32
}
type AllocItem struct {
buffer *BufferItem
allocator *Allocator
Data []byte
}
func (a *AllocItem) Deallocate() {
if a != nil {
new := a.buffer.count.Add(-1)
if new == 0 && a.buffer.full.Load() {
a.buffer.pool.Put(a.buffer)
}
a.allocator.itemPool.Put(a)
}
}
func NewAllocator(bucketSize int, regionSize int, blockSize int) *Allocator {
a := &Allocator{
pool: sync.Pool{New: func() any {
return &BufferItem{
buffer: bytes.NewBuffer(directio.AlignedBlock(int(regionSize))),
}
}},
itemPool: sync.Pool{New: func() any {
return &AllocItem{}
}},
blockSize: blockSize,
bucketSize: bucketSize,
regionSize: regionSize,
}
a.current = a.pool.Get().(*BufferItem)
a.current.pool = &a.pool
return a
}
func (a *Allocator) Allocate(size int) *AllocItem {
item := a.itemPool.Get().(*AllocItem)
a.mu.Lock()
current := a.current
size = alignUp(size, a.blockSize)
if current.buffer.Len() < size {
if current.count.Load() == 0 {
// reuse directly
current.buffer.Reset()
current.buffer = bytes.NewBuffer(current.buffer.Bytes()[:current.buffer.Cap()])
} else {
// put back to pool in item dealloc callback
current.full.Store(true)
current = a.pool.Get().(*BufferItem)
a.current = current
current.buffer.Reset()
current.buffer = bytes.NewBuffer(current.buffer.Bytes()[:current.buffer.Cap()])
current.pool = &a.pool
current.count.Store(0)
current.full.Store(false)
}
}
item.Data = current.buffer.Next(size)
item.buffer = current
item.allocator = a
current.count.Add(1)
a.mu.Unlock()
return item
}