Skip to content

Commit

Permalink
worker: add alerts on upload and download failures
Browse files Browse the repository at this point in the history
  • Loading branch information
peterjan committed Dec 1, 2023
1 parent 37ed745 commit dae4687
Show file tree
Hide file tree
Showing 4 changed files with 115 additions and 20 deletions.
59 changes: 59 additions & 0 deletions worker/alerts.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
package worker

import (
"time"

"go.sia.tech/core/types"
"go.sia.tech/renterd/alerts"
"lukechampine.com/frand"
)

func randomAlertID() types.Hash256 {
return frand.Entropy256()
}

func newDownloadFailedAlert(bucket, path, prefix, marker string, offset, length, contracts int64, err error) alerts.Alert {
return alerts.Alert{
ID: randomAlertID(),
Severity: alerts.SeverityError,
Message: "Download failed",
Data: map[string]any{
"bucket": bucket,
"path": path,
"prefix": prefix,
"marker": marker,
"offset": offset,
"length": length,
"contracts": contracts,
"err": err,
},
Timestamp: time.Now(),
}
}

func newUploadFailedAlert(bucket, path, contractSet, mimeType string, minShards, totalShards, contracts int, packing, multipart bool, err error) alerts.Alert {
data := map[string]any{
"bucket": bucket,
"path": path,
"contractSet": contractSet,
"minShards": minShards,
"totalShards": totalShards,
"packing": packing,
"contracts": contracts,
"err": err,
}
if mimeType != "" {
data["mimeType"] = mimeType
}
if multipart {
data["multipart"] = true
}

return alerts.Alert{
ID: randomAlertID(),
Severity: alerts.SeverityError,
Message: "Upload failed",
Data: data,
Timestamp: time.Now(),
}
}
6 changes: 5 additions & 1 deletion worker/download.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,10 @@ const (
maxConcurrentSlabsPerDownload = 3
)

var (
errDownloadManagerStopped = errors.New("download manager stopped")
)

