Skip to content

Commit

Permalink
Reload CDN client (#566)
Browse files Browse the repository at this point in the history
* optimaze dynamic cdn hosts

Signed-off-by: santong <244372610@qq.com>

* feat: set default cdn loadLimit

Signed-off-by: santong <244372610@qq.com>
  • Loading branch information
244372610 committed Aug 19, 2021
1 parent f73fc50 commit fb54969
Show file tree
Hide file tree
Showing 13 changed files with 88 additions and 177 deletions.
1 change: 1 addition & 0 deletions scheduler/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@ func New() *Config {
Location: "",
IDC: "",
NetTopology: "",
LoadLimit: 100,
},
},
},
Expand Down
1 change: 1 addition & 0 deletions scheduler/config/dynconfig.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ type CDN struct {
Location string `yaml:"location" mapstructure:"location" json:"location"`
IDC string `yaml:"idc" mapstructure:"idc" json:"idc"`
NetTopology string `yaml:"netTopology" mapstructure:"netTopology" json:"net_topology"`
LoadLimit int32 `yaml:"loadLimit" mapstructure:"loadLimit" json:"load_limit"`
}

type DynconfigInterface interface {
Expand Down
37 changes: 12 additions & 25 deletions scheduler/core/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,6 @@ import (
"d7y.io/dragonfly/v2/scheduler/core/scheduler"
"d7y.io/dragonfly/v2/scheduler/supervisor"
"d7y.io/dragonfly/v2/scheduler/supervisor/cdn"
"d7y.io/dragonfly/v2/scheduler/supervisor/cdn/d7y"
"d7y.io/dragonfly/v2/scheduler/supervisor/cdn/source"
"d7y.io/dragonfly/v2/scheduler/supervisor/host"
"d7y.io/dragonfly/v2/scheduler/supervisor/peer"
"d7y.io/dragonfly/v2/scheduler/supervisor/task"
Expand Down Expand Up @@ -65,31 +63,20 @@ type SchedulerService struct {
}

func NewSchedulerService(cfg *config.SchedulerConfig, dynConfig config.DynconfigInterface, openTel bool) (*SchedulerService, error) {
dynConfigData, err := dynConfig.Get()
if err != nil {
return nil, err
}
hostManager := host.NewManager()
peerManager := peer.NewManager(cfg.GC, hostManager)
var cdnManager supervisor.CDNMgr
if cfg.DisableCDN {
if cdnManager, err = source.NewManager(peerManager, hostManager); err != nil {
return nil, errors.Wrap(err, "new back source cdn manager")
}
} else {
var opts []grpc.DialOption
if openTel {
opts = append(opts, grpc.WithChainUnaryInterceptor(otelgrpc.UnaryClientInterceptor()), grpc.WithChainStreamInterceptor(otelgrpc.StreamClientInterceptor()))
}
cdnClient, err := cdn.NewRefreshableCDNClient(dynConfig, opts)
if err != nil {
return nil, errors.Wrap(err, "new refreshable cdn client")
}
if cdnManager, err = d7y.NewManager(cdnClient, peerManager, hostManager); err != nil {
return nil, errors.Wrap(err, "new cdn manager")
}
hostManager.OnNotify(dynConfigData)
dynConfig.Register(hostManager)

var opts []grpc.DialOption
if openTel {
opts = append(opts, grpc.WithChainUnaryInterceptor(otelgrpc.UnaryClientInterceptor()), grpc.WithChainStreamInterceptor(otelgrpc.StreamClientInterceptor()))
}
cdnClient, err := cdn.NewRefreshableCDNClient(dynConfig, opts)
if err != nil {
return nil, errors.Wrap(err, "new refreshable cdn client")
}
cdnManager, err := cdn.NewManager(cdnClient, peerManager, hostManager)
if err != nil {
return nil, errors.Wrap(err, "new cdn manager")
}
taskManager := task.NewManager(cfg.GC, peerManager)
sched, err := scheduler.Get(cfg.Scheduler).Build(cfg, &scheduler.BuildOptions{
Expand Down
31 changes: 0 additions & 31 deletions scheduler/supervisor/cdn/error.go

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
* limitations under the License.
*/

package d7y
package cdn

import (
"context"
Expand All @@ -31,26 +31,37 @@ import (
"d7y.io/dragonfly/v2/pkg/rpc/cdnsystem/client"
"d7y.io/dragonfly/v2/scheduler/config"
"d7y.io/dragonfly/v2/scheduler/supervisor"
"d7y.io/dragonfly/v2/scheduler/supervisor/cdn"
"github.com/pkg/errors"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/trace"
)

var (
ErrCDNRegisterFail = errors.New("cdn task register failed")

ErrCDNDownloadFail = errors.New("cdn task download failed")

ErrCDNUnknown = errors.New("cdn obtain seed encounter unknown err")

ErrCDNInvokeFail = errors.New("invoke cdn interface failed")

ErrInitCDNPeerFail = errors.New("init cdn peer failed")
)

var tracer trace.Tracer

func init() {
tracer = otel.Tracer("scheduler-cdn")
}

type manager struct {
client client.CdnClient
client RefreshableCDNClient
peerManager supervisor.PeerMgr
hostManager supervisor.HostMgr
lock sync.RWMutex
}

func NewManager(cdnClient client.CdnClient, peerManager supervisor.PeerMgr, hostManager supervisor.HostMgr) (supervisor.CDNMgr, error) {
func NewManager(cdnClient RefreshableCDNClient, peerManager supervisor.PeerMgr, hostManager supervisor.HostMgr) (supervisor.CDNMgr, error) {
mgr := &manager{
client: cdnClient,
peerManager: peerManager,
Expand All @@ -70,7 +81,7 @@ func (cm *manager) StartSeedTask(ctx context.Context, task *supervisor.Task) (*s
}
seedSpan.SetAttributes(config.AttributeCDNSeedRequest.String(seedRequest.String()))
if cm.client == nil {
err := cdn.ErrCDNRegisterFail
err := ErrCDNRegisterFail
seedSpan.RecordError(err)
seedSpan.SetAttributes(config.AttributePeerDownloadSuccess.Bool(false))
return nil, err
Expand All @@ -83,14 +94,14 @@ func (cm *manager) StartSeedTask(ctx context.Context, task *supervisor.Task) (*s
logger.Errorf("failed to obtain cdn seed: %v", cdnErr)
switch cdnErr.Code {
case dfcodes.CdnTaskRegistryFail:
return nil, errors.Wrap(cdn.ErrCDNRegisterFail, "obtain seeds")
return nil, errors.Wrap(ErrCDNRegisterFail, "obtain seeds")
case dfcodes.CdnTaskDownloadFail:
return nil, errors.Wrapf(cdn.ErrCDNDownloadFail, "obtain seeds")
return nil, errors.Wrapf(ErrCDNDownloadFail, "obtain seeds")
default:
return nil, errors.Wrapf(cdn.ErrCDNUnknown, "obtain seeds")
return nil, errors.Wrapf(ErrCDNUnknown, "obtain seeds")
}
}
return nil, errors.Wrapf(cdn.ErrCDNInvokeFail, "obtain seeds from cdn: %v", err)
return nil, errors.Wrapf(ErrCDNInvokeFail, "obtain seeds from cdn: %v", err)
}
return cm.receivePiece(ctx, task, stream)
}
Expand All @@ -115,14 +126,14 @@ func (cm *manager) receivePiece(ctx context.Context, task *supervisor.Task, stre
span.RecordError(recvErr)
switch recvErr.Code {
case dfcodes.CdnTaskRegistryFail:
return cdnPeer, errors.Wrapf(cdn.ErrCDNRegisterFail, "receive piece")
return cdnPeer, errors.Wrapf(ErrCDNRegisterFail, "receive piece")
case dfcodes.CdnTaskDownloadFail:
return cdnPeer, errors.Wrapf(cdn.ErrCDNDownloadFail, "receive piece")
return cdnPeer, errors.Wrapf(ErrCDNDownloadFail, "receive piece")
default:
return cdnPeer, errors.Wrapf(cdn.ErrCDNUnknown, "recive piece")
return cdnPeer, errors.Wrapf(ErrCDNUnknown, "recive piece")
}
}
return cdnPeer, errors.Wrapf(cdn.ErrCDNInvokeFail, "receive piece from cdn: %v", err)
return cdnPeer, errors.Wrapf(ErrCDNInvokeFail, "receive piece from cdn: %v", err)
}
if piece != nil {
span.AddEvent(config.EventPieceReceived, trace.WithAttributes(config.AttributePieceReceived.String(piece.String())))
Expand Down Expand Up @@ -170,10 +181,12 @@ func (cm *manager) initCdnPeer(ctx context.Context, task *supervisor.Task, ps *c
var cdnHost *supervisor.PeerHost
cdnPeer, ok := cm.peerManager.Get(ps.PeerId)
if !ok {
logger.Debugf("first seed cdn task for taskID %s", task.TaskID)
if cdnHost, ok = cm.hostManager.Get(ps.HostUuid); !ok {
logger.Errorf("cannot find host %s", ps.HostUuid)
return nil, errors.Wrapf(cdn.ErrInitCDNPeerFail, "cannot find host %s", ps.HostUuid)
if cdnHost, ok = cm.client.GetCDNHost(ps.HostUuid); !ok {
logger.Errorf("cannot find cdn host %s", ps.HostUuid)
return nil, errors.Wrapf(ErrInitCDNPeerFail, "cannot find host %s", ps.HostUuid)
}
cm.hostManager.Add(cdnHost)
}
cdnPeer = supervisor.NewPeer(ps.PeerId, task, cdnHost)
}
Expand Down
51 changes: 36 additions & 15 deletions scheduler/supervisor/cdn/reloadable_cdn_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,23 +22,31 @@ import (
"reflect"
"sync"

"d7y.io/dragonfly/v2/internal/idgen"
"d7y.io/dragonfly/v2/pkg/basic/dfnet"
"d7y.io/dragonfly/v2/pkg/rpc/base"
"d7y.io/dragonfly/v2/pkg/rpc/cdnsystem"
cdnclient "d7y.io/dragonfly/v2/pkg/rpc/cdnsystem/client"
"d7y.io/dragonfly/v2/scheduler/config"
"d7y.io/dragonfly/v2/scheduler/supervisor"
"google.golang.org/grpc"
)

type RefreshableCDNClient interface {
cdnclient.CdnClient
config.Observer
GetCDNHost(hostID string) (*supervisor.PeerHost, bool)
}

type refreshableCDNClient struct {
mu sync.RWMutex
cdnClient cdnclient.CdnClient
cdnAddrs []dfnet.NetAddr
cdnHosts map[string]*supervisor.PeerHost
cdnConfig []*config.CDN
}

func (rcc *refreshableCDNClient) UpdateState(addrs []dfnet.NetAddr) {
rcc.cdnClient.UpdateState(addrs)
}

func (rcc *refreshableCDNClient) ObtainSeeds(ctx context.Context, sr *cdnsystem.SeedRequest, opts ...grpc.CallOption) (*cdnclient.PieceSeedStream, error) {
Expand All @@ -49,8 +57,13 @@ func (rcc *refreshableCDNClient) GetPieceTasks(ctx context.Context, addr dfnet.N
return rcc.cdnClient.GetPieceTasks(ctx, addr, req, opts...)
}

func (rcc *refreshableCDNClient) UpdateState(addrs []dfnet.NetAddr) {
rcc.cdnClient.UpdateState(addrs)
func (rcc *refreshableCDNClient) GetCDNHost(hostID string) (*supervisor.PeerHost, bool) {
rcc.mu.RLock()
defer rcc.mu.RUnlock()
if cdnHost, ok := rcc.cdnHosts[hostID]; ok {
return cdnHost, true
}
return nil, false
}

func (rcc *refreshableCDNClient) Close() error {
Expand All @@ -62,43 +75,51 @@ func NewRefreshableCDNClient(dynConfig config.DynconfigInterface, opts []grpc.Di
if err != nil {
return nil, err
}
cdnAddrs := cdnHostsToNetAddrs(dynConfigData.CDNs)
cdnHosts, cdnAddrs := cdnHostsToNetAddrs(dynConfigData.CDNs)
cdnClient, err := cdnclient.GetClientByAddr(cdnAddrs, opts...)
if err != nil {
return nil, err
}
rcc := &refreshableCDNClient{
cdnClient: cdnClient,
cdnAddrs: cdnAddrs,
cdnHosts: cdnHosts,
cdnConfig: dynConfigData.CDNs,
}
dynConfig.Register(rcc)
return rcc, nil
}

func (rcc *refreshableCDNClient) OnNotify(c *config.DynconfigData) {
netAddrs := cdnHostsToNetAddrs(c.CDNs)
rcc.refresh(netAddrs)
rcc.refresh(c.CDNs)
}

func (rcc *refreshableCDNClient) refresh(netAddrs []dfnet.NetAddr) {
func (rcc *refreshableCDNClient) refresh(cdns []*config.CDN) {
rcc.mu.Lock()
defer rcc.mu.Unlock()

if reflect.DeepEqual(netAddrs, rcc.cdnAddrs) {
if reflect.DeepEqual(rcc.cdnConfig, cdns) {
return
}
cdnHosts, netAddrs := cdnHostsToNetAddrs(cdns)
rcc.cdnHosts = cdnHosts
// Sync CDNManager client netAddrs
rcc.cdnClient.UpdateState(netAddrs)
}

// cdnHostsToNetAddrs coverts manager.CdnHosts to []dfnet.NetAddr.
func cdnHostsToNetAddrs(hosts []*config.CDN) []dfnet.NetAddr {
var netAddrs []dfnet.NetAddr
for i := range hosts {
func cdnHostsToNetAddrs(hosts []*config.CDN) (map[string]*supervisor.PeerHost, []dfnet.NetAddr) {
cdnHostMap := make(map[string]*supervisor.PeerHost, len(hosts))
netAddrs := make([]dfnet.NetAddr, 0, len(hosts))
for _, host := range hosts {
hostID := idgen.CDNUUID(host.HostName, host.Port)
if host.LoadLimit == 0 {
host.LoadLimit = 100
}
cdnHostMap[hostID] = supervisor.NewCDNPeerHost(hostID, host.IP, host.HostName, host.Port, host.DownloadPort, host.SecurityGroup, host.Location,
host.IDC, host.NetTopology, host.LoadLimit)
netAddrs = append(netAddrs, dfnet.NetAddr{
Type: dfnet.TCP,
Addr: fmt.Sprintf("%s:%d", hosts[i].IP, hosts[i].Port),
Addr: fmt.Sprintf("%s:%d", host.IP, host.Port),
})
}
return netAddrs
return cdnHostMap, netAddrs
}
70 changes: 0 additions & 70 deletions scheduler/supervisor/cdn/source/manager.go

This file was deleted.

0 comments on commit fb54969

Please sign in to comment.