Skip to content

Commit

Permalink
service/s3/s3manager: Fix resource leak on UploadPart failures
Browse files Browse the repository at this point in the history
  • Loading branch information
skmcgrail committed Feb 25, 2020
1 parent 4bdcb07 commit 9e8576b
Show file tree
Hide file tree
Showing 3 changed files with 139 additions and 22 deletions.
19 changes: 13 additions & 6 deletions awstesting/integration/performance/s3UploadManager/main_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -153,11 +153,17 @@ func BenchmarkUpload(b *testing.B) {
sdkConfig.BufferProvider = s3manager.NewBufferedReadSeekerWriteToPool(bufferSize)
}

reader := aws.ReadSeekCloser(io.LimitReader(&awstesting.EndlessReader{}, fileSize))

b.ResetTimer()
for i := 0; i < b.N; i++ {
benchUpload(b, benchConfig.bucket, integration.UniqueID(), reader, sdkConfig, benchConfig.clientConfig)
for {
b.ResetTimer()
reader := aws.ReadSeekCloser(io.LimitReader(&awstesting.EndlessReader{}, fileSize))
err := benchUpload(b, benchConfig.bucket, integration.UniqueID(), reader, sdkConfig, benchConfig.clientConfig)
if err != nil {
b.Logf("upload failed, retrying: %v", err)
continue
}
break
}
}
})
}
Expand All @@ -169,16 +175,17 @@ func BenchmarkUpload(b *testing.B) {
}
}

func benchUpload(b *testing.B, bucket, key string, reader io.ReadSeeker, sdkConfig SDKConfig, clientConfig ClientConfig) {
func benchUpload(b *testing.B, bucket, key string, reader io.ReadSeeker, sdkConfig SDKConfig, clientConfig ClientConfig) error {
uploader := newUploader(clientConfig, sdkConfig, SetUnsignedPayload)
_, err := uploader.Upload(&s3manager.UploadInput{
Bucket: &bucket,
Key: &key,
Body: reader,
})
if err != nil {
b.Fatalf("failed to upload object, %v", err)
return err
}
return nil
}

func TestMain(m *testing.M) {
Expand Down
20 changes: 11 additions & 9 deletions service/s3/s3manager/upload.go
Original file line number Diff line number Diff line change
Expand Up @@ -477,14 +477,14 @@ func (u *uploader) nextReader() (io.ReadSeeker, int, func(), error) {

default:
part := u.cfg.partPool.Get()
n, err := readFillBuf(r, part)
n, err := readFillBuf(r, *part)
u.readerPos += int64(n)

cleanup := func() {
u.cfg.partPool.Put(part)
}

return bytes.NewReader(part[0:n]), n, cleanup, err
return bytes.NewReader((*part)[0:n]), n, cleanup, err
}
}

Expand Down Expand Up @@ -673,6 +673,8 @@ func (u *multiuploader) readChunk(ch chan chunk) {
u.seterr(err)
}
}

data.cleanup()
}
}

Expand All @@ -690,7 +692,6 @@ func (u *multiuploader) send(c chunk) error {
}

resp, err := u.cfg.S3.UploadPartWithContext(u.ctx, params, u.cfg.RequestOptions...)
c.cleanup()
if err != nil {
return err
}
Expand Down Expand Up @@ -764,8 +765,8 @@ func (u *multiuploader) complete() *s3.CompleteMultipartUploadOutput {
}

type byteSlicePool interface {
Get() []byte
Put([]byte)
Get() *[]byte
Put(*[]byte)
Size() int64
}

Expand All @@ -774,11 +775,11 @@ type partPool struct {
sync.Pool
}

