Skip to content

Commit

Permalink
awstesting/integration/performance/s3DownloadManager: CLI Configurabl…
Browse files Browse the repository at this point in the history
…e Benchmarking Options (#2866)
  • Loading branch information
skmcgrail committed Oct 2, 2019
1 parent 2c24a0a commit c92c8c5
Show file tree
Hide file tree
Showing 5 changed files with 286 additions and 70 deletions.
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// +build integration,perftest
// +build go1.13,integration,perftest

package main

Expand All @@ -24,6 +24,9 @@ func NewClient(cfg ClientConfig) *http.Client {
TLSHandshakeTimeout: cfg.Timeouts.TLSHandshake,
ExpectContinueTimeout: cfg.Timeouts.ExpectContinue,
ResponseHeaderTimeout: cfg.Timeouts.ResponseHeader,

ReadBufferSize: cfg.ReadBufferSize,
WriteBufferSize: cfg.WriteBufferSize,
}

return &http.Client{
Expand Down
29 changes: 21 additions & 8 deletions awstesting/integration/performance/s3DownloadManager/config.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// +build integration,perftest
// +build go1.13,integration,perftest

package main

Expand All @@ -13,30 +13,36 @@ import (
)

type Config struct {
Bucket string
Size int64
LogVerbose bool

SDK SDKConfig
Client ClientConfig
Bucket string
Size int64
Key string
LogVerbose bool
UploadPartSize int64

SDK SDKConfig
Client ClientConfig
Profiler Profiler
}

func (c *Config) SetupFlags(prefix string, flagset *flag.FlagSet) {
flagset.StringVar(&c.Bucket, "bucket", "",
"The S3 bucket `name` to download the object from.")
flagset.Int64Var(&c.Size, "size", 0,
"The S3 object size in bytes to be first uploaded then downloaded")
flagset.StringVar(&c.Key, "key", "", "The S3 object key to download")
flagset.BoolVar(&c.LogVerbose, "verbose", false,
"The output log will include verbose request information")
flagset.Int64Var(&c.UploadPartSize, "upload-part-size", 0, "the upload part size when uploading a file to s3")

c.SDK.SetupFlags(prefix, flagset)
c.Client.SetupFlags(prefix, flagset)
c.Profiler.SetupFlags(prefix, flagset)
}

func (c *Config) Validate() error {
var errs Errors

if len(c.Bucket) == 0 || c.Size <= 0 {
if len(c.Bucket) == 0 || (c.Size <= 0 && len(c.Key) == 0) {
errs = append(errs, fmt.Errorf("bucket and filename/size are required"))
}

Expand Down Expand Up @@ -79,6 +85,10 @@ type ClientConfig struct {

MaxIdleConns int
MaxIdleConnsPerHost int

// Go 1.13
ReadBufferSize int
WriteBufferSize int
}

func (c *ClientConfig) SetupFlags(prefix string, flagset *flag.FlagSet) {
Expand All @@ -95,6 +105,9 @@ func (c *ClientConfig) SetupFlags(prefix string, flagset *flag.FlagSet) {
flagset.IntVar(&c.MaxIdleConnsPerHost, prefix+"idle-conns-host", http.DefaultMaxIdleConnsPerHost,
"Specifies max idle connection pool per host, will be truncated by idle-conns.")

flagset.IntVar(&c.ReadBufferSize, prefix+"read-buffer", defTR.ReadBufferSize, "size of the transport read buffer used")
flagset.IntVar(&c.WriteBufferSize, prefix+"writer-buffer", defTR.WriteBufferSize, "size of the transport write buffer used")

c.Timeouts.SetupFlags(prefix, flagset)
}

Expand Down
146 changes: 131 additions & 15 deletions awstesting/integration/performance/s3DownloadManager/main.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// +build integration,perftest
// +build go1.13,integration,perftest

package main

Expand All @@ -8,14 +8,17 @@ import (
"io"
"log"
"os"
"path/filepath"
"runtime"
"runtime/pprof"
"runtime/trace"
"time"

"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/request"
"github.com/aws/aws-sdk-go/aws/session"
"github.com/aws/aws-sdk-go/awstesting"
"github.com/aws/aws-sdk-go/awstesting/integration"
"github.com/aws/aws-sdk-go/internal/sdkio"
"github.com/aws/aws-sdk-go/service/s3"
"github.com/aws/aws-sdk-go/service/s3/s3manager"
)
Expand All @@ -27,10 +30,31 @@ func main() {

log.SetOutput(os.Stderr)

log.Printf("uploading %s file to s3://%s\n", integration.SizeToName(int(config.Size)), config.Bucket)
key, err := setupDownloadTest(config.Bucket, config.Size)
if err != nil {
log.Fatalf("failed to setup download testing: %v", err)
config.Profiler.Start()
defer config.Profiler.Stop()

var err error
key := config.Key
size := config.Size
if len(key) == 0 {
uploadPartSize := getUploadPartSize(size, config.UploadPartSize, config.SDK.PartSize)
log.Printf("uploading %s file to s3://%s\n", integration.SizeToName(int(config.Size)), config.Bucket)
key, err = setupDownloadTest(config.Bucket, config.Size, uploadPartSize)
if err != nil {
log.Fatalf("failed to setup download testing: %v", err)
}

defer func() {
log.Printf("cleaning up s3://%s/%s\n", config.Bucket, key)
if err = teardownDownloadTest(config.Bucket, key); err != nil {
log.Fatalf("failed to teardwn test artifacts: %v", err)
}
}()
} else {
size, err = getObjectSize(config.Bucket, key)
if err != nil {
log.Fatalf("failed to get object size: %v", err)
}
}

traces := make(chan *RequestTrace, config.SDK.Concurrency)
Expand All @@ -52,15 +76,10 @@ func main() {

dur := time.Since(start)
log.Printf("Download finished, Size: %d, Dur: %s, Throughput: %.5f GB/s",
config.Size, dur, (float64(config.Size)/(float64(dur)/float64(time.Second)))/float64(1e9),
size, dur, (float64(size)/(float64(dur)/float64(time.Second)))/float64(1e9),
)

<-metricReportDone

log.Printf("cleaning up s3://%s/%s\n", config.Bucket, key)
if err = teardownDownloadTest(config.Bucket, key); err != nil {
log.Fatalf("failed to teardwn test artifacts: %v", err)
}
}

func parseCommandLine() {
Expand All @@ -76,9 +95,9 @@ func parseCommandLine() {
}
}

func setupDownloadTest(bucket string, size int64) (key string, err error) {
func setupDownloadTest(bucket string, fileSize, partSize int64) (key string, err error) {
er := &awstesting.EndlessReader{}
lr := io.LimitReader(er, size)
lr := io.LimitReader(er, fileSize)

key = integration.UniqueID()

Expand All @@ -88,7 +107,8 @@ func setupDownloadTest(bucket string, size int64) (key string, err error) {
}))

uploader := s3manager.NewUploader(sess, func(u *s3manager.Uploader) {
u.PartSize = 100 * sdkio.MebiByte
u.PartSize = partSize
u.Concurrency = runtime.NumCPU() * 2
u.RequestOptions = append(u.RequestOptions, func(r *request.Request) {
if r.Operation.Name != "UploadPart" && r.Operation.Name != "PutObject" {
return
Expand Down Expand Up @@ -216,3 +236,99 @@ func newDownloader(clientConfig ClientConfig, sdkConfig SDKConfig, options ...re

return downloader
}

func getObjectSize(bucket, key string) (int64, error) {
sess := session.Must(session.NewSession())
svc := s3.New(sess)
resp, err := svc.HeadObject(&s3.HeadObjectInput{
Bucket: &bucket,
Key: &key,
})
if err != nil {
return 0, err
}

return *resp.ContentLength, nil
}

type Profiler struct {
outputDir string

enableCPU bool
enableTrace bool

cpuFile *os.File
traceFile *os.File
}

func (p *Profiler) SetupFlags(prefix string, flagSet *flag.FlagSet) {
prefix += "profiler."

flagSet.StringVar(&p.outputDir, prefix+"output-dir", os.TempDir(), "output directory to write profiling data")
flagSet.BoolVar(&p.enableCPU, prefix+"cpu", false, "enable CPU profiling")
flagSet.BoolVar(&p.enableTrace, prefix+"trace", false, "enable tracing")
}

func (p *Profiler) Start() {
var err error

uuid := integration.UniqueID()
if p.enableCPU {
p.cpuFile, err = p.createFile(uuid, "cpu")
if err != nil {
panic(fmt.Sprintf("failed to create cpu profile file: %v", err))
}
err = pprof.StartCPUProfile(p.cpuFile)
if err != nil {
panic(fmt.Sprintf("failed to start cpu profile: %v", err))
}
}
if p.enableTrace {
p.traceFile, err = p.createFile(uuid, "trace")
if err != nil {
panic(fmt.Sprintf("failed to create trace file: %v", err))
}
err = trace.Start(p.traceFile)
if err != nil {
panic(fmt.Sprintf("failed to tracing: %v", err))
}
}
}

func (p *Profiler) logAndCloseFile(profile string, file *os.File) {
info, err := file.Stat()
if err != nil {
log.Printf("failed to stat %s profile: %v", profile, err)
} else {
log.Printf("writing %s profile to: %v", profile, filepath.Join(p.outputDir, info.Name()))
}
file.Close()
}

func (p *Profiler) Stop() {
if p.enableCPU {
pprof.StopCPUProfile()
p.logAndCloseFile("cpu", p.cpuFile)
}
if p.enableTrace {
trace.Stop()
p.logAndCloseFile("trace", p.traceFile)
}
}

func (p *Profiler) createFile(prefix, name string) (*os.File, error) {
return os.OpenFile(filepath.Join(p.outputDir, prefix+"."+name+".profile"), os.O_CREATE|os.O_WRONLY|os.O_TRUNC, 0666)
}

func getUploadPartSize(fileSize, uploadPartSize, downloadPartSize int64) int64 {
partSize := uploadPartSize

if partSize == 0 {
partSize = downloadPartSize
}
if fileSize/partSize > s3manager.MaxUploadParts {
partSize = (fileSize / s3manager.MaxUploadParts) + 1
}

return partSize
}
Loading

0 comments on commit c92c8c5

Please sign in to comment.