From ae921f700be0731a96636d63a33fdf63d037ce91 Mon Sep 17 00:00:00 2001 From: Tommaso Visconti Date: Wed, 12 Aug 2020 12:22:06 +0200 Subject: [PATCH 01/12] upload: s3: add FailOnError config --- CHANGELOG.md | 1 + upload/s3.go | 19 +++++++++++++++++-- 2 files changed, 18 insertions(+), 2 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 461665ad..f64742ab 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -14,6 +14,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 - output: add Stats output [#23](https://github.com/AdRoll/baker/pull/23) - filter: add SetStringFromURL filter [#28](https://github.com/AdRoll/baker/pull/28) - output: add FileWriter output in replacement of Files output [#31](https://github.com/AdRoll/baker/pull/31) +- upload: s3: add `FailOnErrors` configuration [#27](https://github.com/AdRoll/baker/pull/27) ### Changed diff --git a/upload/s3.go b/upload/s3.go index c7a0cb77..ff89dbd3 100644 --- a/upload/s3.go +++ b/upload/s3.go @@ -48,6 +48,7 @@ type S3Config struct { Retries int `help:"Number of retries before a failed upload" default:"3"` Concurrency int `help:"Number of concurrent workers" default:"5"` Interval time.Duration `help:"Period at which the source path is scanned" default:"15s"` + FailOnError bool `help:"Fail when an error happens, instead of only logging" default:"false"` } func (cfg *S3Config) fillDefaults() error { @@ -132,6 +133,9 @@ func (u *S3) Run(upch <-chan string) { defer func() { ticker.Stop() if err := u.uploadDirectory(); err != nil { + if u.Cfg.FailOnError { + log.Fatal(err) + } log.Error(err) } u.wgUpload.Done() @@ -141,6 +145,9 @@ func (u *S3) Run(upch <-chan string) { select { case <-ticker.C: if err := u.uploadDirectory(); err != nil { + if u.Cfg.FailOnError { + log.Fatal(err) + } log.Error(err) } case <-u.quit: @@ -154,7 +161,11 @@ func (u *S3) Run(upch <-chan string) { atomic.AddInt64(&u.totaln, int64(1)) atomic.AddInt64(&u.queuedn, int64(1)) if err != nil { - log.WithFields(log.Fields{"filepath": sourceFilePath}).WithError(err).Error("Couldn't move") + errCtx := log.WithFields(log.Fields{"filepath": sourceFilePath}).WithError(err) + if u.Cfg.FailOnError { + errCtx.Fatal("couldn't move") + } + errCtx.Error("couldn't move") } } @@ -234,7 +245,11 @@ func (u *S3) uploadDirectory() error { break } else { atomic.AddInt64(&u.totalerr, int64(1)) - log.WithError(err).WithFields(log.Fields{"retry#": i + 1}).Error("failed upload") + errCtx := log.WithError(err).WithFields(log.Fields{"retry#": i + 1}) + if u.Cfg.FailOnError { + errCtx.Fatal("failed upload") + } + errCtx.Error("failed upload") } } }(fpath) From 6f08d92368da1a16803f7bc565d714acd8c0c09b Mon Sep 17 00:00:00 2001 From: Tommaso Visconti Date: Thu, 13 Aug 2020 11:28:42 +0200 Subject: [PATCH 02/12] upload.Run() now returns an error --- api.go | 2 +- topology.go | 4 +++- 2 files changed, 4 insertions(+), 2 deletions(-) diff --git a/api.go b/api.go index 9647e2e4..97fd96be 100644 --- a/api.go +++ b/api.go @@ -139,7 +139,7 @@ type Upload interface { // Run processes the output result as it comes through the channel. // Run must block forever // upch will receive filenames that Output wants to see uploaded. - Run(upch <-chan string) + Run(upch <-chan string) error // Stop forces the upload to stop as cleanly as possible, which usually // means to finish up all the existing downloads. diff --git a/topology.go b/topology.go index db2a43ab..1e991696 100644 --- a/topology.go +++ b/topology.go @@ -199,7 +199,9 @@ func (t *Topology) Start() { t.wgupl.Add(1) go func() { if t.Upload != nil { - t.Upload.Run(t.upch) + if err := t.Upload.Run(t.upch); err != nil { + log.WithError(err).Fatal("Upload returned an error") + } } else { // Just consume t.upch if there's no uploader available for range t.upch { From 648e6ea5fe839ba4a230fae33741abd0dd603bfc Mon Sep 17 00:00:00 2001 From: Tommaso Visconti Date: Thu, 13 Aug 2020 15:05:23 +0200 Subject: [PATCH 03/12] Add some tests --- upload/s3.go | 29 ++++++----- upload/s3_test.go | 129 ++++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 145 insertions(+), 13 deletions(-) diff --git a/upload/s3.go b/upload/s3.go index ff89dbd3..6c65d4f4 100644 --- a/upload/s3.go +++ b/upload/s3.go @@ -48,7 +48,7 @@ type S3Config struct { Retries int `help:"Number of retries before a failed upload" default:"3"` Concurrency int `help:"Number of concurrent workers" default:"5"` Interval time.Duration `help:"Period at which the source path is scanned" default:"15s"` - FailOnError bool `help:"Fail when an error happens, instead of only logging" default:"false"` + ExitOnError bool `help:"Exit at first error, instead of logging all errors" default:"false"` } func (cfg *S3Config) fillDefaults() error { @@ -105,6 +105,8 @@ type S3 struct { totaln int64 totalerr int64 queuedn int64 + + uploadFn func(uploader *s3manager.Uploader, bucket, prefix, localPath, fpath string) error } func newS3(cfg baker.UploadParams) (baker.Upload, error) { @@ -121,10 +123,11 @@ func newS3(cfg baker.UploadParams) (baker.Upload, error) { Cfg: dcfg, uploader: s3manager.NewUploaderWithClient(s3svc), quit: make(chan struct{}), + uploadFn: s3UploadFile, }, nil } -func (u *S3) Run(upch <-chan string) { +func (u *S3) Run(upch <-chan string) error { // Start a goroutine in which we periodically look at the source // path for files and upload the ones we find. u.wgUpload.Add(1) @@ -133,7 +136,7 @@ func (u *S3) Run(upch <-chan string) { defer func() { ticker.Stop() if err := u.uploadDirectory(); err != nil { - if u.Cfg.FailOnError { + if u.Cfg.ExitOnError { log.Fatal(err) } log.Error(err) @@ -145,7 +148,7 @@ func (u *S3) Run(upch <-chan string) { select { case <-ticker.C: if err := u.uploadDirectory(); err != nil { - if u.Cfg.FailOnError { + if u.Cfg.ExitOnError { log.Fatal(err) } log.Error(err) @@ -161,16 +164,16 @@ func (u *S3) Run(upch <-chan string) { atomic.AddInt64(&u.totaln, int64(1)) atomic.AddInt64(&u.queuedn, int64(1)) if err != nil { - errCtx := log.WithFields(log.Fields{"filepath": sourceFilePath}).WithError(err) - if u.Cfg.FailOnError { - errCtx.Fatal("couldn't move") + if u.Cfg.ExitOnError { + return fmt.Errorf("couldn't move: %v", err) } - errCtx.Error("couldn't move") + log.WithFields(log.Fields{"filepath": sourceFilePath}).WithError(err).Error("couldn't move") } } // Stop blocks until the upload goroutine has exited. u.Stop() + return nil } func (u *S3) move(sourceFilePath string) error { @@ -225,9 +228,9 @@ func (u *S3) uploadDirectory() error { ctx.Info("Uploading") sem := make(sem, u.Cfg.Concurrency) ctx.Info("Starting to walk...") - err := filepath.Walk(u.Cfg.StagingPath, func(fpath string, info os.FileInfo, err error) error { - if err != nil { - return err + err := filepath.Walk(u.Cfg.StagingPath, func(fpath string, info os.FileInfo, walkErr error) error { + if walkErr != nil { + return walkErr } if info.IsDir() { return nil @@ -239,14 +242,14 @@ func (u *S3) uploadDirectory() error { defer func() { sem.decr(); wg.Done() }() for i := 0; i < u.Cfg.Retries; i++ { - if err := s3UploadFile(u.uploader, u.Cfg.Bucket, u.Cfg.Prefix, u.Cfg.StagingPath, fpath); err == nil { + if err := u.uploadFn(u.uploader, u.Cfg.Bucket, u.Cfg.Prefix, u.Cfg.StagingPath, fpath); err == nil { atomic.AddInt64(&u.totaln, int64(1)) atomic.AddInt64(&u.queuedn, int64(-1)) break } else { atomic.AddInt64(&u.totalerr, int64(1)) errCtx := log.WithError(err).WithFields(log.Fields{"retry#": i + 1}) - if u.Cfg.FailOnError { + if u.Cfg.ExitOnError { errCtx.Fatal("failed upload") } errCtx.Error("failed upload") diff --git a/upload/s3_test.go b/upload/s3_test.go index 27818d57..ac49a907 100644 --- a/upload/s3_test.go +++ b/upload/s3_test.go @@ -2,6 +2,7 @@ package upload import ( "bytes" + "errors" "fmt" "io/ioutil" "net/http" @@ -14,6 +15,7 @@ import ( "github.com/AdRoll/baker" "github.com/AdRoll/baker/testutil" + log "github.com/sirupsen/logrus" "github.com/aws/aws-sdk-go/aws" "github.com/aws/aws-sdk-go/aws/request" @@ -174,3 +176,130 @@ func TestS3Upload(t *testing.T) { t.Errorf("Wrong number of unique filename: %d, want %d", len(fnames), nfiles) } } + +func Test_uploadDirectory(t *testing.T) { + // Create a folder to store files to be uploaded + srcDir, err := ioutil.TempDir("/tmp", "upload_s3_test") + if err != nil { + t.Fatalf("Can't setup test: %v", err) + } + defer os.Remove(srcDir) + + // Write a bunch of files + numFiles := 10 + for i := 0; i < numFiles; i++ { + fname := filepath.Join(srcDir, fmt.Sprintf("test_file_%d", i)) + + if err := ioutil.WriteFile(fname, []byte("abc"), 0644); err != nil { + t.Fatalf("can't create temp file: %v", err) + } + } + + mockUploadFn := func(uploader *s3manager.Uploader, bucket, prefix, localPath, fpath string) error { + return nil + } + + s3 := &S3{ + Cfg: &S3Config{ + StagingPath: srcDir, + Concurrency: 5, + Retries: 3, + }, + uploadFn: mockUploadFn, + } + + if err := s3.uploadDirectory(); err != nil { + log.Fatal(err) + } + if int(s3.totaln) != numFiles { + t.Fatalf("uploaded: want: %d, got: %d", numFiles, int(s3.totaln)) + } + + if s3.totalerr != 0 { + t.Fatalf("errors: want: %d, got: %d", 0, s3.totalerr) + } +} + +func Test_uploadDirectoryError(t *testing.T) { + // Create a folder to store files to be uploaded + srcDir, err := ioutil.TempDir("/tmp", "upload_s3_test") + if err != nil { + t.Fatalf("Can't setup test: %v", err) + } + defer os.Remove(srcDir) + + // Write a bunch of files + numFiles := 10 + for i := 0; i < numFiles; i++ { + fname := filepath.Join(srcDir, fmt.Sprintf("test_file_%d", i)) + + if err := ioutil.WriteFile(fname, []byte("abc"), 0644); err != nil { + t.Fatalf("can't create temp file: %v", err) + } + } + + mockUploadFn := func(uploader *s3manager.Uploader, bucket, prefix, localPath, fpath string) error { + return errors.New("Fake error") + } + + t.Run("ExitOnError: false", func(t *testing.T) { + s3 := &S3{ + Cfg: &S3Config{ + StagingPath: srcDir, + Concurrency: 5, + Retries: 3, + }, + uploadFn: mockUploadFn, + } + if err := s3.uploadDirectory(); err != nil { + log.Fatal(err) + } + if int(s3.totaln) != 0 { + t.Fatalf("uploaded: want: %d, got: %d", 0, int(s3.totaln)) + } + + if int(s3.totalerr) != numFiles*s3.Cfg.Retries { + t.Fatalf("errors: want: %d, got: %d", numFiles*s3.Cfg.Retries, int(s3.totalerr)) + } + }) +} + +func Test_move(t *testing.T) { + srcDir, err := ioutil.TempDir("/tmp", "upload_s3_test") + if err != nil { + t.Fatalf("Can't setup test: %v", err) + } + defer os.Remove(srcDir) + + trgtDir, err := ioutil.TempDir("/tmp", "upload_s3_test2") + if err != nil { + t.Fatalf("Can't setup test: %v", err) + } + defer os.Remove(trgtDir) + + srcFile := filepath.Join(srcDir, "test_file") + trgtFile := filepath.Join(trgtDir, "test_file") + + if err := ioutil.WriteFile(srcFile, []byte("abc"), 0644); err != nil { + t.Fatalf("can't create temp file: %v", err) + } + + s3 := &S3{ + Cfg: &S3Config{ + StagingPath: trgtDir, + SourceBasePath: srcDir, + }, + } + + if err := s3.move(srcFile); err != nil { + t.Fatal(err) + } + + if _, err := os.Stat(trgtFile); err != nil { + t.Error("moved file not found") + } + + if _, err := os.Stat(srcFile); err == nil { + t.Error("source file still there") + } +} From b906cca7b9f57f08d8bb0426571c752dc2ca1882 Mon Sep 17 00:00:00 2001 From: Tommaso Visconti Date: Thu, 13 Aug 2020 15:29:30 +0200 Subject: [PATCH 04/12] upload: s3: uploadDirectory exits at 1st error if ExitOnError --- upload/s3.go | 15 ++++++++++++--- upload/s3_test.go | 29 +++++++++++++++++++++++++++++ 2 files changed, 41 insertions(+), 3 deletions(-) diff --git a/upload/s3.go b/upload/s3.go index 6c65d4f4..4fb5f7b3 100644 --- a/upload/s3.go +++ b/upload/s3.go @@ -228,10 +228,16 @@ func (u *S3) uploadDirectory() error { ctx.Info("Uploading") sem := make(sem, u.Cfg.Concurrency) ctx.Info("Starting to walk...") + globFatalErr := atomic.Value{} err := filepath.Walk(u.Cfg.StagingPath, func(fpath string, info os.FileInfo, walkErr error) error { if walkErr != nil { return walkErr } + // If a fatal error happened in any of the goroutines, then exit immediately + if globFatalErr.Load() != nil { + return globFatalErr.Load().(error) + } + if info.IsDir() { return nil } @@ -242,17 +248,20 @@ func (u *S3) uploadDirectory() error { defer func() { sem.decr(); wg.Done() }() for i := 0; i < u.Cfg.Retries; i++ { + if globFatalErr.Load() != nil { + return + } if err := u.uploadFn(u.uploader, u.Cfg.Bucket, u.Cfg.Prefix, u.Cfg.StagingPath, fpath); err == nil { atomic.AddInt64(&u.totaln, int64(1)) atomic.AddInt64(&u.queuedn, int64(-1)) break } else { atomic.AddInt64(&u.totalerr, int64(1)) - errCtx := log.WithError(err).WithFields(log.Fields{"retry#": i + 1}) if u.Cfg.ExitOnError { - errCtx.Fatal("failed upload") + globFatalErr.Store(err) + return } - errCtx.Error("failed upload") + log.WithError(err).WithFields(log.Fields{"retry#": i + 1}).Error("failed upload") } } }(fpath) diff --git a/upload/s3_test.go b/upload/s3_test.go index ac49a907..aa174c11 100644 --- a/upload/s3_test.go +++ b/upload/s3_test.go @@ -178,6 +178,7 @@ func TestS3Upload(t *testing.T) { } func Test_uploadDirectory(t *testing.T) { + defer testutil.DisableLogging()() // Create a folder to store files to be uploaded srcDir, err := ioutil.TempDir("/tmp", "upload_s3_test") if err != nil { @@ -221,6 +222,8 @@ func Test_uploadDirectory(t *testing.T) { } func Test_uploadDirectoryError(t *testing.T) { + defer testutil.DisableLogging()() + // Create a folder to store files to be uploaded srcDir, err := ioutil.TempDir("/tmp", "upload_s3_test") if err != nil { @@ -239,6 +242,7 @@ func Test_uploadDirectoryError(t *testing.T) { } mockUploadFn := func(uploader *s3manager.Uploader, bucket, prefix, localPath, fpath string) error { + time.Sleep(100 * time.Millisecond) return errors.New("Fake error") } @@ -262,6 +266,31 @@ func Test_uploadDirectoryError(t *testing.T) { t.Fatalf("errors: want: %d, got: %d", numFiles*s3.Cfg.Retries, int(s3.totalerr)) } }) + + t.Run("ExitOnError: true", func(t *testing.T) { + s3 := &S3{ + Cfg: &S3Config{ + StagingPath: srcDir, + Concurrency: 5, + Retries: 3, + ExitOnError: true, + }, + uploadFn: mockUploadFn, + } + if err := s3.uploadDirectory(); err == nil { + t.Fatalf("expected error") + } + + if int(s3.totaln) != 0 { + t.Fatalf("uploaded: want: %d, got: %d", 0, int(s3.totaln)) + } + + // Uploads run parallelized so we can't expect that only 1 error will happen + // before returning, but for sure they can't be more than the number of concurrency + if int(s3.totalerr) > s3.Cfg.Concurrency { + t.Fatalf("errors: want: <= %d, got: %d", s3.Cfg.Concurrency, int(s3.totalerr)) + } + }) } func Test_move(t *testing.T) { From aae9b99da5ac8f48d4a5d15865ca11e9bfcb2d0a Mon Sep 17 00:00:00 2001 From: Tommaso Visconti Date: Thu, 13 Aug 2020 17:17:31 +0200 Subject: [PATCH 05/12] s3 upload returns error and stops if ExitOnError --- upload/s3.go | 38 +++++---- upload/s3_test.go | 204 +++++++++++++++++++++++++++++++++++++++++----- 2 files changed, 208 insertions(+), 34 deletions(-) diff --git a/upload/s3.go b/upload/s3.go index 4fb5f7b3..97686c76 100644 --- a/upload/s3.go +++ b/upload/s3.go @@ -128,6 +128,11 @@ func newS3(cfg baker.UploadParams) (baker.Upload, error) { } func (u *S3) Run(upch <-chan string) error { + // Stop blocks until the upload goroutine has exited. + defer u.Stop() + + errCh := make(chan error) + // Start a goroutine in which we periodically look at the source // path for files and upload the ones we find. u.wgUpload.Add(1) @@ -137,7 +142,7 @@ func (u *S3) Run(upch <-chan string) error { ticker.Stop() if err := u.uploadDirectory(); err != nil { if u.Cfg.ExitOnError { - log.Fatal(err) + errCh <- err } log.Error(err) } @@ -149,7 +154,8 @@ func (u *S3) Run(upch <-chan string) error { case <-ticker.C: if err := u.uploadDirectory(); err != nil { if u.Cfg.ExitOnError { - log.Fatal(err) + errCh <- err + return } log.Error(err) } @@ -159,21 +165,24 @@ func (u *S3) Run(upch <-chan string) error { } }() - for sourceFilePath := range upch { - err := u.move(sourceFilePath) - atomic.AddInt64(&u.totaln, int64(1)) - atomic.AddInt64(&u.queuedn, int64(1)) - if err != nil { - if u.Cfg.ExitOnError { - return fmt.Errorf("couldn't move: %v", err) + for { + select { + case err := <-errCh: + return err + case sourceFilePath := <-upch: + err := u.move(sourceFilePath) + atomic.AddInt64(&u.totaln, int64(1)) + atomic.AddInt64(&u.queuedn, int64(1)) + if err != nil { + if u.Cfg.ExitOnError { + return fmt.Errorf("couldn't move: %v", err) + } + log.WithFields(log.Fields{"filepath": sourceFilePath}).WithError(err).Error("couldn't move") } - log.WithFields(log.Fields{"filepath": sourceFilePath}).WithError(err).Error("couldn't move") + case <-u.quit: + return nil } } - - // Stop blocks until the upload goroutine has exited. - u.Stop() - return nil } func (u *S3) move(sourceFilePath string) error { @@ -252,7 +261,6 @@ func (u *S3) uploadDirectory() error { return } if err := u.uploadFn(u.uploader, u.Cfg.Bucket, u.Cfg.Prefix, u.Cfg.StagingPath, fpath); err == nil { - atomic.AddInt64(&u.totaln, int64(1)) atomic.AddInt64(&u.queuedn, int64(-1)) break } else { diff --git a/upload/s3_test.go b/upload/s3_test.go index aa174c11..f130bfe8 100644 --- a/upload/s3_test.go +++ b/upload/s3_test.go @@ -10,6 +10,7 @@ import ( "path/filepath" "strings" "sync" + "sync/atomic" "testing" "time" @@ -196,7 +197,9 @@ func Test_uploadDirectory(t *testing.T) { } } + var total int64 mockUploadFn := func(uploader *s3manager.Uploader, bucket, prefix, localPath, fpath string) error { + atomic.AddInt64(&total, 1) return nil } @@ -212,8 +215,8 @@ func Test_uploadDirectory(t *testing.T) { if err := s3.uploadDirectory(); err != nil { log.Fatal(err) } - if int(s3.totaln) != numFiles { - t.Fatalf("uploaded: want: %d, got: %d", numFiles, int(s3.totaln)) + if int(total) != numFiles { + t.Fatalf("uploaded: want: %d, got: %d", numFiles, int(total)) } if s3.totalerr != 0 { @@ -221,8 +224,9 @@ func Test_uploadDirectory(t *testing.T) { } } -func Test_uploadDirectoryError(t *testing.T) { - defer testutil.DisableLogging()() +// prepareUploadS3TestFolder creates a temp forlder and the selected number of files in it +func prepareUploadS3TestFolder(t *testing.T, numFiles int) (string, []string) { + t.Helper() // Create a folder to store files to be uploaded srcDir, err := ioutil.TempDir("/tmp", "upload_s3_test") @@ -232,22 +236,33 @@ func Test_uploadDirectoryError(t *testing.T) { defer os.Remove(srcDir) // Write a bunch of files - numFiles := 10 + var fnames []string for i := 0; i < numFiles; i++ { fname := filepath.Join(srcDir, fmt.Sprintf("test_file_%d", i)) if err := ioutil.WriteFile(fname, []byte("abc"), 0644); err != nil { t.Fatalf("can't create temp file: %v", err) } + + fnames = append(fnames, fname) } + return srcDir, fnames +} + +func Test_uploadDirectoryError(t *testing.T) { + defer testutil.DisableLogging()() + + numFiles := 10 + srcDir, _ := prepareUploadS3TestFolder(t, numFiles) + mockUploadFn := func(uploader *s3manager.Uploader, bucket, prefix, localPath, fpath string) error { time.Sleep(100 * time.Millisecond) return errors.New("Fake error") } t.Run("ExitOnError: false", func(t *testing.T) { - s3 := &S3{ + s3Comp := &S3{ Cfg: &S3Config{ StagingPath: srcDir, Concurrency: 5, @@ -255,20 +270,20 @@ func Test_uploadDirectoryError(t *testing.T) { }, uploadFn: mockUploadFn, } - if err := s3.uploadDirectory(); err != nil { + if err := s3Comp.uploadDirectory(); err != nil { log.Fatal(err) } - if int(s3.totaln) != 0 { - t.Fatalf("uploaded: want: %d, got: %d", 0, int(s3.totaln)) + if int(s3Comp.totaln) != 0 { + t.Fatalf("uploaded: want: %d, got: %d", 0, int(s3Comp.totaln)) } - if int(s3.totalerr) != numFiles*s3.Cfg.Retries { - t.Fatalf("errors: want: %d, got: %d", numFiles*s3.Cfg.Retries, int(s3.totalerr)) + if int(s3Comp.totalerr) != numFiles*s3Comp.Cfg.Retries { + t.Fatalf("errors: want: %d, got: %d", numFiles*s3Comp.Cfg.Retries, int(s3Comp.totalerr)) } }) t.Run("ExitOnError: true", func(t *testing.T) { - s3 := &S3{ + s3Comp := &S3{ Cfg: &S3Config{ StagingPath: srcDir, Concurrency: 5, @@ -277,22 +292,173 @@ func Test_uploadDirectoryError(t *testing.T) { }, uploadFn: mockUploadFn, } - if err := s3.uploadDirectory(); err == nil { + if err := s3Comp.uploadDirectory(); err == nil { t.Fatalf("expected error") } - if int(s3.totaln) != 0 { - t.Fatalf("uploaded: want: %d, got: %d", 0, int(s3.totaln)) - } - // Uploads run parallelized so we can't expect that only 1 error will happen // before returning, but for sure they can't be more than the number of concurrency - if int(s3.totalerr) > s3.Cfg.Concurrency { - t.Fatalf("errors: want: <= %d, got: %d", s3.Cfg.Concurrency, int(s3.totalerr)) + if int(s3Comp.totalerr) > s3Comp.Cfg.Concurrency { + t.Fatalf("errors: want: <=%d, got: %d", s3Comp.Cfg.Concurrency, int(s3Comp.totalerr)) } }) } +func TestRun(t *testing.T) { + defer testutil.DisableLogging()() + + tmpDir, fnames := prepareUploadS3TestFolder(t, 1) + fname := fnames[0] + + stagingDir, err := ioutil.TempDir("/tmp", "upload_s3_test_staging") + if err != nil { + t.Fatalf("Can't setup test: %v", err) + } + mockUploadFn := func(uploader *s3manager.Uploader, bucket, prefix, localPath, fpath string) error { + // time.Sleep(100 * time.Millisecond) + // return errors.New("Fake error") + os.Remove(fpath) + return nil + } + + s3Comp := &S3{ + Cfg: &S3Config{ + StagingPath: stagingDir, + SourceBasePath: tmpDir, + Concurrency: 5, + Retries: 3, + Interval: 1 * time.Second, + }, + quit: make(chan struct{}), + uploadFn: mockUploadFn, + } + + upCh := make(chan string) + wg := sync.WaitGroup{} + wg.Add(1) + go func() { + defer wg.Done() + if err := s3Comp.Run(upCh); err != nil { + t.Fatal(err) + } + }() + + upCh <- fname + s3Comp.Stop() + wg.Wait() + + if int(s3Comp.totalerr) != 0 { + t.Fatalf("totalerr: want: %d, got: %d", 0, int(s3Comp.totalerr)) + } + + if int(s3Comp.totaln) != 1 { + t.Fatalf("totaln: want: %d, got: %d", 1, int(s3Comp.totaln)) + } +} + +func TestRunExitOnError(t *testing.T) { + defer testutil.DisableLogging()() + + tmpDir, fnames := prepareUploadS3TestFolder(t, 1) + fname := fnames[0] + + stagingDir, err := ioutil.TempDir("/tmp", "upload_s3_test_staging") + if err != nil { + t.Fatalf("Can't setup test: %v", err) + } + mockUploadFn := func(uploader *s3manager.Uploader, bucket, prefix, localPath, fpath string) error { + time.Sleep(100 * time.Millisecond) + return errors.New("Fake error") + } + + s3Comp := &S3{ + Cfg: &S3Config{ + StagingPath: stagingDir, + SourceBasePath: tmpDir, + ExitOnError: true, + Concurrency: 5, + Retries: 3, + Interval: 1 * time.Second, + }, + quit: make(chan struct{}), + uploadFn: mockUploadFn, + } + + upCh := make(chan string) + wg := sync.WaitGroup{} + wg.Add(1) + go func() { + defer wg.Done() + if err := s3Comp.Run(upCh); err != nil { + t.Fatal(err) + } + }() + + upCh <- fname + time.Sleep(100 * time.Millisecond) + s3Comp.Stop() + wg.Wait() + + if int(s3Comp.totalerr) != 1 { + t.Fatalf("totalerr: want: %d, got: %d", 1, int(s3Comp.totalerr)) + } + + if int(s3Comp.totaln) != 1 { + t.Fatalf("totaln: want: %d, got: %d", 1, int(s3Comp.totaln)) + } +} + +func TestRunNotExitOnError(t *testing.T) { + defer testutil.DisableLogging()() + + tmpDir, fnames := prepareUploadS3TestFolder(t, 1) + fname := fnames[0] + + stagingDir, err := ioutil.TempDir("/tmp", "upload_s3_test_staging") + if err != nil { + t.Fatalf("Can't setup test: %v", err) + } + mockUploadFn := func(uploader *s3manager.Uploader, bucket, prefix, localPath, fpath string) error { + time.Sleep(100 * time.Millisecond) + return errors.New("Fake error") + } + + s3Comp := &S3{ + Cfg: &S3Config{ + StagingPath: stagingDir, + SourceBasePath: tmpDir, + ExitOnError: false, + Concurrency: 5, + Retries: 3, + Interval: 1 * time.Second, + }, + quit: make(chan struct{}), + uploadFn: mockUploadFn, + } + + upCh := make(chan string) + wg := sync.WaitGroup{} + wg.Add(1) + go func() { + defer wg.Done() + if err := s3Comp.Run(upCh); err != nil { + t.Fatal(err) + } + }() + + upCh <- fname + s3Comp.Stop() + wg.Wait() + + if int(s3Comp.totalerr) > 1*s3Comp.Cfg.Retries { + t.Fatalf("totalerr: want: <=%d, got: %d", 1*s3Comp.Cfg.Retries, int(s3Comp.totalerr)) + } + + if int(s3Comp.totaln) != 1 { + t.Fatalf("totaln: want: %d, got: %d", 1, int(s3Comp.totaln)) + } +} + func Test_move(t *testing.T) { srcDir, err := ioutil.TempDir("/tmp", "upload_s3_test") if err != nil { From 0e2acce5454f5cca40ed7a513d437efabe94113a Mon Sep 17 00:00:00 2001 From: Tommaso Visconti Date: Thu, 13 Aug 2020 17:19:40 +0200 Subject: [PATCH 06/12] upload changelog --- CHANGELOG.md | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index f64742ab..59047c20 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -14,7 +14,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 - output: add Stats output [#23](https://github.com/AdRoll/baker/pull/23) - filter: add SetStringFromURL filter [#28](https://github.com/AdRoll/baker/pull/28) - output: add FileWriter output in replacement of Files output [#31](https://github.com/AdRoll/baker/pull/31) -- upload: s3: add `FailOnErrors` configuration [#27](https://github.com/AdRoll/baker/pull/27) +- upload: s3: add `ExitOnError` configuration [#27](https://github.com/AdRoll/baker/pull/27) +- uploads now return an error instead of panicking and baker deals with it [#27](https://github.com/AdRoll/baker/pull/27) ### Changed From b4e94145bd121f900a2f8decef8ad6defea30ff9 Mon Sep 17 00:00:00 2001 From: Tommaso Visconti Date: Thu, 13 Aug 2020 17:57:33 +0200 Subject: [PATCH 07/12] try to fix error on CI --- upload/s3_test.go | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/upload/s3_test.go b/upload/s3_test.go index f130bfe8..87801ea8 100644 --- a/upload/s3_test.go +++ b/upload/s3_test.go @@ -181,7 +181,7 @@ func TestS3Upload(t *testing.T) { func Test_uploadDirectory(t *testing.T) { defer testutil.DisableLogging()() // Create a folder to store files to be uploaded - srcDir, err := ioutil.TempDir("/tmp", "upload_s3_test") + srcDir, err := ioutil.TempDir(".", "upload_s3_test") if err != nil { t.Fatalf("Can't setup test: %v", err) } @@ -229,7 +229,7 @@ func prepareUploadS3TestFolder(t *testing.T, numFiles int) (string, []string) { t.Helper() // Create a folder to store files to be uploaded - srcDir, err := ioutil.TempDir("/tmp", "upload_s3_test") + srcDir, err := ioutil.TempDir(".", "upload_s3_test") if err != nil { t.Fatalf("Can't setup test: %v", err) } @@ -310,7 +310,7 @@ func TestRun(t *testing.T) { tmpDir, fnames := prepareUploadS3TestFolder(t, 1) fname := fnames[0] - stagingDir, err := ioutil.TempDir("/tmp", "upload_s3_test_staging") + stagingDir, err := ioutil.TempDir(".", "upload_s3_test_staging") if err != nil { t.Fatalf("Can't setup test: %v", err) } @@ -362,7 +362,7 @@ func TestRunExitOnError(t *testing.T) { tmpDir, fnames := prepareUploadS3TestFolder(t, 1) fname := fnames[0] - stagingDir, err := ioutil.TempDir("/tmp", "upload_s3_test_staging") + stagingDir, err := ioutil.TempDir(".", "upload_s3_test_staging") if err != nil { t.Fatalf("Can't setup test: %v", err) } @@ -414,7 +414,7 @@ func TestRunNotExitOnError(t *testing.T) { tmpDir, fnames := prepareUploadS3TestFolder(t, 1) fname := fnames[0] - stagingDir, err := ioutil.TempDir("/tmp", "upload_s3_test_staging") + stagingDir, err := ioutil.TempDir(".", "upload_s3_test_staging") if err != nil { t.Fatalf("Can't setup test: %v", err) } @@ -460,13 +460,13 @@ func TestRunNotExitOnError(t *testing.T) { } func Test_move(t *testing.T) { - srcDir, err := ioutil.TempDir("/tmp", "upload_s3_test") + srcDir, err := ioutil.TempDir(".", "upload_s3_test") if err != nil { t.Fatalf("Can't setup test: %v", err) } defer os.Remove(srcDir) - trgtDir, err := ioutil.TempDir("/tmp", "upload_s3_test2") + trgtDir, err := ioutil.TempDir(".", "upload_s3_test2") if err != nil { t.Fatalf("Can't setup test: %v", err) } From cdaf0411c964ce48bb59d82131c2a665156b1646 Mon Sep 17 00:00:00 2001 From: Tommaso Visconti Date: Mon, 7 Sep 2020 12:18:06 +0200 Subject: [PATCH 08/12] fix behaviour on channel close --- upload/s3.go | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/upload/s3.go b/upload/s3.go index 97686c76..a1aabf44 100644 --- a/upload/s3.go +++ b/upload/s3.go @@ -164,12 +164,14 @@ func (u *S3) Run(upch <-chan string) error { } } }() - for { select { case err := <-errCh: return err - case sourceFilePath := <-upch: + case sourceFilePath, more := <-upch: + if !more { + return nil + } err := u.move(sourceFilePath) atomic.AddInt64(&u.totaln, int64(1)) atomic.AddInt64(&u.queuedn, int64(1)) From 34894e76fe6d2ee209927c97ab1f62696acf95e5 Mon Sep 17 00:00:00 2001 From: Tommaso Visconti Date: Tue, 8 Sep 2020 11:35:18 +0200 Subject: [PATCH 09/12] address comments --- upload/s3.go | 11 +++---- upload/s3_test.go | 76 +++++++++++++++++------------------------------ 2 files changed, 33 insertions(+), 54 deletions(-) diff --git a/upload/s3.go b/upload/s3.go index a1aabf44..058d1188 100644 --- a/upload/s3.go +++ b/upload/s3.go @@ -239,14 +239,15 @@ func (u *S3) uploadDirectory() error { ctx.Info("Uploading") sem := make(sem, u.Cfg.Concurrency) ctx.Info("Starting to walk...") - globFatalErr := atomic.Value{} + exitErr := atomic.Value{} err := filepath.Walk(u.Cfg.StagingPath, func(fpath string, info os.FileInfo, walkErr error) error { if walkErr != nil { return walkErr } // If a fatal error happened in any of the goroutines, then exit immediately - if globFatalErr.Load() != nil { - return globFatalErr.Load().(error) + e := exitErr.Load() + if e != nil { + return e.(error) } if info.IsDir() { @@ -259,7 +260,7 @@ func (u *S3) uploadDirectory() error { defer func() { sem.decr(); wg.Done() }() for i := 0; i < u.Cfg.Retries; i++ { - if globFatalErr.Load() != nil { + if exitErr.Load() != nil { return } if err := u.uploadFn(u.uploader, u.Cfg.Bucket, u.Cfg.Prefix, u.Cfg.StagingPath, fpath); err == nil { @@ -268,7 +269,7 @@ func (u *S3) uploadDirectory() error { } else { atomic.AddInt64(&u.totalerr, int64(1)) if u.Cfg.ExitOnError { - globFatalErr.Store(err) + exitErr.Store(err) return } log.WithError(err).WithFields(log.Fields{"retry#": i + 1}).Error("failed upload") diff --git a/upload/s3_test.go b/upload/s3_test.go index 87801ea8..d2303587 100644 --- a/upload/s3_test.go +++ b/upload/s3_test.go @@ -93,20 +93,9 @@ func TestS3Upload(t *testing.T) { // Create many files. const nfiles = 10000 - srcDir, rmSrcDir := testutil.TempDir(t) + srcDir, paths, rmSrcDir := prepareUploadS3TestFolder(t, nfiles) defer rmSrcDir() - paths := make([]string, nfiles) - for i := range paths { - paths[i] = filepath.Join(srcDir, fmt.Sprintf("file%d", i)) - } - - for _, path := range paths { - if err := ioutil.WriteFile(path, []byte("foo"), os.ModePerm); err != nil { - t.Fatal(err) - } - } - cfg := baker.UploadParams{ ComponentParams: baker.ComponentParams{ DecodedConfig: &S3Config{ @@ -181,21 +170,9 @@ func TestS3Upload(t *testing.T) { func Test_uploadDirectory(t *testing.T) { defer testutil.DisableLogging()() // Create a folder to store files to be uploaded - srcDir, err := ioutil.TempDir(".", "upload_s3_test") - if err != nil { - t.Fatalf("Can't setup test: %v", err) - } - defer os.Remove(srcDir) - - // Write a bunch of files numFiles := 10 - for i := 0; i < numFiles; i++ { - fname := filepath.Join(srcDir, fmt.Sprintf("test_file_%d", i)) - - if err := ioutil.WriteFile(fname, []byte("abc"), 0644); err != nil { - t.Fatalf("can't create temp file: %v", err) - } - } + srcDir, _, rmSrcDir := prepareUploadS3TestFolder(t, numFiles) + defer rmSrcDir() var total int64 mockUploadFn := func(uploader *s3manager.Uploader, bucket, prefix, localPath, fpath string) error { @@ -225,15 +202,11 @@ func Test_uploadDirectory(t *testing.T) { } // prepareUploadS3TestFolder creates a temp forlder and the selected number of files in it -func prepareUploadS3TestFolder(t *testing.T, numFiles int) (string, []string) { +func prepareUploadS3TestFolder(t *testing.T, numFiles int) (string, []string, func()) { t.Helper() // Create a folder to store files to be uploaded - srcDir, err := ioutil.TempDir(".", "upload_s3_test") - if err != nil { - t.Fatalf("Can't setup test: %v", err) - } - defer os.Remove(srcDir) + srcDir, rmSrcDir := testutil.TempDir(t) // Write a bunch of files var fnames []string @@ -247,17 +220,17 @@ func prepareUploadS3TestFolder(t *testing.T, numFiles int) (string, []string) { fnames = append(fnames, fname) } - return srcDir, fnames + return srcDir, fnames, rmSrcDir } func Test_uploadDirectoryError(t *testing.T) { defer testutil.DisableLogging()() numFiles := 10 - srcDir, _ := prepareUploadS3TestFolder(t, numFiles) + srcDir, _, rmSrcDir := prepareUploadS3TestFolder(t, numFiles) + defer rmSrcDir() mockUploadFn := func(uploader *s3manager.Uploader, bucket, prefix, localPath, fpath string) error { - time.Sleep(100 * time.Millisecond) return errors.New("Fake error") } @@ -307,16 +280,17 @@ func Test_uploadDirectoryError(t *testing.T) { func TestRun(t *testing.T) { defer testutil.DisableLogging()() - tmpDir, fnames := prepareUploadS3TestFolder(t, 1) + tmpDir, fnames, rmTmpDir := prepareUploadS3TestFolder(t, 1) + defer rmTmpDir() fname := fnames[0] - stagingDir, err := ioutil.TempDir(".", "upload_s3_test_staging") + stagingDir, err := ioutil.TempDir("", t.Name()) if err != nil { t.Fatalf("Can't setup test: %v", err) } + defer os.RemoveAll(stagingDir) + mockUploadFn := func(uploader *s3manager.Uploader, bucket, prefix, localPath, fpath string) error { - // time.Sleep(100 * time.Millisecond) - // return errors.New("Fake error") os.Remove(fpath) return nil } @@ -359,15 +333,17 @@ func TestRun(t *testing.T) { func TestRunExitOnError(t *testing.T) { defer testutil.DisableLogging()() - tmpDir, fnames := prepareUploadS3TestFolder(t, 1) + tmpDir, fnames, rmTmpDir := prepareUploadS3TestFolder(t, 1) + defer rmTmpDir() fname := fnames[0] - stagingDir, err := ioutil.TempDir(".", "upload_s3_test_staging") + stagingDir, err := ioutil.TempDir("", t.Name()) if err != nil { t.Fatalf("Can't setup test: %v", err) } + defer os.RemoveAll(stagingDir) + mockUploadFn := func(uploader *s3manager.Uploader, bucket, prefix, localPath, fpath string) error { - time.Sleep(100 * time.Millisecond) return errors.New("Fake error") } @@ -411,15 +387,17 @@ func TestRunExitOnError(t *testing.T) { func TestRunNotExitOnError(t *testing.T) { defer testutil.DisableLogging()() - tmpDir, fnames := prepareUploadS3TestFolder(t, 1) + tmpDir, fnames, rmTmpDir := prepareUploadS3TestFolder(t, 1) + defer rmTmpDir() fname := fnames[0] - stagingDir, err := ioutil.TempDir(".", "upload_s3_test_staging") + stagingDir, err := ioutil.TempDir("", t.Name()) if err != nil { t.Fatalf("Can't setup test: %v", err) } + defer os.RemoveAll(stagingDir) + mockUploadFn := func(uploader *s3manager.Uploader, bucket, prefix, localPath, fpath string) error { - time.Sleep(100 * time.Millisecond) return errors.New("Fake error") } @@ -460,17 +438,17 @@ func TestRunNotExitOnError(t *testing.T) { } func Test_move(t *testing.T) { - srcDir, err := ioutil.TempDir(".", "upload_s3_test") + srcDir, err := ioutil.TempDir("", t.Name()) if err != nil { t.Fatalf("Can't setup test: %v", err) } - defer os.Remove(srcDir) + defer os.RemoveAll(srcDir) - trgtDir, err := ioutil.TempDir(".", "upload_s3_test2") + trgtDir, err := ioutil.TempDir("", fmt.Sprintf("%s-trgt", t.Name())) if err != nil { t.Fatalf("Can't setup test: %v", err) } - defer os.Remove(trgtDir) + defer os.RemoveAll(trgtDir) srcFile := filepath.Join(srcDir, "test_file") trgtFile := filepath.Join(trgtDir, "test_file") From 633a156654506b7fd79e793d1c457c13eef1193c Mon Sep 17 00:00:00 2001 From: Tommaso Visconti Date: Tue, 8 Sep 2020 15:23:38 +0200 Subject: [PATCH 10/12] tests refactoring to use S3Mock --- upload/s3.go | 20 ++-- upload/s3_test.go | 299 +++++++++++++++++++++++++--------------------- 2 files changed, 174 insertions(+), 145 deletions(-) diff --git a/upload/s3.go b/upload/s3.go index 058d1188..fe3d802d 100644 --- a/upload/s3.go +++ b/upload/s3.go @@ -105,8 +105,6 @@ type S3 struct { totaln int64 totalerr int64 queuedn int64 - - uploadFn func(uploader *s3manager.Uploader, bucket, prefix, localPath, fpath string) error } func newS3(cfg baker.UploadParams) (baker.Upload, error) { @@ -123,7 +121,6 @@ func newS3(cfg baker.UploadParams) (baker.Upload, error) { Cfg: dcfg, uploader: s3manager.NewUploaderWithClient(s3svc), quit: make(chan struct{}), - uploadFn: s3UploadFile, }, nil } @@ -263,17 +260,18 @@ func (u *S3) uploadDirectory() error { if exitErr.Load() != nil { return } - if err := u.uploadFn(u.uploader, u.Cfg.Bucket, u.Cfg.Prefix, u.Cfg.StagingPath, fpath); err == nil { + err := s3UploadFile(u.uploader, u.Cfg.Bucket, u.Cfg.Prefix, u.Cfg.StagingPath, fpath) + if err == nil { atomic.AddInt64(&u.queuedn, int64(-1)) break - } else { - atomic.AddInt64(&u.totalerr, int64(1)) - if u.Cfg.ExitOnError { - exitErr.Store(err) - return - } - log.WithError(err).WithFields(log.Fields{"retry#": i + 1}).Error("failed upload") } + + atomic.AddInt64(&u.totalerr, int64(1)) + if u.Cfg.ExitOnError { + exitErr.Store(err) + return + } + log.WithError(err).WithFields(log.Fields{"retry#": i + 1}).Error("failed upload") } }(fpath) return nil diff --git a/upload/s3_test.go b/upload/s3_test.go index d2303587..b85643d2 100644 --- a/upload/s3_test.go +++ b/upload/s3_test.go @@ -2,7 +2,6 @@ package upload import ( "bytes" - "errors" "fmt" "io/ioutil" "net/http" @@ -10,7 +9,6 @@ import ( "path/filepath" "strings" "sync" - "sync/atomic" "testing" "time" @@ -25,14 +23,14 @@ import ( "github.com/aws/aws-sdk-go/service/s3/s3manager" ) -// mockS3Service returns a mocked s3.S3 service which records all operations +// mockS3Service(false) returns a mocked s3.S3 service which records all operations // related to Upload S3 API calls. // // Once all interactions with the returned service have ended, and not before // that, ops and params can be accessed. ops and params will hold the list of // AWS S3 API calls and their parameters. For instance, if ops[0] is "PutObject" // then params[0] is a *s3.PutObjectInput. -func mockS3Service() (svc *s3.S3, ops *[]string, params *[]interface{}) { +func mockS3Service(wantErr bool) (svc *s3.S3, ops *[]string, params *[]interface{}) { const respMsg = ` mockValue @@ -59,6 +57,13 @@ func mockS3Service() (svc *s3.S3, ops *[]string, params *[]interface{}) { *ops = append(*ops, r.Operation.Name) *params = append(*params, r.Params) + if wantErr { + r.HTTPResponse = &http.Response{ + StatusCode: 400, + } + return + } + r.HTTPResponse = &http.Response{ StatusCode: 200, Body: ioutil.NopCloser(bytes.NewReader([]byte(respMsg))), @@ -81,6 +86,28 @@ func mockS3Service() (svc *s3.S3, ops *[]string, params *[]interface{}) { return svc, ops, params } +// prepareUploadS3TestFolder creates a temp forlder and the selected number of files in it +func prepareUploadS3TestFolder(t *testing.T, numFiles int) (string, []string, func()) { + t.Helper() + + // Create a folder to store files to be uploaded + srcDir, rmSrcDir := testutil.TempDir(t) + + // Write a bunch of files + var fnames []string + for i := 0; i < numFiles; i++ { + fname := filepath.Join(srcDir, fmt.Sprintf("test_file_%d", i)) + + if err := ioutil.WriteFile(fname, []byte("abc"), 0644); err != nil { + t.Fatalf("can't create temp file: %v", err) + } + + fnames = append(fnames, fname) + } + + return srcDir, fnames, rmSrcDir +} + func TestS3Upload(t *testing.T) { // Through the use of a mocked S3 service, this test verifies that sending // 10000 files to an S3Upload results in 10000 S3 Upload API calls. @@ -104,7 +131,6 @@ func TestS3Upload(t *testing.T) { Region: "us-west-2", Bucket: "my-bucket", Prefix: "my-prefix", - Concurrency: 16, Interval: 1 * time.Millisecond, }, }, @@ -115,10 +141,9 @@ func TestS3Upload(t *testing.T) { } // Replace S3Upload.manager with a mocked s3 service. - s, ops, params := mockS3Service() + s, ops, params := mockS3Service(false) u := iu.(*S3) u.uploader = s3manager.NewUploaderWithClient(s) - u.uploader.Concurrency = 10 // Fill the uploader channel with 10k files. upch := make(chan string, len(paths)) @@ -130,9 +155,6 @@ func TestS3Upload(t *testing.T) { // Wait for the uploader to exit. u.Run(upch) - if len(*ops) != nfiles { - t.Fatalf("S3 operations count = %d, want %d", len(*ops), nfiles) - } if len(*ops) != nfiles { t.Fatalf("S3 operation params count = %d, want %d", len(*ops), nfiles) } @@ -174,53 +196,38 @@ func Test_uploadDirectory(t *testing.T) { srcDir, _, rmSrcDir := prepareUploadS3TestFolder(t, numFiles) defer rmSrcDir() - var total int64 - mockUploadFn := func(uploader *s3manager.Uploader, bucket, prefix, localPath, fpath string) error { - atomic.AddInt64(&total, 1) - return nil - } - - s3 := &S3{ - Cfg: &S3Config{ - StagingPath: srcDir, - Concurrency: 5, - Retries: 3, + cfg := baker.UploadParams{ + ComponentParams: baker.ComponentParams{ + DecodedConfig: &S3Config{ + SourceBasePath: srcDir, + StagingPath: srcDir, + Bucket: "my-bucket", + Concurrency: 5, + Retries: 3, + }, }, - uploadFn: mockUploadFn, } + iu, err := newS3(cfg) + if err != nil { + t.Fatalf("NewS3Upload(%+v) = %q", cfg, err) + } + s, ops, _ := mockS3Service(false) + u := iu.(*S3) + u.uploader = s3manager.NewUploaderWithClient(s) + u.uploader.Concurrency = 5 - if err := s3.uploadDirectory(); err != nil { + if err := u.uploadDirectory(); err != nil { log.Fatal(err) } - if int(total) != numFiles { - t.Fatalf("uploaded: want: %d, got: %d", numFiles, int(total)) - } - - if s3.totalerr != 0 { - t.Fatalf("errors: want: %d, got: %d", 0, s3.totalerr) + if len(*ops) != numFiles { + t.Fatalf("S3 operations count = %d, want %d", len(*ops), numFiles) } -} - -// prepareUploadS3TestFolder creates a temp forlder and the selected number of files in it -func prepareUploadS3TestFolder(t *testing.T, numFiles int) (string, []string, func()) { - t.Helper() - - // Create a folder to store files to be uploaded - srcDir, rmSrcDir := testutil.TempDir(t) - - // Write a bunch of files - var fnames []string - for i := 0; i < numFiles; i++ { - fname := filepath.Join(srcDir, fmt.Sprintf("test_file_%d", i)) - if err := ioutil.WriteFile(fname, []byte("abc"), 0644); err != nil { - t.Fatalf("can't create temp file: %v", err) + for i := range *ops { + if (*ops)[i] != "PutObject" { + t.Fatalf("ops[%d] = %q, want PutObject", i, (*ops)[i]) } - - fnames = append(fnames, fname) } - - return srcDir, fnames, rmSrcDir } func Test_uploadDirectoryError(t *testing.T) { @@ -230,49 +237,68 @@ func Test_uploadDirectoryError(t *testing.T) { srcDir, _, rmSrcDir := prepareUploadS3TestFolder(t, numFiles) defer rmSrcDir() - mockUploadFn := func(uploader *s3manager.Uploader, bucket, prefix, localPath, fpath string) error { - return errors.New("Fake error") - } - t.Run("ExitOnError: false", func(t *testing.T) { - s3Comp := &S3{ - Cfg: &S3Config{ - StagingPath: srcDir, - Concurrency: 5, - Retries: 3, + cfg := baker.UploadParams{ + ComponentParams: baker.ComponentParams{ + DecodedConfig: &S3Config{ + SourceBasePath: srcDir, + StagingPath: srcDir, + Bucket: "my-bucket", + Concurrency: 5, + Retries: 3, + ExitOnError: false, + }, }, - uploadFn: mockUploadFn, } - if err := s3Comp.uploadDirectory(); err != nil { + iu, err := newS3(cfg) + if err != nil { + t.Fatalf("NewS3Upload(%+v) = %q", cfg, err) + } + s, _, _ := mockS3Service(true) + u := iu.(*S3) + u.uploader = s3manager.NewUploaderWithClient(s) + + if err := u.uploadDirectory(); err != nil { log.Fatal(err) } - if int(s3Comp.totaln) != 0 { - t.Fatalf("uploaded: want: %d, got: %d", 0, int(s3Comp.totaln)) + if int(u.totaln) != 0 { + t.Fatalf("uploaded: want: %d, got: %d", 0, int(u.totaln)) } - if int(s3Comp.totalerr) != numFiles*s3Comp.Cfg.Retries { - t.Fatalf("errors: want: %d, got: %d", numFiles*s3Comp.Cfg.Retries, int(s3Comp.totalerr)) + if int(u.totalerr) != numFiles*u.Cfg.Retries { + t.Fatalf("errors: want: %d, got: %d", numFiles*u.Cfg.Retries, int(u.totalerr)) } }) t.Run("ExitOnError: true", func(t *testing.T) { - s3Comp := &S3{ - Cfg: &S3Config{ - StagingPath: srcDir, - Concurrency: 5, - Retries: 3, - ExitOnError: true, + cfg := baker.UploadParams{ + ComponentParams: baker.ComponentParams{ + DecodedConfig: &S3Config{ + SourceBasePath: srcDir, + StagingPath: srcDir, + Bucket: "my-bucket", + Concurrency: 5, + Retries: 3, + ExitOnError: true, + }, }, - uploadFn: mockUploadFn, } - if err := s3Comp.uploadDirectory(); err == nil { + iu, err := newS3(cfg) + if err != nil { + t.Fatalf("NewS3Upload(%+v) = %q", cfg, err) + } + s, _, _ := mockS3Service(true) + u := iu.(*S3) + u.uploader = s3manager.NewUploaderWithClient(s) + + if err := u.uploadDirectory(); err == nil { t.Fatalf("expected error") } // Uploads run parallelized so we can't expect that only 1 error will happen // before returning, but for sure they can't be more than the number of concurrency - if int(s3Comp.totalerr) > s3Comp.Cfg.Concurrency { - t.Fatalf("errors: want: <=%d, got: %d", s3Comp.Cfg.Concurrency, int(s3Comp.totalerr)) + if int(u.totalerr) > u.Cfg.Concurrency { + t.Fatalf("errors: want: <=%d, got: %d", u.Cfg.Concurrency, int(u.totalerr)) } }) } @@ -290,43 +316,45 @@ func TestRun(t *testing.T) { } defer os.RemoveAll(stagingDir) - mockUploadFn := func(uploader *s3manager.Uploader, bucket, prefix, localPath, fpath string) error { - os.Remove(fpath) - return nil - } - - s3Comp := &S3{ - Cfg: &S3Config{ - StagingPath: stagingDir, - SourceBasePath: tmpDir, - Concurrency: 5, - Retries: 3, - Interval: 1 * time.Second, + cfg := baker.UploadParams{ + ComponentParams: baker.ComponentParams{ + DecodedConfig: &S3Config{ + SourceBasePath: stagingDir, + StagingPath: tmpDir, + Bucket: "my-bucket", + Concurrency: 5, + Retries: 3, + }, }, - quit: make(chan struct{}), - uploadFn: mockUploadFn, } + iu, err := newS3(cfg) + if err != nil { + t.Fatalf("NewS3Upload(%+v) = %q", cfg, err) + } + s, _, _ := mockS3Service(false) + u := iu.(*S3) + u.uploader = s3manager.NewUploaderWithClient(s) upCh := make(chan string) wg := sync.WaitGroup{} wg.Add(1) go func() { defer wg.Done() - if err := s3Comp.Run(upCh); err != nil { + if err := u.Run(upCh); err != nil { t.Fatal(err) } }() upCh <- fname - s3Comp.Stop() + u.Stop() wg.Wait() - if int(s3Comp.totalerr) != 0 { - t.Fatalf("totalerr: want: %d, got: %d", 0, int(s3Comp.totalerr)) + if int(u.totalerr) != 0 { + t.Fatalf("totalerr: want: %d, got: %d", 0, int(u.totalerr)) } - if int(s3Comp.totaln) != 1 { - t.Fatalf("totaln: want: %d, got: %d", 1, int(s3Comp.totaln)) + if int(u.totaln) != 1 { + t.Fatalf("totaln: want: %d, got: %d", 1, int(u.totaln)) } } @@ -343,44 +371,44 @@ func TestRunExitOnError(t *testing.T) { } defer os.RemoveAll(stagingDir) - mockUploadFn := func(uploader *s3manager.Uploader, bucket, prefix, localPath, fpath string) error { - return errors.New("Fake error") - } - - s3Comp := &S3{ - Cfg: &S3Config{ - StagingPath: stagingDir, - SourceBasePath: tmpDir, - ExitOnError: true, - Concurrency: 5, - Retries: 3, - Interval: 1 * time.Second, + cfg := baker.UploadParams{ + ComponentParams: baker.ComponentParams{ + DecodedConfig: &S3Config{ + SourceBasePath: stagingDir, + StagingPath: tmpDir, + Bucket: "my-bucket", + ExitOnError: true, + }, }, - quit: make(chan struct{}), - uploadFn: mockUploadFn, } + iu, err := newS3(cfg) + if err != nil { + t.Fatalf("NewS3Upload(%+v) = %q", cfg, err) + } + s, _, _ := mockS3Service(true) + u := iu.(*S3) + u.uploader = s3manager.NewUploaderWithClient(s) upCh := make(chan string) wg := sync.WaitGroup{} wg.Add(1) go func() { defer wg.Done() - if err := s3Comp.Run(upCh); err != nil { + if err := u.Run(upCh); err != nil { t.Fatal(err) } }() upCh <- fname - time.Sleep(100 * time.Millisecond) - s3Comp.Stop() + u.Stop() wg.Wait() - if int(s3Comp.totalerr) != 1 { - t.Fatalf("totalerr: want: %d, got: %d", 1, int(s3Comp.totalerr)) + if int(u.totalerr) != 1 { + t.Fatalf("totalerr: want: %d, got: %d", 1, int(u.totalerr)) } - if int(s3Comp.totaln) != 1 { - t.Fatalf("totaln: want: %d, got: %d", 1, int(s3Comp.totaln)) + if int(u.totaln) != 1 { + t.Fatalf("totaln: want: %d, got: %d", 1, int(u.totaln)) } } @@ -397,43 +425,46 @@ func TestRunNotExitOnError(t *testing.T) { } defer os.RemoveAll(stagingDir) - mockUploadFn := func(uploader *s3manager.Uploader, bucket, prefix, localPath, fpath string) error { - return errors.New("Fake error") - } - - s3Comp := &S3{ - Cfg: &S3Config{ - StagingPath: stagingDir, - SourceBasePath: tmpDir, - ExitOnError: false, - Concurrency: 5, - Retries: 3, - Interval: 1 * time.Second, + cfg := baker.UploadParams{ + ComponentParams: baker.ComponentParams{ + DecodedConfig: &S3Config{ + SourceBasePath: stagingDir, + StagingPath: tmpDir, + Bucket: "my-bucket", + Concurrency: 5, + Retries: 3, + ExitOnError: false, + }, }, - quit: make(chan struct{}), - uploadFn: mockUploadFn, } + iu, err := newS3(cfg) + if err != nil { + t.Fatalf("NewS3Upload(%+v) = %q", cfg, err) + } + s, _, _ := mockS3Service(true) + u := iu.(*S3) + u.uploader = s3manager.NewUploaderWithClient(s) upCh := make(chan string) wg := sync.WaitGroup{} wg.Add(1) go func() { defer wg.Done() - if err := s3Comp.Run(upCh); err != nil { + if err := u.Run(upCh); err != nil { t.Fatal(err) } }() upCh <- fname - s3Comp.Stop() + u.Stop() wg.Wait() - if int(s3Comp.totalerr) > 1*s3Comp.Cfg.Retries { - t.Fatalf("totalerr: want: <=%d, got: %d", 1*s3Comp.Cfg.Retries, int(s3Comp.totalerr)) + if int(u.totalerr) > 1*u.Cfg.Retries { + t.Fatalf("totalerr: want: <=%d, got: %d", 1*u.Cfg.Retries, int(u.totalerr)) } - if int(s3Comp.totaln) != 1 { - t.Fatalf("totaln: want: %d, got: %d", 1, int(s3Comp.totaln)) + if int(u.totaln) != 1 { + t.Fatalf("totaln: want: %d, got: %d", 1, int(u.totaln)) } } From 240dd8fd9d7b1e380cacc2719f34f1c8d8760cda Mon Sep 17 00:00:00 2001 From: Tommaso Visconti Date: Tue, 8 Sep 2020 15:25:23 +0200 Subject: [PATCH 11/12] nitting --- upload/s3_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/upload/s3_test.go b/upload/s3_test.go index b85643d2..2d412635 100644 --- a/upload/s3_test.go +++ b/upload/s3_test.go @@ -23,7 +23,7 @@ import ( "github.com/aws/aws-sdk-go/service/s3/s3manager" ) -// mockS3Service(false) returns a mocked s3.S3 service which records all operations +// mockS3Service returns a mocked s3.S3 service which records all operations // related to Upload S3 API calls. // // Once all interactions with the returned service have ended, and not before From 9b30a7d0e9911d2e22dfad003cec1684df26077a Mon Sep 17 00:00:00 2001 From: Tommaso Visconti Date: Tue, 8 Sep 2020 15:35:11 +0200 Subject: [PATCH 12/12] fix TestS3Upload test --- upload/s3_test.go | 2 ++ 1 file changed, 2 insertions(+) diff --git a/upload/s3_test.go b/upload/s3_test.go index 2d412635..504417ab 100644 --- a/upload/s3_test.go +++ b/upload/s3_test.go @@ -131,6 +131,7 @@ func TestS3Upload(t *testing.T) { Region: "us-west-2", Bucket: "my-bucket", Prefix: "my-prefix", + Concurrency: 16, Interval: 1 * time.Millisecond, }, }, @@ -144,6 +145,7 @@ func TestS3Upload(t *testing.T) { s, ops, params := mockS3Service(false) u := iu.(*S3) u.uploader = s3manager.NewUploaderWithClient(s) + u.uploader.Concurrency = 10 // Fill the uploader channel with 10k files. upch := make(chan string, len(paths))