Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Compactor block upload: Check block upload endpoint #2548

Merged
merged 6 commits into from
Jul 26, 2022
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
* [CHANGE] Store-gateway: Remove the experimental ability to run requests in a dedicated OS thread pool and associated CLI flag `-store-gateway.thread-pool-size`. #2423
* [CHANGE] Memberlist: disabled TCP-based ping fallback, because Mimir already uses a custom transport based on TCP. #2456
* [CHANGE] Change default value for `-distributor.ha-tracker.max-clusters` to `100` to provide a DoS protection. #2465
* [CHANGE] Experimental block upload API exposed by compactor has changed: Previous `/api/v1/upload/block/{block}` endpoint for starting block upload is now `/api/v1/upload/block/{block}/start`, and previous endpoint `/api/v1/upload/block/{block}?uploadComplete=true` for finishing block upload is now `/api/v1/upload/block/{block}/finish`. #2486
* [CHANGE] Experimental block upload API exposed by compactor has changed: Previous `/api/v1/upload/block/{block}` endpoint for starting block upload is now `/api/v1/upload/block/{block}/start`, and previous endpoint `/api/v1/upload/block/{block}?uploadComplete=true` for finishing block upload is now `/api/v1/upload/block/{block}/finish`. New API endpoint has been added: `/api/v1/upload/block/{block}/check`. #2486 #2548
* [CHANGE] Compactor: changed `-compactor.max-compaction-time` default from `0s` (disabled) to `1h`. When compacting blocks for a tenant, the compactor will move to compact blocks of another tenant or re-plan blocks to compact at least every 1h. #2514
* [FEATURE] Compactor: Adds the ability to delete partial blocks after a configurable delay. This option can be configured per tenant. #2285
- `-compactor.partial-block-deletion-delay`, as a duration string, allows you to set the delay since a partial block has been modified before marking it for deletion. A value of `0`, the default, disables this feature.
Expand Down
38 changes: 35 additions & 3 deletions docs/sources/operators-guide/reference-http-api/index.md
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@ This document groups API endpoints by service. Note that the API endpoints are e
| [Start block upload](#start-block-upload) | Compactor | `POST /api/v1/upload/block/{block}/start` |
| [Upload block file](#upload-block-file) | Compactor | `POST /api/v1/upload/block/{block}/files?path={path}` |
| [Complete block upload](#complete-block-upload) | Compactor | `POST /api/v1/upload/block/{block}/finish` |
| [Check block upload](#check-block-upload) | Compactor | `GET /api/v1/upload/block/{block}/check` |

### Path prefixes

Expand Down Expand Up @@ -970,13 +971,44 @@ Requires [authentication](#authentication).
POST /api/v1/upload/block/{block}/finish
```

Completes the uploading of a TSDB block with a given ID to object storage. If the complete block already
Initiates the completion of a TSDB block with a given ID to object storage. If the complete block already
exists in object storage, a `409` (Conflict) status code gets returned. If an in-flight meta file
(`uploading-meta.json`) doesn't exist in object storage for the block in question, a `404` (Not Found)
status code gets returned.

If the API request succeeds, the in-flight meta file gets renamed to `meta.json` in the block's directory in
object storage, so the block is considered complete, and a `200` status code gets returned.
If the API request succeeds, compactor will start the block validation in the background. If the background validation
passes block upload is finished by renaming in-flight meta file to `meta.json` in the block's directory.

This API endpoint returns `200` (OK) at the beginning of the validation. To further check state of the block upload,
use [Check block upload](#check-block-upload) API endpoint.

Requires [authentication](#authentication).

This API endpoint is experimental and subject to change.

### Check block upload

```
GET /api/v1/upload/block/{block}/check
```

Returns state of the block upload. State is returned as JSON object with field `result`, with following possible values:
- `complete` -- block validation is complete, and block upload is now finished.
- `uploading` -- block is still being uploaded, and [Complete block upload](#complete-block-upload) has not yet been called on the block.
- `validating` -- block is being validated. Validation was started by call to [Complete block upload](#complete-block-upload) API.
- `failed` -- block validation has failed. Error message is available from `error` field of the returned JSON object.

**Example response**

```json
{"result":"uploading"}
```

**Example response**

```json
{"result":"failed","error":"missing index file"}
```

Requires [authentication](#authentication).

Expand Down
1 change: 1 addition & 0 deletions pkg/api/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -348,6 +348,7 @@ func (a *API) RegisterCompactor(c *compactor.MultitenantCompactor) {
a.RegisterRoute("/api/v1/upload/block/{block}/start", http.HandlerFunc(c.StartBlockUpload), true, false, http.MethodPost)
a.RegisterRoute("/api/v1/upload/block/{block}/files", http.HandlerFunc(c.UploadBlockFile), true, false, http.MethodPost)
a.RegisterRoute("/api/v1/upload/block/{block}/finish", http.HandlerFunc(c.FinishBlockUpload), true, false, http.MethodPost)
a.RegisterRoute("/api/v1/upload/block/{block}/check", http.HandlerFunc(c.GetBlockUploadStateHandler), true, false, http.MethodGet)
}

type Distributor interface {
Expand Down
70 changes: 50 additions & 20 deletions pkg/compactor/block_upload.go
Original file line number Diff line number Diff line change
Expand Up @@ -397,6 +397,8 @@ type validationFile struct {
Error string // Error message if validation failed.
}

const validationFileStaleTimeout = 5 * time.Minute

type blockUploadState int

const (
Expand All @@ -409,22 +411,50 @@ const (
blockValidationStale
)

var blockStateMessages = map[blockUploadState]string{
blockStateUnknown: "unknown",
blockIsComplete: "block already exists",
blockUploadNotStarted: "block upload not started",
blockUploadInProgress: "block upload in progress",
blockValidationInProgress: "block validation in progress",
blockValidationFailed: "block validation failed",
blockValidationStale: "block validation stale",
}
func (c *MultitenantCompactor) GetBlockUploadStateHandler(w http.ResponseWriter, r *http.Request) {
blockID, tenantID, err := c.parseBlockUploadParameters(r)
if err != nil {
http.Error(w, err.Error(), http.StatusBadRequest)
return
}

userBkt := bucket.NewUserBucketClient(tenantID, c.bucketClient, c.cfgProvider)
if err != nil {
http.Error(w, err.Error(), http.StatusBadRequest)
return
}

s, _, v, err := c.getBlockUploadState(r.Context(), userBkt, blockID)
if err != nil {
writeBlockUploadError(err, "get block state", "", log.With(util_log.WithContext(r.Context(), c.logger), "block", blockID), w)
return
}

func (s blockUploadState) String() string {
msg, ok := blockStateMessages[s]
if ok {
return msg
type result struct {
State string `json:"result"`
Error string `json:"error,omitempty"`
}
return "invalid state"

res := result{}

switch s {
case blockIsComplete:
res.State = "complete"
case blockUploadNotStarted:
http.Error(w, "block doesn't exist", http.StatusNotFound)
return
case blockValidationStale:
fallthrough
case blockUploadInProgress:
res.State = "uploading"
case blockValidationInProgress:
res.State = "validating"
case blockValidationFailed:
res.State = "failed"
res.Error = v.Error
}

util.WriteJSONResponse(w, res)
}

// checkBlockState checks blocks state and returns various HTTP status codes for individual states if block
Expand All @@ -437,12 +467,12 @@ func (c *MultitenantCompactor) checkBlockState(ctx context.Context, userBkt objs

switch s {
case blockIsComplete:
return m, v, httpError{message: s.String(), statusCode: http.StatusConflict}
return m, v, httpError{message: "block already exists", statusCode: http.StatusConflict}
case blockValidationInProgress:
return m, v, httpError{message: s.String(), statusCode: http.StatusBadRequest}
return m, v, httpError{message: "block validation in progress", statusCode: http.StatusBadRequest}
case blockUploadNotStarted:
if requireUploadInProgress {
return m, v, httpError{message: s.String(), statusCode: http.StatusNotFound}
return m, v, httpError{message: "block upload not started", statusCode: http.StatusNotFound}
}
return m, v, nil
case blockValidationStale:
Expand All @@ -451,10 +481,10 @@ func (c *MultitenantCompactor) checkBlockState(ctx context.Context, userBkt objs
case blockUploadInProgress:
return m, v, nil
case blockValidationFailed:
return m, v, httpError{message: s.String(), statusCode: http.StatusBadRequest}
return m, v, httpError{message: "block validation failed", statusCode: http.StatusBadRequest}
}

return m, v, httpError{message: s.String(), statusCode: http.StatusInternalServerError}
return m, v, httpError{message: "unknown block upload state", statusCode: http.StatusInternalServerError}
}

// getBlockUploadState returns state of the block upload, and meta and validation objects, if they exist.
Expand Down Expand Up @@ -486,7 +516,7 @@ func (c *MultitenantCompactor) getBlockUploadState(ctx context.Context, userBkt
if v.Error != "" {
return blockValidationFailed, meta, v, err
}
if time.Since(time.UnixMilli(v.LastUpdate)) < 5*time.Minute {
if time.Since(time.UnixMilli(v.LastUpdate)) < validationFileStaleTimeout {
return blockValidationInProgress, meta, v, nil
}
return blockValidationStale, meta, v, nil
Expand Down
117 changes: 105 additions & 12 deletions pkg/compactor/block_upload_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -585,8 +585,8 @@ func TestMultitenantCompactor_HandleBlockUpload_Create(t *testing.T) {
{
name: "valid request when both in-flight meta file and complete meta file exist in object storage",
setUp: func(t *testing.T, bkt *objstore.InMemBucket) metadata.Meta {
uploadMeta(t, bkt, uploadingMetaPath, validMeta)
uploadMeta(t, bkt, metaPath, validMeta)
marshalAndUploadJSON(t, bkt, uploadingMetaPath, validMeta)
marshalAndUploadJSON(t, bkt, metaPath, validMeta)
return validMeta
},
verifyBucket: func(t *testing.T, bkt *objstore.InMemBucket) {
Expand All @@ -598,7 +598,7 @@ func TestMultitenantCompactor_HandleBlockUpload_Create(t *testing.T) {
{
name: "invalid request when in-flight meta file exists in object storage",
setUp: func(t *testing.T, bkt *objstore.InMemBucket) metadata.Meta {
uploadMeta(t, bkt, uploadingMetaPath, validMeta)
marshalAndUploadJSON(t, bkt, uploadingMetaPath, validMeta)

meta := validMeta
// Invalid version
Expand All @@ -613,7 +613,7 @@ func TestMultitenantCompactor_HandleBlockUpload_Create(t *testing.T) {
{
name: "valid request when same in-flight meta file exists in object storage",
setUp: func(t *testing.T, bkt *objstore.InMemBucket) metadata.Meta {
uploadMeta(t, bkt, uploadingMetaPath, validMeta)
marshalAndUploadJSON(t, bkt, uploadingMetaPath, validMeta)
return validMeta
},
verifyBucket: func(t *testing.T, bkt *objstore.InMemBucket) {
Expand All @@ -629,7 +629,7 @@ func TestMultitenantCompactor_HandleBlockUpload_Create(t *testing.T) {
meta := validMeta
meta.MinTime -= 1000
meta.MaxTime -= 1000
uploadMeta(t, bkt, uploadingMetaPath, meta)
marshalAndUploadJSON(t, bkt, uploadingMetaPath, meta)

// Return meta file that differs from the one in bucket
return validMeta
Expand Down Expand Up @@ -997,7 +997,7 @@ func TestMultitenantCompactor_UploadBlockFile(t *testing.T) {
},
},
setUpBucket: func(t *testing.T, bkt *objstore.InMemBucket) {
uploadMeta(t, bkt, uploadingMetaPath, validMeta)
marshalAndUploadJSON(t, bkt, uploadingMetaPath, validMeta)
},
verifyBucket: func(t *testing.T, bkt *objstore.InMemBucket, files []file) {
t.Helper()
Expand Down Expand Up @@ -1270,12 +1270,105 @@ func TestMultitenantCompactor_HandleBlockUpload_Complete(t *testing.T) {
}
}

// uploadMeta is a test helper for uploading a meta file to a certain path in a bucket.
func uploadMeta(t *testing.T, bkt objstore.Bucket, pth string, meta metadata.Meta) {
// marshalAndUploadJSON is a test helper for uploading a meta file to a certain path in a bucket.
func marshalAndUploadJSON(t *testing.T, bkt objstore.Bucket, pth string, val interface{}) {
t.Helper()
buf, err := json.Marshal(val)
require.NoError(t, err)
require.NoError(t, bkt.Upload(context.Background(), pth, bytes.NewReader(buf)))
}

func TestMultitenantCompactor_GetBlockUploadStateHandler(t *testing.T) {
const (
tenantID = "tenant"
blockID = "01G8X9GA8R6N8F75FW1J18G83N"
)

type testcase struct {
setupBucket func(t *testing.T, bkt objstore.Bucket)
disableBlockUpload bool
expectedStatusCode int
expectedBody string
}

for name, tc := range map[string]testcase{
"block doesn't exist": {
expectedStatusCode: http.StatusNotFound,
expectedBody: "block doesn't exist",
},

"complete block": {
setupBucket: func(t *testing.T, bkt objstore.Bucket) {
marshalAndUploadJSON(t, bkt, path.Join(tenantID, blockID, block.MetaFilename), metadata.Meta{})
},
expectedStatusCode: http.StatusOK,
expectedBody: `{"result":"complete"}`,
},

"upload in progress": {
setupBucket: func(t *testing.T, bkt objstore.Bucket) {
marshalAndUploadJSON(t, bkt, path.Join(tenantID, blockID, uploadingMetaFilename), metadata.Meta{})
},
expectedStatusCode: http.StatusOK,
expectedBody: `{"result":"uploading"}`,
},

"validating": {
setupBucket: func(t *testing.T, bkt objstore.Bucket) {
marshalAndUploadJSON(t, bkt, path.Join(tenantID, blockID, uploadingMetaFilename), metadata.Meta{})
marshalAndUploadJSON(t, bkt, path.Join(tenantID, blockID, validationFilename), validationFile{LastUpdate: time.Now().UnixMilli()})
},
expectedStatusCode: http.StatusOK,
expectedBody: `{"result":"validating"}`,
},

"validation failed": {
setupBucket: func(t *testing.T, bkt objstore.Bucket) {
marshalAndUploadJSON(t, bkt, path.Join(tenantID, blockID, uploadingMetaFilename), metadata.Meta{})
marshalAndUploadJSON(t, bkt, path.Join(tenantID, blockID, validationFilename), validationFile{LastUpdate: time.Now().UnixMilli(), Error: "error during validation"})
},
expectedStatusCode: http.StatusOK,
expectedBody: `{"result":"failed","error":"error during validation"}`,
},

"stale validation file": {
setupBucket: func(t *testing.T, bkt objstore.Bucket) {
marshalAndUploadJSON(t, bkt, path.Join(tenantID, blockID, uploadingMetaFilename), metadata.Meta{})
marshalAndUploadJSON(t, bkt, path.Join(tenantID, blockID, validationFilename), validationFile{LastUpdate: time.Now().Add(-10 * time.Minute).UnixMilli()})
},
expectedStatusCode: http.StatusOK,
expectedBody: `{"result":"uploading"}`,
},
} {
t.Run(name, func(t *testing.T) {
bkt := objstore.NewInMemBucket()
if tc.setupBucket != nil {
tc.setupBucket(t, bkt)
}

cfgProvider := newMockConfigProvider()
cfgProvider.blockUploadEnabled[tenantID] = !tc.disableBlockUpload

c := &MultitenantCompactor{
logger: log.NewNopLogger(),
bucketClient: bkt,
cfgProvider: cfgProvider,
}

buf := bytes.NewBuffer(nil)
require.NoError(t, json.NewEncoder(buf).Encode(meta))
ctx := context.Background()
require.NoError(t, bkt.Upload(ctx, pth, buf))
r := httptest.NewRequest(http.MethodGet, fmt.Sprintf("/api/v1/upload/block/%s/check", blockID), nil)
urlVars := map[string]string{"block": blockID}
r = mux.SetURLVars(r, urlVars)
r = r.WithContext(user.InjectOrgID(r.Context(), tenantID))

w := httptest.NewRecorder()
c.GetBlockUploadStateHandler(w, r)
resp := w.Result()

body, err := io.ReadAll(resp.Body)

require.NoError(t, err)
require.Equal(t, tc.expectedStatusCode, resp.StatusCode)
require.Equal(t, tc.expectedBody, strings.TrimSpace(string(body)))
})
}
}
Loading