Skip to content

Commit

Permalink
feat: update log format
Browse files Browse the repository at this point in the history
  • Loading branch information
askyrie committed Jul 26, 2024
1 parent 91a012a commit e4beee1
Showing 1 changed file with 45 additions and 45 deletions.
90 changes: 45 additions & 45 deletions server/controller/manager/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,20 +24,20 @@ import (
"time"

mapset "github.com/deckarep/golang-set"
logging "github.com/op/go-logging"

cloudcfg "github.com/deepflowio/deepflow/server/controller/cloud/config"
gathermodel "github.com/deepflowio/deepflow/server/controller/cloud/kubernetes_gather/model"
"github.com/deepflowio/deepflow/server/controller/cloud/model"
"github.com/deepflowio/deepflow/server/controller/common"
"github.com/deepflowio/deepflow/server/controller/db/mysql"
"github.com/deepflowio/deepflow/server/controller/logger"
"github.com/deepflowio/deepflow/server/controller/manager/config"
"github.com/deepflowio/deepflow/server/controller/recorder"
recordercfg "github.com/deepflowio/deepflow/server/controller/recorder/config"
"github.com/deepflowio/deepflow/server/libs/queue"
)

var log = logging.MustGetLogger("manager")
var log = logger.MustGetLogger("manager")

