Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
43 changes: 43 additions & 0 deletions adapter/dynamodb_migration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (

"github.com/bootjp/elastickv/kv"
"github.com/bootjp/elastickv/store"
"github.com/cockroachdb/errors"
"github.com/stretchr/testify/require"
)

Expand All @@ -27,6 +28,9 @@ func (c *localAdapterCoordinator) Dispatch(ctx context.Context, req *kv.Operatio
if req == nil {
return &kv.CoordinateResponse{}, nil
}
if err := c.validateDispatchShape(req); err != nil {
return nil, err
}
commitTS, err := c.commitTSForRequest(req)
if err != nil {
return nil, err
Expand All @@ -37,6 +41,45 @@ func (c *localAdapterCoordinator) Dispatch(ctx context.Context, req *kv.Operatio
return &kv.CoordinateResponse{}, nil
}

// validateDispatchShape mirrors the production coordinator's
// dispatch-time rejection rules so tests catch the same class of
// bug that real clusters would reject. Specifically:
//
// - DEL_PREFIX cannot be in a transactional OperationGroup
// (kv/sharded_coordinator.go: dispatchDelPrefixBroadcast
// refuses IsTxn=true).
// - DEL_PREFIX cannot be mixed with Put or Del in the same
// OperationGroup (validateDelPrefixOnly enforces all-or-none).
//
// Without these checks, a regression that ships
// `IsTxn:true with [Del, DelPrefix...]` (Codex P1 on PR #695)
// would silently pass the local coordinator while production
// rejected every bucket delete with ErrInvalidRequest.
func (c *localAdapterCoordinator) validateDispatchShape(req *kv.OperationGroup[kv.OP]) error {
hasDelPrefix := false
hasOther := false
for _, elem := range req.Elems {
if elem == nil {
continue
}
if elem.Op == kv.DelPrefix {
hasDelPrefix = true
} else {
hasOther = true
}
}
if !hasDelPrefix {
return nil
}
if req.IsTxn {
return errors.Wrap(kv.ErrInvalidRequest, "DEL_PREFIX not supported in transactions")
}
if hasOther {
return errors.Wrap(kv.ErrInvalidRequest, "DEL_PREFIX cannot be mixed with other operations")
}
return nil
}

func (c *localAdapterCoordinator) commitTSForRequest(req *kv.OperationGroup[kv.OP]) (uint64, error) {
if req == nil {
return 0, nil
Expand Down
22 changes: 18 additions & 4 deletions adapter/s3.go
Original file line number Diff line number Diff line change
Expand Up @@ -636,6 +636,7 @@ func (s *S3Server) headBucket(w http.ResponseWriter, r *http.Request, bucket str
}

func (s *S3Server) deleteBucket(w http.ResponseWriter, r *http.Request, bucket string) {
var deletedGeneration uint64
err := s.retryS3Mutation(r.Context(), func() error {
readTS := s.readTS()
startTS := s.txnStartTS(readTS)
Expand Down Expand Up @@ -669,19 +670,32 @@ func (s *S3Server) deleteBucket(w http.ResponseWriter, r *http.Request, bucket s
}
}

// Phase 1: Del BucketMetaKey in a txn (OCC-protected
// against concurrent createBucket racing this delete).
// Phase 2 (DEL_PREFIX safety net) runs outside the txn
// because the production coordinator rejects DEL_PREFIX
// inside transactions and rejects mixed Del+DelPrefix
// groups (kv/sharded_coordinator.go: dispatchDelPrefixBroadcast).
// See AdminDeleteBucket's doc comment for the full
// rationale.
_, err = s.coordinator.Dispatch(r.Context(), &kv.OperationGroup[kv.OP]{
IsTxn: true,
StartTS: startTS,
Elems: []*kv.Elem[kv.OP]{
{Op: kv.Del, Key: s3keys.BucketMetaKey(bucket)},
},
Elems: []*kv.Elem[kv.OP]{{Op: kv.Del, Key: s3keys.BucketMetaKey(bucket)}},
})
return errors.WithStack(err)
if err != nil {
return errors.WithStack(err)
}
deletedGeneration = meta.Generation
return nil
})
if err != nil {
writeS3MutationError(w, err, bucket, "")
return
}
// Phase 2: best-effort DEL_PREFIX safety net. See
// AdminDeleteBucket / runBucketDeleteSafetyNet for the contract.
s.runBucketDeleteSafetyNet(r.Context(), bucket, deletedGeneration)
w.WriteHeader(http.StatusNoContent)
}

Expand Down
128 changes: 102 additions & 26 deletions adapter/s3_admin.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package adapter
import (
"bytes"
"context"
"log/slog"
"sort"
"strings"

Expand Down Expand Up @@ -359,29 +360,48 @@ func (s *S3Server) AdminPutBucketAcl(ctx context.Context, principal AdminPrincip
// bucket-must-be-empty rule mirrors the SigV4 deleteBucket path —
// the dashboard cannot force a recursive delete, by design.
//
// Known orphan-race limitation (coderabbitai 🔴 / 🟠 on PR #669):
// the empty-bucket probe (ScanAt with limit=1 on
// ObjectManifestPrefixForBucket) reads at readTS but the
// subsequent BucketMetaKey delete only carries that single point
// key in its ReadKeys set. A concurrent PutObject that inserts a
// manifest key in the scanned prefix between readTS and the
// delete's commitTS will not conflict — the OCC validator only
// inspects keys that appear in ReadKeys, and there is no
// ReadRanges mechanism today. The object's manifest key survives
// under a now-deleted bucket meta and becomes orphaned.
// The dispatch happens in two phases because the production
// coordinator (kv/sharded_coordinator.go: dispatchDelPrefixBroadcast)
// rejects DEL_PREFIX inside a transaction and rejects DEL_PREFIX
// mixed with Del or Put in the same OperationGroup:
//
// This race exists pre-existing in the SigV4 path
// (adapter/s3.go:deleteBucket — same shape, same limitation), so
// AdminDeleteBucket inherits the contract; closing the gap
// requires either (a) bumping BucketGenerationKey on every
// PutObject so it can serve as an OCC token in this read set, or
// (b) extending OperationGroup with ReadRanges and teaching the
// FSM to validate range emptiness atomically with commit. Both
// are larger changes outside this PR's scope; tracked in
// docs/design/2026_04_24_partial_admin_dashboard.md under the
// Outstanding open items section. Operators concerned about the
// orphan window today should pause writes against the target
// bucket before issuing the admin delete.
// Phase 1: Del BucketMetaKey in a txn (OCC-protected against
// a concurrent AdminCreateBucket landing between our
// readTS and commitTS).
// Phase 2: DEL_PREFIX over every per-bucket key family in a
// non-txn broadcast — the safety net that sweeps
// orphans left by any PutObject that committed
// chunks/manifest between the empty-probe and the
// Phase-1 commit. See design doc
// 2026_04_28_proposed_admin_delete_bucket_safety_net.md
// §6.2 for the original single-OperationGroup design
// and the dispatch-shape rejection that forced the
// two-phase split.
//
// Phase 2 is best-effort: a Phase-2 failure leaves the bucket meta
// already deleted (Phase 1 succeeded) but per-bucket prefixes
// possibly still containing orphans. That state is no worse than
// the pre-fix behaviour on main and recovers on operator-driven
// re-cleanup. We log a warning rather than propagate the error so
// the operator-visible delete reports success — the bucket really
// is gone from the API surface, and a retry would 404 because
// loadBucketMetaAt no longer finds the meta.
//
// BucketGenerationKey is intentionally NOT deleted. Re-creating
// the bucket bumps the generation; orphan blobs that escaped this
// delete (e.g. on an older generation) stay isolated under the
// old generation prefix and never surface in the new bucket.
// Pinned by TestS3Server_AdminDeleteBucket_BucketGenerationKeySurvives.
//
// The contract change for clients: a PutObject that returned 200
// OK during the race window can have its data swept by the
// concurrent delete. Operators are advised to pause writes before
// AdminDeleteBucket; the alternative (orphan objects that no API
// can enumerate or remove) is strictly worse.
//
// The same shape is mirrored on the SigV4 path
// (adapter/s3.go:deleteBucket) so both delete entrypoints share
// the same race-window guarantees.
func (s *S3Server) AdminDeleteBucket(ctx context.Context, principal AdminPrincipal, name string) error {
if !principal.Role.canWrite() {
return ErrAdminForbidden
Expand All @@ -390,6 +410,7 @@ func (s *S3Server) AdminDeleteBucket(ctx context.Context, principal AdminPrincip
return ErrAdminNotLeader
}

var deletedGeneration uint64
err := s.retryS3Mutation(ctx, func() error {
readTS := s.readTS()
startTS := s.txnStartTS(readTS)
Expand All @@ -411,21 +432,76 @@ func (s *S3Server) AdminDeleteBucket(ctx context.Context, principal AdminPrincip
if len(kvs) > 0 {
return ErrAdminBucketNotEmpty
}
// Phase 1: Del BucketMetaKey in a txn so a concurrent
// AdminCreateBucket racing the delete is rejected by OCC.
// retryS3Mutation handles ErrWriteConflict / ErrTxnLocked
// by re-running this whole closure.
_, err = s.coordinator.Dispatch(ctx, &kv.OperationGroup[kv.OP]{
IsTxn: true,
StartTS: startTS,
Elems: []*kv.Elem[kv.OP]{
{Op: kv.Del, Key: s3keys.BucketMetaKey(name)},
},
Elems: []*kv.Elem[kv.OP]{{Op: kv.Del, Key: s3keys.BucketMetaKey(name)}},
})
return errors.WithStack(err)
if err != nil {
return errors.WithStack(err)
}
deletedGeneration = meta.Generation
return nil
})
if err != nil {
return err //nolint:wrapcheck // sentinel errors propagate as-is.
}
// Phase 2: best-effort safety-net DEL_PREFIX. Outside the
// retryS3Mutation closure because retrying after Phase 1
// committed would 404 at loadBucketMetaAt; we want the error
// (if any) logged but not propagated to the operator.
s.runBucketDeleteSafetyNet(ctx, name, deletedGeneration)
return nil
}

// bucketDeleteSafetyNetElems returns the DEL_PREFIX elem list for
// the Phase-2 safety-net dispatch shared between AdminDeleteBucket
// and the SigV4 deleteBucket path. One helper so a future
// per-bucket key family added to the data plane covers both delete
// entrypoints in lockstep.
//
// BucketGenerationKey is intentionally not in the list — see the
// AdminDeleteBucket doc comment for the orphan-isolation rationale.
//
// The 6 DEL_PREFIX ops broadcast across every shard
// (kv/sharded_coordinator.go: DEL_PREFIX cannot be routed to a
// single shard). Acceptable because (a) the empty-probe already
// confirmed the manifest prefix is empty in the common case, so
// per-shard scans return 0 keys, (b) bucket delete is operator-
// frequency, not data-plane.
func bucketDeleteSafetyNetElems(bucket string, generation uint64) []*kv.Elem[kv.OP] {
return []*kv.Elem[kv.OP]{
{Op: kv.DelPrefix, Key: s3keys.ObjectManifestPrefixForBucket(bucket, generation)},
{Op: kv.DelPrefix, Key: s3keys.UploadMetaPrefixForBucket(bucket, generation)},
{Op: kv.DelPrefix, Key: s3keys.UploadPartPrefixForBucket(bucket, generation)},
{Op: kv.DelPrefix, Key: s3keys.BlobPrefixForBucket(bucket, generation)},
{Op: kv.DelPrefix, Key: s3keys.GCUploadPrefixForBucket(bucket, generation)},
{Op: kv.DelPrefix, Key: s3keys.RoutePrefixForBucket(bucket, generation)},
}
}

// runBucketDeleteSafetyNet runs the Phase-2 DEL_PREFIX dispatch
// and swallows transport / cluster errors after logging — the
// caller has already deleted the bucket meta and the operator-
// visible state is consistent with that. Shared between admin and
// SigV4 paths.
func (s *S3Server) runBucketDeleteSafetyNet(ctx context.Context, bucket string, generation uint64) {
if _, err := s.coordinator.Dispatch(ctx, &kv.OperationGroup[kv.OP]{
Elems: bucketDeleteSafetyNetElems(bucket, generation),
}); err != nil {
slog.WarnContext(ctx,
"bucket delete safety-net DEL_PREFIX failed; bucket meta is gone but orphan sweep incomplete",
slog.String("bucket", bucket),
slog.Uint64("generation", generation),
slog.String("error", err.Error()),
)
}
}

// adminCanonicalACL normalises an empty input to the canned
// "private" default. The SigV4 createBucket / putBucketAcl paths
// apply the same default after trimming the x-amz-acl header.
Expand Down
Loading
Loading