Skip to content

Commit

Permalink
Add CDN Tracer (#555)
Browse files Browse the repository at this point in the history
* feat: add cdn grpc interceptor tracer

Signed-off-by: santong <244372610@qq.com>

* feat: cdn tracer

Signed-off-by: santong <weipeng.swp@alibaba-inc.com>
  • Loading branch information
244372610 committed Aug 18, 2021
1 parent 7a3088e commit d74f830
Show file tree
Hide file tree
Showing 31 changed files with 445 additions and 171 deletions.
8 changes: 6 additions & 2 deletions cdnsystem/cdn.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ import (
"d7y.io/dragonfly/v2/pkg/rpc/manager"
"d7y.io/dragonfly/v2/pkg/util/net/iputils"
"github.com/pkg/errors"
"go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc"
"google.golang.org/grpc"
)

Expand Down Expand Up @@ -144,8 +145,11 @@ func (s *Server) Serve() (err error) {
nil,
)
}

err = rpc.StartTCPServer(s.config.ListenPort, s.config.ListenPort, s.seedServer)
var opts []grpc.ServerOption
if s.config.Options.Telemetry.Jaeger != "" {
opts = append(opts, grpc.ChainUnaryInterceptor(otelgrpc.UnaryServerInterceptor()), grpc.ChainStreamInterceptor(otelgrpc.StreamServerInterceptor()))
}
err = rpc.StartTCPServer(s.config.ListenPort, s.config.ListenPort, s.seedServer, opts...)
if err != nil {
return errors.Wrap(err, "start tcp server")
}
Expand Down
57 changes: 57 additions & 0 deletions cdnsystem/config/constants_otel.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
/*
* Copyright 2020 The Dragonfly Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package config

import "go.opentelemetry.io/otel/attribute"

const (
AttributeObtainSeedsRequest = attribute.Key("d7y.obtain.seeds.request")
AttributeGetPieceTasksRequest = attribute.Key("d7y.get.piece.tasks.request")
AttributePiecePacketResult = attribute.Key("d7y.piece.packet.result")
AttributeTaskID = attribute.Key("d7y.task.id")
AttributeTaskStatus = attribute.Key("d7y.task.status")
AttributeTaskURL = attribute.Key("d7y.task.url")
AttributeTaskInfo = attribute.Key("d7y.taskInfo")
AttributeIfReuseTask = attribute.Key("d7y.task.already.exist")
AttributeSeedPiece = attribute.Key("d7y.seed.piece")
AttributeSeedTask = attribute.Key("d7y.seed.task")
AttributeCacheResult = attribute.Key("d7y.cache.result")
AttributeWriteGoroutineCount = attribute.Key("d7y.write.goroutine.count")
AttributeDownloadFileInfo = attribute.Key("d7y.download.file.info")
)

const (
SpanObtainSeeds = "cdn-obtain-seeds"
SpanGetPieceTasks = "get-piece-tasks"
SpanTaskRegister = "task-register"
SpanAndOrUpdateTask = "add-or-update-task"
SpanTriggerCDNSyncAction = "trigger-cdn-sync-action"
SpanTriggerCDN = "trigger-cdn"
SpanDetectCache = "detect-cache"
SpanDownloadSource = "download-source"
SpanWriteData = "write-data"
)

const (
EventHitUnReachableURL = "hit-unReachableURL"
EventRequestSourceFileLength = "request-source-file-length"
EventDeleteUnReachableTask = "downloaded"
EventInitSeedProgress = "init-seed-progress"
EventWatchSeedProgress = "watch-seed-progress"
EventPublishPiece = "publish-piece"
EventPublishTask = "publish-task"
)
96 changes: 75 additions & 21 deletions cdnsystem/rpcserver/rpcserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,15 +37,35 @@ import (
"d7y.io/dragonfly/v2/pkg/util/net/urlutils"
"d7y.io/dragonfly/v2/pkg/util/stringutils"
"github.com/pkg/errors"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/trace"
)

var tracer trace.Tracer

func init() {
tracer = otel.Tracer("cdn-server")
}

type options struct {
tracer trace.Tracer
}

type Option func(*options)

func WithTracer(tracer trace.Tracer) Option {
return func(o *options) {
o.tracer = tracer
}
}

// CdnSeedServer is used to implement cdnsystem.SeederServer.
type CdnSeedServer struct {
taskMgr supervisor.SeedTaskMgr
cfg *config.Config
}

// NewManager returns a new Manager Object.
// NewCdnSeedServer returns a new Manager Object.
func NewCdnSeedServer(cfg *config.Config, taskMgr supervisor.SeedTaskMgr) (*CdnSeedServer, error) {
return &CdnSeedServer{
taskMgr: taskMgr,
Expand Down Expand Up @@ -95,30 +115,43 @@ func checkSeedRequestParams(req *cdnsystem.SeedRequest) error {
}

func (css *CdnSeedServer) ObtainSeeds(ctx context.Context, req *cdnsystem.SeedRequest, psc chan<- *cdnsystem.PieceSeed) (err error) {
var span trace.Span
ctx, span = tracer.Start(ctx, config.SpanObtainSeeds, trace.WithSpanKind(trace.SpanKindServer))
defer span.End()
span.SetAttributes(config.AttributeObtainSeedsRequest.String(req.String()))
span.SetAttributes(config.AttributeTaskID.String(req.TaskId))
defer func() {
if r := recover(); r != nil {
err = dferrors.Newf(dfcodes.UnknownError, "a panic error was encountered for obtain task(%s) seeds: %v", req.TaskId, r)
err = dferrors.Newf(dfcodes.UnknownError, "encounter an panic: %v", r)
span.RecordError(err)
logger.WithTaskID(req.TaskId).Errorf("failed to obtain task(%s) seeds, req=%+v: %v", req.TaskId, req, err)
}

if err != nil {
logger.WithTaskID(req.TaskId).Errorf("failed to obtain task(%s) seeds, request: %+v %v", req.TaskId, req, err)

span.RecordError(err)
logger.WithTaskID(req.TaskId).Errorf("failed to obtain task(%s) seeds, req=%+v: %v", req.TaskId, req, err)
}
}()
logger.Infof("obtain seeds request: %+v", req)

registerRequest, err := constructRegisterRequest(req)
if err != nil {
return dferrors.Newf(dfcodes.BadRequest, "bad seed request for task(%s): %v", req.TaskId, err)
err = dferrors.Newf(dfcodes.BadRequest, "bad seed request for task(%s): %v", req.TaskId, err)
span.RecordError(err)
return err
}
// register task
pieceChan, err := css.taskMgr.Register(ctx, registerRequest)

if err != nil {
return dferrors.Newf(dfcodes.CdnTaskRegistryFail, "failed to register seed task(%s): %v", req.TaskId, err)
err = dferrors.Newf(dfcodes.CdnTaskRegistryFail, "failed to register seed task(%s): %v", req.TaskId, err)
span.RecordError(err)
return err
}
task, err := css.taskMgr.Get(req.TaskId)
if err != nil {
return dferrors.Newf(dfcodes.CdnError, "failed to get task(%s): %v", req.TaskId, err)
err = dferrors.Newf(dfcodes.CdnError, "failed to get task(%s): %v", req.TaskId, err)
span.RecordError(err)
return err
}
peerID := cdnutil.GenCDNPeerID(req.TaskId)
for piece := range pieceChan {
Expand All @@ -136,10 +169,11 @@ func (css *CdnSeedServer) ObtainSeeds(ctx context.Context, req *cdnsystem.SeedRe
Done: false,
ContentLength: task.SourceFileLength,
}

}
if task.CdnStatus != types.TaskInfoCdnStatusSuccess {
return dferrors.Newf(dfcodes.CdnTaskDownloadFail, "task(%s) status error , status: %s", req.TaskId, task.CdnStatus)
err = dferrors.Newf(dfcodes.CdnTaskDownloadFail, "task(%s) status error , status: %s", req.TaskId, task.CdnStatus)
span.RecordError(err)
return err
}
psc <- &cdnsystem.PieceSeed{
PeerId: peerID,
Expand All @@ -152,56 +186,76 @@ func (css *CdnSeedServer) ObtainSeeds(ctx context.Context, req *cdnsystem.SeedRe
}

func (css *CdnSeedServer) GetPieceTasks(ctx context.Context, req *base.PieceTaskRequest) (piecePacket *base.PiecePacket, err error) {
var span trace.Span
ctx, span = tracer.Start(ctx, config.SpanGetPieceTasks, trace.WithSpanKind(trace.SpanKindServer))
defer span.End()
span.SetAttributes(config.AttributeGetPieceTasksRequest.String(req.String()))
span.SetAttributes(config.AttributeTaskID.String(req.TaskId))
defer func() {
if r := recover(); r != nil {
logger.WithTaskID(req.TaskId).Errorf("failed to get piece tasks, req=%+v: %v", req, r)
err = errors.Errorf("encounter an panic: %v", r)
span.RecordError(err)
logger.WithTaskID(req.TaskId).Errorf("failed to get piece tasks, req=%+v: %v", req, err)
}
if err != nil {
span.RecordError(err)
logger.WithTaskID(req.TaskId).Errorf("failed to get piece tasks, req=%+v: %v", req, err)
}
}()
if err := checkPieceTasksRequestParams(req); err != nil {
return nil, dferrors.Newf(dfcodes.BadRequest, "failed to validate seed request for task(%s): %v", req.TaskId, err)
err = dferrors.Newf(dfcodes.BadRequest, "failed to validate seed request for task(%s): %v", req.TaskId, err)
span.RecordError(err)
return nil, err
}
task, err := css.taskMgr.Get(req.TaskId)
if err != nil {
if cdnerrors.IsDataNotFound(err) {
return nil, dferrors.Newf(dfcodes.CdnTaskNotFound, "failed to get task(%s) from cdn: %v", req.TaskId, err)
err = dferrors.Newf(dfcodes.CdnTaskNotFound, "failed to get task(%s) from cdn: %v", req.TaskId, err)
span.RecordError(err)
return nil, err
}
return nil, dferrors.Newf(dfcodes.CdnError, "failed to get task(%s) from cdn: %v", req.TaskId, err)
err = dferrors.Newf(dfcodes.CdnError, "failed to get task(%s) from cdn: %v", req.TaskId, err)
span.RecordError(err)
return nil, err
}
if task.IsError() {
return nil, dferrors.Newf(dfcodes.CdnTaskDownloadFail, "fail to download task(%s), cdnStatus: %s", task.TaskID, task.CdnStatus)
err = dferrors.Newf(dfcodes.CdnTaskDownloadFail, "fail to download task(%s), cdnStatus: %s", task.TaskID, task.CdnStatus)
span.RecordError(err)
return nil, err
}
pieces, err := css.taskMgr.GetPieces(ctx, req.TaskId)
if err != nil {
return nil, dferrors.Newf(dfcodes.CdnError, "failed to get pieces of task(%s) from cdn: %v", task.TaskID, err)
err = dferrors.Newf(dfcodes.CdnError, "failed to get pieces of task(%s) from cdn: %v", task.TaskID, err)
span.RecordError(err)
return nil, err
}
pieceInfos := make([]*base.PieceInfo, 0)
var count int32 = 0
for _, piece := range pieces {
if piece.PieceNum >= req.StartNum && (count < req.Limit || req.Limit == 0) {
pieceInfos = append(pieceInfos, &base.PieceInfo{
p := &base.PieceInfo{
PieceNum: piece.PieceNum,
RangeStart: piece.PieceRange.StartIndex,
RangeSize: piece.PieceLen,
PieceMd5: piece.PieceMd5,
PieceOffset: piece.OriginRange.StartIndex,
PieceStyle: base.PieceStyle(piece.PieceStyle),
})
}
pieceInfos = append(pieceInfos, p)
count++
}
}

return &base.PiecePacket{
pp := &base.PiecePacket{
TaskId: req.TaskId,
DstPid: fmt.Sprintf("%s-%s_%s", iputils.HostName, req.TaskId, "CDN"),
DstAddr: fmt.Sprintf("%s:%d", css.cfg.AdvertiseIP, css.cfg.DownloadPort),
PieceInfos: pieceInfos,
TotalPiece: task.PieceTotal,
ContentLength: task.SourceFileLength,
PieceMd5Sign: task.PieceMd5Sign,
}, nil
}
span.SetAttributes(config.AttributePiecePacketResult.String(pp.String()))
return pp, nil
}

func checkPieceTasksRequestParams(req *base.PieceTaskRequest) error {
Expand Down
22 changes: 11 additions & 11 deletions cdnsystem/storedriver/driver.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,22 +103,22 @@ type Config struct {
// Raw identifies a piece of data uniquely.
// If the length<=0, it represents all data.
type Raw struct {
Bucket string
Key string
Offset int64
Length int64
Trunc bool
TruncSize int64
Append bool
Bucket string `json:"bucket"`
Key string `json:"key"`
Offset int64 `json:"offset"`
Length int64 `json:"length"`
Trunc bool `json:"trunc"`
TruncSize int64 `json:"trunc_size"`
Append bool `json:"append"`
WalkFn filepath.WalkFunc
}

// StorageInfo includes partial meta information of the data.
type StorageInfo struct {
Path string // file path
Size int64 // file size
CreateTime time.Time // create time
ModTime time.Time // modified time
Path string `json:"path"` // file path
Size int64 `json:"size"` // file size
CreateTime time.Time `json:"create_time"` // create time
ModTime time.Time `json:"mod_time"` // modified time
}

// driverPlugin is a wrapper of the storage driver which implements the interface of Driver.
Expand Down
28 changes: 14 additions & 14 deletions cdnsystem/storedriver/mock_driver.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

0 comments on commit d74f830

Please sign in to comment.