type (
// id is a unique identifier used for debugging
id [8]byte
Expand Down Expand Up @@ -323,7 +327,7 @@ outer:
for {
select {
case <-mgr.stopChan:
return errors.New("manager was stopped")
return errDownloadManagerStopped
case <-ctx.Done():
return errors.New("download timed out")
case resp := <-responseChan:
Expand Down
24 changes: 10 additions & 14 deletions worker/upload.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,9 @@ const (
)

var (
errNoCandidateUploader = errors.New("no candidate uploader found")
errNotEnoughContracts = errors.New("not enough contracts to support requested redundancy")
errUploadManagerStopped = errors.New("upload manager stopped")
errNoCandidateUploader = errors.New("no candidate uploader found")
errNotEnoughContracts = errors.New("not enough contracts to support requested redundancy")
)

type uploadParameters struct {
Expand Down Expand Up @@ -230,7 +231,7 @@ func (w *worker) initUploadManager(mm *memoryManager, maxOverdrive uint64, overd
w.uploadManager = newUploadManager(w.bus, w, w, mm, maxOverdrive, overdriveTimeout, logger)
}

func (w *worker) upload(ctx context.Context, r io.Reader, bucket, path string, opts ...UploadOption) (string, error) {
func (w *worker) upload(ctx context.Context, r io.Reader, contracts []api.ContractMetadata, bucket, path string, opts ...UploadOption) (string, error) {
// build upload parameters
up := defaultParameters()
for _, opt := range opts {
Expand All @@ -253,7 +254,7 @@ func (w *worker) upload(ctx context.Context, r io.Reader, bucket, path string, o
}

// perform the upload
obj, partialSlabData, eTag, err := w.uploadManager.Upload(ctx, r, up, lockingPriorityUpload)
obj, partialSlabData, eTag, err := w.uploadManager.Upload(ctx, r, contracts, up, lockingPriorityUpload)
if err != nil {
return "", fmt.Errorf("couldn't upload object: %w", err)
}
Expand Down Expand Up @@ -284,15 +285,15 @@ func (w *worker) upload(ctx context.Context, r io.Reader, bucket, path string, o
return eTag, nil
}

func (w *worker) uploadMultiPart(ctx context.Context, r io.Reader, bucket, path, uploadID string, partNumber int, opts ...UploadOption) (string, error) {
func (w *worker) uploadMultiPart(ctx context.Context, r io.Reader, contracts []api.ContractMetadata, bucket, path, uploadID string, partNumber int, opts ...UploadOption) (string, error) {
// build upload parameters
up := defaultParameters()
for _, opt := range opts {
opt(&up)
}

// upload the part
obj, partialSlabData, eTag, err := w.uploadManager.Upload(ctx, r, up, lockingPriorityUpload)
obj, partialSlabData, eTag, err := w.uploadManager.Upload(ctx, r, contracts, up, lockingPriorityUpload)
if err != nil {
return "", fmt.Errorf("couldn't upload object: %w", err)
}
Expand Down Expand Up @@ -535,7 +536,7 @@ func (mgr *uploadManager) Stop() {
}
}

func (mgr *uploadManager) Upload(ctx context.Context, r io.Reader, up uploadParameters, lockPriority int) (_ object.Object, partialSlab []byte, eTag string, err error) {
func (mgr *uploadManager) Upload(ctx context.Context, r io.Reader, contracts []api.ContractMetadata, up uploadParameters, lockPriority int) (_ object.Object, partialSlab []byte, eTag string, err error) {
// cancel all in-flight requests when the upload is done
ctx, cancel := context.WithCancel(ctx)
defer cancel()
Expand All @@ -559,12 +560,6 @@ func (mgr *uploadManager) Upload(ctx context.Context, r io.Reader, up uploadPara
return object.Object{}, nil, "", err
}

// fetch contracts
contracts, err := mgr.b.ContractSetContracts(ctx, up.contractSet)
if err != nil {
return object.Object{}, nil, "", fmt.Errorf("couldn't fetch contracts from bus: %w", err)
}

// create the upload
u, finishFn, err := mgr.newUpload(ctx, up.rs.TotalShards, contracts, up.bh, lockPriority)
if err != nil {
Expand Down Expand Up @@ -644,7 +639,7 @@ func (mgr *uploadManager) Upload(ctx context.Context, r io.Reader, up uploadPara
for len(responses) < numSlabs {
select {
case <-mgr.stopChan:
return object.Object{}, nil, "", errors.New("manager was stopped")
return object.Object{}, nil, "", errUploadManagerStopped
case numSlabs = <-numSlabsChan:
case res := <-respChan:
if res.err != nil {
Expand Down Expand Up @@ -687,6 +682,7 @@ func (mgr *uploadManager) UploadShards(ctx context.Context, shards [][]byte, con
}
return sectors, nil
}

func (mgr *uploadManager) launch(req *sectorUploadReq) error {
// recompute stats
mgr.tryRecomputeStats()
Expand Down
46 changes: 41 additions & 5 deletions worker/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -343,6 +343,14 @@ func (w *worker) withRevision(ctx context.Context, fetchTimeout time.Duration, c
})
}

func (w *worker) registerAlert(a alerts.Alert) {
ctx, cancel := context.WithTimeout(context.Background(), time.Minute)
if err := w.alerts.RegisterAlert(ctx, a); err != nil {
w.logger.Error("failed to register alert", err)
}
cancel()
}

func (w *worker) rhpScanHandler(jc jape.Context) {
var rsr api.RHPScanRequest
if jc.Decode(&rsr) != nil {
Expand Down Expand Up @@ -1053,9 +1061,13 @@ func (w *worker) objectsHandlerGET(jc jape.Context) {
}

// create a download function
downloadFn := func(wr io.Writer, offset, length int64) error {
downloadFn := func(wr io.Writer, offset, length int64) (err error) {
ctx = WithGougingChecker(ctx, w.bus, gp)
return w.downloadManager.DownloadObject(ctx, wr, res.Object.Object, uint64(offset), uint64(length), contracts)
err = w.downloadManager.DownloadObject(ctx, wr, res.Object.Object, uint64(offset), uint64(length), contracts)
if err != nil && !errors.Is(err, errDownloadManagerStopped) {
w.registerAlert(newDownloadFailedAlert(bucket, path, prefix, marker, offset, length, int64(len(contracts)), err))
}
return
}

// serve the content
Expand All @@ -1073,6 +1085,9 @@ func (w *worker) objectsHandlerPUT(jc jape.Context) {
jc.Custom((*[]byte)(nil), nil)
ctx := jc.Request.Context()

// grab the path
path := jc.PathParam("path")

// fetch the upload parameters
up, err := w.bus.UploadParams(ctx)
if jc.Check("couldn't fetch upload parameters from bus", err) != nil {
Expand Down Expand Up @@ -1143,9 +1158,18 @@ func (w *worker) objectsHandlerPUT(jc jape.Context) {
// attach gouging checker to the context
ctx = WithGougingChecker(ctx, w.bus, up.GougingParams)

// fetch contracts
contracts, err := w.bus.ContractSetContracts(ctx, up.ContractSet)
if jc.Check("couldn't fetch contracts from bus", err) != nil {
return
}

// upload the object
eTag, err := w.upload(ctx, jc.Request.Body, bucket, jc.PathParam("path"), opts...)
if jc.Check("couldn't upload object", err) != nil {
eTag, err := w.upload(ctx, jc.Request.Body, contracts, bucket, path, opts...)
if err := jc.Check("couldn't upload object", err); err != nil {
if !errors.Is(err, errUploadManagerStopped) {
w.registerAlert(newUploadFailedAlert(bucket, path, up.ContractSet, mimeType, rs.MinShards, rs.TotalShards, len(contracts), up.UploadPacking, false, err))
}
return
}

Expand All @@ -1157,6 +1181,9 @@ func (w *worker) multipartUploadHandlerPUT(jc jape.Context) {
jc.Custom((*[]byte)(nil), nil)
ctx := jc.Request.Context()

// grab the path
path := jc.PathParam("path")

// fetch the upload parameters
up, err := w.bus.UploadParams(ctx)
if jc.Check("couldn't fetch upload parameters from bus", err) != nil {
Expand Down Expand Up @@ -1263,9 +1290,18 @@ func (w *worker) multipartUploadHandlerPUT(jc jape.Context) {
// attach gouging checker to the context
ctx = WithGougingChecker(ctx, w.bus, up.GougingParams)

// fetch contracts
contracts, err := w.bus.ContractSetContracts(ctx, up.ContractSet)
if jc.Check("couldn't fetch contracts from bus", err) != nil {
return
}

// upload the multipart
eTag, err := w.uploadMultiPart(ctx, jc.Request.Body, bucket, jc.PathParam("path"), uploadID, partNumber, opts...)
eTag, err := w.uploadMultiPart(ctx, jc.Request.Body, contracts, bucket, path, uploadID, partNumber, opts...)
if jc.Check("couldn't upload object", err) != nil {
if !errors.Is(err, errUploadManagerStopped) {
w.registerAlert(newUploadFailedAlert(bucket, path, up.ContractSet, "", rs.MinShards, rs.TotalShards, len(contracts), up.UploadPacking, true, err))
}
return
}

Expand Down

0 comments on commit dae4687

Please sign in to comment.