Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Split start and finish block upload endpoints. #2486

Merged
merged 5 commits into from
Jul 20, 2022
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
* [CHANGE] Add new `-*.consul.cas-retry-delay` flags. They have a default value of `1s`, while previously there was no delay between retries. #2309
* [CHANGE] Store-gateway: Remove the experimental ability to run requests in a dedicated OS thread pool and associated CLI flag `-store-gateway.thread-pool-size`. #2423
* [CHANGE] Memberlist: disabled TCP-based ping fallback, because Mimir already uses a custom transport based on TCP. #2456
* [CHANGE] Experimental block upload API exposed by compactor has changed: Previous `/api/v1/upload/block/{block}` endpoint for starting block upload is now `/api/v1/upload/block/{block}/start`, and previous endpoint `/api/v1/upload/block/{block}?uploadComplete=true` for finishing block upload is now `/api/v1/upload/block/{block}/finish`. #2486
* [FEATURE] Compactor: Adds the ability to delete partial blocks after a configurable delay. This option can be configured per tenant. #2285
- `-compactor.partial-block-deletion-delay`, as a duration string, allows you to set the delay since a partial block has been modified before marking it for deletion. A value of `0`, the default, disables this feature.
- The metric `cortex_compactor_blocks_marked_for_deletion_total` has a new value for the `reason` label `reason="partial"`, when a block deletion marker is triggered by the partial block deletion delay.
Expand Down
6 changes: 3 additions & 3 deletions docs/sources/operators-guide/reference-http-api/index.md
Original file line number Diff line number Diff line change
Expand Up @@ -77,9 +77,9 @@ This document groups API endpoints by service. Note that the API endpoints are e
| [Store-gateway tenants](#store-gateway-tenants) | Store-gateway | `GET /store-gateway/tenants` |
| [Store-gateway tenant blocks](#store-gateway-tenant-blocks) | Store-gateway | `GET /store-gateway/tenant/{tenant}/blocks` |
| [Compactor ring status](#compactor-ring-status) | Compactor | `GET /compactor/ring` |
| [Start block upload](#start-block-upload) | Compactor | `POST /api/v1/upload/block/{block}` |
| [Start block upload](#start-block-upload) | Compactor | `POST /api/v1/upload/block/{block}/start` |
pstibrany marked this conversation as resolved.
Show resolved Hide resolved
| [Upload block file](#upload-block-file) | Compactor | `POST /api/v1/upload/block/{block}/files?path={path}` |
| [Complete block upload](#complete-block-upload) | Compactor | `POST /api/v1/upload/block/{block}?uploadComplete=true` |
| [Complete block upload](#complete-block-upload) | Compactor | `POST /api/v1/upload/block/{block}/finish` |

### Path prefixes

Expand Down Expand Up @@ -953,7 +953,7 @@ Requires [authentication](#authentication).
### Complete block upload

```
POST /api/v1/upload/block/{block}?uploadComplete=true
POST /api/v1/upload/block/{block}/finish
```

Completes the uploading of a TSDB block with a given ID to object storage. If the complete block already
Expand Down
7 changes: 3 additions & 4 deletions pkg/api/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -345,10 +345,9 @@ func (a *API) RegisterCompactor(c *compactor.MultitenantCompactor) {
{Desc: "Ring status", Path: "/compactor/ring"},
})
a.RegisterRoute("/compactor/ring", http.HandlerFunc(c.RingHandler), false, true, "GET", "POST")
a.RegisterRoute("/api/v1/upload/block/{block}", http.HandlerFunc(c.HandleBlockUpload), true,
false, http.MethodPost)
a.RegisterRoute("/api/v1/upload/block/{block}/files", http.HandlerFunc(c.UploadBlockFile),
true, false, http.MethodPost)
a.RegisterRoute("/api/v1/upload/block/{block}/start", http.HandlerFunc(c.StartBlockUpload), true, false, http.MethodPost)
a.RegisterRoute("/api/v1/upload/block/{block}/files", http.HandlerFunc(c.UploadBlockFile), true, false, http.MethodPost)
a.RegisterRoute("/api/v1/upload/block/{block}/finish", http.HandlerFunc(c.FinishBlockUpload), true, false, http.MethodPost)
}

type Distributor interface {
Expand Down
137 changes: 73 additions & 64 deletions pkg/compactor/block_upload.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,62 +36,91 @@ const uploadingMetaFilename = "uploading-" + block.MetaFilename

var rePath = regexp.MustCompile(`^(index|chunks/\d{6})$`)

// HandleBlockUpload handles requests for starting or completing block uploads.
//
// The query parameter uploadComplete (true or false, default false) controls whether the
// upload should be completed or not.
// StartBlockUpload handles request for starting block upload.
//
// Starting the uploading of a block means to upload a meta file and verify that the upload can
// go ahead. In practice this means to check that the (complete) block isn't already in block
// storage, and that the meta file is valid.
func (c *MultitenantCompactor) HandleBlockUpload(w http.ResponseWriter, r *http.Request) {
vars := mux.Vars(r)
blockID := vars["block"]
bULID, err := ulid.Parse(blockID)
if err != nil {
http.Error(w, "invalid block ID", http.StatusBadRequest)
func (c *MultitenantCompactor) StartBlockUpload(w http.ResponseWriter, r *http.Request) {
blockID, tenantID, userBkt, ok := c.parseBlockUploadParameters(w, r)
if !ok {
return
}

ctx := r.Context()
tenantID, err := tenant.TenantID(ctx)
if err != nil {
http.Error(w, "invalid tenant ID", http.StatusBadRequest)
logger := log.With(util_log.WithContext(ctx, c.logger), "block", blockID)

const op = "start block upload"

if err := checkForCompleteBlock(ctx, blockID, userBkt); err != nil {
writeBlockUploadError(err, op, "while checking for complete block", logger, w)
return
}
if !c.cfgProvider.CompactorBlockUploadEnabled(tenantID) {
http.Error(w, "block upload is disabled", http.StatusBadRequest)

if err := c.createBlockUpload(ctx, r, logger, userBkt, tenantID, blockID); err != nil {
writeBlockUploadError(err, op, "", logger, w)
return
}

logger := log.With(util_log.WithContext(ctx, c.logger), "block", blockID)
w.WriteHeader(http.StatusOK)
}

shouldComplete := r.URL.Query().Get("uploadComplete") == "true"
var op string
if shouldComplete {
op = "complete block upload"
} else {
op = "start block upload"
// FinishBlockUpload handles request for finishing block upload.
//
// Finishing block upload performs block valiation, and if all checks pass, marks block as finished
// by uploading meta.json file.
func (c *MultitenantCompactor) FinishBlockUpload(w http.ResponseWriter, r *http.Request) {
blockID, _, userBkt, ok := c.parseBlockUploadParameters(w, r)
if !ok {
return
}

userBkt := bucket.NewUserBucketClient(tenantID, c.bucketClient, c.cfgProvider)
if err := checkForCompleteBlock(ctx, bULID, userBkt); err != nil {
ctx := r.Context()
logger := log.With(util_log.WithContext(ctx, c.logger), "block", blockID)

const op = "complete block upload"

if err := checkForCompleteBlock(ctx, blockID, userBkt); err != nil {
writeBlockUploadError(err, op, "while checking for complete block", logger, w)
return
}

if shouldComplete {
err = c.completeBlockUpload(ctx, r, logger, userBkt, bULID)
} else {
err = c.createBlockUpload(ctx, r, logger, userBkt, tenantID, bULID)
}
if err != nil {
if err := c.completeBlockUpload(ctx, r, logger, userBkt, blockID); err != nil {
writeBlockUploadError(err, op, "", logger, w)
return
}

w.WriteHeader(http.StatusOK)
}

// parseBlockUploadParameters parses common parameters from the request: block ID, tenant and checks if tenant has uploads enabled.
// If it returns ok==false, then HTTP response has been sent.
func (c *MultitenantCompactor) parseBlockUploadParameters(w http.ResponseWriter, r *http.Request) (blockULID ulid.ULID, tenantID string, userBucket objstore.InstrumentedBucket, ok bool) {
pstibrany marked this conversation as resolved.
Show resolved Hide resolved
var err error
blockULID, err = ulid.Parse(mux.Vars(r)["block"])
if err != nil {
http.Error(w, "invalid block ID", http.StatusBadRequest)
return
}

ctx := r.Context()
tenantID, err = tenant.TenantID(ctx)
if err != nil {
http.Error(w, "invalid tenant ID", http.StatusBadRequest)
return
}

if !c.cfgProvider.CompactorBlockUploadEnabled(tenantID) {
http.Error(w, "block upload is disabled", http.StatusBadRequest)
return
}

userBucket = bucket.NewUserBucketClient(tenantID, c.bucketClient, c.cfgProvider)
pstibrany marked this conversation as resolved.
Show resolved Hide resolved

ok = true
return
}

func writeBlockUploadError(err error, op, extra string, logger log.Logger, w http.ResponseWriter) {
var httpErr httpError
if errors.As(err, &httpErr) {
Expand Down Expand Up @@ -163,35 +192,17 @@ func (c *MultitenantCompactor) createBlockUpload(ctx context.Context, r *http.Re
//
// It takes the mandatory query parameter "path", specifying the file's destination path.
func (c *MultitenantCompactor) UploadBlockFile(w http.ResponseWriter, r *http.Request) {
const op = "block file upload"

vars := mux.Vars(r)
blockID := vars["block"]
bULID, err := ulid.Parse(blockID)
if err != nil {
http.Error(w, "invalid block ID", http.StatusBadRequest)
blockID, _, userBkt, ok := c.parseBlockUploadParameters(w, r)
if !ok {
return
}

pth := r.URL.Query().Get("path")
if pth == "" {
http.Error(w, "missing or invalid file path", http.StatusBadRequest)
return
}

ctx := r.Context()
tenantID, err := tenant.TenantID(ctx)
if err != nil {
http.Error(w, "invalid tenant ID", http.StatusBadRequest)
return
}
if !c.cfgProvider.CompactorBlockUploadEnabled(tenantID) {
http.Error(w, "block upload is disabled", http.StatusBadRequest)
return
}

logger := util_log.WithContext(ctx, c.logger)
logger = log.With(logger, "block", blockID)

if path.Base(pth) == block.MetaFilename {
http.Error(w, fmt.Sprintf("%s is not allowed", block.MetaFilename), http.StatusBadRequest)
return
Expand All @@ -207,14 +218,17 @@ func (c *MultitenantCompactor) UploadBlockFile(w http.ResponseWriter, r *http.Re
return
}

userBkt := bucket.NewUserBucketClient(tenantID, c.bucketClient, c.cfgProvider)
const op = "block file upload"

ctx := r.Context()
logger := log.With(util_log.WithContext(ctx, c.logger), "block", blockID)

if err := checkForCompleteBlock(ctx, bULID, userBkt); err != nil {
if err := checkForCompleteBlock(ctx, blockID, userBkt); err != nil {
writeBlockUploadError(err, op, "while checking for complete block", logger, w)
return
}

metaPath := path.Join(blockID, uploadingMetaFilename)
metaPath := path.Join(blockID.String(), uploadingMetaFilename)
exists, err := userBkt.Exists(ctx, metaPath)
if err != nil {
level.Error(logger).Log("msg", "failed to check existence in object storage",
Expand All @@ -229,24 +243,19 @@ func (c *MultitenantCompactor) UploadBlockFile(w http.ResponseWriter, r *http.Re

// TODO: Verify that upload path and length correspond to file index

dst := path.Join(blockID, pth)
dst := path.Join(blockID.String(), pth)

level.Debug(logger).Log("msg", "uploading block file to bucket", "destination", dst,
"size", r.ContentLength)
reader := bodyReader{
r: r,
}
level.Debug(logger).Log("msg", "uploading block file to bucket", "destination", dst, "size", r.ContentLength)
reader := bodyReader{r: r}
if err := userBkt.Upload(ctx, dst, reader); err != nil {
level.Error(logger).Log("msg", "failed uploading block file to bucket",
"operation", op, "destination", dst, "err", err)
level.Error(logger).Log("msg", "failed uploading block file to bucket", "operation", op, "destination", dst, "err", err)
// We don't know what caused the error; it could be the client's fault (e.g. killed
// connection), but internal server error is the safe choice here.
http.Error(w, "internal server error", http.StatusInternalServerError)
return
}

level.Debug(logger).Log("msg", "finished uploading block file to bucket",
"path", pth)
level.Debug(logger).Log("msg", "finished uploading block file to bucket", "path", pth)

w.WriteHeader(http.StatusOK)
}
Expand Down
19 changes: 8 additions & 11 deletions pkg/compactor/block_upload_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -519,15 +519,15 @@ func TestMultitenantCompactor_HandleBlockUpload_Create(t *testing.T) {
require.NoError(t, json.NewEncoder(buf).Encode(tc.meta))
rdr = buf
}
r := httptest.NewRequest(http.MethodPost, fmt.Sprintf("/api/v1/upload/block/%s", tc.blockID), rdr)
r := httptest.NewRequest(http.MethodPost, fmt.Sprintf("/api/v1/upload/block/%s/start", tc.blockID), rdr)
if tc.tenantID != "" {
r = r.WithContext(user.InjectOrgID(r.Context(), tc.tenantID))
}
if tc.blockID != "" {
r = mux.SetURLVars(r, map[string]string{"block": tc.blockID})
}
w := httptest.NewRecorder()
c.HandleBlockUpload(w, r)
c.StartBlockUpload(w, r)

resp := w.Result()
body, err := io.ReadAll(resp.Body)
Expand Down Expand Up @@ -655,11 +655,11 @@ func TestMultitenantCompactor_HandleBlockUpload_Create(t *testing.T) {
bucketClient: bkt,
cfgProvider: cfgProvider,
}
r := httptest.NewRequest(http.MethodPost, fmt.Sprintf("/api/v1/upload/block/%s", blockID), bytes.NewReader(metaJSON))
r := httptest.NewRequest(http.MethodPost, fmt.Sprintf("/api/v1/upload/block/%s/start", blockID), bytes.NewReader(metaJSON))
r = r.WithContext(user.InjectOrgID(r.Context(), tenantID))
r = mux.SetURLVars(r, map[string]string{"block": blockID})
w := httptest.NewRecorder()
c.HandleBlockUpload(w, r)
c.StartBlockUpload(w, r)

resp := w.Result()
body, err := io.ReadAll(resp.Body)
Expand Down Expand Up @@ -888,8 +888,7 @@ func TestMultitenantCompactor_UploadBlockFile(t *testing.T) {
if tc.body != "" {
rdr = strings.NewReader(tc.body)
}
r := httptest.NewRequest(http.MethodPost, fmt.Sprintf(
"/api/v1/upload/block/%s/files?path=%s", blockID, url.QueryEscape(tc.path)), rdr)
r := httptest.NewRequest(http.MethodPost, fmt.Sprintf("/api/v1/upload/block/%s/files?path=%s", blockID, url.QueryEscape(tc.path)), rdr)
if tc.tenantID != "" {
r = r.WithContext(user.InjectOrgID(r.Context(), tenantID))
}
Expand Down Expand Up @@ -986,8 +985,7 @@ func TestMultitenantCompactor_UploadBlockFile(t *testing.T) {

for _, f := range tc.files {
rdr := strings.NewReader(f.content)
r := httptest.NewRequest(http.MethodPost, fmt.Sprintf(
"/api/v1/upload/block/%s/files?path=%s", blockID, url.QueryEscape(f.path)), rdr)
r := httptest.NewRequest(http.MethodPost, fmt.Sprintf("/api/v1/upload/block/%s/files?path=%s", blockID, url.QueryEscape(f.path)), rdr)
urlVars := map[string]string{
"block": blockID,
}
Expand Down Expand Up @@ -1183,16 +1181,15 @@ func TestMultitenantCompactor_HandleBlockUpload_Complete(t *testing.T) {
bucketClient: &bkt,
cfgProvider: cfgProvider,
}
r := httptest.NewRequest(http.MethodPost, fmt.Sprintf(
"/api/v1/upload/block/%s?uploadComplete=true", tc.blockID), nil)
r := httptest.NewRequest(http.MethodPost, fmt.Sprintf("/api/v1/upload/block/%s/finish", tc.blockID), nil)
if tc.tenantID != "" {
r = r.WithContext(user.InjectOrgID(r.Context(), tenantID))
}
if tc.blockID != "" {
r = mux.SetURLVars(r, map[string]string{"block": tc.blockID})
}
w := httptest.NewRecorder()
c.HandleBlockUpload(w, r)
c.FinishBlockUpload(w, r)

resp := w.Result()
body, err := io.ReadAll(resp.Body)
Expand Down
18 changes: 11 additions & 7 deletions pkg/mimirtool/client/backfill.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,13 +67,18 @@ func (c *MimirClient) backfillBlock(blockDir string, logctx *logrus.Entry) error

logctx.WithField("file", "meta.json").Info("making request to start block upload")

blockUploadEndpointPrefix := path.Join("/api/v1/upload/block", url.PathEscape(blockID))
const (
endpointPrefix = "/api/v1/upload/block"
startBlockUpload = "start"
uploadFile = "files"
finishBlockUpload = "finish"
)

buf := bytes.NewBuffer(nil)
if err := json.NewEncoder(buf).Encode(blockMeta); err != nil {
return errors.Wrap(err, "failed to JSON encode payload")
}
resp, err := c.doRequest(blockUploadEndpointPrefix, http.MethodPost, buf, int64(buf.Len()))
resp, err := c.doRequest(path.Join(endpointPrefix, url.PathEscape(blockID), startBlockUpload), http.MethodPost, buf, int64(buf.Len()))
if err != nil {
return errors.Wrap(err, "request to start block upload failed")
}
Expand All @@ -86,12 +91,12 @@ func (c *MimirClient) backfillBlock(blockDir string, logctx *logrus.Entry) error
continue
}

if err := c.uploadBlockFile(tf, blockDir, blockUploadEndpointPrefix, logctx); err != nil {
if err := c.uploadBlockFile(tf, blockDir, path.Join(endpointPrefix, url.PathEscape(blockID), uploadFile), logctx); err != nil {
return err
}
}

resp, err = c.doRequest(fmt.Sprintf("%s?uploadComplete=true", blockUploadEndpointPrefix), http.MethodPost, nil, -1)
resp, err = c.doRequest(path.Join(endpointPrefix, url.PathEscape(blockID), finishBlockUpload), http.MethodPost, nil, -1)
if err != nil {
return errors.Wrap(err, "request to finish block upload failed")
}
Expand All @@ -102,7 +107,7 @@ func (c *MimirClient) backfillBlock(blockDir string, logctx *logrus.Entry) error
return nil
}

func (c *MimirClient) uploadBlockFile(tf metadata.File, blockDir, blockUploadEndpointPrefix string, logctx *logrus.Entry) error {
func (c *MimirClient) uploadBlockFile(tf metadata.File, blockDir, fileUploadEndpoint string, logctx *logrus.Entry) error {
pth := filepath.Join(blockDir, filepath.FromSlash(tf.RelPath))
f, err := os.Open(pth)
if err != nil {
Expand All @@ -112,10 +117,9 @@ func (c *MimirClient) uploadBlockFile(tf metadata.File, blockDir, blockUploadEnd
_ = f.Close()
}()

escapedPath := url.QueryEscape(tf.RelPath)
logctx.WithFields(logrus.Fields{"file": tf.RelPath, "size": tf.SizeBytes}).Info("uploading block file")

resp, err := c.doRequest(path.Join(blockUploadEndpointPrefix, fmt.Sprintf("files?path=%s", escapedPath)), http.MethodPost, f, tf.SizeBytes)
resp, err := c.doRequest(fmt.Sprintf("%s?path=%s", fileUploadEndpoint, url.QueryEscape(tf.RelPath)), http.MethodPost, f, tf.SizeBytes)
if err != nil {
return errors.Wrapf(err, "request to upload file %q failed", pth)
}
Expand Down