Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 12 additions & 7 deletions cmd/storage/storage_purge.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,21 +80,26 @@ var storagePurgeCmd = &cobra.Command{

deletedChan, errChan := storage.DeleteObjectVersions(exocmd.GContext, bucket, prefix)

for {
for deletedChan != nil || errChan != nil {
select {
case err, ok := <-errChan:
if ok {
fmt.Printf("Error happened: %v\n", err)
} else {
fmt.Println("Purge completed")
return nil
if !ok {
errChan = nil
continue
}
fmt.Printf("Error: %v\n", err)
case deletedElt, ok := <-deletedChan:
if !ok {
deletedChan = nil
continue
}
case deletedElt := <-deletedChan:
if verbose {
fmt.Println("deleted:", aws.ToString(deletedElt.Key))
}
}
}
fmt.Println("Purge completed")
return nil
},
}

Expand Down
33 changes: 30 additions & 3 deletions pkg/storage/sos/object.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,12 +49,21 @@ func (c *Client) DeleteObjectVersions(ctx context.Context, bucket, prefix string
defer close(errChan)

batchSize := int32(1000)
var keyMarker, versionIDMarker string
for {
list, err := c.S3Client.ListObjectVersions(ctx, &s3.ListObjectVersionsInput{
input := &s3.ListObjectVersionsInput{
Bucket: &bucket,
MaxKeys: batchSize,
Prefix: &prefix,
})
}
if keyMarker != "" {
input.KeyMarker = &keyMarker
}
if versionIDMarker != "" {
input.VersionIdMarker = &versionIDMarker
}

list, err := c.S3Client.ListObjectVersions(ctx, input)
if err != nil {
errChan <- err
return
Expand Down Expand Up @@ -88,7 +97,25 @@ func (c *Client) DeleteObjectVersions(ctx context.Context, bucket, prefix string
deletedChan <- deleted
}
for _, derror := range deleteResult.Errors {
errChan <- fmt.Errorf("delete error: %v", derror)
errChan <- fmt.Errorf("failed to delete %s (version %s): %s (%s)",
aws.ToString(derror.Key),
aws.ToString(derror.VersionId),
aws.ToString(derror.Message),
aws.ToString(derror.Code),
)
}

// No progress means all objects are protected; stop to avoid looping forever.
if len(deleteResult.Deleted) == 0 {
return
}

if list.IsTruncated {
keyMarker = aws.ToString(list.NextKeyMarker)
versionIDMarker = aws.ToString(list.NextVersionIdMarker)
} else {
keyMarker = ""
versionIDMarker = ""
}
}
}()
Expand Down
149 changes: 149 additions & 0 deletions pkg/storage/sos/object_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -612,3 +612,152 @@ func Test_IsTraversalPath(t *testing.T) {
})
}
}

func drainDeleteObjectVersions(deletedChan <-chan types.DeletedObject, errChan <-chan error) ([]types.DeletedObject, []error) {
var deleted []types.DeletedObject
var errs []error
for deletedChan != nil || errChan != nil {
select {
case d, ok := <-deletedChan:
if !ok {
deletedChan = nil
continue
}
deleted = append(deleted, d)
case e, ok := <-errChan:
if !ok {
errChan = nil
continue
}
errs = append(errs, e)
}
}
return deleted, errs
}

func TestDeleteObjectVersions_HappyPath(t *testing.T) {
bucket := "test-bucket"
listCalls := 0

client := sos.Client{
S3Client: &MockS3API{
mockListObjectVersions: func(_ context.Context, params *s3.ListObjectVersionsInput, _ ...func(*s3.Options)) (*s3.ListObjectVersionsOutput, error) {
listCalls++
if listCalls > 1 {
// Second call: nothing left.
return &s3.ListObjectVersionsOutput{}, nil
}
return &s3.ListObjectVersionsOutput{
IsTruncated: false,
Versions: []types.ObjectVersion{
{Key: aws.String("obj1"), VersionId: aws.String("v1")},
{Key: aws.String("obj2"), VersionId: aws.String("v2")},
},
}, nil
},
mockDeleteObjects: func(_ context.Context, params *s3.DeleteObjectsInput, _ ...func(*s3.Options)) (*s3.DeleteObjectsOutput, error) {
return &s3.DeleteObjectsOutput{
Deleted: []types.DeletedObject{
{Key: aws.String("obj1"), VersionId: aws.String("v1")},
{Key: aws.String("obj2"), VersionId: aws.String("v2")},
},
}, nil
},
},
}

deleted, errs := drainDeleteObjectVersions(client.DeleteObjectVersions(context.Background(), bucket, "/"))
assert.Empty(t, errs)
assert.Len(t, deleted, 2)
assert.Equal(t, 2, listCalls, "should list twice: once for the batch, once to confirm empty")
}

