Skip to content

Commit

Permalink
Preallocate buffer pool buffers
Browse files Browse the repository at this point in the history
This should avoid many reallocation on the buffers.
  • Loading branch information
felixbuenemann committed Sep 6, 2020
1 parent 8360608 commit 5c01baa
Show file tree
Hide file tree
Showing 3 changed files with 48 additions and 28 deletions.
46 changes: 31 additions & 15 deletions chunk/buffer.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,26 +8,42 @@ import (
. "github.com/claudetech/loggo/default"
)

var bufPoolSize uint64

var bufPool = sync.Pool{
New: func() interface{} {
id := atomic.AddUint64(&bufPoolSize, 1)
Log.Debugf("Allocate buffer %v", id)
return &Buffer{id: id}
},
// BufferPool manages a pool of buffers
type BufferPool struct {
size uint64
pool sync.Pool
}

// NewBufferPool creates a new buffer pool
func NewBufferPool(bufferSize int64) *BufferPool {
bp := new(BufferPool)
bp.pool = sync.Pool{
New: func() interface{} {
id := atomic.AddUint64(&bp.size, 1)
Log.Debugf("Allocate buffer %v", id)
buffer := bytes.NewBuffer(make([]byte, bufferSize))
return &Buffer{*buffer, id, 0, bp}
},
}
return bp
}

// Get a buffer from the pool
func (bp *BufferPool) Get() *Buffer {
return bp.pool.Get().(*Buffer)
}

// Put a buffer into the pool
func (bp *BufferPool) Put(buffer *Buffer) {
bp.pool.Put(buffer)
}

// Buffer is a managed memory buffer with a reference counter
type Buffer struct {
id uint64
bytes.Buffer
id uint64
refs int64
}

// NewBuffer gets a buffer from the pool
func NewBuffer() *Buffer {
return bufPool.Get().(*Buffer)
pool *BufferPool
}

// Ref increases the reference count of the buffer
Expand All @@ -46,6 +62,6 @@ func (b *Buffer) Unref() {
if refs == 0 {
Log.Debugf("Release buffer %v", b.id)
b.Reset()
bufPool.Put(b)
b.pool.Put(b)
}
}
26 changes: 14 additions & 12 deletions chunk/download.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,21 +14,23 @@ import (

// Downloader handles concurrent chunk downloads
type Downloader struct {
Client *drive.Client
queue chan *Request
callbacks map[string][]DownloadCallback
lock sync.Mutex
Client *drive.Client
queue chan *Request
callbacks map[string][]DownloadCallback
lock sync.Mutex
bufferPool *BufferPool
}

// DownloadCallback is called after a download has finished
type DownloadCallback func(error, *Buffer)

// NewDownloader creates a new download manager
func NewDownloader(threads int, client *drive.Client) (*Downloader, error) {
func NewDownloader(threads int, client *drive.Client, bufferPool *BufferPool) (*Downloader, error) {
manager := Downloader{
Client: client,
queue: make(chan *Request, 100),
callbacks: make(map[string][]DownloadCallback, 100),
Client: client,
queue: make(chan *Request, 100),
callbacks: make(map[string][]DownloadCallback, 100),
bufferPool: bufferPool,
}

for i := 0; i < threads; i++ {
Expand Down Expand Up @@ -58,7 +60,7 @@ func (d *Downloader) thread() {

func (d *Downloader) download(client *http.Client, req *Request) {
Log.Debugf("Starting download %v (preload: %v)", req.id, req.preload)
buffer, err := downloadFromAPI(client, req, 0)
buffer, err := d.downloadFromAPI(client, req, 0)

d.lock.Lock()
callbacks := d.callbacks[req.id]
Expand All @@ -72,7 +74,7 @@ func (d *Downloader) download(client *http.Client, req *Request) {
d.lock.Unlock()
}

func downloadFromAPI(client *http.Client, request *Request, delay int64) (*Buffer, error) {
func (d *Downloader) downloadFromAPI(client *http.Client, request *Request, delay int64) (*Buffer, error) {
// sleep if request is throttled
if delay > 0 {
time.Sleep(time.Duration(delay) * time.Second)
Expand Down Expand Up @@ -123,7 +125,7 @@ func downloadFromAPI(client *http.Client, request *Request, delay int64) (*Buffe
} else {
delay = delay * 2
}
return downloadFromAPI(client, request, delay)
return d.downloadFromAPI(client, request, delay)
}

// return an error if other error occurred
Expand All @@ -132,7 +134,7 @@ func downloadFromAPI(client *http.Client, request *Request, delay int64) (*Buffe
request.object.ObjectID, request.object.Name, res.StatusCode)
}

buffer := NewBuffer()
buffer := d.bufferPool.Get()
buffer.Ref()
n, err := buffer.ReadFrom(reader)
if nil != err {
Expand Down
4 changes: 3 additions & 1 deletion chunk/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,9 @@ func NewManager(
return nil, fmt.Errorf("max-chunks must be greater than 2 and bigger than the load ahead value")
}

downloader, err := NewDownloader(loadThreads, client)
bufferPool := NewBufferPool(chunkSize)

downloader, err := NewDownloader(loadThreads, client, bufferPool)
if nil != err {
return nil, err
}
Expand Down

0 comments on commit 5c01baa

Please sign in to comment.