From 1fcd86a2cacca6bb7d64d4f32f6a66132961f860 Mon Sep 17 00:00:00 2001 From: Starnop Date: Tue, 6 Aug 2019 16:30:57 +0800 Subject: [PATCH] feature: add piece error API Signed-off-by: Starnop --- apis/swagger.yml | 86 +++++++- apis/types/piece_error_request.go | 132 ++++++++++++ pkg/fileutils/fileutils.go | 8 +- pkg/limitreader/limit_reader.go | 4 +- supernode/daemon/mgr/cdn/cache_detector.go | 8 +- supernode/daemon/mgr/cdn/manager.go | 71 +++++- supernode/daemon/mgr/cdn/reporter.go | 4 +- supernode/daemon/mgr/cdn/super_reader.go | 28 ++- supernode/daemon/mgr/cdn/super_reader_test.go | 54 +++++ supernode/daemon/mgr/cdn/super_writer_util.go | 4 +- supernode/daemon/mgr/cdn_mgr.go | 6 + supernode/daemon/mgr/gc/gc_manager.go | 4 +- supernode/daemon/mgr/gc/gc_task.go | 10 +- supernode/daemon/mgr/gc_mgr.go | 3 +- supernode/daemon/mgr/mock/mock_cdn_mgr.go | 29 +++ supernode/daemon/mgr/piece_error_mgr.go | 34 +++ .../daemon/mgr/pieceerror/file_not_exist.go | 55 +++++ supernode/daemon/mgr/pieceerror/manager.go | 203 ++++++++++++++++++ .../daemon/mgr/pieceerror/md5_not_match.go | 73 +++++++ supernode/daemon/mgr/task/manager.go | 10 +- supernode/daemon/mgr/task/manager_util.go | 10 +- supernode/server/0.3_bridge.go | 2 +- supernode/server/piece_error_bridge.go | 65 ++++++ supernode/server/router.go | 6 + supernode/server/server.go | 44 ++-- supernode/server/task_bridge.go | 36 ++++ supernode/util/range_util.go | 41 ++-- 27 files changed, 958 insertions(+), 72 deletions(-) create mode 100644 apis/types/piece_error_request.go create mode 100644 supernode/daemon/mgr/cdn/super_reader_test.go create mode 100644 supernode/daemon/mgr/piece_error_mgr.go create mode 100644 supernode/daemon/mgr/pieceerror/file_not_exist.go create mode 100644 supernode/daemon/mgr/pieceerror/manager.go create mode 100644 supernode/daemon/mgr/pieceerror/md5_not_match.go create mode 100644 supernode/server/piece_error_bridge.go create mode 100644 supernode/server/task_bridge.go diff --git a/apis/swagger.yml b/apis/swagger.yml index 625891613..418fcdb39 100644 --- a/apis/swagger.yml +++ b/apis/swagger.yml @@ -414,6 +414,13 @@ paths: required: true description: "ID of task" type: string + - name: full + in: query + type: "boolean" + default: false + description: | + supernode will also delete the cdn files when the value of full equals true. + responses: 204: description: "no error" @@ -513,6 +520,43 @@ paths: 500: $ref: "#/responses/500ErrorResponse" + /tasks/{id}/pieces/{pieceRange}/error: + post: + summary: "report a piece error" + description: | + When a peer failed to download a piece from supernode or + failed to validate the pieceMD5, + and then dfget should report the error info to supernode. + consumes: + - "application/json" + produces: + - "application/json" + parameters: + - name: id + in: path + required: true + description: "ID of task" + type: string + - name: pieceRange + in: path + required: true + description: | + the range of specific piece in the task, example "0-45565". + type: string + - name: "PieceErrorRequest" + in: "body" + description: | + request body which contains piece error information. + schema: + $ref: "#/definitions/PieceErrorRequest" + responses: + 200: + description: "no error" + 404: + $ref: "#/responses/404ErrorResponse" + 500: + $ref: "#/responses/500ErrorResponse" + /preheats: post: summary: "Create a Preheat Task" @@ -1095,6 +1139,47 @@ definitions: description: | the range of specific piece in the task, example "0-45565". + PieceErrorRequest: + type: "object" + description: "Peer's detailed information in supernode." + properties: + taskId: + type: "string" + description: | + the taskID of the piece. + srcCid: + type: "string" + description: | + the CID of the src Peer. + dstPid: + type: "string" + description: | + the peer ID of the target Peer. + dstIP: + type: "string" + description: | + the peer ID of the target Peer. + range: + type: "string" + description: | + the range of specific piece in the task, example "0-45565". + realMd5: + type: "string" + description: | + the MD5 information of piece which calculated by the piece content + which downloaded from the target peer. + expectedMd5: + type: "string" + description: | + the MD5 value of piece which returned by the supernode that + in order to verify the correctness of the piece content which + downloaded from the other peers. + errorType: + type: "string" + description: | + the error type when failed to download from supernode that dfget will report to supernode + enum: ["FILE_NOT_EXIST", "FILE_MD5_NOT_MATCH"] + PreheatInfo: type: "object" description: | @@ -1283,7 +1368,6 @@ definitions: message: type: "string" description: "detailed error message" - responses: 401ErrorResponse: diff --git a/apis/types/piece_error_request.go b/apis/types/piece_error_request.go new file mode 100644 index 000000000..380db1009 --- /dev/null +++ b/apis/types/piece_error_request.go @@ -0,0 +1,132 @@ +// Code generated by go-swagger; DO NOT EDIT. + +package types + +// This file was generated by the swagger tool. +// Editing this file might prove futile when you re-run the swagger generate command + +import ( + "encoding/json" + + strfmt "github.com/go-openapi/strfmt" + + "github.com/go-openapi/errors" + "github.com/go-openapi/swag" + "github.com/go-openapi/validate" +) + +// PieceErrorRequest Peer's detailed information in supernode. +// swagger:model PieceErrorRequest +type PieceErrorRequest struct { + + // the peer ID of the target Peer. + // + DstIP string `json:"dstIP,omitempty"` + + // the peer ID of the target Peer. + // + DstPid string `json:"dstPid,omitempty"` + + // the error type when failed to download from supernode that dfget will report to supernode + // + // Enum: [FILE_NOT_EXIST FILE_MD5_NOT_MATCH] + ErrorType string `json:"errorType,omitempty"` + + // the MD5 value of piece which returned by the supernode that + // in order to verify the correctness of the piece content which + // downloaded from the other peers. + // + ExpectedMd5 string `json:"expectedMd5,omitempty"` + + // the range of specific piece in the task, example "0-45565". + // + Range string `json:"range,omitempty"` + + // the MD5 information of piece which calculated by the piece content + // which downloaded from the target peer. + // + RealMd5 string `json:"realMd5,omitempty"` + + // the CID of the src Peer. + // + SrcCid string `json:"srcCid,omitempty"` + + // the taskID of the piece. + // + TaskID string `json:"taskId,omitempty"` +} + +// Validate validates this piece error request +func (m *PieceErrorRequest) Validate(formats strfmt.Registry) error { + var res []error + + if err := m.validateErrorType(formats); err != nil { + res = append(res, err) + } + + if len(res) > 0 { + return errors.CompositeValidationError(res...) + } + return nil +} + +var pieceErrorRequestTypeErrorTypePropEnum []interface{} + +func init() { + var res []string + if err := json.Unmarshal([]byte(`["FILE_NOT_EXIST","FILE_MD5_NOT_MATCH"]`), &res); err != nil { + panic(err) + } + for _, v := range res { + pieceErrorRequestTypeErrorTypePropEnum = append(pieceErrorRequestTypeErrorTypePropEnum, v) + } +} + +const ( + + // PieceErrorRequestErrorTypeFILENOTEXIST captures enum value "FILE_NOT_EXIST" + PieceErrorRequestErrorTypeFILENOTEXIST string = "FILE_NOT_EXIST" + + // PieceErrorRequestErrorTypeFILEMD5NOTMATCH captures enum value "FILE_MD5_NOT_MATCH" + PieceErrorRequestErrorTypeFILEMD5NOTMATCH string = "FILE_MD5_NOT_MATCH" +) + +// prop value enum +func (m *PieceErrorRequest) validateErrorTypeEnum(path, location string, value string) error { + if err := validate.Enum(path, location, value, pieceErrorRequestTypeErrorTypePropEnum); err != nil { + return err + } + return nil +} + +func (m *PieceErrorRequest) validateErrorType(formats strfmt.Registry) error { + + if swag.IsZero(m.ErrorType) { // not required + return nil + } + + // value enum + if err := m.validateErrorTypeEnum("errorType", "body", m.ErrorType); err != nil { + return err + } + + return nil +} + +// MarshalBinary interface implementation +func (m *PieceErrorRequest) MarshalBinary() ([]byte, error) { + if m == nil { + return nil, nil + } + return swag.WriteJSON(m) +} + +// UnmarshalBinary interface implementation +func (m *PieceErrorRequest) UnmarshalBinary(b []byte) error { + var res PieceErrorRequest + if err := swag.ReadJSON(b, &res); err != nil { + return err + } + *m = res + return nil +} diff --git a/pkg/fileutils/fileutils.go b/pkg/fileutils/fileutils.go index 73774f943..308f5161b 100644 --- a/pkg/fileutils/fileutils.go +++ b/pkg/fileutils/fileutils.go @@ -20,6 +20,7 @@ import ( "bufio" "crypto/md5" "fmt" + "hash" "io" "io/ioutil" "os" @@ -208,7 +209,12 @@ func Md5Sum(name string) string { return "" } - return fmt.Sprintf("%x", h.Sum(nil)) + return GetMd5Sum(h, nil) +} + +// GetMd5Sum gets md5 sum as a string and appends the current hash to b. +func GetMd5Sum(md5 hash.Hash, b []byte) string { + return fmt.Sprintf("%x", md5.Sum(b)) } // GetSys returns the underlying data source of the os.FileInfo. diff --git a/pkg/limitreader/limit_reader.go b/pkg/limitreader/limit_reader.go index 3a07a2556..cabc70e84 100644 --- a/pkg/limitreader/limit_reader.go +++ b/pkg/limitreader/limit_reader.go @@ -18,10 +18,10 @@ package limitreader import ( "crypto/md5" - "fmt" "hash" "io" + "github.com/dragonflyoss/Dragonfly/pkg/fileutils" "github.com/dragonflyoss/Dragonfly/pkg/ratelimiter" ) @@ -93,7 +93,7 @@ func (lr *LimitReader) Read(p []byte) (n int, err error) { // Md5 calculates the md5 of all contents read func (lr *LimitReader) Md5() string { if lr.md5sum != nil { - return fmt.Sprintf("%x", lr.md5sum.Sum(nil)) + return fileutils.GetMd5Sum(lr.md5sum, nil) } return "" } diff --git a/supernode/daemon/mgr/cdn/cache_detector.go b/supernode/daemon/mgr/cdn/cache_detector.go index 7c6c44060..c2243f4fb 100644 --- a/supernode/daemon/mgr/cdn/cache_detector.go +++ b/supernode/daemon/mgr/cdn/cache_detector.go @@ -30,14 +30,14 @@ import ( type cacheDetector struct { cacheStore *store.Store metaDataManager *fileMetaDataManager - OriginClient httpclient.OriginHTTPClient + originClient httpclient.OriginHTTPClient } func newCacheDetector(cacheStore *store.Store, metaDataManager *fileMetaDataManager, originClient httpclient.OriginHTTPClient) *cacheDetector { return &cacheDetector{ cacheStore: cacheStore, metaDataManager: metaDataManager, - OriginClient: originClient, + originClient: originClient, } } @@ -68,7 +68,7 @@ func (cd *cacheDetector) detectCache(ctx context.Context, task *types.TaskInfo) } func (cd *cacheDetector) parseBreakNum(ctx context.Context, task *types.TaskInfo, metaData *fileMetaData) int { - expired, err := cd.OriginClient.IsExpired(task.RawURL, task.Headers, metaData.LastModified, metaData.ETag) + expired, err := cd.originClient.IsExpired(task.RawURL, task.Headers, metaData.LastModified, metaData.ETag) if err != nil { logrus.Errorf("failed to check whether the task(%s) has expired: %v", task.ID, err) } @@ -85,7 +85,7 @@ func (cd *cacheDetector) parseBreakNum(ctx context.Context, task *types.TaskInfo return 0 } - supportRange, err := cd.OriginClient.IsSupportRange(task.TaskURL, task.Headers) + supportRange, err := cd.originClient.IsSupportRange(task.TaskURL, task.Headers) if err != nil { logrus.Errorf("failed to check whether the task(%s) supports partial requests: %v", task.ID, err) } diff --git a/supernode/daemon/mgr/cdn/manager.go b/supernode/daemon/mgr/cdn/manager.go index 0ffb7703d..fc81a15e7 100644 --- a/supernode/daemon/mgr/cdn/manager.go +++ b/supernode/daemon/mgr/cdn/manager.go @@ -19,6 +19,7 @@ package cdn import ( "context" "crypto/md5" + "fmt" "path" "github.com/dragonflyoss/Dragonfly/apis/types" @@ -33,10 +34,18 @@ import ( "github.com/dragonflyoss/Dragonfly/supernode/store" "github.com/dragonflyoss/Dragonfly/supernode/util" + "github.com/pkg/errors" "github.com/prometheus/client_golang/prometheus" "github.com/sirupsen/logrus" ) +const ( + PieceMd5SourceDefault = "default" + PieceMd5SourceMemory = "memory" + PieceMd5SourceMeta = "meta" + PieceMd5SourceFile = "file" +) + var _ mgr.CDNMgr = &Manager{} type metrics struct { @@ -168,7 +177,67 @@ func (cm *Manager) GetStatus(ctx context.Context, taskID string) (cdnStatus stri return "", nil } -// Delete deletes the cdn meta with specified taskID. +// GetPieceMD5 gets the piece Md5 accorrding to the specified taskID and pieceNum. +func (cm *Manager) GetPieceMD5(ctx context.Context, taskID string, pieceNum int, pieceRange, source string) (pieceMd5 string, err error) { + if stringutils.IsEmptyStr(source) || + source == PieceMd5SourceDefault || + source == PieceMd5SourceMemory { + return cm.pieceMD5Manager.getPieceMD5(taskID, pieceNum) + } + + if source == PieceMd5SourceMeta { + // get file meta data + fileMeta, err := cm.metaDataManager.readFileMetaData(ctx, taskID) + if err != nil { + return "", errors.Wrapf(err, "failed to get file meta data taskID(%s)", taskID) + } + + // get piece md5s from meta data file + pieceMD5s, err := cm.metaDataManager.readPieceMD5s(ctx, taskID, fileMeta.RealMd5) + if err != nil { + return "", errors.Wrapf(err, "failed to get piece MD5s from meta data taskID(%s)", taskID) + } + + if len(pieceMD5s) < pieceNum { + return "", fmt.Errorf("not enough piece MD5 for pieceNum(%d)", pieceNum) + } + + return pieceMD5s[pieceNum], nil + } + + if source == PieceMd5SourceFile { + // get piece length + start, end, err := util.ParsePieceIndex(pieceRange) + if err != nil { + return "", errors.Wrapf(err, "failed to parse piece range(%s)", pieceRange) + } + pieceLength := end - start + 1 + + // get piece content reader + pieceRaw := getDownloadRawFunc(taskID) + pieceRaw.Offset = start + pieceRaw.Length = pieceLength + reader, err := cm.cacheStore.Get(ctx, pieceRaw) + if err != nil { + return "", errors.Wrapf(err, "failed to get file reader taskID(%s)", taskID) + } + + // get piece Md5 by read source file + return getMD5ByReadFile(reader, int32(pieceLength)) + } + + return "", nil +} + +// CheckFile checks the file whether exists. +func (cm *Manager) CheckFile(ctx context.Context, taskID string) bool { + if _, err := cm.cacheStore.Stat(ctx, getDownloadRaw(taskID)); err != nil { + return false + } + return true +} + +// Delete the cdn meta with specified taskID. func (cm *Manager) Delete(ctx context.Context, taskID string, force bool) error { if !force { return cm.pieceMD5Manager.removePieceMD5sByTaskID(taskID) diff --git a/supernode/daemon/mgr/cdn/reporter.go b/supernode/daemon/mgr/cdn/reporter.go index 3e03db718..929cd6376 100644 --- a/supernode/daemon/mgr/cdn/reporter.go +++ b/supernode/daemon/mgr/cdn/reporter.go @@ -18,10 +18,10 @@ package cdn import ( "context" - "fmt" "hash" "github.com/dragonflyoss/Dragonfly/apis/types" + "github.com/dragonflyoss/Dragonfly/pkg/fileutils" "github.com/dragonflyoss/Dragonfly/pkg/stringutils" "github.com/dragonflyoss/Dragonfly/supernode/config" "github.com/dragonflyoss/Dragonfly/supernode/daemon/mgr" @@ -132,7 +132,7 @@ func (re *reporter) processCacheByReadFile(ctx context.Context, taskID string, m fileMd5Value := metaData.RealMd5 if stringutils.IsEmptyStr(fileMd5Value) { - fileMd5Value = fmt.Sprintf("%x", result.fileMd5.Sum(nil)) + fileMd5Value = fileutils.GetMd5Sum(result.fileMd5, nil) } fmd := &fileMetaData{ diff --git a/supernode/daemon/mgr/cdn/super_reader.go b/supernode/daemon/mgr/cdn/super_reader.go index 385d9504b..b2312f3c1 100644 --- a/supernode/daemon/mgr/cdn/super_reader.go +++ b/supernode/daemon/mgr/cdn/super_reader.go @@ -24,6 +24,7 @@ import ( "hash" "io" + "github.com/dragonflyoss/Dragonfly/pkg/fileutils" "github.com/dragonflyoss/Dragonfly/pkg/util" "github.com/dragonflyoss/Dragonfly/supernode/config" @@ -57,7 +58,7 @@ func (sr *superReader) readFile(ctx context.Context, reader io.Reader, calculate for { // read header and get piece content length - ret, err := readHeader(ctx, reader, pieceMd5) + ret, err := readHeader(reader, pieceMd5) if err != nil { if err == io.EOF { return result, nil @@ -70,14 +71,14 @@ func (sr *superReader) readFile(ctx context.Context, reader io.Reader, calculate logrus.Debugf("get piece length: %d with count: %d from header", pieceLen, result.pieceCount) // read content - if err := readContent(ctx, reader, pieceLen, pieceMd5, result.fileMd5); err != nil { + if err := readContent(reader, pieceLen, pieceMd5, result.fileMd5); err != nil { logrus.Errorf("failed to read content for count %d: %v", result.pieceCount, err) return result, err } result.fileLength += int64(pieceLen) // read tailer - if err := readTailer(ctx, reader, pieceMd5); err != nil { + if err := readTailer(reader, pieceMd5); err != nil { return result, errors.Wrapf(err, "failed to read tailer for count %d", result.pieceCount) } result.fileLength++ @@ -85,7 +86,7 @@ func (sr *superReader) readFile(ctx context.Context, reader io.Reader, calculate result.pieceCount++ if calculatePieceMd5 { - pieceSum := fmt.Sprintf("%x", pieceMd5.Sum(nil)) + pieceSum := fileutils.GetMd5Sum(pieceMd5, nil) pieceLength := pieceLen + config.PieceWrapSize result.pieceMd5s = append(result.pieceMd5s, getPieceMd5Value(pieceSum, pieceLength)) pieceMd5.Reset() @@ -93,7 +94,7 @@ func (sr *superReader) readFile(ctx context.Context, reader io.Reader, calculate } } -func readHeader(ctx context.Context, reader io.Reader, pieceMd5 hash.Hash) (uint32, error) { +func readHeader(reader io.Reader, pieceMd5 hash.Hash) (uint32, error) { header := make([]byte, 4) n, err := reader.Read(header) @@ -111,7 +112,7 @@ func readHeader(ctx context.Context, reader io.Reader, pieceMd5 hash.Hash) (uint return binary.BigEndian.Uint32(header), nil } -func readContent(ctx context.Context, reader io.Reader, pieceLen int32, pieceMd5 hash.Hash, fileMd5 hash.Hash) error { +func readContent(reader io.Reader, pieceLen int32, pieceMd5 hash.Hash, fileMd5 hash.Hash) error { bufSize := int32(256 * 1024) if pieceLen < bufSize { bufSize = pieceLen @@ -157,7 +158,7 @@ func readContent(ctx context.Context, reader io.Reader, pieceLen int32, pieceMd5 return nil } -func readTailer(ctx context.Context, reader io.Reader, pieceMd5 hash.Hash) error { +func readTailer(reader io.Reader, pieceMd5 hash.Hash) error { tailer := make([]byte, 1) if err := binary.Read(reader, binary.BigEndian, tailer); err != nil { return err @@ -171,3 +172,16 @@ func readTailer(ctx context.Context, reader io.Reader, pieceMd5 hash.Hash) error } return nil } + +func getMD5ByReadFile(reader io.Reader, pieceLen int32) (string, error) { + if pieceLen <= 0 { + return fileutils.GetMd5Sum(md5.New(), nil), nil + } + + pieceMd5 := md5.New() + if err := readContent(reader, pieceLen, pieceMd5, nil); err != nil { + return "", err + } + + return fileutils.GetMd5Sum(pieceMd5, nil), nil +} diff --git a/supernode/daemon/mgr/cdn/super_reader_test.go b/supernode/daemon/mgr/cdn/super_reader_test.go new file mode 100644 index 000000000..d5fa95be7 --- /dev/null +++ b/supernode/daemon/mgr/cdn/super_reader_test.go @@ -0,0 +1,54 @@ +/* + * Copyright The Dragonfly Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package cdn + +import ( + "bytes" + "crypto/md5" + "encoding/binary" + + "github.com/dragonflyoss/Dragonfly/pkg/fileutils" + + "github.com/go-check/check" +) + +type SuperReaderTestSuite struct { + workHome string + config string + writer *superWriter +} + +func init() { + check.Suite(&SuperReaderTestSuite{}) +} + +// TODO: add more unit tests + +func (s *SuperReaderTestSuite) TestGetMD5ByReadFile(c *check.C) { + testStr := []byte("hello dragonfly") + + for i := 0; i < len(testStr); i++ { + contentBuf := &bytes.Buffer{} + binary.Write(contentBuf, binary.BigEndian, testStr[:i]) + md5Init := md5.New() + md5Init.Write([]byte(testStr[:i])) + expectedMD5 := fileutils.GetMd5Sum(md5Init, nil) + realMD5, err := getMD5ByReadFile(contentBuf, int32(i)) + c.Check(err, check.IsNil) + c.Check(expectedMD5, check.Equals, realMD5) + } +} diff --git a/supernode/daemon/mgr/cdn/super_writer_util.go b/supernode/daemon/mgr/cdn/super_writer_util.go index 1d7bd8bf3..9b3d551be 100644 --- a/supernode/daemon/mgr/cdn/super_writer_util.go +++ b/supernode/daemon/mgr/cdn/super_writer_util.go @@ -21,10 +21,10 @@ import ( "context" "crypto/md5" "encoding/binary" - "fmt" "hash" "sync" + "github.com/dragonflyoss/Dragonfly/pkg/fileutils" "github.com/dragonflyoss/Dragonfly/supernode/config" "github.com/dragonflyoss/Dragonfly/supernode/store" @@ -65,7 +65,7 @@ func (cw *superWriter) writerPool(ctx context.Context, wg *sync.WaitGroup, n int } // report piece status - pieceSum := fmt.Sprintf("%x", pieceMd5.Sum(nil)) + pieceSum := fileutils.GetMd5Sum(pieceMd5, nil) pieceMd5Value := getPieceMd5Value(pieceSum, job.pieceContentSize+config.PieceWrapSize) if cw.cdnReporter != nil { if err := cw.cdnReporter.reportPieceStatus(ctx, job.taskID, job.pieceNum, pieceMd5Value, config.PieceSUCCESS); err != nil { diff --git a/supernode/daemon/mgr/cdn_mgr.go b/supernode/daemon/mgr/cdn_mgr.go index d09313b85..3573099b2 100644 --- a/supernode/daemon/mgr/cdn_mgr.go +++ b/supernode/daemon/mgr/cdn_mgr.go @@ -41,6 +41,12 @@ type CDNMgr interface { // GetStatus gets the status of the file. GetStatus(ctx context.Context, taskID string) (cdnStatus string, err error) + // GetPieceMD5 gets the piece Md5 accorrding to the specified taskID and pieceNum. + GetPieceMD5(ctx context.Context, taskID string, pieceNum int, pieceRange, source string) (pieceMd5 string, err error) + + // CheckFile checks the file whether exists. + CheckFile(ctx context.Context, taskID string) bool + // Delete the cdn meta with specified taskID. // The file on the disk will be deleted when the force is true. Delete(ctx context.Context, taskID string, force bool) error diff --git a/supernode/daemon/mgr/gc/gc_manager.go b/supernode/daemon/mgr/gc/gc_manager.go index 43be9f8c4..d98745edd 100644 --- a/supernode/daemon/mgr/gc/gc_manager.go +++ b/supernode/daemon/mgr/gc/gc_manager.go @@ -71,8 +71,8 @@ func (gcm *Manager) StartGC(ctx context.Context) { } // GCTask to do the gc job with specified taskID. -func (gcm *Manager) GCTask(ctx context.Context, taskID string) { - gcm.gcTask(ctx, taskID) +func (gcm *Manager) GCTask(ctx context.Context, taskID string, full bool) { + gcm.gcTask(ctx, taskID, full) } // GCPeer to do the gc job when a peer offline. diff --git a/supernode/daemon/mgr/gc/gc_task.go b/supernode/daemon/mgr/gc/gc_task.go index 05a43be17..1f84dbb12 100644 --- a/supernode/daemon/mgr/gc/gc_task.go +++ b/supernode/daemon/mgr/gc/gc_task.go @@ -49,7 +49,7 @@ func (gcm *Manager) gcTasks(ctx context.Context) { continue } - if !gcm.gcTask(ctx, taskID) { + if !gcm.gcTask(ctx, taskID, false) { continue } removedTaskCount++ @@ -58,7 +58,7 @@ func (gcm *Manager) gcTasks(ctx context.Context) { logrus.Infof("gc tasks: success to full gc task count(%d), remainder count(%d)", removedTaskCount, totalTaskNums-removedTaskCount) } -func (gcm *Manager) gcTask(ctx context.Context, taskID string) bool { +func (gcm *Manager) gcTask(ctx context.Context, taskID string, full bool) bool { logrus.Infof("start to gc task: %s", taskID) util.GetLock(taskID, false) @@ -73,7 +73,7 @@ func (gcm *Manager) gcTask(ctx context.Context, taskID string) bool { }(&wg) go func(wg *sync.WaitGroup) { - gcm.gcCDNByTaskID(ctx, taskID) + gcm.gcCDNByTaskID(ctx, taskID, full) wg.Done() }(&wg) @@ -98,8 +98,8 @@ func (gcm *Manager) gcCIDsByTaskID(ctx context.Context, taskID string) { } } -func (gcm *Manager) gcCDNByTaskID(ctx context.Context, taskID string) { - if err := gcm.cdnMgr.Delete(ctx, taskID, false); err != nil { +func (gcm *Manager) gcCDNByTaskID(ctx context.Context, taskID string, full bool) { + if err := gcm.cdnMgr.Delete(ctx, taskID, full); err != nil { logrus.Errorf("gc task: failed to gc cdn meta taskID(%s): %v", taskID, err) } } diff --git a/supernode/daemon/mgr/gc_mgr.go b/supernode/daemon/mgr/gc_mgr.go index d6e58f51c..42b267747 100644 --- a/supernode/daemon/mgr/gc_mgr.go +++ b/supernode/daemon/mgr/gc_mgr.go @@ -26,7 +26,8 @@ type GCMgr interface { StartGC(ctx context.Context) // GCTask to do the gc task job with specified taskID. - GCTask(ctx context.Context, taskID string) + // The CDN file will be deleted when the full is true. + GCTask(ctx context.Context, taskID string, full bool) // GCPeer to do the gc peer job when a peer offline. GCPeer(ctx context.Context, peerID string) diff --git a/supernode/daemon/mgr/mock/mock_cdn_mgr.go b/supernode/daemon/mgr/mock/mock_cdn_mgr.go index deb60a799..3e2549c6d 100644 --- a/supernode/daemon/mgr/mock/mock_cdn_mgr.go +++ b/supernode/daemon/mgr/mock/mock_cdn_mgr.go @@ -81,6 +81,35 @@ func (mr *MockCDNMgrMockRecorder) GetStatus(ctx, taskID interface{}) *gomock.Cal return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetStatus", reflect.TypeOf((*MockCDNMgr)(nil).GetStatus), ctx, taskID) } +// GetPieceMD5 mocks base method +func (m *MockCDNMgr) GetPieceMD5(ctx context.Context, taskID string, pieceNum int, pieceRange, source string) (string, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "GetPieceMD5", ctx, taskID, pieceNum, pieceRange, source) + ret0, _ := ret[0].(string) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// GetPieceMD5 indicates an expected call of GetPieceMD5 +func (mr *MockCDNMgrMockRecorder) GetPieceMD5(ctx, taskID, pieceNum, pieceRange, source interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetPieceMD5", reflect.TypeOf((*MockCDNMgr)(nil).GetPieceMD5), ctx, taskID, pieceNum, pieceRange, source) +} + +// CheckFile mocks base method +func (m *MockCDNMgr) CheckFile(ctx context.Context, taskID string) bool { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "CheckFile", ctx, taskID) + ret0, _ := ret[0].(bool) + return ret0 +} + +// CheckFile indicates an expected call of CheckFile +func (mr *MockCDNMgrMockRecorder) CheckFile(ctx, taskID interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "CheckFile", reflect.TypeOf((*MockCDNMgr)(nil).CheckFile), ctx, taskID) +} + // Delete mocks base method func (m *MockCDNMgr) Delete(ctx context.Context, taskID string, force bool) error { m.ctrl.T.Helper() diff --git a/supernode/daemon/mgr/piece_error_mgr.go b/supernode/daemon/mgr/piece_error_mgr.go new file mode 100644 index 000000000..874573475 --- /dev/null +++ b/supernode/daemon/mgr/piece_error_mgr.go @@ -0,0 +1,34 @@ +/* + * Copyright The Dragonfly Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package mgr + +import ( + "context" + + "github.com/dragonflyoss/Dragonfly/apis/types" +) + +// PieceErrorMgr as an interface defines all operations to handle piece errors. +type PieceErrorMgr interface { + // StartHandleError start a goroutine to handle the piece error. + StartHandleError(ctx context.Context) + + // HandlePieceError the peer should report the error with related info when + // it failed to download a piece from supernode. + // And the supernode should handle the piece Error and do some repair operations. + HandlePieceError(ctx context.Context, pieceErrorRequest *types.PieceErrorRequest) error +} diff --git a/supernode/daemon/mgr/pieceerror/file_not_exist.go b/supernode/daemon/mgr/pieceerror/file_not_exist.go new file mode 100644 index 000000000..3973eb8d0 --- /dev/null +++ b/supernode/daemon/mgr/pieceerror/file_not_exist.go @@ -0,0 +1,55 @@ +/* + * Copyright The Dragonfly Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package pieceerror + +import ( + "context" + + "github.com/dragonflyoss/Dragonfly/apis/types" + "github.com/dragonflyoss/Dragonfly/supernode/daemon/mgr" + + "github.com/sirupsen/logrus" +) + +var _ Handler = &FileNotExistHandler{} + +type FileNotExistHandler struct { + gcManager mgr.GCMgr + cdnManager mgr.CDNMgr +} + +func init() { + Register(types.PieceErrorRequestErrorTypeFILENOTEXIST, NewFileNotExistHandler) +} + +func NewFileNotExistHandler(gcManager mgr.GCMgr, cdnManager mgr.CDNMgr) (Handler, error) { + return &FileNotExistHandler{ + gcManager: gcManager, + cdnManager: cdnManager, + }, nil +} + +func (feh *FileNotExistHandler) Handle(ctx context.Context, pieceErrorRequest *types.PieceErrorRequest) error { + if feh.cdnManager.CheckFile(ctx, pieceErrorRequest.TaskID) { + return nil + } + + logrus.Warnf("taskID(%s) file data doesn't exist, start to gc this task", pieceErrorRequest.TaskID) + + feh.gcManager.GCTask(ctx, pieceErrorRequest.TaskID, true) + return nil +} diff --git a/supernode/daemon/mgr/pieceerror/manager.go b/supernode/daemon/mgr/pieceerror/manager.go new file mode 100644 index 000000000..943ee5007 --- /dev/null +++ b/supernode/daemon/mgr/pieceerror/manager.go @@ -0,0 +1,203 @@ +/* + * Copyright The Dragonfly Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package pieceerror + +import ( + "context" + "fmt" + "time" + + "github.com/dragonflyoss/Dragonfly/apis/types" + "github.com/dragonflyoss/Dragonfly/pkg/errortypes" + "github.com/dragonflyoss/Dragonfly/pkg/syncmap" + "github.com/dragonflyoss/Dragonfly/supernode/config" + "github.com/dragonflyoss/Dragonfly/supernode/daemon/mgr" + + "github.com/pkg/errors" + "github.com/sirupsen/logrus" +) + +var _ mgr.PieceErrorMgr = &Manager{} + +const ( + ErrHandlerChanSize = 100 + HandleErrorPool = 4 + GCHandlingInterval = 1 * time.Second + GCHandlingDelay = 3 * time.Second +) + +// handlerStore stores all registered handler +var handlerStore = syncmap.NewSyncMap() + +type handlerInitFunc func(gcManager mgr.GCMgr, cdnManager mgr.CDNMgr) (handler Handler, err error) + +func Register(errType string, initer handlerInitFunc) { + handlerStore.Add(errType, initer) +} + +// TODO: make it pluggable +type Handler interface { + Handle(ctx context.Context, pieceErrorRequest *types.PieceErrorRequest) error +} + +type Manager struct { + cfg *config.Config + handlers map[string]Handler + + gcManager mgr.GCMgr + cdnManager mgr.CDNMgr + + // error handler + pieceErrChan chan *types.PieceErrorRequest + errorHandlingStore *syncmap.SyncMap + handledStore *syncmap.SyncMap +} + +func NewManager(cfg *config.Config, gcManager mgr.GCMgr, cdnManager mgr.CDNMgr) (*Manager, error) { + return &Manager{ + cfg: cfg, + handlers: make(map[string]Handler, 0), + gcManager: gcManager, + cdnManager: cdnManager, + pieceErrChan: make(chan *types.PieceErrorRequest, ErrHandlerChanSize), + errorHandlingStore: syncmap.NewSyncMap(), + handledStore: syncmap.NewSyncMap(), + }, nil +} + +// HandlePieceError the peer should report the error with related info when +// it failed to download a piece from supernode. +// And the supernode should handle the piece Error and do some repair operations. +func (em *Manager) HandlePieceError(ctx context.Context, pieceErrorRequest *types.PieceErrorRequest) error { + // ignore the error that isn't caused by downloading from supernode + if !em.cfg.IsSuperPID(pieceErrorRequest.DstPid) { + return nil + } + + // if the error is handling, we should ignore it. + _, err := em.errorHandlingStore.Get(pieceErrorRequest.TaskID) + if err == nil { + return nil + } + if !errortypes.IsDataNotFound(err) { + logrus.Errorf("failed to get taskID(%s) from errorHandlingStore: %v", pieceErrorRequest.TaskID, err) + return err + } + + select { + case em.pieceErrChan <- pieceErrorRequest: + em.errorHandlingStore.Add(pieceErrorRequest.TaskID, true) + return nil + default: + logrus.Warnf("drop piece error request: %+v", pieceErrorRequest) + return fmt.Errorf("%d piece errors are being processed already", ErrHandlerChanSize) + } +} + +// StartHandleError starts a goroutine to handle the piece error. +func (em *Manager) StartHandleError(ctx context.Context) { + em.initHandlers() + + go func() { + em.startHandleErrorPool(ctx) + }() + + go func() { + ticker := time.NewTicker(GCHandlingInterval) + for range ticker.C { + em.deleteHandling(ctx) + } + }() +} + +func (em *Manager) initHandlers() { + rangeFunc := func(key, value interface{}) bool { + initFunc, ok := value.(handlerInitFunc) + if !ok { + return true + } + + errType, ok := key.(string) + if !ok { + return true + } + + handler, err := initFunc(em.gcManager, em.cdnManager) + if err != nil { + logrus.Errorf("failed to init handler type %s: %v", errType, err) + return true + } + + em.handlers[errType] = handler + return true + } + + handlerStore.Range(rangeFunc) +} + +func (em *Manager) deleteHandling(ctx context.Context) { + rangeFunc := func(key, value interface{}) bool { + handledTime, ok := value.(time.Time) + if !ok { + return true + } + if time.Since(handledTime) < GCHandlingDelay { + return true + } + + if taskID, ok := key.(string); ok { + em.errorHandlingStore.Delete(taskID) + em.handledStore.Delete(taskID) + } + return true + } + + em.handledStore.Range(rangeFunc) +} +func (em *Manager) startHandleErrorPool(ctx context.Context) { + for i := 0; i < HandleErrorPool; i++ { + go func() { + for per := range em.pieceErrChan { + if err := em.handleError(ctx, per); err != nil { + logrus.Errorf("failed to handle error %+v:%v", per, err) + } + } + }() + } +} + +func (em *Manager) handleError(ctx context.Context, pieceError *types.PieceErrorRequest) error { + // add taskID to handledStore regardless of the result of handler + defer func() { + em.handledStore.Add(pieceError.TaskID, time.Now()) + }() + + handler, err := em.getHandler(ctx, pieceError.ErrorType) + if err != nil { + return errors.Wrapf(err, "failed to get handler") + } + + return handler.Handle(ctx, pieceError) +} + +func (em *Manager) getHandler(ctx context.Context, errType string) (Handler, error) { + if v, ok := em.handlers[errType]; ok { + return v, nil + } + + return nil, fmt.Errorf("unregistered error handler") +} diff --git a/supernode/daemon/mgr/pieceerror/md5_not_match.go b/supernode/daemon/mgr/pieceerror/md5_not_match.go new file mode 100644 index 000000000..b4eb52d9d --- /dev/null +++ b/supernode/daemon/mgr/pieceerror/md5_not_match.go @@ -0,0 +1,73 @@ +/* + * Copyright The Dragonfly Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package pieceerror + +import ( + "context" + + "github.com/dragonflyoss/Dragonfly/apis/types" + "github.com/dragonflyoss/Dragonfly/supernode/daemon/mgr" + "github.com/dragonflyoss/Dragonfly/supernode/util" + + "github.com/sirupsen/logrus" +) + +var _ Handler = &FileMd5NotMatchHandler{} + +type FileMd5NotMatchHandler struct { + gcManager mgr.GCMgr + cdnManager mgr.CDNMgr +} + +func init() { + Register(types.PieceErrorRequestErrorTypeFILEMD5NOTMATCH, NewFileMd5NotMatchHandler) +} + +func NewFileMd5NotMatchHandler(gcManager mgr.GCMgr, cdnManager mgr.CDNMgr) (Handler, error) { + return &FileMd5NotMatchHandler{ + gcManager: gcManager, + cdnManager: cdnManager, + }, nil +} + +func (fnmh *FileMd5NotMatchHandler) Handle(ctx context.Context, pieceErrorRequest *types.PieceErrorRequest) error { + pieceNum := util.CalculatePieceNum(pieceErrorRequest.Range) + + // get piece MD5 by reading the meta file + metaPieceMD5, err := fnmh.cdnManager.GetPieceMD5(ctx, pieceErrorRequest.TaskID, pieceNum, pieceErrorRequest.Range, "meta") + if err != nil { + logrus.Errorf("failed to get piece MD5 by read meta data taskID(%s) pieceRange(%s): %v", + pieceErrorRequest.TaskID, pieceErrorRequest.Range, err) + } + + // get piece Md5 by reading the source file on the local disk + filePieceMD5, err := fnmh.cdnManager.GetPieceMD5(ctx, pieceErrorRequest.TaskID, pieceNum, pieceErrorRequest.Range, "file") + if err != nil { + logrus.Errorf("failed to get piece MD5 by read source file directly taskID(%s) pieceRange(%s): %v", + pieceErrorRequest.TaskID, pieceErrorRequest.Range, err) + } + + logrus.Debugf("success to get taskID(%s) pieceRange(%s) metaPieceMD5(%s) filePieceMD5(%s) expectedMD5(%s)", + pieceErrorRequest.TaskID, pieceErrorRequest.Range, metaPieceMD5, filePieceMD5, pieceErrorRequest.ExpectedMd5) + + if filePieceMD5 != metaPieceMD5 && + filePieceMD5 != pieceErrorRequest.ExpectedMd5 { + fnmh.gcManager.GCTask(ctx, pieceErrorRequest.TaskID, true) + } + + return nil +} diff --git a/supernode/daemon/mgr/task/manager.go b/supernode/daemon/mgr/task/manager.go index 6b3f0d02b..c230f89c1 100644 --- a/supernode/daemon/mgr/task/manager.go +++ b/supernode/daemon/mgr/task/manager.go @@ -72,19 +72,21 @@ func newMetrics(register prometheus.Registerer) *metrics { // Manager is an implementation of the interface of TaskMgr. type Manager struct { - cfg *config.Config + cfg *config.Config + metrics *metrics + originClient httpclient.OriginHTTPClient + // store object taskStore *dutil.Store accessTimeMap *syncmap.SyncMap taskURLUnReachableStore *syncmap.SyncMap + // mgr object peerMgr mgr.PeerMgr dfgetTaskMgr mgr.DfgetTaskMgr progressMgr mgr.ProgressMgr cdnMgr mgr.CDNMgr schedulerMgr mgr.SchedulerMgr - OriginClient httpclient.OriginHTTPClient - metrics *metrics } // NewManager returns a new Manager Object. @@ -101,7 +103,7 @@ func NewManager(cfg *config.Config, peerMgr mgr.PeerMgr, dfgetTaskMgr mgr.DfgetT schedulerMgr: schedulerMgr, accessTimeMap: syncmap.NewSyncMap(), taskURLUnReachableStore: syncmap.NewSyncMap(), - OriginClient: originClient, + originClient: originClient, metrics: newMetrics(register), }, nil } diff --git a/supernode/daemon/mgr/task/manager_util.go b/supernode/daemon/mgr/task/manager_util.go index 19f56ca78..aa422bacd 100644 --- a/supernode/daemon/mgr/task/manager_util.go +++ b/supernode/daemon/mgr/task/manager_util.go @@ -328,7 +328,7 @@ func (tm *Manager) parseAvailablePeers(ctx context.Context, clientID string, tas "taskID(%s) clientID(%s) status(%s): %v", task.ID, clientID, types.DfGetTaskStatusSUCCESS, err) } finishInfo := make(map[string]interface{}) - finishInfo["md5"] = task.Md5 + finishInfo["md5"] = task.RealMd5 finishInfo["fileLength"] = task.FileLength return true, finishInfo, nil } @@ -382,11 +382,17 @@ func (tm *Manager) pieceResultToPieceInfo(ctx context.Context, pr *mgr.PieceResu return nil, err } + pieceMD5, err := tm.cdnMgr.GetPieceMD5(ctx, pr.TaskID, pr.PieceNum, "", "default") + if err != nil { + logrus.Warnf("failed to get piece MD5 taskID(%s) pieceNum(%d): %v", pr.TaskID, pr.PieceNum, err) + pieceMD5 = "" + } return &types.PieceInfo{ PID: pr.DstPID, Path: dfgetTask.Path, PeerIP: peer.IP.String(), PeerPort: peer.Port, + PieceMD5: pieceMD5, PieceRange: util.CalculatePieceRange(pr.PieceNum, pieceSize), PieceSize: pieceSize, }, nil @@ -519,7 +525,7 @@ func isWait(CDNStatus string) bool { } func (tm *Manager) getHTTPFileLength(taskID, url string, headers map[string]string) (int64, error) { - fileLength, code, err := tm.OriginClient.GetContentLength(url, headers) + fileLength, code, err := tm.originClient.GetContentLength(url, headers) if err != nil { return -1, errors.Wrapf(errortypes.ErrUnknowError, "failed to get http file Length: %v", err) } diff --git a/supernode/server/0.3_bridge.go b/supernode/server/0.3_bridge.go index 164a0288f..4f8b3497b 100644 --- a/supernode/server/0.3_bridge.go +++ b/supernode/server/0.3_bridge.go @@ -121,7 +121,7 @@ func (s *Server) registry(ctx context.Context, rw http.ResponseWriter, req *http TaskURL: request.TaskURL, SupernodeIP: request.SuperNodeIP, } - s.OriginClient.RegisterTLSConfig(taskCreateRequest.RawURL, request.Insecure, request.RootCAs) + s.originClient.RegisterTLSConfig(taskCreateRequest.RawURL, request.Insecure, request.RootCAs) resp, err := s.TaskMgr.Register(ctx, taskCreateRequest) if err != nil { logrus.Errorf("failed to register task %+v: %v", taskCreateRequest, err) diff --git a/supernode/server/piece_error_bridge.go b/supernode/server/piece_error_bridge.go new file mode 100644 index 000000000..477ee3f57 --- /dev/null +++ b/supernode/server/piece_error_bridge.go @@ -0,0 +1,65 @@ +/* + * Copyright The Dragonfly Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package server + +import ( + "context" + "encoding/json" + "net/http" + + "github.com/dragonflyoss/Dragonfly/apis/types" + "github.com/dragonflyoss/Dragonfly/pkg/errortypes" + "github.com/dragonflyoss/Dragonfly/pkg/stringutils" + + "github.com/go-openapi/strfmt" + "github.com/gorilla/mux" + "github.com/pkg/errors" +) + +func (s *Server) handlePieceError(ctx context.Context, rw http.ResponseWriter, req *http.Request) (err error) { + taskID := mux.Vars(req)["id"] + pieceRange := mux.Vars(req)["pieceRange"] + + reader := req.Body + request := &types.PieceErrorRequest{} + if err := json.NewDecoder(reader).Decode(request); err != nil { + return errors.Wrap(errortypes.ErrInvalidValue, err.Error()) + } + + if err := request.Validate(strfmt.NewFormats()); err != nil { + return errors.Wrap(errortypes.ErrInvalidValue, err.Error()) + } + + if stringutils.IsEmptyStr(request.DstPid) { + return errors.Wrap(errortypes.ErrEmptyValue, "dstPid") + } + + // fulfill the taskID and pieceRange if they are empty + if stringutils.IsEmptyStr(request.TaskID) { + request.TaskID = taskID + } + if stringutils.IsEmptyStr(request.Range) { + request.Range = pieceRange + } + + if err := s.PieceErrorMgr.HandlePieceError(ctx, request); err != nil { + return err + } + + rw.WriteHeader(http.StatusOK) + return nil +} diff --git a/supernode/server/router.go b/supernode/server/router.go index 675ec933b..0b275d74d 100644 --- a/supernode/server/router.go +++ b/supernode/server/router.go @@ -55,6 +55,12 @@ func initRoute(s *Server) *mux.Router { {Method: http.MethodGet, Path: "/peers/{id}", HandlerFunc: s.getPeer}, {Method: http.MethodGet, Path: "/peers", HandlerFunc: s.listPeers}, + // task + {Method: http.MethodDelete, Path: "/tasks/{id}", HandlerFunc: s.deleteTask}, + + // piece + {Method: http.MethodGet, Path: "/tasks/{id}/pieces/{pieceRange}/error", HandlerFunc: s.handlePieceError}, + // metrics {Method: http.MethodGet, Path: "/metrics", HandlerFunc: handleMetrics}, {Method: http.MethodPost, Path: "/task/metrics", HandlerFunc: m.handleMetricsReport}, diff --git a/supernode/server/server.go b/supernode/server/server.go index 855201dac..dd00cab68 100644 --- a/supernode/server/server.go +++ b/supernode/server/server.go @@ -29,6 +29,7 @@ import ( "github.com/dragonflyoss/Dragonfly/supernode/daemon/mgr/dfgettask" "github.com/dragonflyoss/Dragonfly/supernode/daemon/mgr/gc" "github.com/dragonflyoss/Dragonfly/supernode/daemon/mgr/peer" + "github.com/dragonflyoss/Dragonfly/supernode/daemon/mgr/pieceerror" "github.com/dragonflyoss/Dragonfly/supernode/daemon/mgr/progress" "github.com/dragonflyoss/Dragonfly/supernode/daemon/mgr/scheduler" "github.com/dragonflyoss/Dragonfly/supernode/daemon/mgr/task" @@ -44,14 +45,15 @@ var dfgetLogger *logrus.Logger // Server is supernode server struct. type Server struct { - Config *config.Config - PeerMgr mgr.PeerMgr - TaskMgr mgr.TaskMgr - DfgetTaskMgr mgr.DfgetTaskMgr - ProgressMgr mgr.ProgressMgr - GCMgr mgr.GCMgr - - OriginClient httpclient.OriginHTTPClient + Config *config.Config + PeerMgr mgr.PeerMgr + TaskMgr mgr.TaskMgr + DfgetTaskMgr mgr.DfgetTaskMgr + ProgressMgr mgr.ProgressMgr + GCMgr mgr.GCMgr + PieceErrorMgr mgr.PieceErrorMgr + + originClient httpclient.OriginHTTPClient } // New creates a brand new server instance. @@ -103,19 +105,26 @@ func New(cfg *config.Config, logger *logrus.Logger, register prometheus.Register return nil, err } - GCMgr, err := gc.NewManager(cfg, taskMgr, peerMgr, dfgetTaskMgr, progressMgr, cdnMgr) + gcMgr, err := gc.NewManager(cfg, taskMgr, peerMgr, dfgetTaskMgr, progressMgr, cdnMgr) + if err != nil { + return nil, err + } + + pieceErrorMgr, err := pieceerror.NewManager(cfg, gcMgr, cdnMgr) if err != nil { return nil, err } return &Server{ - Config: cfg, - PeerMgr: peerMgr, - TaskMgr: taskMgr, - DfgetTaskMgr: dfgetTaskMgr, - ProgressMgr: progressMgr, - GCMgr: GCMgr, - OriginClient: originClient, + Config: cfg, + PeerMgr: peerMgr, + TaskMgr: taskMgr, + DfgetTaskMgr: dfgetTaskMgr, + ProgressMgr: progressMgr, + GCMgr: gcMgr, + PieceErrorMgr: pieceErrorMgr, + + originClient: originClient, }, nil } @@ -131,7 +140,10 @@ func (s *Server) Start() error { return err } + // start to handle piece error + s.PieceErrorMgr.StartHandleError(context.Background()) s.GCMgr.StartGC(context.Background()) + server := &http.Server{ Handler: router, ReadTimeout: time.Minute * 10, diff --git a/supernode/server/task_bridge.go b/supernode/server/task_bridge.go new file mode 100644 index 000000000..23fe49414 --- /dev/null +++ b/supernode/server/task_bridge.go @@ -0,0 +1,36 @@ +/* + * Copyright The Dragonfly Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package server + +import ( + "context" + "net/http" + "strconv" + + "github.com/gorilla/mux" +) + +func (s *Server) deleteTask(ctx context.Context, rw http.ResponseWriter, req *http.Request) (err error) { + id := mux.Vars(req)["id"] + params := req.URL.Query() + full, _ := strconv.ParseBool(params.Get("full")) + + s.GCMgr.GCTask(ctx, id, full) + + rw.WriteHeader(http.StatusOK) + return nil +} diff --git a/supernode/util/range_util.go b/supernode/util/range_util.go index c67470cb3..ca15964de 100644 --- a/supernode/util/range_util.go +++ b/supernode/util/range_util.go @@ -29,49 +29,48 @@ const ( // CalculatePieceSize calculates the size of piece // according to the parameter range. func CalculatePieceSize(rangeStr string) int64 { - ranges := strings.Split(rangeStr, separator) - if len(ranges) != 2 { - return 0 - } - - startIndex, err := strconv.ParseInt(ranges[0], 10, 64) + startIndex, endIndex, err := ParsePieceIndex(rangeStr) if err != nil { return 0 } - endIndex, err := strconv.ParseInt(ranges[1], 10, 64) - if err != nil { - return 0 - } - if endIndex < startIndex { - return 0 - } - pieceSize := endIndex - startIndex + 1 - return pieceSize + return endIndex - startIndex + 1 } // CalculatePieceNum calculates the number of piece // according to the parameter range. func CalculatePieceNum(rangeStr string) int { + startIndex, endIndex, err := ParsePieceIndex(rangeStr) + if err != nil { + return -1 + } + + pieceSize := endIndex - startIndex + 1 + + return int(startIndex / pieceSize) +} + +// ParsePieceIndex parses the start and end index ​​according to range string. +func ParsePieceIndex(rangeStr string) (start, end int64, err error) { ranges := strings.Split(rangeStr, separator) if len(ranges) != 2 { - return -1 + return -1, -1, fmt.Errorf("range value(%s) is illegal which should be like 0-45535", rangeStr) } startIndex, err := strconv.ParseInt(ranges[0], 10, 64) if err != nil { - return -1 + return -1, -1, fmt.Errorf("range(%s) start is not a number", rangeStr) } endIndex, err := strconv.ParseInt(ranges[1], 10, 64) if err != nil { - return -1 + return -1, -1, fmt.Errorf("range(%s) end is not a number", rangeStr) } + if endIndex < startIndex { - return -1 + return -1, -1, fmt.Errorf("range(%s) start is larger than end", rangeStr) } - pieceSize := endIndex - startIndex + 1 - return int(startIndex / pieceSize) + return startIndex, endIndex, nil } // CalculateBreakRange calculates the start and end of piece