func TestDeleteObjectVersions_ComplianceLock(t *testing.T) {
bucket := "locked-bucket"
listCalls := 0
deleteCalls := 0

client := sos.Client{
S3Client: &MockS3API{
mockListObjectVersions: func(_ context.Context, _ *s3.ListObjectVersionsInput, _ ...func(*s3.Options)) (*s3.ListObjectVersionsOutput, error) {
listCalls++
return &s3.ListObjectVersionsOutput{
IsTruncated: false,
Versions: []types.ObjectVersion{
{Key: aws.String("locked-obj"), VersionId: aws.String("v1")},
},
}, nil
},
mockDeleteObjects: func(_ context.Context, _ *s3.DeleteObjectsInput, _ ...func(*s3.Options)) (*s3.DeleteObjectsOutput, error) {
deleteCalls++
return &s3.DeleteObjectsOutput{
Errors: []types.Error{
{
Key: aws.String("locked-obj"),
VersionId: aws.String("v1"),
Code: aws.String("ObjectLocked"),
Message: aws.String("Object is under compliance retention"),
},
},
}, nil
},
},
}

deleted, errs := drainDeleteObjectVersions(client.DeleteObjectVersions(context.Background(), bucket, "/"))
assert.Empty(t, deleted)
assert.Len(t, errs, 1)
assert.Contains(t, errs[0].Error(), "ObjectLocked")
assert.Contains(t, errs[0].Error(), "locked-obj")
assert.Equal(t, 1, listCalls, "should stop after first failed batch")
assert.Equal(t, 1, deleteCalls, "should stop after first failed batch")
}

func TestDeleteObjectVersions_Pagination(t *testing.T) {
bucket := "big-bucket"
listCalls := 0

client := sos.Client{
S3Client: &MockS3API{
mockListObjectVersions: func(_ context.Context, params *s3.ListObjectVersionsInput, _ ...func(*s3.Options)) (*s3.ListObjectVersionsOutput, error) {
listCalls++
switch listCalls {
case 1:
assert.Empty(t, aws.ToString(params.KeyMarker))
assert.Empty(t, aws.ToString(params.VersionIdMarker))
return &s3.ListObjectVersionsOutput{
IsTruncated: true,
NextKeyMarker: aws.String("obj1"),
NextVersionIdMarker: aws.String("v1"),
Versions: []types.ObjectVersion{
{Key: aws.String("obj1"), VersionId: aws.String("v1")},
},
}, nil
case 2:
assert.Equal(t, "obj1", aws.ToString(params.KeyMarker))
assert.Equal(t, "v1", aws.ToString(params.VersionIdMarker))
return &s3.ListObjectVersionsOutput{
IsTruncated: false,
Versions: []types.ObjectVersion{
{Key: aws.String("obj2"), VersionId: aws.String("v2")},
},
}, nil
default:
return &s3.ListObjectVersionsOutput{}, nil
}
},
mockDeleteObjects: func(_ context.Context, params *s3.DeleteObjectsInput, _ ...func(*s3.Options)) (*s3.DeleteObjectsOutput, error) {
deleted := make([]types.DeletedObject, 0, len(params.Delete.Objects))
for _, o := range params.Delete.Objects {
deleted = append(deleted, types.DeletedObject{Key: o.Key, VersionId: o.VersionId})
}
return &s3.DeleteObjectsOutput{Deleted: deleted}, nil
},
},
}

deleted, errs := drainDeleteObjectVersions(client.DeleteObjectVersions(context.Background(), bucket, "/"))
assert.Empty(t, errs)
assert.Len(t, deleted, 2, "both pages should be processed")
assert.Equal(t, 3, listCalls, "should call list three times: page1, page2, confirm empty")
}
Loading
Loading