Skip to content

Commit

Permalink
awstesting/integration/performance/s3UploadManager: Extend s3UploadMa…
Browse files Browse the repository at this point in the history
…nager To Support Benchmarking
  • Loading branch information
skmcgrail committed Sep 16, 2019
1 parent f8cc8cb commit 1c07843
Show file tree
Hide file tree
Showing 6 changed files with 322 additions and 80 deletions.
34 changes: 34 additions & 0 deletions awstesting/integration/integration.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"crypto/rand"
"fmt"
"io"
"io/ioutil"
"os"

"github.com/aws/aws-sdk-go/aws"
Expand Down Expand Up @@ -39,6 +40,39 @@ func UniqueID() string {
return fmt.Sprintf("%x", uuid)
}

// CreateFileOfSize will return an *os.File that is of size bytes
func CreateFileOfSize(dir string, size int64) (*os.File, error) {
file, err := ioutil.TempFile(dir, "s3Bench")
if err != nil {
return nil, err
}

err = file.Truncate(size)
if err != nil {
file.Close()
os.Remove(file.Name())
return nil, err
}

return file, nil
}

// SizeToName returns a human-readable string for the given size bytes
func SizeToName(size int) string {
units := []string{"B", "KB", "MB", "GB"}
i := 0
for size >= 1024 {
size /= 1024
i++
}

if i > len(units)-1 {
i = len(units) - 1
}

return fmt.Sprintf("%d%s", size, units[i])
}