type Manager struct {
cfg config.ManagerConfig
Expand Down Expand Up @@ -187,10 +187,10 @@ func (m *Manager) run(ctx context.Context) {
}

var domains []mysql.Domain
var oldDomains = mapset.NewSet()
var newDomains = mapset.NewSet()
var delDomains = mapset.NewSet()
var addDomains = mapset.NewSet()
var oldDomainLcuuids = mapset.NewSet()
var newDomainLcuuids = mapset.NewSet()
var delDomainLcuuids = mapset.NewSet()
var addDomainLcuuids = mapset.NewSet()
var intersectDomains = mapset.NewSet()

for lcuuid, task := range m.taskMap {
Expand All @@ -203,84 +203,84 @@ func (m *Manager) run(ctx context.Context) {
if task.Cloud.GetOrgID() != orgID {
continue
}
oldDomains.Add(lcuuid)
oldDomainLcuuids.Add(lcuuid)
}
db.Where(
"enabled = ? AND controller_ip = ?", common.DOMAIN_ENABLED_TRUE, controller.IP,
).Find(&domains)
lcuuidToDomain := make(map[string]mysql.Domain)
for _, domain := range domains {
lcuuidToDomain[domain.Lcuuid] = domain
newDomains.Add(domain.Lcuuid)
newDomainLcuuids.Add(domain.Lcuuid)
}

// 对于删除的domain,停止Task,并移除管理
delDomains = oldDomains.Difference(newDomains)
for _, domain := range delDomains.ToSlice() {
lcuuid := domain.(string)
m.taskMap[lcuuid].Stop()
delDomainLcuuids = oldDomainLcuuids.Difference(newDomainLcuuids)
for _, lcuuid := range delDomainLcuuids.ToSlice() {
deletedLcuuid := lcuuid.(string)
m.taskMap[deletedLcuuid].Stop()
m.mutex.Lock()
delete(m.taskMap, lcuuid)
delete(m.taskMap, deletedLcuuid)
m.mutex.Unlock()
}

// 对于新增的domain,启动Task,并纳入Manager管理
addDomains = newDomains.Difference(oldDomains)
for _, domain := range addDomains.ToSlice() {
lcuuid := domain.(string)
task := NewTask(orgID, lcuuidToDomain[lcuuid], m.cfg.TaskCfg, ctx, m.resourceEventQueue)
addDomainLcuuids = newDomainLcuuids.Difference(oldDomainLcuuids)
for _, lcuuid := range addDomainLcuuids.ToSlice() {
addedLcuuid := lcuuid.(string)
domain := lcuuidToDomain[addedLcuuid]
task := NewTask(orgID, domain, m.cfg.TaskCfg, ctx, m.resourceEventQueue)
if task == nil || task.Cloud == nil {
log.Errorf("org (%d) domain (%s) init failed", orgID, lcuuidToDomain[lcuuid].Name)
log.Errorf("domain (%s) init failed", domain.Name, logger.NewORGPrefix(orgID))
continue
}
m.mutex.Lock()
m.taskMap[lcuuid] = task
m.taskMap[lcuuid].Start()
m.taskMap[addedLcuuid] = task
m.taskMap[addedLcuuid].Start()
m.mutex.Unlock()
}

// 检查已有domain是否存在配置/名称修改
// 如果存在配置修改,则停止已有Task,并移除管理;同时启动新的Task,并纳入Manager管理
// 如果没有配置修改,判断是否存在名称修改更新Task信息
intersectDomains = newDomains.Intersect(oldDomains)
for _, domain := range intersectDomains.ToSlice() {
lcuuid := domain.(string)
oldDomainConfig := m.taskMap[lcuuid].DomainConfig
newDomainConfig := lcuuidToDomain[lcuuid].Config
if oldDomainConfig != newDomainConfig {
log.Infof("org (%d) oldDomainConfig: %s", orgID, oldDomainConfig)
log.Infof("org (%d) newDomainConfig: %s", orgID, newDomainConfig)
m.taskMap[lcuuid].Stop()
task := NewTask(orgID, lcuuidToDomain[lcuuid], m.cfg.TaskCfg, ctx, m.resourceEventQueue)
intersectDomains = newDomainLcuuids.Intersect(oldDomainLcuuids)
for _, lcuuid := range intersectDomains.ToSlice() {
domainLcuuid := lcuuid.(string)
newDomain := lcuuidToDomain[domainLcuuid]
oldDomainConfig := m.taskMap[domainLcuuid].DomainConfig
if oldDomainConfig != newDomain.Config {
log.Infof("domain (%s) oldDomainConfig: %s", newDomain.Name, oldDomainConfig, logger.NewORGPrefix(orgID))
log.Infof("domain (%s) newDomainConfig: %s", newDomain.Name, newDomain.Config, logger.NewORGPrefix(orgID))
m.taskMap[domainLcuuid].Stop()
task := NewTask(orgID, newDomain, m.cfg.TaskCfg, ctx, m.resourceEventQueue)
if task == nil || task.Cloud == nil {
log.Errorf("org (%d) domain (%s) init failed", orgID, lcuuidToDomain[lcuuid].Name)
log.Errorf("domain (%s) init failed", newDomain.Name, logger.NewORGPrefix(orgID))
continue
}

m.mutex.Lock()
delete(m.taskMap, lcuuid)
m.taskMap[lcuuid] = task
m.taskMap[lcuuid].Start()
delete(m.taskMap, domainLcuuid)
m.taskMap[domainLcuuid] = task
m.taskMap[domainLcuuid].Start()
m.mutex.Unlock()
} else {
oldDomainName := m.taskMap[lcuuid].DomainName
newDomainName := lcuuidToDomain[lcuuid].Name
if oldDomainName != newDomainName {
if m.taskMap[lcuuid].Cloud.GetBasicInfo().Type == common.KUBERNETES {
m.taskMap[lcuuid].Stop()
task := NewTask(orgID, lcuuidToDomain[lcuuid], m.cfg.TaskCfg, ctx, m.resourceEventQueue)
oldDomainName := m.taskMap[domainLcuuid].DomainName
if oldDomainName != newDomain.Name {
if m.taskMap[domainLcuuid].Cloud.GetBasicInfo().Type == common.KUBERNETES {
m.taskMap[domainLcuuid].Stop()
task := NewTask(orgID, newDomain, m.cfg.TaskCfg, ctx, m.resourceEventQueue)
if task == nil || task.Cloud == nil {
log.Errorf("org (%d) domain (%s) init failed", orgID, lcuuidToDomain[lcuuid].Name)
log.Errorf("domain (%s) init failed", orgID, newDomain.Name, logger.NewORGPrefix(orgID))
continue
}

m.mutex.Lock()
delete(m.taskMap, lcuuid)
m.taskMap[lcuuid] = task
m.taskMap[lcuuid].Start()
delete(m.taskMap, domainLcuuid)
m.taskMap[domainLcuuid] = task
m.taskMap[domainLcuuid].Start()
m.mutex.Unlock()
} else {
m.taskMap[lcuuid].UpdateDomainName(newDomainName)
m.taskMap[domainLcuuid].UpdateDomainName(newDomain.Name)
}
}
}
Expand Down

0 comments on commit e4beee1

Please sign in to comment.