Skip to content

Commit

Permalink
feat: register service to manager (#475)
Browse files Browse the repository at this point in the history
* feat: register service to manager

Signed-off-by: Gaius <gaius.qi@gmail.com>
  • Loading branch information
gaius-qi committed Jul 20, 2021
1 parent 9af0bab commit cffe414
Show file tree
Hide file tree
Showing 11 changed files with 309 additions and 1,096 deletions.
24 changes: 5 additions & 19 deletions cdnsystem/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -164,9 +164,7 @@ func (s *Server) register(ctx context.Context) error {
location := s.config.Host.Location
downloadPort := int32(s.config.DownloadPort)

var cdn *manager.CDN
var err error
cdn, err = s.managerClient.CreateCDN(ctx, &manager.CreateCDNRequest{
cdn, err := s.managerClient.UpdateCDN(ctx, &manager.UpdateCDNRequest{
SourceType: manager.SourceType_CDN_SOURCE,
HostName: iputils.HostName,
Ip: ip,
Expand All @@ -176,30 +174,18 @@ func (s *Server) register(ctx context.Context) error {
DownloadPort: downloadPort,
})
if err != nil {
cdn, err = s.managerClient.UpdateCDN(ctx, &manager.UpdateCDNRequest{
SourceType: manager.SourceType_CDN_SOURCE,
HostName: iputils.HostName,
Ip: ip,
Port: port,
Idc: idc,
Location: location,
DownloadPort: downloadPort,
})
if err != nil {
logger.Errorf("update cdn to manager failed %v", err)
return err
}
logger.Infof("update cdn %s successfully", cdn.HostName)
logger.Errorf("update cdn %s to manager failed %v", cdn.HostName, err)
return err
}
logger.Infof("create cdn %s successfully", cdn.HostName)
logger.Infof("update cdn %s to manager successfully", cdn.HostName)

cdnClusterID := s.config.Manager.CDNClusterID
if cdnClusterID != 0 {
if _, err := s.managerClient.AddCDNToCDNCluster(ctx, &manager.AddCDNToCDNClusterRequest{
CdnId: cdn.Id,
CdnClusterId: cdnClusterID,
}); err != nil {
logger.Warnf("add cdn to cdn cluster failed %v", err)
logger.Warnf("add cdn %s to cdn cluster %s failed %v", cdn.HostName, cdnClusterID, err)
return err
}
logger.Infof("add cdn %s to cdn cluster %s successfully", cdn.HostName, cdnClusterID)
Expand Down
57 changes: 36 additions & 21 deletions manager/service/service_grpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package service

import (
"context"
"errors"
"io"

logger "d7y.io/dragonfly/v2/internal/dflog"
Expand Down Expand Up @@ -101,7 +102,7 @@ func (s *GRPC) GetCDN(ctx context.Context, req *manager.GetCDNRequest) (*manager
return &pbCDN, nil
}

func (s *GRPC) CreateCDN(ctx context.Context, req *manager.CreateCDNRequest) (*manager.CDN, error) {
func (s *GRPC) createCDN(ctx context.Context, req *manager.UpdateCDNRequest) (*manager.CDN, error) {
if err := req.Validate(); err != nil {
return nil, status.Error(codes.InvalidArgument, err.Error())
}
Expand All @@ -116,7 +117,7 @@ func (s *GRPC) CreateCDN(ctx context.Context, req *manager.CreateCDNRequest) (*m
}

if err := s.db.Create(&cdn).Error; err != nil {
return nil, err
return nil, status.Error(codes.Unknown, err.Error())
}

return &manager.CDN{
Expand All @@ -136,14 +137,21 @@ func (s *GRPC) UpdateCDN(ctx context.Context, req *manager.UpdateCDNRequest) (*m
}

cdn := model.CDN{}
if err := s.db.First(&cdn, model.CDN{HostName: req.HostName}).Updates(model.CDN{
if err := s.db.First(&cdn, model.CDN{HostName: req.HostName}).Error; err != nil {
if errors.Is(err, gorm.ErrRecordNotFound) {
return s.createCDN(ctx, req)
}
return nil, status.Error(codes.Unknown, err.Error())
}

if err := s.db.Model(&cdn).Updates(model.CDN{
IDC: req.Idc,
Location: req.Location,
IP: req.Ip,
Port: req.Port,
DownloadPort: req.DownloadPort,
}).Error; err != nil {
return nil, err
return nil, status.Error(codes.Unknown, err.Error())
}

if err := s.cache.Delete(
Expand Down Expand Up @@ -171,16 +179,16 @@ func (s *GRPC) AddCDNToCDNCluster(ctx context.Context, req *manager.AddCDNToCDNC

cdnCluster := model.CDNCluster{}
if err := s.db.First(&cdnCluster, req.CdnClusterId).Error; err != nil {
return nil, err
return nil, status.Error(codes.Unknown, err.Error())
}

cdn := model.CDN{}
if err := s.db.First(&cdn, req.CdnId).Error; err != nil {
return nil, err
return nil, status.Error(codes.Unknown, err.Error())
}

if err := s.db.Model(&cdnCluster).Association("CDNs").Append(&cdn); err != nil {
return nil, err
return nil, status.Error(codes.Unknown, err.Error())
}

if err := s.cache.Delete(
Expand Down Expand Up @@ -288,7 +296,7 @@ func (s *GRPC) GetScheduler(ctx context.Context, req *manager.GetSchedulerReques
return &pbScheduler, nil
}

func (s *GRPC) CreateScheduler(ctx context.Context, req *manager.CreateSchedulerRequest) (*manager.Scheduler, error) {
func (s *GRPC) createScheduler(ctx context.Context, req *manager.UpdateSchedulerRequest) (*manager.Scheduler, error) {
if err := req.Validate(); err != nil {
return nil, status.Error(codes.InvalidArgument, err.Error())
}
Expand All @@ -311,7 +319,7 @@ func (s *GRPC) CreateScheduler(ctx context.Context, req *manager.CreateScheduler
}

if err := s.db.Create(&scheduler).Error; err != nil {
return nil, err
return nil, status.Error(codes.Unknown, err.Error())
}

return &manager.Scheduler{
Expand All @@ -332,23 +340,30 @@ func (s *GRPC) UpdateScheduler(ctx context.Context, req *manager.UpdateScheduler
return nil, status.Error(codes.InvalidArgument, err.Error())
}

scheduler := model.Scheduler{}
if err := s.db.First(&scheduler, model.Scheduler{HostName: req.HostName}).Error; err != nil {
if errors.Is(err, gorm.ErrRecordNotFound) {
return s.createScheduler(ctx, req)
}
return nil, status.Error(codes.Unknown, err.Error())
}

var netConfig datatypes.JSONMap
if len(req.NetConfig) > 0 {
if err := netConfig.UnmarshalJSON(req.NetConfig); err != nil {
return nil, status.Error(codes.InvalidArgument, err.Error())
}
}

scheduler := model.Scheduler{}
if err := s.db.First(&scheduler, model.Scheduler{HostName: req.HostName}).Updates(model.Scheduler{
if err := s.db.Model(&scheduler).Updates(model.Scheduler{
VIPs: req.Vips,
IDC: req.Idc,
Location: req.Location,
NetConfig: netConfig,
IP: req.Ip,
Port: req.Port,
}).Error; err != nil {
return nil, err
return nil, status.Error(codes.Unknown, err.Error())
}

if err := s.cache.Delete(
Expand Down Expand Up @@ -378,16 +393,16 @@ func (s *GRPC) AddSchedulerClusterToSchedulerCluster(ctx context.Context, req *m

schedulerCluster := model.SchedulerCluster{}
if err := s.db.First(&schedulerCluster, req.SchedulerClusterId).Error; err != nil {
return nil, err
return nil, status.Error(codes.Unknown, err.Error())
}

scheduler := model.Scheduler{}
if err := s.db.First(&scheduler, req.SchedulerId).Error; err != nil {
return nil, err
return nil, status.Error(codes.Unknown, err.Error())
}

if err := s.db.Model(&schedulerCluster).Association("Schedulers").Append(&scheduler); err != nil {
return nil, err
return nil, status.Error(codes.Unknown, err.Error())
}

if err := s.cache.Delete(
Expand Down Expand Up @@ -474,7 +489,7 @@ func (s *GRPC) KeepAlive(m manager.Manager_KeepAliveServer) error {
req, err := m.Recv()
if err != nil {
logger.Errorf("keepalive failed for the first time: %v", err)
return err
return status.Error(codes.Unknown, err.Error())
}
if err := req.Validate(); err != nil {
return status.Error(codes.InvalidArgument, err.Error())
Expand All @@ -492,7 +507,7 @@ func (s *GRPC) KeepAlive(m manager.Manager_KeepAliveServer) error {
}).Updates(model.Scheduler{
Status: model.SchedulerStatusActive,
}).Error; err != nil {
return err
return status.Error(codes.Unknown, err.Error())
}

if err := s.cache.Delete(
Expand All @@ -511,7 +526,7 @@ func (s *GRPC) KeepAlive(m manager.Manager_KeepAliveServer) error {
}).Updates(model.CDN{
Status: model.CDNStatusActive,
}).Error; err != nil {
return err
return status.Error(codes.Unknown, err.Error())
}

if err := s.cache.Delete(
Expand All @@ -533,7 +548,7 @@ func (s *GRPC) KeepAlive(m manager.Manager_KeepAliveServer) error {
}).Updates(model.Scheduler{
Status: model.SchedulerStatusInactive,
}).Error; err != nil {
return err
return status.Error(codes.Unknown, err.Error())
}

if err := s.cache.Delete(
Expand All @@ -552,7 +567,7 @@ func (s *GRPC) KeepAlive(m manager.Manager_KeepAliveServer) error {
}).Updates(model.CDN{
Status: model.CDNStatusInactive,
}).Error; err != nil {
return err
return status.Error(codes.Unknown, err.Error())
}

if err := s.cache.Delete(
Expand All @@ -568,7 +583,7 @@ func (s *GRPC) KeepAlive(m manager.Manager_KeepAliveServer) error {
return nil
}
logger.Errorf("%s keepalive failed: %v", hostName, err)
return err
return status.Error(codes.Unknown, err.Error())
}

logger.Debugf("%s type of %s send keepalive request", sourceType, hostName)
Expand Down
10 changes: 7 additions & 3 deletions pkg/rpc/cdnsystem/cdnsystem_grpc.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

10 changes: 7 additions & 3 deletions pkg/rpc/dfdaemon/dfdaemon_grpc.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

0 comments on commit cffe414

Please sign in to comment.