// SessionWithDefaultRegion returns a copy of the integration session with the
// region set if one was not already provided.
func SessionWithDefaultRegion(region string) *session.Session {
Expand Down
25 changes: 24 additions & 1 deletion awstesting/integration/performance/s3UploadManager/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,18 @@
Uploads a file to a S3 bucket using the SDK's S3 upload manager. Allows passing
in custom configuration for the HTTP client and SDK's Upload Manager behavior.

## Usage Example:
## Build
### Standalone
```sh
go build -tags "integration perftest" -o s3UploadPerfGo ./awstesting/integration/performance/s3UploadManager
```
### Benchmarking
```sh
go test -tags "integration perftest" -c -o s3UploadPerfGo ./awstesting/integration/performance/s3UploadManager
```

## Usage Example:
### Standalone
```sh
AWS_REGION=us-west-2 AWS_PROFILE=aws-go-sdk-team-test ./s3UploadPerfGo \
-bucket aws-sdk-go-data \
Expand All @@ -18,3 +28,16 @@ AWS_REGION=us-west-2 AWS_PROFILE=aws-go-sdk-team-test ./s3UploadPerfGo \
-client.timeout.connect=1s \
-client.timeout.response-header=1s
```

### Benchmarking
```sh
AWS_REGION=us-west-2 AWS_PROFILE=aws-go-sdk-team-test ./s3UploadPerfGo \
-test.bench=. \
-test.benchmem \
-test.benchtime 1x \
-bucket aws-sdk-go-data \
-client.idle-conns 1000 \
-client.idle-conns-host 300 \
-client.timeout.connect=1s \
-client.timeout.response-header=1s
```
22 changes: 15 additions & 7 deletions awstesting/integration/performance/s3UploadManager/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,16 +6,19 @@ import (
"flag"
"fmt"
"net/http"
"os"
"strings"
"time"

"github.com/aws/aws-sdk-go/service/s3/s3manager"
)

type Config struct {
Bucket, Key string
Filename string
LogVerbose bool
Bucket string
Filename string
Size int64
TempDir string
LogVerbose bool

SDK SDKConfig
Client ClientConfig
Expand All @@ -24,10 +27,11 @@ type Config struct {
func (c *Config) SetupFlags(prefix string, flagset *flag.FlagSet) {
flagset.StringVar(&c.Bucket, "bucket", "",
"The S3 bucket `name` to upload the object to.")
flagset.StringVar(&c.Key, "key", "",
"The S3 object key `name` to name the uploaded object.")
flagset.StringVar(&c.Filename, "file", "",
"The `path` of the local file to upload.")
flagset.Int64Var(&c.Size, "size", 0,
"The S3 object size in bytes to upload")
flagset.StringVar(&c.TempDir, "temp", os.TempDir(), "location to create temporary files")
flagset.BoolVar(&c.LogVerbose, "verbose", false,
"The output log will include verbose request information")

Expand All @@ -38,8 +42,8 @@ func (c *Config) SetupFlags(prefix string, flagset *flag.FlagSet) {
func (c *Config) Validate() error {
var errs Errors

if len(c.Bucket) == 0 || len(c.Key) == 0 || len(c.Filename) == 0 {
errs = append(errs, fmt.Errorf("bucket, key, and filename are required"))
if len(c.Bucket) == 0 || (c.Size <= 0 && c.Filename == "") {
errs = append(errs, fmt.Errorf("bucket and filename/size are required"))
}

if err := c.SDK.Validate(); err != nil {
Expand All @@ -60,7 +64,9 @@ type SDKConfig struct {
PartSize int64
Concurrency int
WithUnsignedPayload bool
WithContentMD5 bool
ExpectContinue bool
BufferProvider s3manager.ReadSeekerWriteToProvider
}

func (c *SDKConfig) SetupFlags(prefix string, flagset *flag.FlagSet) {
Expand All @@ -72,6 +78,8 @@ func (c *SDKConfig) SetupFlags(prefix string, flagset *flag.FlagSet) {
"Specifies the number of parts to upload `at once`.")
flagset.BoolVar(&c.WithUnsignedPayload, prefix+"unsigned", false,
"Specifies if the SDK will use UNSIGNED_PAYLOAD for part SHA256 in request signature.")
flagset.BoolVar(&c.WithContentMD5, prefix+"content-md5", true,
"Specifies if the SDK should compute the content md5 header for S3 uploads.")

flagset.BoolVar(&c.ExpectContinue, prefix+"100-continue", true,
"Specifies if the SDK requests will wait for the 100 continue response before sending request payload.")
Expand Down
178 changes: 107 additions & 71 deletions awstesting/integration/performance/s3UploadManager/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,86 +4,50 @@ package main

import (
"flag"
"fmt"
"log"
"os"
"path/filepath"
"strconv"
"time"

"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/request"
"github.com/aws/aws-sdk-go/aws/session"
"github.com/aws/aws-sdk-go/awstesting/integration"
"github.com/aws/aws-sdk-go/service/s3"
"github.com/aws/aws-sdk-go/service/s3/s3manager"
)

var config Config

func init() {
config.SetupFlags("", flag.CommandLine)
}

func main() {
if err := flag.CommandLine.Parse(os.Args[1:]); err != nil {
flag.CommandLine.PrintDefaults()
log.Fatalf("failed to parse CLI commands")
}
if err := config.Validate(); err != nil {
flag.CommandLine.PrintDefaults()
log.Fatalf("invalid arguments")
}
parseCommandLine()

client := NewClient(config.Client)
log.SetOutput(os.Stderr)

file, err := os.Open(config.Filename)
if err != nil {
log.Fatalf("unable to open file to upload, %v", err)
}
defer file.Close()
var (
file *os.File
err error
)

sess, err := session.NewSessionWithOptions(session.Options{
Config: aws.Config{
HTTPClient: client,
S3Disable100Continue: aws.Bool(!config.SDK.ExpectContinue),
},
SharedConfigState: session.SharedConfigEnable,
})
if err != nil {
log.Fatalf("failed to load session, %v", err)
if config.Filename != "" {
file, err = os.Open(config.Filename)
if err != nil {
log.Fatalf("failed to open file: %v", err)
}
} else {
file, err = integration.CreateFileOfSize(config.TempDir, config.Size)
if err != nil {
log.Fatalf("failed to create file: %v", err)
}
defer os.Remove(file.Name())
}

defer file.Close()

traces := make(chan *RequestTrace, config.SDK.Concurrency)
uploader := s3manager.NewUploader(sess, func(u *s3manager.Uploader) {
u.PartSize = config.SDK.PartSize
u.Concurrency = config.SDK.Concurrency

u.RequestOptions = append(u.RequestOptions,
func(r *request.Request) {
id := "op"
if v, ok := r.Params.(*s3.UploadPartInput); ok {
id = strconv.FormatInt(*v.PartNumber, 10)
}
tracer := NewRequestTrace(r.Context(), r.Operation.Name, id)
r.SetContext(tracer)

r.Handlers.Send.PushFront(tracer.OnSendAttempt)
r.Handlers.CompleteAttempt.PushBack(tracer.OnCompleteAttempt)
r.Handlers.CompleteAttempt.PushBack(func(rr *request.Request) {
})
r.Handlers.Complete.PushBack(tracer.OnComplete)
r.Handlers.Complete.PushBack(func(rr *request.Request) {
traces <- tracer
})

if config.SDK.WithUnsignedPayload {
if r.Operation.Name != "UploadPart" {
return
}
r.HTTPRequest.Header.Set("X-Amz-Content-Sha256", "UNSIGNED-PAYLOAD")
}
},
)
})
requestTracer := uploadRequestTracer(traces)
uploader := newUploader(config.Client, config.SDK, requestTracer)

metricReportDone := make(chan struct{})
go func() {
Expand All @@ -108,18 +72,19 @@ func main() {
"CreateMultipartUpload",
"CompleteMultipartUpload",
"UploadPart",
"PutObject",
} {
if trace, ok := metrics[name]; ok {
printAttempts(name, trace, config.LogVerbose)
}
}
}()

fmt.Println("Starting upload...")
log.Println("starting upload...")
start := time.Now()
_, err = uploader.Upload(&s3manager.UploadInput{
Bucket: &config.Bucket,
Key: &config.Key,
Key: aws.String(filepath.Base(file.Name())),
Body: file,
})
if err != nil {
Expand All @@ -130,29 +95,100 @@ func main() {
fileInfo, _ := file.Stat()
size := fileInfo.Size()
dur := time.Since(start)
fmt.Printf("Upload finished, Size: %d, Dur: %s, Throughput: %.5f GB/s\n",
log.Printf("Upload finished, Size: %d, Dur: %s, Throughput: %.5f GB/s",
size, dur, (float64(size)/(float64(dur)/float64(time.Second)))/float64(1e9),
)

<-metricReportDone
}

func printAttempts(op string, trace *RequestTrace, verbose bool) {
fmt.Println(op+":",
"latency:", trace.finish.Sub(trace.start),
"requests:", len(trace.attempts),
"errors:", len(trace.errs),
)
func parseCommandLine() {
config.SetupFlags("", flag.CommandLine)

if err := flag.CommandLine.Parse(os.Args[1:]); err != nil {
flag.CommandLine.PrintDefaults()
log.Fatalf("failed to parse CLI commands")
}
if err := config.Validate(); err != nil {
flag.CommandLine.PrintDefaults()
log.Fatalf("invalid arguments: %v", err)
}
}

func printAttempts(op string, trace *RequestTrace, verbose bool) {
if !verbose {
return
}

log.Printf("%s: latency:%s requests:%d errors:%d",
op,
trace.finish.Sub(trace.start),
len(trace.attempts),
len(trace.errs),
)

for _, a := range trace.attempts {
fmt.Printf(" * %s\n", a)
log.Printf(" * %s", a)
}
if err := trace.Err(); err != nil {
fmt.Println("Operation Errors:", err)
log.Printf("Operation Errors: %v", err)
}
log.Println()
}

func uploadRequestTracer(traces chan<- *RequestTrace) request.Option {
tracerOption := func(r *request.Request) {
id := "op"
if v, ok := r.Params.(*s3.UploadPartInput); ok {
id = strconv.FormatInt(*v.PartNumber, 10)
}
tracer := NewRequestTrace(r.Context(), r.Operation.Name, id)
r.SetContext(tracer)

r.Handlers.Send.PushFront(tracer.OnSendAttempt)
r.Handlers.CompleteAttempt.PushBack(tracer.OnCompleteAttempt)
r.Handlers.Complete.PushBack(tracer.OnComplete)
r.Handlers.Complete.PushBack(func(rr *request.Request) {
traces <- tracer
})
}
fmt.Println()

return tracerOption
}

func SetUnsignedPayload(r *request.Request) {
if r.Operation.Name != "UploadPart" && r.Operation.Name != "PutObject" {
return
}
r.HTTPRequest.Header.Set("X-Amz-Content-Sha256", "UNSIGNED-PAYLOAD")
}

func newUploader(clientConfig ClientConfig, sdkConfig SDKConfig, options ...request.Option) *s3manager.Uploader {
client := NewClient(clientConfig)

if sdkConfig.WithUnsignedPayload {
options = append(options, SetUnsignedPayload)
}

sess, err := session.NewSessionWithOptions(session.Options{
Config: aws.Config{
HTTPClient: client,
S3Disable100Continue: aws.Bool(!sdkConfig.ExpectContinue),
S3DisableContentMD5Validation: aws.Bool(!sdkConfig.WithContentMD5),
},
SharedConfigState: session.SharedConfigEnable,
})
if err != nil {
log.Fatalf("failed to load session, %v", err)
}

uploader := s3manager.NewUploader(sess, func(u *s3manager.Uploader) {
u.PartSize = sdkConfig.PartSize
u.Concurrency = sdkConfig.Concurrency
u.BufferProvider = sdkConfig.BufferProvider

u.RequestOptions = append(u.RequestOptions, options...)
})

return uploader
}

0 comments on commit 1c07843

Please sign in to comment.