Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
- output: add Stats output [#23](https://github.com/AdRoll/baker/pull/23)
- filter: add SetStringFromURL filter [#28](https://github.com/AdRoll/baker/pull/28)
- output: add FileWriter output in replacement of Files output [#31](https://github.com/AdRoll/baker/pull/31)
- upload: s3: add `ExitOnError` configuration [#27](https://github.com/AdRoll/baker/pull/27)
- uploads now return an error instead of panicking and baker deals with it [#27](https://github.com/AdRoll/baker/pull/27)

### Changed

Expand Down
2 changes: 1 addition & 1 deletion api.go
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,7 @@ type Upload interface {
// Run processes the output result as it comes through the channel.
// Run must block forever
// upch will receive filenames that Output wants to see uploaded.
Run(upch <-chan string)
Run(upch <-chan string) error

// Stop forces the upload to stop as cleanly as possible, which usually
// means to finish up all the existing downloads.
Expand Down
4 changes: 3 additions & 1 deletion topology.go
Original file line number Diff line number Diff line change
Expand Up @@ -199,7 +199,9 @@ func (t *Topology) Start() {
t.wgupl.Add(1)
go func() {
if t.Upload != nil {
t.Upload.Run(t.upch)
if err := t.Upload.Run(t.upch); err != nil {
log.WithError(err).Fatal("Upload returned an error")
}
} else {
// Just consume t.upch if there's no uploader available
for range t.upch {
Expand Down
74 changes: 55 additions & 19 deletions upload/s3.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ type S3Config struct {
Retries int `help:"Number of retries before a failed upload" default:"3"`
Concurrency int `help:"Number of concurrent workers" default:"5"`
Interval time.Duration `help:"Period at which the source path is scanned" default:"15s"`
ExitOnError bool `help:"Exit at first error, instead of logging all errors" default:"false"`
}

func (cfg *S3Config) fillDefaults() error {
Expand Down Expand Up @@ -123,7 +124,12 @@ func newS3(cfg baker.UploadParams) (baker.Upload, error) {
}, nil
}

func (u *S3) Run(upch <-chan string) {
func (u *S3) Run(upch <-chan string) error {
// Stop blocks until the upload goroutine has exited.
defer u.Stop()

errCh := make(chan error)

// Start a goroutine in which we periodically look at the source
// path for files and upload the ones we find.
u.wgUpload.Add(1)
Expand All @@ -132,6 +138,9 @@ func (u *S3) Run(upch <-chan string) {
defer func() {
ticker.Stop()
if err := u.uploadDirectory(); err != nil {
if u.Cfg.ExitOnError {
errCh <- err
}
log.Error(err)
}
u.wgUpload.Done()
Expand All @@ -141,25 +150,38 @@ func (u *S3) Run(upch <-chan string) {
select {
case <-ticker.C:
if err := u.uploadDirectory(); err != nil {
if u.Cfg.ExitOnError {
errCh <- err
return
}
log.Error(err)
}
case <-u.quit:
return
}
}
}()

for sourceFilePath := range upch {
err := u.move(sourceFilePath)
atomic.AddInt64(&u.totaln, int64(1))
atomic.AddInt64(&u.queuedn, int64(1))
if err != nil {
log.WithFields(log.Fields{"filepath": sourceFilePath}).WithError(err).Error("Couldn't move")
for {
select {
case err := <-errCh:
return err
case sourceFilePath, more := <-upch:
if !more {
return nil
}
err := u.move(sourceFilePath)
atomic.AddInt64(&u.totaln, int64(1))
atomic.AddInt64(&u.queuedn, int64(1))
if err != nil {
if u.Cfg.ExitOnError {
return fmt.Errorf("couldn't move: %v", err)
}
log.WithFields(log.Fields{"filepath": sourceFilePath}).WithError(err).Error("couldn't move")
}
case <-u.quit:
return nil
}
}

// Stop blocks until the upload goroutine has exited.
u.Stop()
}

func (u *S3) move(sourceFilePath string) error {
Expand Down Expand Up @@ -214,10 +236,17 @@ func (u *S3) uploadDirectory() error {
ctx.Info("Uploading")
sem := make(sem, u.Cfg.Concurrency)
ctx.Info("Starting to walk...")
err := filepath.Walk(u.Cfg.StagingPath, func(fpath string, info os.FileInfo, err error) error {
if err != nil {
return err
exitErr := atomic.Value{}
err := filepath.Walk(u.Cfg.StagingPath, func(fpath string, info os.FileInfo, walkErr error) error {
if walkErr != nil {
return walkErr
}
// If a fatal error happened in any of the goroutines, then exit immediately
e := exitErr.Load()
if e != nil {
return e.(error)
}

if info.IsDir() {
return nil
}
Expand All @@ -228,14 +257,21 @@ func (u *S3) uploadDirectory() error {
defer func() { sem.decr(); wg.Done() }()

for i := 0; i < u.Cfg.Retries; i++ {
if err := s3UploadFile(u.uploader, u.Cfg.Bucket, u.Cfg.Prefix, u.Cfg.StagingPath, fpath); err == nil {
atomic.AddInt64(&u.totaln, int64(1))
if exitErr.Load() != nil {
return
}
err := s3UploadFile(u.uploader, u.Cfg.Bucket, u.Cfg.Prefix, u.Cfg.StagingPath, fpath)
if err == nil {
atomic.AddInt64(&u.queuedn, int64(-1))
break
} else {
atomic.AddInt64(&u.totalerr, int64(1))
log.WithError(err).WithFields(log.Fields{"retry#": i + 1}).Error("failed upload")
}

atomic.AddInt64(&u.totalerr, int64(1))
if u.Cfg.ExitOnError {
exitErr.Store(err)
return
}
log.WithError(err).WithFields(log.Fields{"retry#": i + 1}).Error("failed upload")
}
}(fpath)
return nil
Expand Down
Loading