forked from 742481030/rclone
-
Notifications
You must be signed in to change notification settings - Fork 0
/
pool.go
205 lines (188 loc) · 4.44 KB
/
pool.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
// Package pool implements a memory pool similar in concept to
// sync.Pool but with more determinism.
package pool
import (
"fmt"
"log"
"sync"
"time"
"github.com/rclone/rclone/lib/mmap"
)
// Pool of internal buffers
//
// We hold buffers in cache. Every time we Get or Put we update
// minFill which is the minimum len(cache) seen.
//
// Every flushTime we remove minFill buffers from the cache as they
// were not used in the previous flushTime interval.
type Pool struct {
mu sync.Mutex
cache [][]byte
minFill int // the minimum fill of the cache
bufferSize int
poolSize int
timer *time.Timer
inUse int
alloced int
flushTime time.Duration
flushPending bool
alloc func(int) ([]byte, error)
free func([]byte) error
}
// New makes a buffer pool
//
// flushTime is the interval the buffer pools is flushed
// bufferSize is the size of the allocations
// poolSize is the maximum number of free buffers in the pool
// useMmap should be set to use mmap allocations
func New(flushTime time.Duration, bufferSize, poolSize int, useMmap bool) *Pool {
bp := &Pool{
cache: make([][]byte, 0, poolSize),
poolSize: poolSize,
flushTime: flushTime,
bufferSize: bufferSize,
}
if useMmap {
bp.alloc = mmap.Alloc
bp.free = mmap.Free
} else {
bp.alloc = func(size int) ([]byte, error) {
return make([]byte, size), nil
}
bp.free = func([]byte) error {
return nil
}
}
bp.timer = time.AfterFunc(flushTime, bp.flushAged)
return bp
}
// get gets the last buffer in bp.cache
//
// Call with mu held
func (bp *Pool) get() []byte {
n := len(bp.cache) - 1
buf := bp.cache[n]
bp.cache[n] = nil // clear buffer pointer from bp.cache
bp.cache = bp.cache[:n]
return buf
}
// put puts the buffer on the end of bp.cache
//
// Call with mu held
func (bp *Pool) put(buf []byte) {
bp.cache = append(bp.cache, buf)
}
// flush n entries from the entire buffer pool
// Call with mu held
func (bp *Pool) flush(n int) {
for i := 0; i < n; i++ {
bp.freeBuffer(bp.get())
}
bp.minFill = len(bp.cache)
}
// Flush the entire buffer pool
func (bp *Pool) Flush() {
bp.mu.Lock()
bp.flush(len(bp.cache))
bp.mu.Unlock()
}
// Remove bp.minFill buffers
func (bp *Pool) flushAged() {
bp.mu.Lock()
bp.flushPending = false
bp.flush(bp.minFill)
// If there are still items in the cache, schedule another flush
if len(bp.cache) != 0 {
bp.kickFlusher()
}
bp.mu.Unlock()
}
// InUse returns the number of buffers in use which haven't been
// returned to the pool
func (bp *Pool) InUse() int {
bp.mu.Lock()
defer bp.mu.Unlock()
return bp.inUse
}
// InPool returns the number of buffers in the pool
func (bp *Pool) InPool() int {
bp.mu.Lock()
defer bp.mu.Unlock()
return len(bp.cache)
}
// Alloced returns the number of buffers allocated and not yet freed
func (bp *Pool) Alloced() int {
bp.mu.Lock()
defer bp.mu.Unlock()
return bp.alloced
}
// starts or resets the buffer flusher timer - call with mu held
func (bp *Pool) kickFlusher() {
if bp.flushPending {
return
}
bp.flushPending = true
bp.timer.Reset(bp.flushTime)
}
// Make sure minFill is correct - call with mu held
func (bp *Pool) updateMinFill() {
if len(bp.cache) < bp.minFill {
bp.minFill = len(bp.cache)
}
}
// Get a buffer from the pool or allocate one
func (bp *Pool) Get() []byte {
bp.mu.Lock()
var buf []byte
waitTime := time.Millisecond
for {
if len(bp.cache) > 0 {
buf = bp.get()
break
} else {
var err error
buf, err = bp.alloc(bp.bufferSize)
if err == nil {
bp.alloced++
break
}
log.Printf("Failed to get memory for buffer, waiting for %v: %v", waitTime, err)
bp.mu.Unlock()
time.Sleep(waitTime)
bp.mu.Lock()
waitTime *= 2
}
}
bp.inUse++
bp.updateMinFill()
bp.mu.Unlock()
return buf
}
// freeBuffer returns mem to the os if required - call with lock held
func (bp *Pool) freeBuffer(mem []byte) {
err := bp.free(mem)
if err != nil {
log.Printf("Failed to free memory: %v", err)
}
bp.alloced--
}
// Put returns the buffer to the buffer cache or frees it
//
// Note that if you try to return a buffer of the wrong size to Put it
// will panic.
func (bp *Pool) Put(buf []byte) {
bp.mu.Lock()
defer bp.mu.Unlock()
buf = buf[0:cap(buf)]
if len(buf) != bp.bufferSize {
panic(fmt.Sprintf("Returning buffer sized %d but expecting %d", len(buf), bp.bufferSize))
}
if len(bp.cache) < bp.poolSize {
bp.put(buf)
} else {
bp.freeBuffer(buf)
}
bp.inUse--
bp.updateMinFill()
bp.kickFlusher()
}