diff --git a/uploader/s3.go b/uploader/s3.go index 542827be1..ea3f37bac 100644 --- a/uploader/s3.go +++ b/uploader/s3.go @@ -24,6 +24,21 @@ const ( defaultS3PartSize = 64 * 1024 * 1024 // 64MB per part ) +// CountingReader is a wrapper of io.Reader to count number of bytes read +type CountingReader struct { + reader io.Reader + BytesRead int +} + +// Read aggregates number of bytes read +func (r * CountingReader) Read(p []byte) (n int, err error) { + n, err = r.reader.Read(p) + if err == nil { + r.BytesRead += n + } + return +} + // S3Uploader uploads logs to S3 type S3Uploader struct { log logrus.FieldLogger @@ -90,11 +105,6 @@ func (u *S3Uploader) Upload(ctx context.Context, local string, remote string, ct } }() - // Record number of bytes being uploaded. At this point, file must exist. - if stat, err := os.Stat(local); err == nil { - u.metrics.Counter("titus.executor.S3Uploader.uploadMB", int(stat.Size()/1024/1024), nil) - } - return u.uploadFile(ctx, f, remote, contentType) } @@ -105,17 +115,21 @@ func (u *S3Uploader) uploadFile(ctx context.Context, local io.Reader, remote str contentType = defaultS3ContentType } + // wrap input io.Reader with a counting reader + reader := &CountingReader{local, 0} + result, err := u.s3Uploader.UploadWithContext(ctx, &s3manager.UploadInput{ ACL: aws.String(defaultS3ACL), ContentType: aws.String(contentType), Bucket: aws.String(u.bucketName), Key: aws.String(remote), - Body: local, + Body: io.Reader(reader), }) if err != nil { return err } + u.metrics.Counter("titus.executor.S3Uploader.uploadMB", int(reader.BytesRead/1024/1024), nil) u.log.Printf("Successfully uploaded file from: %s to: %s", local, result.Location) return nil