diff --git a/adapter/s3.go b/adapter/s3.go index c2bdd75a..b467f876 100644 --- a/adapter/s3.go +++ b/adapter/s3.go @@ -48,6 +48,7 @@ const ( s3ManifestCleanupWorkers = 16 s3PathSplitParts = 2 + s3RangeSplitParts = 2 s3GenerationBytes = 8 s3HLCPhysicalShift = 16 @@ -213,8 +214,8 @@ type s3CompleteMultipartUploadResult struct { } type s3CompleteMultipartUploadRequest struct { - XMLName xml.Name `xml:"CompleteMultipartUpload"` - Parts []s3CompleteMultipartUploadPart `xml:"Part"` + XMLName xml.Name `xml:"CompleteMultipartUpload"` + Parts []s3CompleteMultipartUploadPart `xml:"Part"` } type s3CompleteMultipartUploadPart struct { @@ -347,6 +348,7 @@ func (s *S3Server) handleBucket(w http.ResponseWriter, r *http.Request, bucket s } } +//nolint:cyclop // handleObject routes to sub-handlers based on method+operation; branching is by design. func (s *S3Server) handleObject(w http.ResponseWriter, r *http.Request, bucket string, objectKey string) { query := r.URL.Query() uploadID := query.Get("uploadId") @@ -788,6 +790,7 @@ func (s *S3Server) getObject(w http.ResponseWriter, r *http.Request, bucket stri s.streamObjectChunks(w, r, bucket, meta.Generation, objectKey, manifest, readTS, rangeStart, contentLength) } +//nolint:cyclop // streamObjectChunks iterates nested part/chunk loops with necessary error-handling branches. func (s *S3Server) streamObjectChunks(w http.ResponseWriter, r *http.Request, bucket string, generation uint64, objectKey string, manifest *s3ObjectManifest, readTS uint64, offset int64, length int64) { remaining := length pos := int64(0) @@ -799,7 +802,7 @@ func (s *S3Server) streamObjectChunks(w http.ResponseWriter, r *http.Request, bu if remaining <= 0 { break } - cs := int64(chunkSize) + cs := int64(chunkSize) //nolint:gosec // G115: chunkSize is bounded by s3ChunkSize which fits in int64. chunkEnd := pos + cs if chunkEnd <= offset { pos = chunkEnd @@ -807,11 +810,22 @@ func (s *S3Server) streamObjectChunks(w http.ResponseWriter, r *http.Request, bu } chunkIndex, err := uint64FromInt(chunkIdx) if err != nil { + slog.ErrorContext(r.Context(), "streamObjectChunks: uint64FromInt failed", + "bucket", bucket, + "object_key", objectKey, + "err", err, + ) return } chunkKey := s3keys.BlobKey(bucket, generation, objectKey, manifest.UploadID, part.PartNo, chunkIndex) chunk, err := s.store.GetAt(r.Context(), chunkKey, readTS) if err != nil { + slog.ErrorContext(r.Context(), "streamObjectChunks: GetAt failed", + "bucket", bucket, + "object_key", objectKey, + "chunk_key", string(chunkKey), + "err", err, + ) return } start := int64(0) @@ -833,6 +847,7 @@ func (s *S3Server) streamObjectChunks(w http.ResponseWriter, r *http.Request, bu } } +//nolint:cyclop // parseS3RangeHeader handles all RFC-compliant byte-range forms; each branch is a distinct syntax. func parseS3RangeHeader(header string, totalSize int64) (start int64, end int64, ok bool) { if !strings.HasPrefix(header, "bytes=") { return 0, 0, false @@ -841,8 +856,8 @@ func parseS3RangeHeader(header string, totalSize int64) (start int64, end int64, if strings.Contains(spec, ",") { return 0, 0, false // multi-range not supported } - parts := strings.SplitN(spec, "-", 2) - if len(parts) != 2 { + parts := strings.SplitN(spec, "-", s3RangeSplitParts) + if len(parts) != s3RangeSplitParts { return 0, 0, false } left := strings.TrimSpace(parts[0]) @@ -854,6 +869,10 @@ func parseS3RangeHeader(header string, totalSize int64) (start int64, end int64, if err != nil || n <= 0 { return 0, 0, false } + if totalSize <= 0 { + // Nothing to serve from an empty object; caller should return 416. + return 0, 0, false + } if n > totalSize { n = totalSize } @@ -989,7 +1008,7 @@ func (s *S3Server) createMultipartUpload(w http.ResponseWriter, r *http.Request, }) } -//nolint:cyclop // Upload part is intentionally linear and maps directly to protocol steps. +//nolint:cyclop,gocognit // Upload part is intentionally linear and maps directly to protocol steps. func (s *S3Server) uploadPart(w http.ResponseWriter, r *http.Request, bucket string, objectKey string, uploadID string, partNumberStr string) { partNumber, err := strconv.Atoi(partNumberStr) if err != nil || partNumber < s3MinPartNumber || partNumber > s3MaxPartNumber { @@ -1045,29 +1064,42 @@ func (s *S3Server) uploadPart(w http.ResponseWriter, r *http.Request, bucket str for { n, readErr := r.Body.Read(buf) - if n > 0 { - chunk := append([]byte(nil), buf[:n]...) - if _, err := hasher.Write(chunk); err != nil { - writeS3InternalError(w, err) + if n == 0 { + if errors.Is(readErr, io.EOF) { + break + } + if readErr != nil { + var maxBytesErr *http.MaxBytesError + if errors.As(readErr, &maxBytesErr) { + writeS3Error(w, http.StatusRequestEntityTooLarge, "EntityTooLarge", "part exceeds maximum allowed size", bucket, objectKey) + return + } + writeS3InternalError(w, readErr) return } - chunkKey := s3keys.BlobKey(bucket, meta.Generation, objectKey, uploadID, partNo, chunkNo) - pendingBatch = append(pendingBatch, &kv.Elem[kv.OP]{Op: kv.Put, Key: chunkKey, Value: chunk}) - cs, err := uint64FromInt(n) - if err != nil { + continue + } + chunk := append([]byte(nil), buf[:n]...) + if _, err := hasher.Write(chunk); err != nil { + writeS3InternalError(w, err) + return + } + chunkKey := s3keys.BlobKey(bucket, meta.Generation, objectKey, uploadID, partNo, chunkNo) + pendingBatch = append(pendingBatch, &kv.Elem[kv.OP]{Op: kv.Put, Key: chunkKey, Value: chunk}) + cs, err := uint64FromInt(n) + if err != nil { + writeS3InternalError(w, err) + return + } + chunkSizes = append(chunkSizes, cs) + if len(pendingBatch) >= s3ChunkBatchOps { + if err := flushBatch(); err != nil { writeS3InternalError(w, err) return } - chunkSizes = append(chunkSizes, cs) - if len(pendingBatch) >= s3ChunkBatchOps { - if err := flushBatch(); err != nil { - writeS3InternalError(w, err) - return - } - } - sizeBytes += int64(n) - chunkNo++ } + sizeBytes += int64(n) + chunkNo++ if errors.Is(readErr, io.EOF) { break } @@ -1114,6 +1146,8 @@ func (s *S3Server) uploadPart(w http.ResponseWriter, r *http.Request, bucket str {Op: kv.Put, Key: partKey, Value: descBody}, }, }); err != nil { + // Clean up orphaned blob chunks so they don't accumulate in the store. + s.cleanupPartBlobsAsync(bucket, meta.Generation, objectKey, uploadID, partNo, chunkNo) writeS3InternalError(w, err) return } @@ -1122,9 +1156,9 @@ func (s *S3Server) uploadPart(w http.ResponseWriter, r *http.Request, bucket str w.WriteHeader(http.StatusOK) } -//nolint:cyclop,gocognit // CompleteMultipartUpload validates parts, computes composite ETag, and commits atomically. +//nolint:cyclop,gocognit,gocyclo // CompleteMultipartUpload validates parts, computes composite ETag, and commits atomically. func (s *S3Server) completeMultipartUpload(w http.ResponseWriter, r *http.Request, bucket string, objectKey string, uploadID string) { - bodyBytes, err := io.ReadAll(io.LimitReader(r.Body, 1<<20)) + bodyBytes, err := io.ReadAll(io.LimitReader(r.Body, s3ChunkSize)) if err != nil { writeS3InternalError(w, err) return @@ -1138,10 +1172,19 @@ func (s *S3Server) completeMultipartUpload(w http.ResponseWriter, r *http.Reques writeS3Error(w, http.StatusBadRequest, "InvalidArgument", "at least one part is required", bucket, objectKey) return } + if len(completionReq.Parts) > s3MaxPartsPerUpload { + writeS3Error(w, http.StatusBadRequest, "InvalidArgument", + fmt.Sprintf("too many parts in CompleteMultipartUpload request (maximum %d)", s3MaxPartsPerUpload), bucket, objectKey) + return + } - // Parts must be in ascending order. - for i := 1; i < len(completionReq.Parts); i++ { - if completionReq.Parts[i].PartNumber <= completionReq.Parts[i-1].PartNumber { + // Parts must be in ascending order, within allowed part number range. + for i, part := range completionReq.Parts { + if part.PartNumber < s3MinPartNumber || part.PartNumber > s3MaxPartNumber { + writeS3Error(w, http.StatusBadRequest, "InvalidArgument", "part number out of allowed range", bucket, objectKey) + return + } + if i > 0 && part.PartNumber <= completionReq.Parts[i-1].PartNumber { writeS3Error(w, http.StatusBadRequest, "InvalidPartOrder", "parts must be in ascending order", bucket, objectKey) return } @@ -1187,7 +1230,7 @@ func (s *S3Server) completeMultipartUpload(w http.ResponseWriter, r *http.Reques totalSize := int64(0) for i, reqPart := range completionReq.Parts { - partKey := s3keys.UploadPartKey(bucket, meta.Generation, objectKey, uploadID, uint64(reqPart.PartNumber)) + partKey := s3keys.UploadPartKey(bucket, meta.Generation, objectKey, uploadID, uint64(reqPart.PartNumber)) //nolint:gosec // G115: PartNumber validated in [1,10000]. raw, err := s.store.GetAt(r.Context(), partKey, readTS) if err != nil { readPin.Release() @@ -1230,13 +1273,7 @@ func (s *S3Server) completeMultipartUpload(w http.ResponseWriter, r *http.Reques return } - manifestParts = append(manifestParts, s3ObjectPart{ - PartNo: desc.PartNo, - ETag: desc.ETag, - SizeBytes: desc.SizeBytes, - ChunkCount: desc.ChunkCount, - ChunkSizes: desc.ChunkSizes, - }) + manifestParts = append(manifestParts, s3ObjectPart(desc)) totalSize += desc.SizeBytes } readPin.Release() @@ -1291,12 +1328,19 @@ func (s *S3Server) completeMultipartUpload(w http.ResponseWriter, r *http.Reques return errors.WithStack(err) } - // Atomically: write manifest, delete UploadMeta + GCUpload (fence against Abort). + bucketFence, err := encodeS3BucketMeta(meta) + if err != nil { + return errors.WithStack(err) + } + + // Atomically: fence bucket (conflict with DELETE bucket), write manifest, + // delete UploadMeta + GCUpload (fence against Abort). _, err = s.coordinator.Dispatch(r.Context(), &kv.OperationGroup[kv.OP]{ IsTxn: true, StartTS: startTS, CommitTS: commitTS, Elems: []*kv.Elem[kv.OP]{ + {Op: kv.Put, Key: s3keys.BucketMetaKey(bucket), Value: bucketFence}, {Op: kv.Put, Key: headKey, Value: manifestBody}, {Op: kv.Del, Key: uploadMetaKey}, {Op: kv.Del, Key: s3keys.GCUploadKey(bucket, meta.Generation, objectKey, uploadID)}, @@ -1381,6 +1425,7 @@ func (s *S3Server) abortMultipartUpload(w http.ResponseWriter, r *http.Request, w.WriteHeader(http.StatusNoContent) } +//nolint:cyclop // listParts validates upload, parses pagination params, and iterates parts; branches are inherent. func (s *S3Server) listParts(w http.ResponseWriter, r *http.Request, bucket string, objectKey string, uploadID string) { readTS := s.readTS() readPin := s.pinReadTS(readTS) @@ -1444,19 +1489,24 @@ func (s *S3Server) listParts(w http.ResponseWriter, r *http.Request, bucket stri return } result.Parts = append(result.Parts, s3ListPartEntry{ - PartNumber: int(desc.PartNo), + PartNumber: int(desc.PartNo), //nolint:gosec // G115: PartNo is in [1,10000], safe for int. ETag: quoteS3ETag(desc.ETag), Size: desc.SizeBytes, }) - result.NextPartNumberMarker = int(desc.PartNo) + result.NextPartNumberMarker = int(desc.PartNo) //nolint:gosec // G115: PartNo is in [1,10000], safe for int. } writeS3XML(w, http.StatusOK, result) } func (s *S3Server) cleanupUploadPartsAsync(bucket string, generation uint64, objectKey string, uploadID string) { + select { + case s.cleanupSem <- struct{}{}: + default: + // Semaphore saturated; skip to avoid unbounded goroutine accumulation. + return + } go func() { - s.cleanupSem <- struct{}{} defer func() { <-s.cleanupSem }() ctx, cancel := context.WithTimeout(context.Background(), s3ManifestCleanupTimeout) defer cancel() @@ -1469,9 +1519,57 @@ func (s *S3Server) cleanupUploadParts(ctx context.Context, bucket string, genera s.deleteByPrefix(ctx, partPrefix, bucket, generation, objectKey, uploadID) } +// cleanupPartBlobsAsync asynchronously deletes the blob chunk keys for a single +// upload part. It is used to garbage-collect orphaned chunks when a part +// descriptor write fails after the chunks have already been committed. +func (s *S3Server) cleanupPartBlobsAsync(bucket string, generation uint64, objectKey string, uploadID string, partNo uint64, chunkCount uint64) { + select { + case s.cleanupSem <- struct{}{}: + default: + // Semaphore saturated; skip to avoid unbounded goroutine accumulation. + return + } + go func() { + defer func() { <-s.cleanupSem }() + ctx, cancel := context.WithTimeout(context.Background(), s3ManifestCleanupTimeout) + defer cancel() + pending := make([]*kv.Elem[kv.OP], 0, s3ChunkBatchOps) + flush := func() { + if len(pending) == 0 { + return + } + if _, err := s.coordinator.Dispatch(ctx, &kv.OperationGroup[kv.OP]{Elems: pending}); err != nil { + slog.ErrorContext(ctx, "cleanupPartBlobsAsync: coordinator dispatch failed", + "bucket", bucket, + "object_key", objectKey, + "upload_id", uploadID, + "part_no", partNo, + "err", err, + ) + } + pending = pending[:0] + } + for i := uint64(0); i < chunkCount; i++ { + pending = append(pending, &kv.Elem[kv.OP]{ + Op: kv.Del, + Key: s3keys.BlobKey(bucket, generation, objectKey, uploadID, partNo, i), + }) + if len(pending) >= s3ChunkBatchOps { + flush() + } + } + flush() + }() +} + func (s *S3Server) cleanupUploadDataAsync(bucket string, generation uint64, objectKey string, uploadID string) { + select { + case s.cleanupSem <- struct{}{}: + default: + // Semaphore saturated; skip to avoid unbounded goroutine accumulation. + return + } go func() { - s.cleanupSem <- struct{}{} defer func() { <-s.cleanupSem }() ctx, cancel := context.WithTimeout(context.Background(), s3ManifestCleanupTimeout) defer cancel() @@ -1710,8 +1808,13 @@ func (s *S3Server) cleanupManifestBlobsAsync(bucket string, generation uint64, o if manifest == nil { return } + select { + case s.cleanupSem <- struct{}{}: + default: + // Semaphore saturated; skip to avoid unbounded goroutine accumulation. + return + } go func() { - s.cleanupSem <- struct{}{} defer func() { <-s.cleanupSem }() ctx, cancel := context.WithTimeout(context.Background(), s3ManifestCleanupTimeout) defer cancel() diff --git a/adapter/s3_auth.go b/adapter/s3_auth.go index 8f261896..02c79c42 100644 --- a/adapter/s3_auth.go +++ b/adapter/s3_auth.go @@ -263,6 +263,34 @@ func normalizeS3PayloadHash(raw string) string { const s3PresignMaxExpiry = 7 * 24 * 60 * 60 // 604800 seconds (7 days) +// checkPresignExpiry validates the expiry of a presigned request. +// It returns an *s3AuthError if the request has expired or the expiry is invalid. +func checkPresignExpiry(expiresStr string, signingTime time.Time) *s3AuthError { + if expiresStr == "" { + // No explicit expiry: fall back to clock skew check. + skew := time.Now().UTC().Sub(signingTime.UTC()) + if skew < 0 { + skew = -skew + } + if skew > s3RequestTimeMaxSkew { + return &s3AuthError{Status: http.StatusForbidden, Code: "AccessDenied", Message: "presigned URL has expired"} + } + return nil + } + expires, err := strconv.Atoi(expiresStr) + if err != nil || expires <= 0 || expires > s3PresignMaxExpiry { + return &s3AuthError{ + Status: http.StatusForbidden, + Code: "AuthorizationQueryParametersError", + Message: "X-Amz-Expires must be between 1 and 604800", + } + } + if time.Now().UTC().After(signingTime.UTC().Add(time.Duration(expires) * time.Second)) { + return &s3AuthError{Status: http.StatusForbidden, Code: "AccessDenied", Message: "presigned URL has expired"} + } + return nil +} + //nolint:cyclop // Presigned URL validation must check multiple parameters precisely. func (s *S3Server) authorizePresignedRequest(r *http.Request) *s3AuthError { query := r.URL.Query() @@ -332,35 +360,8 @@ func (s *S3Server) authorizePresignedRequest(r *http.Request) *s3AuthError { } } - if expiresStr != "" { - expires, err := strconv.Atoi(expiresStr) - if err != nil || expires <= 0 || expires > s3PresignMaxExpiry { - return &s3AuthError{ - Status: http.StatusForbidden, - Code: "AuthorizationQueryParametersError", - Message: "X-Amz-Expires must be between 1 and 604800", - } - } - if time.Now().UTC().After(signingTime.UTC().Add(time.Duration(expires) * time.Second)) { - return &s3AuthError{ - Status: http.StatusForbidden, - Code: "AccessDenied", - Message: "presigned URL has expired", - } - } - } else { - // No explicit expiry: fall back to clock skew check. - skew := time.Now().UTC().Sub(signingTime.UTC()) - if skew < 0 { - skew = -skew - } - if skew > s3RequestTimeMaxSkew { - return &s3AuthError{ - Status: http.StatusForbidden, - Code: "AccessDenied", - Message: "presigned URL has expired", - } - } + if authErr := checkPresignExpiry(expiresStr, signingTime); authErr != nil { + return authErr } // Use the SDK's PresignHTTP to rebuild the expected presigned URL. @@ -373,7 +374,7 @@ func (s *S3Server) authorizePresignedRequest(r *http.Request) *s3AuthError { } verifyURL.RawQuery = q.Encode() - verifyReq, err := http.NewRequestWithContext(context.Background(), r.Method, verifyURL.String(), nil) + verifyReq, err := http.NewRequestWithContext(context.Background(), r.Method, verifyURL.String(), nil) //nolint:gosec // G704: URL is derived from the incoming request's own URL to rebuild and verify its signature; no outbound network request is made. if err != nil { return &s3AuthError{ Status: http.StatusForbidden, @@ -391,6 +392,14 @@ func (s *S3Server) authorizePresignedRequest(r *http.Request) *s3AuthError { verifyReq.Header.Set(h, r.Header.Get(h)) } + // Re-add X-Amz-Expires so PresignHTTP includes it in the canonical request, + // matching the client's original signature computation. + if expiresStr != "" { + vq := verifyReq.URL.Query() + vq.Set("X-Amz-Expires", expiresStr) + verifyReq.URL.RawQuery = vq.Encode() + } + signer := v4.NewSigner(func(opts *v4.SignerOptions) { opts.DisableURIPathEscaping = true }) diff --git a/adapter/s3_test.go b/adapter/s3_test.go index 89573729..04aabdfe 100644 --- a/adapter/s3_test.go +++ b/adapter/s3_test.go @@ -855,6 +855,29 @@ func TestS3Server_RangeReadInvalidRange(t *testing.T) { require.Equal(t, http.StatusRequestedRangeNotSatisfiable, rec.Code) } +func TestS3Server_RangeReadEmptyObject(t *testing.T) { + t.Parallel() + + st := store.NewMVCCStore() + server := NewS3Server(nil, "", st, newLocalAdapterCoordinator(st), nil) + + rec := httptest.NewRecorder() + server.handle(rec, newS3TestRequest(http.MethodPut, "/bucket-empty-range", nil)) + require.Equal(t, http.StatusOK, rec.Code) + + // Upload an empty object. + rec = httptest.NewRecorder() + server.handle(rec, newS3TestRequest(http.MethodPut, "/bucket-empty-range/empty.txt", strings.NewReader(""))) + require.Equal(t, http.StatusOK, rec.Code) + + // Suffix range on empty object must return 416. + rec = httptest.NewRecorder() + req := newS3TestRequest(http.MethodGet, "/bucket-empty-range/empty.txt", nil) + req.Header.Set("Range", "bytes=-4") + server.handle(rec, req) + require.Equal(t, http.StatusRequestedRangeNotSatisfiable, rec.Code) +} + func TestS3Server_MultipartUploadETagComputation(t *testing.T) { t.Parallel() @@ -951,6 +974,68 @@ func TestS3Server_MultipartUploadRejectsInvalidPartOrder(t *testing.T) { require.Contains(t, rec.Body.String(), "InvalidPartOrder") } +func TestS3Server_CompleteMultipartUploadTooManyParts(t *testing.T) { + t.Parallel() + + st := store.NewMVCCStore() + server := NewS3Server(nil, "", st, newLocalAdapterCoordinator(st), nil) + + rec := httptest.NewRecorder() + server.handle(rec, newS3TestRequest(http.MethodPut, "/bucket-toomany", nil)) + require.Equal(t, http.StatusOK, rec.Code) + + rec = httptest.NewRecorder() + server.handle(rec, newS3TestRequest(http.MethodPost, "/bucket-toomany/obj?uploads=", nil)) + require.Equal(t, http.StatusOK, rec.Code) + var initResult s3InitiateMultipartUploadResult + require.NoError(t, xml.Unmarshal(rec.Body.Bytes(), &initResult)) + uploadID := initResult.UploadId + + // Build a CompleteMultipartUpload request with too many parts (> 10000). + var sb strings.Builder + sb.WriteString("") + for i := 1; i <= s3MaxPartsPerUpload+1; i++ { + fmt.Fprintf(&sb, "%d\"abc\"", i) + } + sb.WriteString("") + + rec = httptest.NewRecorder() + server.handle(rec, newS3TestRequest(http.MethodPost, + fmt.Sprintf("/bucket-toomany/obj?uploadId=%s", uploadID), + strings.NewReader(sb.String()))) + require.Equal(t, http.StatusBadRequest, rec.Code) + require.Contains(t, rec.Body.String(), "InvalidArgument") +} + +func TestS3Server_CompleteMultipartUploadOutOfRangePartNumber(t *testing.T) { + t.Parallel() + + st := store.NewMVCCStore() + server := NewS3Server(nil, "", st, newLocalAdapterCoordinator(st), nil) + + rec := httptest.NewRecorder() + server.handle(rec, newS3TestRequest(http.MethodPut, "/bucket-partrange", nil)) + require.Equal(t, http.StatusOK, rec.Code) + + rec = httptest.NewRecorder() + server.handle(rec, newS3TestRequest(http.MethodPost, "/bucket-partrange/obj?uploads=", nil)) + require.Equal(t, http.StatusOK, rec.Code) + var initResult s3InitiateMultipartUploadResult + require.NoError(t, xml.Unmarshal(rec.Body.Bytes(), &initResult)) + uploadID := initResult.UploadId + + // Use part number 0 (below s3MinPartNumber=1). + completeBody := ` + 0"abc" + ` + rec = httptest.NewRecorder() + server.handle(rec, newS3TestRequest(http.MethodPost, + fmt.Sprintf("/bucket-partrange/obj?uploadId=%s", uploadID), + strings.NewReader(completeBody))) + require.Equal(t, http.StatusBadRequest, rec.Code) + require.Contains(t, rec.Body.String(), "InvalidArgument") +} + func TestS3Server_MultipartNoSuchUpload(t *testing.T) { t.Parallel() @@ -1243,9 +1328,9 @@ func TestS3Server_CompleteMultipartUploadETagMismatch(t *testing.T) { require.Equal(t, http.StatusOK, rec.Code) // Complete with wrong ETag. - completeBody := fmt.Sprintf(` + completeBody := ` 1"0000000000000000deadbeef00000000" - `) + ` rec = httptest.NewRecorder() server.handle(rec, newS3TestRequest(http.MethodPost, fmt.Sprintf("/bucket-etag-mm/obj?uploadId=%s", uploadID),