Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Limit the byte size of block metadata when starting a block upload #4683

Merged
merged 6 commits into from
Apr 12, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
* [CHANGE] Query-frontend: use protobuf internal query result payload format by default. This feature is no longer considered experimental. #4557
* [CHANGE] Ruler: reject creating federated rule groups while tenant federation is disabled. Previously the rule groups would be silently dropped during bucket sync. #4555
* [CHANGE] Compactor: the `/api/v1/upload/block/{block}/finish` endpoint now returns a `429` status code when the compactor has reached the limit specified by `-compactor.max-block-upload-validation-concurrency`. #4598
* [CHANGE] Compactor: when starting a block upload the maximum byte size of the block metadata provided in the request body is now limited to 1 MiB. If this limit is exceeded a `413` status code is returned. #4683
* [CHANGE] Store-gateway: cache key format for expanded postings has changed. This will invalidate the expanded postings in the index cache when deployed. #4667
* [FEATURE] Cache: Introduce experimental support for using Redis for results, chunks, index, and metadata caches. #4371
* [FEATURE] Vault: Introduce experimental integration with Vault to fetch secrets used to configure TLS for clients. Server TLS secrets will still be read from a file. `tls-ca-path`, `tls-cert-path` and `tls-key-path` will denote the path in Vault for the following CLI flags when `-vault.enabled` is true: #4446.
Expand Down
45 changes: 32 additions & 13 deletions pkg/compactor/block_upload.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"context"
"encoding/json"
"fmt"
"io"
"net/http"
"os"
"path"
Expand Down Expand Up @@ -38,6 +39,7 @@ const (
validationFilename = "validation.json" // Name of the file that stores a heartbeat time and possibly an error message
validationHeartbeatInterval = 1 * time.Minute // Duration of time between heartbeats of an in-progress block upload validation
validationHeartbeatTimeout = 5 * time.Minute // Maximum duration of time to wait until a validation is able to be restarted
maximumMetaSizeBytes = 1 * 1024 * 1024 // 1 MiB, maximum allowed size of an uploaded block's meta.json file
)

var rePath = regexp.MustCompile(`^(index|chunks/\d{6})$`)
Expand Down Expand Up @@ -65,7 +67,29 @@ func (c *MultitenantCompactor) StartBlockUpload(w http.ResponseWriter, r *http.R
return
}

if err := c.createBlockUpload(ctx, r, logger, userBkt, tenantID, blockID); err != nil {
content, err := io.ReadAll(http.MaxBytesReader(w, r.Body, maximumMetaSizeBytes))
if err != nil {
if errors.As(err, new(*http.MaxBytesError)) {
err = httpError{
message: fmt.Sprintf("The block metadata was too large (maximum size allowed is %d bytes)", maximumMetaSizeBytes),
statusCode: http.StatusRequestEntityTooLarge,
}
}
writeBlockUploadError(err, op, "", logger, w)
return
}

var meta metadata.Meta
if err := json.Unmarshal(content, &meta); err != nil {
err = httpError{
message: "malformed request body",
statusCode: http.StatusBadRequest,
}
writeBlockUploadError(err, op, "", logger, w)
return
}

if err := c.createBlockUpload(ctx, &meta, logger, userBkt, tenantID, blockID); err != nil {
writeBlockUploadError(err, op, "", logger, w)
return
}
Expand Down Expand Up @@ -176,20 +200,11 @@ func writeBlockUploadError(err error, op, extra string, logger log.Logger, w htt
http.Error(w, "internal server error", http.StatusInternalServerError)
}

func (c *MultitenantCompactor) createBlockUpload(ctx context.Context, r *http.Request,
func (c *MultitenantCompactor) createBlockUpload(ctx context.Context, meta *metadata.Meta,
logger log.Logger, userBkt objstore.Bucket, tenantID string, blockID ulid.ULID) error {
level.Debug(logger).Log("msg", "starting block upload")

var meta metadata.Meta
dec := json.NewDecoder(r.Body)
if err := dec.Decode(&meta); err != nil {
return httpError{
message: "malformed request body",
statusCode: http.StatusBadRequest,
}
}

if msg := c.sanitizeMeta(logger, blockID, &meta); msg != "" {
if msg := c.sanitizeMeta(logger, blockID, meta); msg != "" {
return httpError{
message: msg,
statusCode: http.StatusBadRequest,
Expand All @@ -209,7 +224,7 @@ func (c *MultitenantCompactor) createBlockUpload(ctx context.Context, r *http.Re
}
}

return c.uploadMeta(ctx, logger, &meta, blockID, uploadingMetaFilename, userBkt)
return c.uploadMeta(ctx, logger, meta, blockID, uploadingMetaFilename, userBkt)
}

// UploadBlockFile handles requests for uploading block files.
Expand Down Expand Up @@ -359,6 +374,10 @@ func (c *MultitenantCompactor) markBlockComplete(ctx context.Context, logger log
// sanitizeMeta sanitizes and validates a metadata.Meta object. If a validation error occurs, an error
// message gets returned, otherwise an empty string.
func (c *MultitenantCompactor) sanitizeMeta(logger log.Logger, blockID ulid.ULID, meta *metadata.Meta) string {
if meta == nil {
return "missing block metadata"
}

// check that the blocks doesn't contain down-sampled data
if meta.Thanos.Downsample.Resolution > 0 {
return "block contains downsampled data"
Expand Down
12 changes: 12 additions & 0 deletions pkg/compactor/block_upload_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,7 @@ func TestMultitenantCompactor_StartBlockUpload(t *testing.T) {
expBadRequest string
expConflict string
expUnprocessableEntity string
expEntityTooLarge string
expInternalServerError bool
setUpBucketMock func(bkt *bucket.ClientMock)
verifyUpload func(*testing.T, *bucket.ClientMock)
Expand Down Expand Up @@ -411,6 +412,14 @@ func TestMultitenantCompactor_StartBlockUpload(t *testing.T) {
meta: &validMeta,
expInternalServerError: true,
},
{
name: "too large of a request body",
tenantID: tenantID,
blockID: blockID,
setUpBucketMock: setUpPartialBlock,
body: strings.Repeat("A", maximumMetaSizeBytes+1),
expEntityTooLarge: fmt.Sprintf("The block metadata was too large (maximum size allowed is %d bytes)", maximumMetaSizeBytes),
},
{
name: "block upload disabled",
tenantID: tenantID,
Expand Down Expand Up @@ -580,6 +589,9 @@ func TestMultitenantCompactor_StartBlockUpload(t *testing.T) {
case tc.expUnprocessableEntity != "":
assert.Equal(t, http.StatusUnprocessableEntity, resp.StatusCode)
assert.Equal(t, fmt.Sprintf("%s\n", tc.expUnprocessableEntity), string(body))
case tc.expEntityTooLarge != "":
assert.Equal(t, http.StatusRequestEntityTooLarge, resp.StatusCode)
assert.Equal(t, fmt.Sprintf("%s\n", tc.expEntityTooLarge), string(body))
default:
assert.Equal(t, http.StatusOK, resp.StatusCode)
assert.Empty(t, string(body))
Expand Down