diff --git a/CHANGELOG.md b/CHANGELOG.md index 055f90631c..5252047e29 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -15,6 +15,7 @@ * [CHANGE] Query-frontend: use protobuf internal query result payload format by default. This feature is no longer considered experimental. #4557 * [CHANGE] Ruler: reject creating federated rule groups while tenant federation is disabled. Previously the rule groups would be silently dropped during bucket sync. #4555 * [CHANGE] Compactor: the `/api/v1/upload/block/{block}/finish` endpoint now returns a `429` status code when the compactor has reached the limit specified by `-compactor.max-block-upload-validation-concurrency`. #4598 +* [CHANGE] Compactor: when starting a block upload the maximum byte size of the block metadata provided in the request body is now limited to 1 MiB. If this limit is exceeded a `413` status code is returned. #4683 * [CHANGE] Store-gateway: cache key format for expanded postings has changed. This will invalidate the expanded postings in the index cache when deployed. #4667 * [FEATURE] Cache: Introduce experimental support for using Redis for results, chunks, index, and metadata caches. #4371 * [FEATURE] Vault: Introduce experimental integration with Vault to fetch secrets used to configure TLS for clients. Server TLS secrets will still be read from a file. `tls-ca-path`, `tls-cert-path` and `tls-key-path` will denote the path in Vault for the following CLI flags when `-vault.enabled` is true: #4446. diff --git a/pkg/compactor/block_upload.go b/pkg/compactor/block_upload.go index cfe61bde10..68b0725cbe 100644 --- a/pkg/compactor/block_upload.go +++ b/pkg/compactor/block_upload.go @@ -7,6 +7,7 @@ import ( "context" "encoding/json" "fmt" + "io" "net/http" "os" "path" @@ -38,6 +39,7 @@ const ( validationFilename = "validation.json" // Name of the file that stores a heartbeat time and possibly an error message validationHeartbeatInterval = 1 * time.Minute // Duration of time between heartbeats of an in-progress block upload validation validationHeartbeatTimeout = 5 * time.Minute // Maximum duration of time to wait until a validation is able to be restarted + maximumMetaSizeBytes = 1 * 1024 * 1024 // 1 MiB, maximum allowed size of an uploaded block's meta.json file ) var rePath = regexp.MustCompile(`^(index|chunks/\d{6})$`) @@ -65,7 +67,29 @@ func (c *MultitenantCompactor) StartBlockUpload(w http.ResponseWriter, r *http.R return } - if err := c.createBlockUpload(ctx, r, logger, userBkt, tenantID, blockID); err != nil { + content, err := io.ReadAll(http.MaxBytesReader(w, r.Body, maximumMetaSizeBytes)) + if err != nil { + if errors.As(err, new(*http.MaxBytesError)) { + err = httpError{ + message: fmt.Sprintf("The block metadata was too large (maximum size allowed is %d bytes)", maximumMetaSizeBytes), + statusCode: http.StatusRequestEntityTooLarge, + } + } + writeBlockUploadError(err, op, "", logger, w) + return + } + + var meta metadata.Meta + if err := json.Unmarshal(content, &meta); err != nil { + err = httpError{ + message: "malformed request body", + statusCode: http.StatusBadRequest, + } + writeBlockUploadError(err, op, "", logger, w) + return + } + + if err := c.createBlockUpload(ctx, &meta, logger, userBkt, tenantID, blockID); err != nil { writeBlockUploadError(err, op, "", logger, w) return } @@ -176,20 +200,11 @@ func writeBlockUploadError(err error, op, extra string, logger log.Logger, w htt http.Error(w, "internal server error", http.StatusInternalServerError) } -func (c *MultitenantCompactor) createBlockUpload(ctx context.Context, r *http.Request, +func (c *MultitenantCompactor) createBlockUpload(ctx context.Context, meta *metadata.Meta, logger log.Logger, userBkt objstore.Bucket, tenantID string, blockID ulid.ULID) error { level.Debug(logger).Log("msg", "starting block upload") - var meta metadata.Meta - dec := json.NewDecoder(r.Body) - if err := dec.Decode(&meta); err != nil { - return httpError{ - message: "malformed request body", - statusCode: http.StatusBadRequest, - } - } - - if msg := c.sanitizeMeta(logger, blockID, &meta); msg != "" { + if msg := c.sanitizeMeta(logger, blockID, meta); msg != "" { return httpError{ message: msg, statusCode: http.StatusBadRequest, @@ -209,7 +224,7 @@ func (c *MultitenantCompactor) createBlockUpload(ctx context.Context, r *http.Re } } - return c.uploadMeta(ctx, logger, &meta, blockID, uploadingMetaFilename, userBkt) + return c.uploadMeta(ctx, logger, meta, blockID, uploadingMetaFilename, userBkt) } // UploadBlockFile handles requests for uploading block files. @@ -359,6 +374,10 @@ func (c *MultitenantCompactor) markBlockComplete(ctx context.Context, logger log // sanitizeMeta sanitizes and validates a metadata.Meta object. If a validation error occurs, an error // message gets returned, otherwise an empty string. func (c *MultitenantCompactor) sanitizeMeta(logger log.Logger, blockID ulid.ULID, meta *metadata.Meta) string { + if meta == nil { + return "missing block metadata" + } + // check that the blocks doesn't contain down-sampled data if meta.Thanos.Downsample.Resolution > 0 { return "block contains downsampled data" diff --git a/pkg/compactor/block_upload_test.go b/pkg/compactor/block_upload_test.go index 909c20e14b..5e62ad7f72 100644 --- a/pkg/compactor/block_upload_test.go +++ b/pkg/compactor/block_upload_test.go @@ -121,6 +121,7 @@ func TestMultitenantCompactor_StartBlockUpload(t *testing.T) { expBadRequest string expConflict string expUnprocessableEntity string + expEntityTooLarge string expInternalServerError bool setUpBucketMock func(bkt *bucket.ClientMock) verifyUpload func(*testing.T, *bucket.ClientMock) @@ -411,6 +412,14 @@ func TestMultitenantCompactor_StartBlockUpload(t *testing.T) { meta: &validMeta, expInternalServerError: true, }, + { + name: "too large of a request body", + tenantID: tenantID, + blockID: blockID, + setUpBucketMock: setUpPartialBlock, + body: strings.Repeat("A", maximumMetaSizeBytes+1), + expEntityTooLarge: fmt.Sprintf("The block metadata was too large (maximum size allowed is %d bytes)", maximumMetaSizeBytes), + }, { name: "block upload disabled", tenantID: tenantID, @@ -580,6 +589,9 @@ func TestMultitenantCompactor_StartBlockUpload(t *testing.T) { case tc.expUnprocessableEntity != "": assert.Equal(t, http.StatusUnprocessableEntity, resp.StatusCode) assert.Equal(t, fmt.Sprintf("%s\n", tc.expUnprocessableEntity), string(body)) + case tc.expEntityTooLarge != "": + assert.Equal(t, http.StatusRequestEntityTooLarge, resp.StatusCode) + assert.Equal(t, fmt.Sprintf("%s\n", tc.expEntityTooLarge), string(body)) default: assert.Equal(t, http.StatusOK, resp.StatusCode) assert.Empty(t, string(body))