Skip to content

Commit

Permalink
awstesting/integration/performance/s3UploadManager: Extend s3UploadMa…
Browse files Browse the repository at this point in the history
…nager To Support Benchmarking
  • Loading branch information
skmcgrail committed Aug 30, 2019
1 parent bc9d7f0 commit afa1a91
Show file tree
Hide file tree
Showing 5 changed files with 264 additions and 87 deletions.
34 changes: 34 additions & 0 deletions awstesting/integration/integration.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"crypto/rand"
"fmt"
"io"
"io/ioutil"
"os"

"github.com/aws/aws-sdk-go/aws"
Expand Down Expand Up @@ -39,6 +40,39 @@ func UniqueID() string {
return fmt.Sprintf("%x", uuid)
}

// CreateFileOfSize will return an *os.File that is of size bytes
func CreateFileOfSize(size int64) (*os.File, error) {
file, err := ioutil.TempFile(os.TempDir(), "s3Bench")
if err != nil {
return nil, err
}

err = file.Truncate(size)
if err != nil {
file.Close()
os.Remove(file.Name())
return nil, err
}

return file, nil
}

// SizeToName returns a human-readable string for the given size bytes
func SizeToName(size int) string {
units := []string{"B", "KB", "MB", "GB"}
i := 0
for size >= 1024 {
size /= 1024
i++
}

if i > len(units)-1 {
i = len(units) - 1
}

return fmt.Sprintf("%d%s", size, units[i])
}

