Skip to content

Commit

Permalink
feat: Add distribute Schedule Tracer & Refactor scheduler (#537)
Browse files Browse the repository at this point in the history
* check same cdn list avoid updating client too frequent

Signed-off-by: santong <weipeng.swp@alibaba-inc.com>
  • Loading branch information
244372610 committed Aug 12, 2021
1 parent 25d58d4 commit 0cc52f5
Show file tree
Hide file tree
Showing 40 changed files with 674 additions and 536 deletions.
2 changes: 1 addition & 1 deletion cdnsystem/config/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ const (

// DefaultTaskExpireTime when a task is not accessed within the taskExpireTime,
// and it will be treated to be expired.
DefaultTaskExpireTime = 3 * time.Minute
DefaultTaskExpireTime = 30 * time.Minute
)

const (
Expand Down
7 changes: 6 additions & 1 deletion client/daemon/daemon.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ import (
"d7y.io/dragonfly/v2/pkg/rpc"
"d7y.io/dragonfly/v2/pkg/rpc/scheduler"
schedulerclient "d7y.io/dragonfly/v2/pkg/rpc/scheduler/client"
"go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc"
)

type Daemon interface {
Expand Down Expand Up @@ -96,7 +97,11 @@ func New(opt *config.DaemonOption) (Daemon, error) {
NetTopology: opt.Host.NetTopology,
}

sched, err := schedulerclient.GetClientByAddr(opt.Scheduler.NetAddrs)
var opts []grpc.DialOption
if opt.Options.Telemetry.Jaeger != "" {
opts = append(opts, grpc.WithChainUnaryInterceptor(otelgrpc.UnaryClientInterceptor()), grpc.WithChainStreamInterceptor(otelgrpc.StreamClientInterceptor()))
}
sched, err := schedulerclient.GetClientByAddr(opt.Scheduler.NetAddrs, opts...)
if err != nil {
return nil, errors.Wrap(err, "failed to get schedulers")
}
Expand Down
7 changes: 3 additions & 4 deletions client/daemon/peer/peertask_file.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,14 +88,14 @@ func newFilePeerTask(ctx context.Context,
logger.Infof("request overview, url: %s, filter: %s, meta: %s, biz: %s, peer: %s", request.Url, request.UrlMeta.Filter, request.UrlMeta, request.UrlMeta.Tag, request.PeerId)
// trace register
regCtx, regSpan := tracer.Start(ctx, config.SpanRegisterTask)
result, err := schedulerClient.RegisterPeerTask(regCtx, request)
logger.Infof("step 1: peer %s start to register", request.PeerId)
result, err := schedulerClient.RegisterPeerTask(regCtx, request)
regSpan.RecordError(err)
regSpan.End()

var needBackSource bool
if err != nil {
logger.Errorf("step 1: peer %s register failed: err", request.PeerId, err)
logger.Errorf("step 1: peer %s register failed: %v", request.PeerId, err)
if schedulerOption.DisableAutoBackSource {
logger.Errorf("register peer task failed: %s, peer id: %s, auto back source disabled", err, request.PeerId)
span.RecordError(err)
Expand Down Expand Up @@ -148,9 +148,8 @@ func newFilePeerTask(ctx context.Context,
logger.Infof("%s/%s size scope: normal", result.TaskId, request.PeerId)
}
}

peerPacketStream, err := schedulerClient.ReportPieceResult(ctx, result.TaskId, request)
logger.Infof("step 2: start report peer %s piece result", request.PeerId)
peerPacketStream, err := schedulerClient.ReportPieceResult(ctx, result.TaskId, request)
if err != nil {
logger.Errorf("step 2: peer %s report piece failed: err", request.PeerId, err)
defer span.End()
Expand Down
2 changes: 1 addition & 1 deletion client/daemon/peer/peertask_stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,8 +71,8 @@ func newStreamPeerTask(ctx context.Context,
request.PeerId, request.Url, request.UrlMeta.Filter, request.UrlMeta, request.UrlMeta.Tag)
// trace register
regCtx, regSpan := tracer.Start(ctx, config.SpanRegisterTask)
result, err := schedulerClient.RegisterPeerTask(regCtx, request)
logger.Infof("step 1: peer %s start to register", request.PeerId)
result, err := schedulerClient.RegisterPeerTask(regCtx, request)
regSpan.RecordError(err)
regSpan.End()

Expand Down
2 changes: 1 addition & 1 deletion cmd/cdn/cmd/root.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ func runCdnSystem() error {
s, _ := yaml.Marshal(cfg)
logger.Infof("cdn system configuration:\n%s", string(s))

ff := dependency.InitMonitor(cfg.Verbose, cfg.PProfPort, cfg.Telemetry.Jaeger)
ff := dependency.InitMonitor(cfg.Verbose, cfg.PProfPort, cfg.Telemetry)
defer ff()

svr, err := server.New(cfg)
Expand Down
3 changes: 2 additions & 1 deletion cmd/dependency/base/option.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,5 +25,6 @@ type Options struct {

// TelemetryOption is the option for telemetry
type TelemetryOption struct {
Jaeger string `yaml:"jaeger" mapstructure:"jaeger"`
Jaeger string `yaml:"jaeger" mapstructure:"jaeger"`
ServiceName string `yaml:"serviceName" mapstructure:"serviceName"`
}
14 changes: 8 additions & 6 deletions cmd/dependency/dependency.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
"syscall"
"time"

"d7y.io/dragonfly/v2/cmd/dependency/base"
"github.com/go-echarts/statsview"
"github.com/go-echarts/statsview/viewer"
"github.com/mitchellh/mapstructure"
Expand Down Expand Up @@ -67,6 +68,7 @@ func InitCobra(cmd *cobra.Command, useConfigFile bool, config interface{}) {
flags.Bool("verbose", false, "whether logger use debug level")
flags.Int("pprof-port", -1, "listen port for pprof, 0 represents random port")
flags.String("jaeger", "", "jaeger endpoint url, like: http://localhost:14250/api/traces")
flags.String("service-name", fmt.Sprintf("%s-%s", "dragonfly", cmd.Name()), "name of the service for tracer")
flags.String("config", "", fmt.Sprintf("the path of configuration file with yaml extension name, default is %s, it can also be set by env var: %s", filepath.Join(dfpath.DefaultConfigDir, rootName+".yaml"), strings.ToUpper(rootName+"_config")))

// Bind common flags
Expand All @@ -86,7 +88,7 @@ func InitCobra(cmd *cobra.Command, useConfigFile bool, config interface{}) {
}

// InitMonitor initialize monitor and return final handler
func InitMonitor(verbose bool, pprofPort int, jaeger string) func() {
func InitMonitor(verbose bool, pprofPort int, otelOption base.TelemetryOption) func() {
var fc = make(chan func(), 5)

if verbose {
Expand Down Expand Up @@ -116,8 +118,8 @@ func InitMonitor(verbose bool, pprofPort int, jaeger string) func() {
}()
}

if jaeger != "" {
ff, err := initJaegerTracer(jaeger)
if otelOption.Jaeger != "" {
ff, err := initJaegerTracer(otelOption)
if err != nil {
logger.Warnf("init jaeger tracer error: %v", err)
}
Expand Down Expand Up @@ -227,8 +229,8 @@ func initDecoderConfig(dc *mapstructure.DecoderConfig) {
}

// initTracer creates a new trace provider instance and registers it as global trace provider.
func initJaegerTracer(url string) (func(), error) {
exp, err := jaeger.NewRawExporter(jaeger.WithCollectorEndpoint(jaeger.WithEndpoint(url)))
func initJaegerTracer(otelOption base.TelemetryOption) (func(), error) {
exp, err := jaeger.NewRawExporter(jaeger.WithCollectorEndpoint(jaeger.WithEndpoint(otelOption.Jaeger)))
if err != nil {
return nil, err
}
Expand All @@ -239,7 +241,7 @@ func initJaegerTracer(url string) (func(), error) {
sdktrace.WithSampler(sdktrace.AlwaysSample()),
// Record information about this application in an Resource.
sdktrace.WithResource(resource.NewWithAttributes(
semconv.ServiceNameKey.String("dragonfly"),
semconv.ServiceNameKey.String(otelOption.ServiceName),
semconv.ServiceInstanceIDKey.String(fmt.Sprintf("%s|%s", iputils.HostName, iputils.HostIP)),
semconv.ServiceVersionKey.String(version.GitVersion))),
)
Expand Down
2 changes: 1 addition & 1 deletion cmd/dfget/cmd/daemon.go
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,7 @@ func runDaemon() error {
s, _ := yaml.Marshal(cfg)
logger.Infof("client daemon configuration:\n%s", string(s))

ff := dependency.InitMonitor(cfg.Verbose, cfg.PProfPort, cfg.Telemetry.Jaeger)
ff := dependency.InitMonitor(cfg.Verbose, cfg.PProfPort, cfg.Telemetry)
defer ff()

svr, err := server.New(cfg)
Expand Down
2 changes: 1 addition & 1 deletion cmd/dfget/cmd/root.go
Original file line number Diff line number Diff line change
Expand Up @@ -158,7 +158,7 @@ func runDfget() error {
s, _ := yaml.Marshal(dfgetConfig)
logger.Infof("client dfget configuration:\n%s", string(s))

ff := dependency.InitMonitor(dfgetConfig.Verbose, dfgetConfig.PProfPort, dfgetConfig.Telemetry.Jaeger)
ff := dependency.InitMonitor(dfgetConfig.Verbose, dfgetConfig.PProfPort, dfgetConfig.Telemetry)
defer ff()

var (
Expand Down
2 changes: 1 addition & 1 deletion cmd/manager/cmd/root.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ func runManager() error {

logger.Infof("manager configuration:\n%s", string(s))

ff := dependency.InitMonitor(cfg.Verbose, cfg.PProfPort, cfg.Telemetry.Jaeger)
ff := dependency.InitMonitor(cfg.Verbose, cfg.PProfPort, cfg.Telemetry)
defer ff()

svr, err := server.New(cfg)
Expand Down
6 changes: 3 additions & 3 deletions cmd/scheduler/cmd/root.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,8 @@ import (
"d7y.io/dragonfly/v2/cmd/dependency"
logger "d7y.io/dragonfly/v2/internal/dflog"
"d7y.io/dragonfly/v2/internal/dflog/logcore"
"d7y.io/dragonfly/v2/scheduler"
"d7y.io/dragonfly/v2/scheduler/config"
"d7y.io/dragonfly/v2/scheduler/server"
"github.com/pkg/errors"
"github.com/spf13/cobra"
"gopkg.in/yaml.v3"
Expand Down Expand Up @@ -83,10 +83,10 @@ func runScheduler() error {
s, _ := yaml.Marshal(cfg)
logger.Infof("scheduler configuration:\n%s", string(s))

ff := dependency.InitMonitor(cfg.Verbose, cfg.PProfPort, cfg.Telemetry.Jaeger)
ff := dependency.InitMonitor(cfg.Verbose, cfg.PProfPort, cfg.Telemetry)
defer ff()

svr, err := server.New(cfg)
svr, err := scheduler.New(cfg)
if err != nil {
logger.Errorf("get scheduler server error: %s", err)
return err
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -718,9 +718,11 @@ github.com/tidwall/match v1.0.1/go.mod h1:LujAq0jyVjBy028G1WhWfIzbpQfMO8bBZ6Tyb0
github.com/tidwall/pretty v1.0.0 h1:HsD+QiTn7sK6flMKIvNmpqz1qrpP3Ps6jOKIKMooyg4=
github.com/tidwall/pretty v1.0.0/go.mod h1:XNkn88O1ChpSDQmQeStsy+sBenx6DDtFZJxhVysOjyk=
github.com/tmc/grpc-websocket-proxy v0.0.0-20190109142713-0ad062ec5ee5/go.mod h1:ncp9v5uamzpCO7NfCPTXjqaC+bZgJeR0sMTm6dMHP7U=
github.com/ugorji/go v1.1.7 h1:/68gy2h+1mWMrwZFeD1kQialdSzAb432dtpeJ42ovdo=
github.com/ugorji/go v1.1.7/go.mod h1:kZn38zHttfInRq0xu/PH0az30d+z6vm202qpg1oXVMw=
github.com/ugorji/go v1.1.13 h1:nB3O5kBSQGjEQAcfe1aLUYuxmXdFKmYgBZhY32rQb6Q=
github.com/ugorji/go v1.1.13/go.mod h1:jxau1n+/wyTGLQoCkjok9r5zFa/FxT6eI5HiHKQszjc=
github.com/ugorji/go/codec v1.1.7 h1:2SvQaVZ1ouYrrKKwoSk2pzd4A9evlKJb9oTL+OaLUSs=
github.com/ugorji/go/codec v1.1.7/go.mod h1:Ax+UKWsSmolVDwsd+7N3ZtXu+yMGCf907BLYF3GoBXY=
github.com/ugorji/go/codec v1.1.13 h1:013LbFhocBoIqgHeIHKlV4JWYhqogATYWZhIcH0WHn4=
github.com/ugorji/go/codec v1.1.13/go.mod h1:oNVt3Dq+FO91WNQ/9JnHKQP2QJxTzoN7wCBFCq1OeuU=
Expand Down
1 change: 0 additions & 1 deletion internal/dfcodes/rpc_code.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,6 @@ const (
SchedPeerPieceResultReportFail base.Code = 5006
SchedCDNSeedFail base.Code = 5007
SchedTaskStatusError base.Code = 5008
SchedWithoutParentPeer base.Code = 5009

// cdnsystem response error 6000-6999
CdnError base.Code = 6000
Expand Down
8 changes: 2 additions & 6 deletions pkg/rpc/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ import (

"github.com/pkg/errors"
"github.com/serialx/hashring"
"go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc"
"google.golang.org/grpc"
"google.golang.org/grpc/keepalive"

Expand Down Expand Up @@ -94,11 +93,8 @@ var defaultClientOpts = []grpc.DialOption{
Time: 2 * time.Minute,
Timeout: 10 * time.Second,
}),
// TODO make grpc interceptor optional
grpc.WithChainStreamInterceptor(otelgrpc.StreamClientInterceptor(),
streamClientInterceptor),
grpc.WithChainUnaryInterceptor(otelgrpc.UnaryClientInterceptor(),
unaryClientInterceptor),
grpc.WithStreamInterceptor(streamClientInterceptor),
grpc.WithUnaryInterceptor(unaryClientInterceptor),
}

type ConnOption interface {
Expand Down
2 changes: 1 addition & 1 deletion pkg/rpc/scheduler/client/peer_packet_stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ func newPeerPacketStream(ctx context.Context, sc *schedulerClient, hashKey strin

pps := &peerPacketStream{
sc: sc,
ctx: context.Background(),
ctx: ctx,
hashKey: hashKey,
ptr: ptr,
opts: opts,
Expand Down
8 changes: 2 additions & 6 deletions pkg/rpc/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@ import (
"syscall"
"time"

"go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc"
"google.golang.org/grpc"
"google.golang.org/grpc/keepalive"
"google.golang.org/grpc/status"
Expand Down Expand Up @@ -76,11 +75,8 @@ var serverOpts = []grpc.ServerOption{
MaxConnectionIdle: 5 * time.Minute,
}),
grpc.MaxConcurrentStreams(100),
// TODO make grpc interceptor optional
grpc.ChainStreamInterceptor(otelgrpc.StreamServerInterceptor(),
streamServerInterceptor),
grpc.ChainUnaryInterceptor(otelgrpc.UnaryServerInterceptor(),
unaryServerInterceptor),
grpc.StreamInterceptor(streamServerInterceptor),
grpc.UnaryInterceptor(unaryServerInterceptor),
}

var sp = struct {
Expand Down
1 change: 0 additions & 1 deletion pkg/structure/sortedlist/sorted_list.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,6 @@ func (l *SortedList) Update(data Item) (err error) {

l.deleteItem(oldKey1, oldKey2, data)
l.addItem(key1, key2, data)

return
}

Expand Down
28 changes: 17 additions & 11 deletions scheduler/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,7 @@ func NewDefaultSchedulerConfig() *SchedulerConfig {
AScheduler: "",
BScheduler: "",
WorkerNum: runtime.GOMAXPROCS(0),
BackSourceCount: 3,
AccessWindow: 3 * time.Minute,
CandidateParentCount: 10,
Scheduler: "basic",
Expand All @@ -130,8 +131,10 @@ func NewDefaultGCConfig() *GCConfig {
return &GCConfig{
PeerGCInterval: 5 * time.Minute,
TaskGCInterval: 5 * time.Minute,
PeerTTL: 5 * time.Minute,
TaskTTL: 1 * time.Hour,
PeerTTL: 10 * time.Minute,
PeerTTI: 3 * time.Minute,
TaskTTL: 10 * time.Minute,
TaskTTI: 3 * time.Minute,
}
}

Expand Down Expand Up @@ -214,17 +217,18 @@ type DynConfig struct {
}

type SchedulerConfig struct {
DisableCDN bool `yaml:"disableCDN" mapstructure:"disableCDN"`
ABTest bool `yaml:"abtest" mapstructure:"abtest"`
AScheduler string `yaml:"ascheduler" mapstructure:"ascheduler"`
BScheduler string `yaml:"bscheduler" mapstructure:"bscheduler"`
WorkerNum int `yaml:"workerNum" mapstructure:"workerNum"`
DisableCDN bool `yaml:"disableCDN" mapstructure:"disableCDN"`
ABTest bool `yaml:"abtest" mapstructure:"abtest"`
AScheduler string `yaml:"ascheduler" mapstructure:"ascheduler"`
BScheduler string `yaml:"bscheduler" mapstructure:"bscheduler"`
WorkerNum int `yaml:"workerNum" mapstructure:"workerNum"`
BackSourceCount int `yaml:"backSourceCount" mapstructure:"backSourceCount"`
// AccessWindow should less than CDN task expireTime
AccessWindow time.Duration `yaml:"accessWindow" mapstructure:"accessWindow"`
CandidateParentCount int `yaml:"candidateParentCount" mapstructure:"candidateParentCount"`
Scheduler string `yaml:"scheduler" mapstructure:"scheduler"`
CDNLoad int `yaml:"cDNLoad" mapstructure:"cDNLoad"`
ClientLoad int `yaml:"clientLoad" mapstructure:"clientLoad"`
CDNLoad int `yaml:"cdnLoad" mapstructure:"cdnLoad"`
ClientLoad int32 `yaml:"clientLoad" mapstructure:"clientLoad"`
OpenMonitor bool `yaml:"openMonitor" mapstructure:"openMonitor"`
GC *GCConfig `yaml:"gc" mapstructure:"gc"`
}
Expand All @@ -237,9 +241,11 @@ type ServerConfig struct {

type GCConfig struct {
PeerGCInterval time.Duration `yaml:"peerGCInterval" mapstructure:"peerGCInterval"`
PeerTTL time.Duration `yaml:"peerTTL" mapstructure:"peerTTL"`
PeerTTI time.Duration `yaml:"peerTTI" mapstructure:"peerTTI"`
TaskGCInterval time.Duration `yaml:"taskGCInterval" mapstructure:"taskGCInterval"`
PeerTTL time.Duration
TaskTTL time.Duration
TaskTTL time.Duration `yaml:"taskTTL" mapstructure:"taskTTL"`
TaskTTI time.Duration `yaml:"taskTTI" mapstructure:"taskTTI"`
}

type HostConfig struct {
Expand Down
54 changes: 54 additions & 0 deletions scheduler/config/constants_otel.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
/*
* Copyright 2020 The Dragonfly Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package config

import "go.opentelemetry.io/otel/attribute"

const (
AttributePeerRegisterRequest = attribute.Key("d7y.peer.register.request")
AttributeTaskSizeScope = attribute.Key("d7y.task.size.scope")
AttributeSinglePiece = attribute.Key("d7y.peer.single.piece")
AttributePieceReceived = attribute.Key("d7y.peer.piece.received")
AttributeLeavePeerID = attribute.Key("d7y.leave.peer.id")
AttributeLeaveTaskID = attribute.Key("d7y.leave.task.id")
AttributeReportPeerID = attribute.Key("d7y.report.peer.id")
AttributePeerDownloadSuccess = attribute.Key("d7y.peer.download.success")
AttributeDownloadFileURL = attribute.Key("d7y.file.url")
AttributeContentLength = attribute.Key("d7y.source.content.length")
AttributePeerDownloadResult = attribute.Key("d7y.peer.download.result")
AttributeSchedulePacket = attribute.Key("d7y.schedule.packet")
AttributeTaskID = attribute.Key("d7y.peer.task.id")
AttributePeerID = attribute.Key("d7y.peer.id")
AttributeCDNSeedRequest = attribute.Key("d7y.cdn.seed.request")
)

const (
SpanPeerRegister = "peer-register"
SpanTriggerCDN = "trigger-cdn"
SpanReportPieceResult = "report-piece-result"
SpanReportPeerResult = "report-peer-result"
SpanPeerLeave = "peer-leave"
)

const (
EventScheduleParentFail = "fail-schedule-parent"
EventPeerNotFound = "peer-not-found"
EventHostNotFound = "host-not-found"
EventCreatePeer = "create-peer"
EventPieceReceived = "receive-piece"
EventPeerDownloaded = "downloaded"
)

0 comments on commit 0cc52f5

Please sign in to comment.