Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
187 changes: 145 additions & 42 deletions adapter/s3.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ const (
s3ManifestCleanupWorkers = 16

s3PathSplitParts = 2
s3RangeSplitParts = 2
s3GenerationBytes = 8
s3HLCPhysicalShift = 16

Expand Down Expand Up @@ -213,8 +214,8 @@ type s3CompleteMultipartUploadResult struct {
}

type s3CompleteMultipartUploadRequest struct {
XMLName xml.Name `xml:"CompleteMultipartUpload"`
Parts []s3CompleteMultipartUploadPart `xml:"Part"`
XMLName xml.Name `xml:"CompleteMultipartUpload"`
Parts []s3CompleteMultipartUploadPart `xml:"Part"`
}

type s3CompleteMultipartUploadPart struct {
Expand Down Expand Up @@ -347,6 +348,7 @@ func (s *S3Server) handleBucket(w http.ResponseWriter, r *http.Request, bucket s
}
}

//nolint:cyclop // handleObject routes to sub-handlers based on method+operation; branching is by design.
func (s *S3Server) handleObject(w http.ResponseWriter, r *http.Request, bucket string, objectKey string) {
query := r.URL.Query()
uploadID := query.Get("uploadId")
Expand Down Expand Up @@ -788,6 +790,7 @@ func (s *S3Server) getObject(w http.ResponseWriter, r *http.Request, bucket stri
s.streamObjectChunks(w, r, bucket, meta.Generation, objectKey, manifest, readTS, rangeStart, contentLength)
}

//nolint:cyclop // streamObjectChunks iterates nested part/chunk loops with necessary error-handling branches.
func (s *S3Server) streamObjectChunks(w http.ResponseWriter, r *http.Request, bucket string, generation uint64, objectKey string, manifest *s3ObjectManifest, readTS uint64, offset int64, length int64) {
remaining := length
pos := int64(0)
Expand All @@ -799,19 +802,30 @@ func (s *S3Server) streamObjectChunks(w http.ResponseWriter, r *http.Request, bu
if remaining <= 0 {
break
}
cs := int64(chunkSize)
cs := int64(chunkSize) //nolint:gosec // G115: chunkSize is bounded by s3ChunkSize which fits in int64.
chunkEnd := pos + cs
if chunkEnd <= offset {
pos = chunkEnd
continue
}
chunkIndex, err := uint64FromInt(chunkIdx)
if err != nil {
slog.ErrorContext(r.Context(), "streamObjectChunks: uint64FromInt failed",
"bucket", bucket,
"object_key", objectKey,
"err", err,
)
return
}
chunkKey := s3keys.BlobKey(bucket, generation, objectKey, manifest.UploadID, part.PartNo, chunkIndex)
chunk, err := s.store.GetAt(r.Context(), chunkKey, readTS)
if err != nil {
slog.ErrorContext(r.Context(), "streamObjectChunks: GetAt failed",
"bucket", bucket,
"object_key", objectKey,
"chunk_key", string(chunkKey),
"err", err,
)
return
}
start := int64(0)
Expand All @@ -833,6 +847,7 @@ func (s *S3Server) streamObjectChunks(w http.ResponseWriter, r *http.Request, bu
}
}

//nolint:cyclop // parseS3RangeHeader handles all RFC-compliant byte-range forms; each branch is a distinct syntax.
func parseS3RangeHeader(header string, totalSize int64) (start int64, end int64, ok bool) {
if !strings.HasPrefix(header, "bytes=") {
return 0, 0, false
Expand All @@ -841,8 +856,8 @@ func parseS3RangeHeader(header string, totalSize int64) (start int64, end int64,
if strings.Contains(spec, ",") {
return 0, 0, false // multi-range not supported
}
parts := strings.SplitN(spec, "-", 2)
if len(parts) != 2 {
parts := strings.SplitN(spec, "-", s3RangeSplitParts)
if len(parts) != s3RangeSplitParts {
return 0, 0, false
}
left := strings.TrimSpace(parts[0])
Expand All @@ -854,6 +869,10 @@ func parseS3RangeHeader(header string, totalSize int64) (start int64, end int64,
if err != nil || n <= 0 {
return 0, 0, false
}
if totalSize <= 0 {
// Nothing to serve from an empty object; caller should return 416.
return 0, 0, false
}
if n > totalSize {
n = totalSize
}
Expand Down Expand Up @@ -989,7 +1008,7 @@ func (s *S3Server) createMultipartUpload(w http.ResponseWriter, r *http.Request,
})
}

//nolint:cyclop // Upload part is intentionally linear and maps directly to protocol steps.
//nolint:cyclop,gocognit // Upload part is intentionally linear and maps directly to protocol steps.
func (s *S3Server) uploadPart(w http.ResponseWriter, r *http.Request, bucket string, objectKey string, uploadID string, partNumberStr string) {
partNumber, err := strconv.Atoi(partNumberStr)
if err != nil || partNumber < s3MinPartNumber || partNumber > s3MaxPartNumber {
Expand Down Expand Up @@ -1045,29 +1064,42 @@ func (s *S3Server) uploadPart(w http.ResponseWriter, r *http.Request, bucket str

for {
n, readErr := r.Body.Read(buf)
if n > 0 {
chunk := append([]byte(nil), buf[:n]...)
if _, err := hasher.Write(chunk); err != nil {
writeS3InternalError(w, err)
if n == 0 {
if errors.Is(readErr, io.EOF) {
break
}
if readErr != nil {
var maxBytesErr *http.MaxBytesError
if errors.As(readErr, &maxBytesErr) {
writeS3Error(w, http.StatusRequestEntityTooLarge, "EntityTooLarge", "part exceeds maximum allowed size", bucket, objectKey)
return
}
writeS3InternalError(w, readErr)
return
}
chunkKey := s3keys.BlobKey(bucket, meta.Generation, objectKey, uploadID, partNo, chunkNo)
pendingBatch = append(pendingBatch, &kv.Elem[kv.OP]{Op: kv.Put, Key: chunkKey, Value: chunk})
cs, err := uint64FromInt(n)
if err != nil {
continue
}
chunk := append([]byte(nil), buf[:n]...)
if _, err := hasher.Write(chunk); err != nil {
writeS3InternalError(w, err)
return
}
chunkKey := s3keys.BlobKey(bucket, meta.Generation, objectKey, uploadID, partNo, chunkNo)
pendingBatch = append(pendingBatch, &kv.Elem[kv.OP]{Op: kv.Put, Key: chunkKey, Value: chunk})
cs, err := uint64FromInt(n)
if err != nil {
writeS3InternalError(w, err)
return
}
chunkSizes = append(chunkSizes, cs)
if len(pendingBatch) >= s3ChunkBatchOps {
if err := flushBatch(); err != nil {
writeS3InternalError(w, err)
return
}
chunkSizes = append(chunkSizes, cs)
if len(pendingBatch) >= s3ChunkBatchOps {
if err := flushBatch(); err != nil {
writeS3InternalError(w, err)
return
}
}
sizeBytes += int64(n)
chunkNo++
}
sizeBytes += int64(n)
chunkNo++
if errors.Is(readErr, io.EOF) {
break
}
Expand Down Expand Up @@ -1114,6 +1146,8 @@ func (s *S3Server) uploadPart(w http.ResponseWriter, r *http.Request, bucket str
{Op: kv.Put, Key: partKey, Value: descBody},
},
}); err != nil {
// Clean up orphaned blob chunks so they don't accumulate in the store.
s.cleanupPartBlobsAsync(bucket, meta.Generation, objectKey, uploadID, partNo, chunkNo)
writeS3InternalError(w, err)
return
}
Expand All @@ -1122,9 +1156,9 @@ func (s *S3Server) uploadPart(w http.ResponseWriter, r *http.Request, bucket str
w.WriteHeader(http.StatusOK)
}

//nolint:cyclop,gocognit // CompleteMultipartUpload validates parts, computes composite ETag, and commits atomically.
//nolint:cyclop,gocognit,gocyclo // CompleteMultipartUpload validates parts, computes composite ETag, and commits atomically.
func (s *S3Server) completeMultipartUpload(w http.ResponseWriter, r *http.Request, bucket string, objectKey string, uploadID string) {
bodyBytes, err := io.ReadAll(io.LimitReader(r.Body, 1<<20))
bodyBytes, err := io.ReadAll(io.LimitReader(r.Body, s3ChunkSize))
if err != nil {
writeS3InternalError(w, err)
return
Expand All @@ -1138,10 +1172,19 @@ func (s *S3Server) completeMultipartUpload(w http.ResponseWriter, r *http.Reques
writeS3Error(w, http.StatusBadRequest, "InvalidArgument", "at least one part is required", bucket, objectKey)
return
}
if len(completionReq.Parts) > s3MaxPartsPerUpload {
writeS3Error(w, http.StatusBadRequest, "InvalidArgument",
fmt.Sprintf("too many parts in CompleteMultipartUpload request (maximum %d)", s3MaxPartsPerUpload), bucket, objectKey)
return
}

// Parts must be in ascending order.
for i := 1; i < len(completionReq.Parts); i++ {
if completionReq.Parts[i].PartNumber <= completionReq.Parts[i-1].PartNumber {
// Parts must be in ascending order, within allowed part number range.
for i, part := range completionReq.Parts {
if part.PartNumber < s3MinPartNumber || part.PartNumber > s3MaxPartNumber {
writeS3Error(w, http.StatusBadRequest, "InvalidArgument", "part number out of allowed range", bucket, objectKey)
return
}
if i > 0 && part.PartNumber <= completionReq.Parts[i-1].PartNumber {
writeS3Error(w, http.StatusBadRequest, "InvalidPartOrder", "parts must be in ascending order", bucket, objectKey)
return
}
Expand Down Expand Up @@ -1187,7 +1230,7 @@ func (s *S3Server) completeMultipartUpload(w http.ResponseWriter, r *http.Reques
totalSize := int64(0)

for i, reqPart := range completionReq.Parts {
partKey := s3keys.UploadPartKey(bucket, meta.Generation, objectKey, uploadID, uint64(reqPart.PartNumber))
partKey := s3keys.UploadPartKey(bucket, meta.Generation, objectKey, uploadID, uint64(reqPart.PartNumber)) //nolint:gosec // G115: PartNumber validated in [1,10000].
raw, err := s.store.GetAt(r.Context(), partKey, readTS)
if err != nil {
readPin.Release()
Expand Down Expand Up @@ -1230,13 +1273,7 @@ func (s *S3Server) completeMultipartUpload(w http.ResponseWriter, r *http.Reques
return
}

manifestParts = append(manifestParts, s3ObjectPart{
PartNo: desc.PartNo,
ETag: desc.ETag,
SizeBytes: desc.SizeBytes,
ChunkCount: desc.ChunkCount,
ChunkSizes: desc.ChunkSizes,
})
manifestParts = append(manifestParts, s3ObjectPart(desc))
totalSize += desc.SizeBytes
}
readPin.Release()
Expand Down Expand Up @@ -1291,12 +1328,19 @@ func (s *S3Server) completeMultipartUpload(w http.ResponseWriter, r *http.Reques
return errors.WithStack(err)
}

// Atomically: write manifest, delete UploadMeta + GCUpload (fence against Abort).
bucketFence, err := encodeS3BucketMeta(meta)
if err != nil {
return errors.WithStack(err)
}

// Atomically: fence bucket (conflict with DELETE bucket), write manifest,
// delete UploadMeta + GCUpload (fence against Abort).
_, err = s.coordinator.Dispatch(r.Context(), &kv.OperationGroup[kv.OP]{
IsTxn: true,
StartTS: startTS,
CommitTS: commitTS,
Elems: []*kv.Elem[kv.OP]{
{Op: kv.Put, Key: s3keys.BucketMetaKey(bucket), Value: bucketFence},
{Op: kv.Put, Key: headKey, Value: manifestBody},
{Op: kv.Del, Key: uploadMetaKey},
{Op: kv.Del, Key: s3keys.GCUploadKey(bucket, meta.Generation, objectKey, uploadID)},
Expand Down Expand Up @@ -1381,6 +1425,7 @@ func (s *S3Server) abortMultipartUpload(w http.ResponseWriter, r *http.Request,
w.WriteHeader(http.StatusNoContent)
}

//nolint:cyclop // listParts validates upload, parses pagination params, and iterates parts; branches are inherent.
func (s *S3Server) listParts(w http.ResponseWriter, r *http.Request, bucket string, objectKey string, uploadID string) {
readTS := s.readTS()
readPin := s.pinReadTS(readTS)
Expand Down Expand Up @@ -1444,19 +1489,24 @@ func (s *S3Server) listParts(w http.ResponseWriter, r *http.Request, bucket stri
return
}
result.Parts = append(result.Parts, s3ListPartEntry{
PartNumber: int(desc.PartNo),
PartNumber: int(desc.PartNo), //nolint:gosec // G115: PartNo is in [1,10000], safe for int.
ETag: quoteS3ETag(desc.ETag),
Size: desc.SizeBytes,
})
result.NextPartNumberMarker = int(desc.PartNo)
result.NextPartNumberMarker = int(desc.PartNo) //nolint:gosec // G115: PartNo is in [1,10000], safe for int.
}

writeS3XML(w, http.StatusOK, result)
}

func (s *S3Server) cleanupUploadPartsAsync(bucket string, generation uint64, objectKey string, uploadID string) {
select {
case s.cleanupSem <- struct{}{}:
default:
// Semaphore saturated; skip to avoid unbounded goroutine accumulation.
return
}
go func() {
s.cleanupSem <- struct{}{}
defer func() { <-s.cleanupSem }()
ctx, cancel := context.WithTimeout(context.Background(), s3ManifestCleanupTimeout)
defer cancel()
Expand All @@ -1469,9 +1519,57 @@ func (s *S3Server) cleanupUploadParts(ctx context.Context, bucket string, genera
s.deleteByPrefix(ctx, partPrefix, bucket, generation, objectKey, uploadID)
}

// cleanupPartBlobsAsync asynchronously deletes the blob chunk keys for a single
// upload part. It is used to garbage-collect orphaned chunks when a part
// descriptor write fails after the chunks have already been committed.
func (s *S3Server) cleanupPartBlobsAsync(bucket string, generation uint64, objectKey string, uploadID string, partNo uint64, chunkCount uint64) {
select {
case s.cleanupSem <- struct{}{}:
default:
// Semaphore saturated; skip to avoid unbounded goroutine accumulation.
return
}
go func() {
defer func() { <-s.cleanupSem }()
ctx, cancel := context.WithTimeout(context.Background(), s3ManifestCleanupTimeout)
defer cancel()
pending := make([]*kv.Elem[kv.OP], 0, s3ChunkBatchOps)
flush := func() {
if len(pending) == 0 {
return
}
if _, err := s.coordinator.Dispatch(ctx, &kv.OperationGroup[kv.OP]{Elems: pending}); err != nil {
slog.ErrorContext(ctx, "cleanupPartBlobsAsync: coordinator dispatch failed",
"bucket", bucket,
"object_key", objectKey,
"upload_id", uploadID,
"part_no", partNo,
"err", err,
)
}
pending = pending[:0]
}
for i := uint64(0); i < chunkCount; i++ {
pending = append(pending, &kv.Elem[kv.OP]{
Op: kv.Del,
Key: s3keys.BlobKey(bucket, generation, objectKey, uploadID, partNo, i),
})
if len(pending) >= s3ChunkBatchOps {
flush()
}
}
flush()
}()
}

func (s *S3Server) cleanupUploadDataAsync(bucket string, generation uint64, objectKey string, uploadID string) {
select {
case s.cleanupSem <- struct{}{}:
default:
// Semaphore saturated; skip to avoid unbounded goroutine accumulation.
return
}
go func() {
s.cleanupSem <- struct{}{}
defer func() { <-s.cleanupSem }()
ctx, cancel := context.WithTimeout(context.Background(), s3ManifestCleanupTimeout)
defer cancel()
Expand Down Expand Up @@ -1710,8 +1808,13 @@ func (s *S3Server) cleanupManifestBlobsAsync(bucket string, generation uint64, o
if manifest == nil {
return
}
select {
case s.cleanupSem <- struct{}{}:
default:
// Semaphore saturated; skip to avoid unbounded goroutine accumulation.
return
}
go func() {
s.cleanupSem <- struct{}{}
defer func() { <-s.cleanupSem }()
ctx, cancel := context.WithTimeout(context.Background(), s3ManifestCleanupTimeout)
defer cancel()
Expand Down
Loading
Loading