// SessionWithDefaultRegion returns a copy of the integration session with the
// region set if one was not already provided.
func SessionWithDefaultRegion(region string) *session.Session {
Expand Down
15 changes: 7 additions & 8 deletions awstesting/integration/performance/s3UploadManager/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,9 @@ import (
)

type Config struct {
Bucket, Key string
Filename string
LogVerbose bool
Bucket string
Size int64
LogVerbose bool

SDK SDKConfig
Client ClientConfig
Expand All @@ -24,10 +24,8 @@ type Config struct {
func (c *Config) SetupFlags(prefix string, flagset *flag.FlagSet) {
flagset.StringVar(&c.Bucket, "bucket", "",
"The S3 bucket `name` to upload the object to.")
flagset.StringVar(&c.Key, "key", "",
"The S3 object key `name` to name the uploaded object.")
flagset.StringVar(&c.Filename, "file", "",
"The `path` of the local file to upload.")
flagset.Int64Var(&c.Size, "size", 5242880,
"The S3 object size in bytes to upload")
flagset.BoolVar(&c.LogVerbose, "verbose", false,
"The output log will include verbose request information")

Expand All @@ -38,7 +36,7 @@ func (c *Config) SetupFlags(prefix string, flagset *flag.FlagSet) {
func (c *Config) Validate() error {
var errs Errors

if len(c.Bucket) == 0 || len(c.Key) == 0 || len(c.Filename) == 0 {
if len(c.Bucket) == 0 || c.Size <= 0 {
errs = append(errs, fmt.Errorf("bucket, key, and filename are required"))
}

Expand All @@ -61,6 +59,7 @@ type SDKConfig struct {
Concurrency int
WithUnsignedPayload bool
ExpectContinue bool
BufferStrategy s3manager.BufferStrategy
}

func (c *SDKConfig) SetupFlags(prefix string, flagset *flag.FlagSet) {
Expand Down
161 changes: 83 additions & 78 deletions awstesting/integration/performance/s3UploadManager/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,86 +4,34 @@ package main

import (
"flag"
"fmt"
"log"
"os"
"path/filepath"
"strconv"
"time"

"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/request"
"github.com/aws/aws-sdk-go/aws/session"
"github.com/aws/aws-sdk-go/awstesting/integration"
"github.com/aws/aws-sdk-go/service/s3"
"github.com/aws/aws-sdk-go/service/s3/s3manager"
)

var config Config

func init() {
config.SetupFlags("", flag.CommandLine)
}

func main() {
if err := flag.CommandLine.Parse(os.Args[1:]); err != nil {
flag.CommandLine.PrintDefaults()
log.Fatalf("failed to parse CLI commands")
}
if err := config.Validate(); err != nil {
flag.CommandLine.PrintDefaults()
log.Fatalf("invalid arguments")
}

client := NewClient(config.Client)
parseCommandLine()

file, err := os.Open(config.Filename)
file, err := integration.CreateFileOfSize(config.Size)
if err != nil {
log.Fatalf("unable to open file to upload, %v", err)
log.Fatalf("failed to create file: %v", err)
}
defer os.Remove(file.Name())
defer file.Close()

sess, err := session.NewSessionWithOptions(session.Options{
Config: aws.Config{
HTTPClient: client,
S3Disable100Continue: aws.Bool(!config.SDK.ExpectContinue),
},
SharedConfigState: session.SharedConfigEnable,
})
if err != nil {
log.Fatalf("failed to load session, %v", err)
}

traces := make(chan *RequestTrace, config.SDK.Concurrency)
uploader := s3manager.NewUploader(sess, func(u *s3manager.Uploader) {
u.PartSize = config.SDK.PartSize
u.Concurrency = config.SDK.Concurrency

u.RequestOptions = append(u.RequestOptions,
func(r *request.Request) {
id := "op"
if v, ok := r.Params.(*s3.UploadPartInput); ok {
id = strconv.FormatInt(*v.PartNumber, 10)
}
tracer := NewRequestTrace(r.Context(), r.Operation.Name, id)
r.SetContext(tracer)

r.Handlers.Send.PushFront(tracer.OnSendAttempt)
r.Handlers.CompleteAttempt.PushBack(tracer.OnCompleteAttempt)
r.Handlers.CompleteAttempt.PushBack(func(rr *request.Request) {
})
r.Handlers.Complete.PushBack(tracer.OnComplete)
r.Handlers.Complete.PushBack(func(rr *request.Request) {
traces <- tracer
})

if config.SDK.WithUnsignedPayload {
if r.Operation.Name != "UploadPart" {
return
}
r.HTTPRequest.Header.Set("X-Amz-Content-Sha256", "UNSIGNED-PAYLOAD")
}
},
)
})
requestTracer := uploadRequestTracer(traces)
uploader := newUploader(config.Client, config.SDK, SetUnsignedPayload, requestTracer)

metricReportDone := make(chan struct{})
go func() {
Expand Down Expand Up @@ -115,44 +63,101 @@ func main() {
}
}()

fmt.Println("Starting upload...")
start := time.Now()
_, err = uploader.Upload(&s3manager.UploadInput{
Bucket: &config.Bucket,
Key: &config.Key,
Key: aws.String(filepath.Base(file.Name())),
Body: file,
})
if err != nil {
log.Fatalf("failed to upload object, %v", err)
}
close(traces)
<-metricReportDone
}

fileInfo, _ := file.Stat()
size := fileInfo.Size()
dur := time.Since(start)
fmt.Printf("Upload finished, Size: %d, Dur: %s, Throughput: %.5f GB/s\n",
size, dur, (float64(size)/(float64(dur)/float64(time.Second)))/float64(1e9),
)
func parseCommandLine() {
config.SetupFlags("", flag.CommandLine)

<-metricReportDone
if err := flag.CommandLine.Parse(os.Args[1:]); err != nil {
flag.CommandLine.PrintDefaults()
log.Fatalf("failed to parse CLI commands")
}
if err := config.Validate(); err != nil {
flag.CommandLine.PrintDefaults()
log.Fatalf("invalid arguments")
}
}

func printAttempts(op string, trace *RequestTrace, verbose bool) {
fmt.Println(op+":",
if !verbose {
return
}

log.Println(op+":",
"latency:", trace.finish.Sub(trace.start),
"requests:", len(trace.attempts),
"errors:", len(trace.errs),
)

if !verbose {
return
}

for _, a := range trace.attempts {
fmt.Printf(" * %s\n", a)
log.Printf(" * %s\n", a)
}
if err := trace.Err(); err != nil {
fmt.Println("Operation Errors:", err)
log.Printf("Operation Errors: %v", err)
}
log.Println()
}

func uploadRequestTracer(traces chan <- *RequestTrace) request.Option{
tracerOption := func(r *request.Request) {
id := "op"
if v, ok := r.Params.(*s3.UploadPartInput); ok {
id = strconv.FormatInt(*v.PartNumber, 10)
}
tracer := NewRequestTrace(r.Context(), r.Operation.Name, id)
r.SetContext(tracer)

r.Handlers.Send.PushFront(tracer.OnSendAttempt)
r.Handlers.CompleteAttempt.PushBack(tracer.OnCompleteAttempt)
r.Handlers.CompleteAttempt.PushBack(func(rr *request.Request) {
})
r.Handlers.Complete.PushBack(tracer.OnComplete)
r.Handlers.Complete.PushBack(func(rr *request.Request) {
traces <- tracer
})
}

return tracerOption
}

func SetUnsignedPayload(r *request.Request) {
if r.Operation.Name != "UploadPart" {
return
}
r.HTTPRequest.Header.Set("X-Amz-Content-Sha256", "UNSIGNED-PAYLOAD")
}

func newUploader(clientConfig ClientConfig, sdkConfig SDKConfig, options ...request.Option) *s3manager.Uploader {
client := NewClient(clientConfig)

sess, err := session.NewSessionWithOptions(session.Options{
Config: aws.Config{
HTTPClient: client,
S3Disable100Continue: aws.Bool(!sdkConfig.ExpectContinue),
},
SharedConfigState: session.SharedConfigEnable,
})
if err != nil {
log.Fatalf("failed to load session, %v", err)
}
fmt.Println()

uploader := s3manager.NewUploader(sess, func(u *s3manager.Uploader) {
u.PartSize = sdkConfig.PartSize
u.Concurrency = sdkConfig.Concurrency
u.BufferStrategy = sdkConfig.BufferStrategy

u.RequestOptions = append(u.RequestOptions, options...)
})

return uploader
}

0 comments on commit afa1a91

Please sign in to comment.