Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Propagate storage driver context to S3 API calls #4036

Merged
merged 1 commit into from
Sep 4, 2023
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
30 changes: 15 additions & 15 deletions registry/storage/driver/s3-aws/s3.go
Original file line number Diff line number Diff line change
Expand Up @@ -608,7 +608,7 @@ func (d *driver) GetContent(ctx context.Context, path string) ([]byte, error) {

// PutContent stores the []byte content at a location designated by "path".
func (d *driver) PutContent(ctx context.Context, path string, contents []byte) error {
_, err := d.S3.PutObject(&s3.PutObjectInput{
_, err := d.S3.PutObjectWithContext(ctx, &s3.PutObjectInput{
Bucket: aws.String(d.Bucket),
Key: aws.String(d.s3Path(path)),
ContentType: d.getContentType(),
Expand All @@ -624,7 +624,7 @@ func (d *driver) PutContent(ctx context.Context, path string, contents []byte) e
// Reader retrieves an io.ReadCloser for the content stored at "path" with a
// given byte offset.
func (d *driver) Reader(ctx context.Context, path string, offset int64) (io.ReadCloser, error) {
resp, err := d.S3.GetObject(&s3.GetObjectInput{
resp, err := d.S3.GetObjectWithContext(ctx, &s3.GetObjectInput{
Bucket: aws.String(d.Bucket),
Key: aws.String(d.s3Path(path)),
Range: aws.String("bytes=" + strconv.FormatInt(offset, 10) + "-"),
Expand All @@ -645,7 +645,7 @@ func (d *driver) Writer(ctx context.Context, path string, appendParam bool) (sto
key := d.s3Path(path)
if !appendParam {
// TODO (brianbland): cancel other uploads at this path
resp, err := d.S3.CreateMultipartUpload(&s3.CreateMultipartUploadInput{
resp, err := d.S3.CreateMultipartUploadWithContext(ctx, &s3.CreateMultipartUploadInput{
Bucket: aws.String(d.Bucket),
Key: aws.String(key),
ContentType: d.getContentType(),
Expand All @@ -665,7 +665,7 @@ func (d *driver) Writer(ctx context.Context, path string, appendParam bool) (sto
Prefix: aws.String(key),
}
for {
resp, err := d.S3.ListMultipartUploads(listMultipartUploadsInput)
resp, err := d.S3.ListMultipartUploadsWithContext(ctx, listMultipartUploadsInput)
if err != nil {
return nil, parseError(path, err)
}
Expand All @@ -683,7 +683,7 @@ func (d *driver) Writer(ctx context.Context, path string, appendParam bool) (sto
continue
}

partsList, err := d.S3.ListParts(&s3.ListPartsInput{
partsList, err := d.S3.ListPartsWithContext(ctx, &s3.ListPartsInput{
Bucket: aws.String(d.Bucket),
Key: aws.String(key),
UploadId: multi.UploadId,
Expand All @@ -693,7 +693,7 @@ func (d *driver) Writer(ctx context.Context, path string, appendParam bool) (sto
}
allParts = append(allParts, partsList.Parts...)
for *partsList.IsTruncated {
partsList, err = d.S3.ListParts(&s3.ListPartsInput{
partsList, err = d.S3.ListPartsWithContext(ctx, &s3.ListPartsInput{
Bucket: aws.String(d.Bucket),
Key: aws.String(key),
UploadId: multi.UploadId,
Expand Down Expand Up @@ -722,7 +722,7 @@ func (d *driver) Writer(ctx context.Context, path string, appendParam bool) (sto
// Stat retrieves the FileInfo for the given path, including the current size
// in bytes and the creation time.
func (d *driver) Stat(ctx context.Context, path string) (storagedriver.FileInfo, error) {
resp, err := d.S3.ListObjectsV2(&s3.ListObjectsV2Input{
resp, err := d.S3.ListObjectsV2WithContext(ctx, &s3.ListObjectsV2Input{
Bucket: aws.String(d.Bucket),
Prefix: aws.String(d.s3Path(path)),
MaxKeys: aws.Int64(1),
Expand Down Expand Up @@ -767,7 +767,7 @@ func (d *driver) List(ctx context.Context, opath string) ([]string, error) {
prefix = "/"
}

resp, err := d.S3.ListObjectsV2(&s3.ListObjectsV2Input{
resp, err := d.S3.ListObjectsV2WithContext(ctx, &s3.ListObjectsV2Input{
Bucket: aws.String(d.Bucket),
Prefix: aws.String(d.s3Path(path)),
Delimiter: aws.String("/"),
Expand All @@ -791,7 +791,7 @@ func (d *driver) List(ctx context.Context, opath string) ([]string, error) {
}

if *resp.IsTruncated {
resp, err = d.S3.ListObjectsV2(&s3.ListObjectsV2Input{
resp, err = d.S3.ListObjectsV2WithContext(ctx, &s3.ListObjectsV2Input{
Bucket: aws.String(d.Bucket),
Prefix: aws.String(d.s3Path(path)),
Delimiter: aws.String("/"),
Expand Down Expand Up @@ -841,7 +841,7 @@ func (d *driver) copy(ctx context.Context, sourcePath string, destPath string) e
}

if fileInfo.Size() <= d.MultipartCopyThresholdSize {
_, err := d.S3.CopyObject(&s3.CopyObjectInput{
_, err := d.S3.CopyObjectWithContext(ctx, &s3.CopyObjectInput{
Bucket: aws.String(d.Bucket),
Key: aws.String(d.s3Path(destPath)),
ContentType: d.getContentType(),
Expand All @@ -857,7 +857,7 @@ func (d *driver) copy(ctx context.Context, sourcePath string, destPath string) e
return nil
}

createResp, err := d.S3.CreateMultipartUpload(&s3.CreateMultipartUploadInput{
createResp, err := d.S3.CreateMultipartUploadWithContext(ctx, &s3.CreateMultipartUploadInput{
Bucket: aws.String(d.Bucket),
Key: aws.String(d.s3Path(destPath)),
ContentType: d.getContentType(),
Expand All @@ -884,7 +884,7 @@ func (d *driver) copy(ctx context.Context, sourcePath string, destPath string) e
if lastByte >= fileInfo.Size() {
lastByte = fileInfo.Size() - 1
}
uploadResp, err := d.S3.UploadPartCopy(&s3.UploadPartCopyInput{
uploadResp, err := d.S3.UploadPartCopyWithContext(ctx, &s3.UploadPartCopyInput{
Bucket: aws.String(d.Bucket),
CopySource: aws.String(d.Bucket + "/" + d.s3Path(sourcePath)),
Key: aws.String(d.s3Path(destPath)),
Expand All @@ -910,7 +910,7 @@ func (d *driver) copy(ctx context.Context, sourcePath string, destPath string) e
}
}

_, err = d.S3.CompleteMultipartUpload(&s3.CompleteMultipartUploadInput{
_, err = d.S3.CompleteMultipartUploadWithContext(ctx, &s3.CompleteMultipartUploadInput{
Bucket: aws.String(d.Bucket),
Key: aws.String(d.s3Path(destPath)),
UploadId: createResp.UploadId,
Expand All @@ -931,7 +931,7 @@ func (d *driver) Delete(ctx context.Context, path string) error {

for {
// list all the objects
resp, err := d.S3.ListObjectsV2(listObjectsInput)
resp, err := d.S3.ListObjectsV2WithContext(ctx, listObjectsInput)

// resp.Contents can only be empty on the first call
// if there were no more results to return after the first call, resp.IsTruncated would have been false
Expand All @@ -956,7 +956,7 @@ func (d *driver) Delete(ctx context.Context, path string) error {
// by default the response returns up to 1,000 key names. The response _might_ contain fewer keys but it will never contain more.
// 10000 keys is coincidentally (?) also the max number of keys that can be deleted in a single Delete operation, so we'll just smack
// Delete here straight away and reset the object slice when successful.
resp, err := d.S3.DeleteObjects(&s3.DeleteObjectsInput{
resp, err := d.S3.DeleteObjectsWithContext(ctx, &s3.DeleteObjectsInput{
Bucket: aws.String(d.Bucket),
Delete: &s3.Delete{
Objects: s3Objects,
Expand Down