func (p *partPool) Get() []byte {
return p.Pool.Get().([]byte)
func (p *partPool) Get() *[]byte {
return p.Pool.Get().(*[]byte)
}

func (p *partPool) Put(b []byte) {
func (p *partPool) Put(b *[]byte) {
p.Pool.Put(b)
}

Expand All @@ -790,7 +791,8 @@ func newPartPool(partSize int64) *partPool {
p := &partPool{partSize: partSize}

p.New = func() interface{} {
return make([]byte, p.partSize)
bs := make([]byte, p.partSize)
return &bs
}

return p
Expand Down
122 changes: 115 additions & 7 deletions service/s3/s3manager/upload_internal_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"io/ioutil"
random "math/rand"
"net/http"
"sync"
"sync/atomic"
"testing"

Expand All @@ -21,16 +22,34 @@ import (
)

type recordedPartPool struct {
allocations uint64
gets uint64
outstanding int64
*partPool
}

func (r *recordedPartPool) Get() []byte {
func newRecordedPartPool(partSize int64) *recordedPartPool {
rp := &recordedPartPool{}
pp := newPartPool(partSize)

n := pp.New
pp.New = func() interface{} {
atomic.AddUint64(&rp.allocations, 1)
return n()
}

rp.partPool = pp

return rp
}

func (r *recordedPartPool) Get() *[]byte {
atomic.AddUint64(&r.gets, 1)
atomic.AddInt64(&r.outstanding, 1)
return r.partPool.Get()
}

func (r *recordedPartPool) Put(b []byte) {
func (r *recordedPartPool) Put(b *[]byte) {
atomic.AddInt64(&r.outstanding, -1)
r.partPool.Put(b)
}
Expand All @@ -45,15 +64,106 @@ func swapByteSlicePool(f func(partSize int64) byteSlicePool) func() {
}
}

type testReader struct {
br *bytes.Reader
m sync.Mutex
}

func (r *testReader) Read(p []byte) (n int, err error) {
r.m.Lock()
defer r.m.Unlock()
return r.br.Read(p)
}

func TestUploadByteSlicePool(t *testing.T) {
cases := map[string]struct {
PartSize int64
FileSize int64
}{
"single part": {
PartSize: sdkio.MebiByte * 5,
FileSize: sdkio.MebiByte * 5,
},
"multi-part": {
PartSize: sdkio.MebiByte * 5,
FileSize: sdkio.MebiByte * 10,
},
}

for name, tt := range cases {
t.Run(name, func(t *testing.T) {
var p *recordedPartPool

unswap := swapByteSlicePool(func(partSize int64) byteSlicePool {
p = newRecordedPartPool(partSize)
return p
})
defer unswap()

sess := unit.Session.Copy()
svc := s3.New(sess)
svc.Handlers.Unmarshal.Clear()
svc.Handlers.UnmarshalMeta.Clear()
svc.Handlers.UnmarshalError.Clear()
svc.Handlers.Send.Clear()
svc.Handlers.Send.PushFront(func(r *request.Request) {
if r.Body != nil {
io.Copy(ioutil.Discard, r.Body)
}

r.HTTPResponse = &http.Response{
StatusCode: 200,
Body: ioutil.NopCloser(bytes.NewReader([]byte{})),
}

switch data := r.Data.(type) {
case *s3.CreateMultipartUploadOutput:
data.UploadId = aws.String("UPLOAD-ID")
case *s3.UploadPartOutput:
data.ETag = aws.String(fmt.Sprintf("ETAG%d", random.Int()))
case *s3.CompleteMultipartUploadOutput:
data.Location = aws.String("https://location")
data.VersionId = aws.String("VERSION-ID")
case *s3.PutObjectOutput:
data.VersionId = aws.String("VERSION-ID")
}
})

uploader := NewUploaderWithClient(svc, func(u *Uploader) {
u.PartSize = tt.PartSize
u.Concurrency = 50
})

expected := s3testing.GetTestBytes(int(tt.FileSize))
_, err := uploader.Upload(&UploadInput{
Bucket: aws.String("bucket"),
Key: aws.String("key"),
Body: testReader{br: bytes.NewReader(expected)},
})
if err != nil {
t.Errorf("expected no error, but got %v", err)
}

if v := atomic.LoadInt64(&p.outstanding); v != 0 {
t.Fatalf("expected zero outsnatding pool parts, got %d", v)
}

gets, allocs := atomic.LoadUint64(&p.gets), atomic.LoadUint64(&p.allocations)

t.Logf("total gets %v, total allocations %v", gets, allocs)
})
}
}

func TestUploadByteSlicePool_Failures(t *testing.T) {
cases := map[string]struct {
PartSize int64
FileSize int64
Operations []string
}{
"single part": {
PartSize: sdkio.MebiByte * 5,
FileSize: sdkio.MebiByte * 5,
FileSize: sdkio.MebiByte * 4,
Operations: []string{
"PutObject",
},
Expand All @@ -76,9 +186,7 @@ func TestUploadByteSlicePool(t *testing.T) {
var p *recordedPartPool

unswap := swapByteSlicePool(func(partSize int64) byteSlicePool {
p = &recordedPartPool{
partPool: newPartPool(partSize),
}
p = newRecordedPartPool(partSize)
return p
})
defer unswap()
Expand Down Expand Up @@ -131,7 +239,7 @@ func TestUploadByteSlicePool(t *testing.T) {
_, err := uploader.Upload(&UploadInput{
Bucket: aws.String("bucket"),
Key: aws.String("key"),
Body: bytes.NewReader(expected),
Body: testReader{br: bytes.NewReader(expected)},
})
if err == nil {
t.Fatalf("expected error but got none")
Expand Down

0 comments on commit 9e8576b

Please sign in to comment.