diff --git a/server/controller/genesis/common/utils.go b/server/controller/genesis/common/utils.go index 341de88ebdc..758d67eb684 100644 --- a/server/controller/genesis/common/utils.go +++ b/server/controller/genesis/common/utils.go @@ -34,15 +34,15 @@ import ( "time" simplejson "github.com/bitly/go-simplejson" - "github.com/op/go-logging" "gopkg.in/yaml.v3" "inet.af/netaddr" "github.com/deepflowio/deepflow/server/controller/common" "github.com/deepflowio/deepflow/server/controller/db/mysql" + "github.com/deepflowio/deepflow/server/controller/logger" ) -var log = logging.MustGetLogger("genesis.common") +var log = logger.MustGetLogger("genesis.common") type TeamInfo struct { OrgID int diff --git a/server/controller/genesis/genesis.go b/server/controller/genesis/genesis.go index 87544316b45..f64ee9c78cf 100644 --- a/server/controller/genesis/genesis.go +++ b/server/controller/genesis/genesis.go @@ -27,7 +27,6 @@ import ( "sync/atomic" "time" - "github.com/op/go-logging" "google.golang.org/grpc" api "github.com/deepflowio/deepflow/message/controller" @@ -37,12 +36,13 @@ import ( "github.com/deepflowio/deepflow/server/controller/db/mysql" genesiscommon "github.com/deepflowio/deepflow/server/controller/genesis/common" gconfig "github.com/deepflowio/deepflow/server/controller/genesis/config" + "github.com/deepflowio/deepflow/server/controller/logger" "github.com/deepflowio/deepflow/server/controller/model" "github.com/deepflowio/deepflow/server/controller/statsd" "github.com/deepflowio/deepflow/server/libs/queue" ) -var log = logging.MustGetLogger("genesis") +var log = logger.MustGetLogger("genesis") var GenesisService *Genesis var Synchronizer *SynchronizerServer @@ -203,7 +203,7 @@ func (g *Genesis) GetGenesisSyncResponse(orgID int) (GenesisSyncDataResponse, er conn, err := grpc.Dial(grpcServer, grpc.WithInsecure(), grpc.WithMaxMsgSize(g.grpcMaxMSGLength)) if err != nil { msg := "create grpc connection faild:" + err.Error() - log.Error(msg) + log.Error(msg, logger.NewORGPrefix(orgID)) return retGenesisSyncData, errors.New(msg) } defer conn.Close() @@ -216,7 +216,7 @@ func (g *Genesis) GetGenesisSyncResponse(orgID int) (GenesisSyncDataResponse, er ret, err := client.GenesisSharingSync(context.Background(), req) if err != nil { msg := fmt.Sprintf("get genesis sharing sync faild (%s)", err.Error()) - log.Warning(msg) + log.Warning(msg, logger.NewORGPrefix(orgID)) return retGenesisSyncData, errors.New(msg) } @@ -468,12 +468,12 @@ func (g *Genesis) getServerIPs(orgID int) ([]string, error) { nodeIP := os.Getenv(common.NODE_IP_KEY) err = db.Find(&azControllerConns).Error if err != nil { - log.Warningf("query az_controller_connection failed (%s)", err.Error()) + log.Warningf("query az_controller_connection failed (%s)", err.Error(), logger.NewORGPrefix(orgID)) return []string{}, err } err = db.Where("ip <> ? AND state <> ?", nodeIP, common.CONTROLLER_STATE_EXCEPTION).Find(&controllers).Error if err != nil { - log.Warningf("query controller failed (%s)", err.Error()) + log.Warningf("query controller failed (%s)", err.Error(), logger.NewORGPrefix(orgID)) return []string{}, err } @@ -513,12 +513,12 @@ func (g *Genesis) receiveKubernetesData(kChan chan KubernetesInfo) { func (g *Genesis) GetKubernetesData(orgID int, clusterID string) (KubernetesInfo, bool) { k8sDataInterface, ok := g.kubernetesData.Load(fmt.Sprintf("%d-%s", orgID, clusterID)) if !ok { - log.Warningf("kubernetes data not found org_id (%d) cluster id (%s)", orgID, clusterID) + log.Warningf("kubernetes data not found org_id (%d) cluster id (%s)", orgID, clusterID, logger.NewORGPrefix(orgID)) return KubernetesInfo{}, false } k8sData, ok := k8sDataInterface.(KubernetesInfo) if !ok { - log.Error("kubernetes data interface assert failed") + log.Error("kubernetes data interface assert failed", logger.NewORGPrefix(orgID)) return KubernetesInfo{}, false } return k8sData, true @@ -539,7 +539,7 @@ func (g *Genesis) GetKubernetesResponse(orgID int, clusterID string) (map[string conn, err := grpc.Dial(grpcServer, grpc.WithInsecure(), grpc.WithMaxMsgSize(g.grpcMaxMSGLength)) if err != nil { msg := "create grpc connection faild:" + err.Error() - log.Error(msg) + log.Error(msg, logger.NewORGPrefix(orgID)) return k8sResp, errors.New(msg) } defer conn.Close() @@ -553,18 +553,18 @@ func (g *Genesis) GetKubernetesResponse(orgID int, clusterID string) (map[string ret, err := client.GenesisSharingK8S(context.Background(), req) if err != nil { msg := fmt.Sprintf("get (%s) genesis sharing k8s failed (%s) ", serverIP, err.Error()) - log.Error(msg) + log.Error(msg, logger.NewORGPrefix(orgID), logger.NewORGPrefix(orgID)) return k8sResp, errors.New(msg) } entries := ret.GetEntries() if len(entries) == 0 { - log.Debugf("genesis sharing k8s node (%s) entries length is 0", serverIP) + log.Debugf("genesis sharing k8s node (%s) entries length is 0", serverIP, logger.NewORGPrefix(orgID)) continue } epochStr := ret.GetEpoch() epoch, err := time.ParseInLocation(common.GO_BIRTHDAY, epochStr, time.Local) if err != nil { - log.Error("genesis api sharing k8s format timestr faild:" + err.Error()) + log.Error("genesis api sharing k8s format timestr faild:"+err.Error(), logger.NewORGPrefix(orgID)) return k8sResp, err } if !epoch.After(k8sInfo.Epoch) { @@ -582,7 +582,7 @@ func (g *Genesis) GetKubernetesResponse(orgID int, clusterID string) (map[string return k8sResp, errors.New("no vtap report cluster id:" + clusterID) } if k8sInfo.ErrorMSG != "" { - log.Errorf("cluster id (%s) k8s info grpc Error: %s", clusterID, k8sInfo.ErrorMSG) + log.Errorf("cluster id (%s) k8s info grpc Error: %s", clusterID, k8sInfo.ErrorMSG, logger.NewORGPrefix(orgID)) return k8sResp, errors.New(k8sInfo.ErrorMSG) } if len(k8sInfo.Entries) == 0 { @@ -600,7 +600,7 @@ func (g *Genesis) GetKubernetesResponse(orgID int, clusterID string) (map[string eType := e.GetType() out, err := genesiscommon.ParseCompressedInfo(e.GetCompressedInfo()) if err != nil { - log.Warningf("decode decompress error: %s", err.Error()) + log.Warningf("decode decompress error: %s", err.Error(), logger.NewORGPrefix(orgID)) return map[string][]string{}, err } k8sResp[eType] = append(k8sResp[eType], string(out.Bytes())) diff --git a/server/controller/genesis/grpc_server.go b/server/controller/genesis/grpc_server.go index 3020af7deda..54df9cdcafd 100644 --- a/server/controller/genesis/grpc_server.go +++ b/server/controller/genesis/grpc_server.go @@ -33,6 +33,7 @@ import ( mysqlcommon "github.com/deepflowio/deepflow/server/controller/db/mysql/common" "github.com/deepflowio/deepflow/server/controller/genesis/common" "github.com/deepflowio/deepflow/server/controller/genesis/config" + "github.com/deepflowio/deepflow/server/controller/logger" "github.com/deepflowio/deepflow/server/libs/queue" ) @@ -161,7 +162,7 @@ func (g *SynchronizerServer) GenesisSync(ctx context.Context, request *trident.G var localVersion uint64 = 0 if vtapID == 0 { - log.Infof("genesis sync received message with org_id %d vtap_id 0 from %s", orgID, remote) + log.Infof("genesis sync received message with vtap_id 0 from %s", remote, logger.NewORGPrefix(orgID)) } else { now := time.Now() if lTime, ok := g.vtapToLastSeen.Load(vtap); ok { @@ -185,7 +186,7 @@ func (g *SynchronizerServer) GenesisSync(ctx context.Context, request *trident.G platformData := request.GetPlatformData() if version == localVersion || platformData == nil { - log.Debugf("genesis sync renew version %v from ip %s org_id %d vtap_id %v", version, remote, orgID, vtapID) + log.Debugf("genesis sync renew version %v from ip %s vtap_id %v", version, remote, vtapID, logger.NewORGPrefix(orgID)) g.genesisSyncQueue.Put( VIFRPCMessage{ peer: remote, @@ -199,7 +200,7 @@ func (g *SynchronizerServer) GenesisSync(ctx context.Context, request *trident.G return &trident.GenesisSyncResponse{Version: &localVersion}, nil } - log.Infof("genesis sync received version %v -> %v from ip %s org_id %d vtap_id %v", localVersion, version, remote, orgID, vtapID) + log.Infof("genesis sync received version %v -> %v from ip %s vtap_id %v", localVersion, version, remote, vtapID, logger.NewORGPrefix(orgID)) g.genesisSyncQueue.Put( VIFRPCMessage{ peer: remote, @@ -320,7 +321,7 @@ func (g *SynchronizerServer) KubernetesAPISync(ctx context.Context, request *tri if ok { localVersion = lVersion.(uint64) } - log.Infof("kubernetes api sync received version %v -> %v from ip %s org_id %d vtap_id %v len %v", localVersion, version, remote, orgID, vtapID, len(entries)) + log.Infof("kubernetes api sync received version %v -> %v from ip %s vtap_id %v len %v", localVersion, version, remote, vtapID, len(entries), logger.NewORGPrefix(orgID)) // 如果version有更新,但消息中没有任何kubernetes数据,触发trident重新上报数据 if localVersion != version && len(entries) == 0 { @@ -341,7 +342,7 @@ func (g *SynchronizerServer) KubernetesAPISync(ctx context.Context, request *tri g.clusterIDToVersion.Store(clusterID, version) return &trident.KubernetesAPISyncResponse{Version: &version}, nil } else { - log.Infof("kubernetes api sync received version %v from ip %s org_id %d no vtap_id", version, remote, orgID) + log.Infof("kubernetes api sync received version %v from ip %s no vtap_id", version, remote, logger.NewORGPrefix(orgID)) //正常上报数据,才推送消息到队列中 if len(entries) > 0 { g.k8sQueue.Put(K8SRPCMessage{ diff --git a/server/controller/genesis/store.go b/server/controller/genesis/store.go index 190d3f9257e..ac3a156ef15 100644 --- a/server/controller/genesis/store.go +++ b/server/controller/genesis/store.go @@ -33,6 +33,7 @@ import ( mcommon "github.com/deepflowio/deepflow/server/controller/db/mysql/common" gcommon "github.com/deepflowio/deepflow/server/controller/genesis/common" "github.com/deepflowio/deepflow/server/controller/genesis/config" + "github.com/deepflowio/deepflow/server/controller/logger" "github.com/deepflowio/deepflow/server/controller/model" ) @@ -278,9 +279,9 @@ func (s *SyncStorage) refreshDatabase() { if len(invalidStorages) > 0 { err := db.Delete(&invalidStorages).Error if err != nil { - log.Errorf("node (%s) clean org (%d) genesis storage invalid data failed: %s", nodeIP, orgID, err) + log.Errorf("node (%s) clean genesis storage invalid data failed: %s", nodeIP, err, logger.NewORGPrefix(orgID)) } else { - log.Infof("node (%s) clean org (%d) genesis storage invalid data success", nodeIP, orgID) + log.Infof("node (%s) clean genesis storage invalid data success", nodeIP, logger.NewORGPrefix(orgID)) } } } @@ -385,7 +386,7 @@ func (k *KubernetesStorage) Add(orgID int, newInfo KubernetesInfo) { if !unTriggerFlag { err := k.triggerCloudRrefresh(orgID, newInfo.ClusterID, newInfo.Version) if err != nil { - log.Warning(fmt.Sprintf("trigger cloud kubernetes refresh failed: (%s)", err.Error())) + log.Warning(fmt.Sprintf("trigger cloud kubernetes refresh failed: (%s)", err.Error()), logger.NewORGPrefix(orgID)) } } } @@ -453,7 +454,7 @@ func (k *KubernetesStorage) triggerCloudRrefresh(orgID int, clusterID string, ve "version": strconv.Itoa(int(version)), } - log.Debugf("trigger cloud (%s) org (%d) kubernetes (%s) refresh version (%d)", requestUrl, orgID, clusterID, version) + log.Debugf("trigger cloud (%s) kubernetes (%s) refresh version (%d)", requestUrl, clusterID, version, logger.NewORGPrefix(orgID)) return gcommon.RequestGet(requestUrl, 30, queryStrings) } diff --git a/server/controller/genesis/updater.go b/server/controller/genesis/updater.go index 5e20ce002d2..477be536fcc 100644 --- a/server/controller/genesis/updater.go +++ b/server/controller/genesis/updater.go @@ -35,6 +35,7 @@ import ( "github.com/deepflowio/deepflow/server/controller/common" genesiscommon "github.com/deepflowio/deepflow/server/controller/genesis/common" "github.com/deepflowio/deepflow/server/controller/genesis/config" + "github.com/deepflowio/deepflow/server/controller/logger" "github.com/deepflowio/deepflow/server/controller/model" "github.com/deepflowio/deepflow/server/libs/queue" ) @@ -157,7 +158,7 @@ func (v *GenesisSyncRpcUpdater) ParseVinterfaceInfo(info VIFRPCMessage, peer str VIFs := []model.GenesisVinterface{} ipAddrs := info.message.GetPlatformData().GetRawIpAddrs() if len(ipAddrs) == 0 { - log.Errorf("get sync data (raw ip addrs) empty") + log.Errorf("get sync data (raw ip addrs) empty", logger.NewORGPrefix(info.orgID)) return []model.GenesisVinterface{} } netNSs := info.message.GetPlatformData().GetRawIpNetns() @@ -166,7 +167,7 @@ func (v *GenesisSyncRpcUpdater) ParseVinterfaceInfo(info VIFRPCMessage, peer str ipAddrs = ipAddrs[:1] } if len(ipAddrs) != len(netNSs) { - log.Error("the quantities of (raw ip addrs) and (raw ip netns) do not match") + log.Error("the quantities of (raw ip addrs) and (raw ip netns) do not match", logger.NewORGPrefix(info.orgID)) return []model.GenesisVinterface{} } rootNSMacs := map[string]bool{} @@ -176,7 +177,7 @@ func (v *GenesisSyncRpcUpdater) ParseVinterfaceInfo(info VIFRPCMessage, peer str nsName := netNSs[i] parsedGlobalIPs, err := genesiscommon.ParseIPOutput(strings.Trim(ipAddr, " ")) if err != nil { - log.Errorf("parse ip output error: (%s)", err) + log.Errorf("parse ip output error: (%s)", err, logger.NewORGPrefix(info.orgID)) return []model.GenesisVinterface{} } @@ -256,14 +257,14 @@ func (v *GenesisSyncRpcUpdater) ParseVinterfaceInfo(info VIFRPCMessage, peer str if hasNetMask { ipPrefix, err := netaddr.ParseIPPrefix(addr) if err != nil { - log.Error(err.Error()) + log.Error(err.Error(), logger.NewORGPrefix(info.orgID)) continue } netIP = ipPrefix.IP() } else { ipAddr, err := netaddr.ParseIP(addr) if err != nil { - log.Error(err.Error()) + log.Error(err.Error(), logger.NewORGPrefix(info.orgID)) continue } netIP = ipAddr @@ -329,13 +330,13 @@ func (v *GenesisSyncRpcUpdater) ParseVIP(info VIFRPCMessage, vtapID uint32) []mo ipAddrs := info.message.GetPlatformData().GetRawIpAddrs() if len(ipAddrs) == 0 { - log.Errorf("get sync data (raw ip addrs) empty") + log.Errorf("get sync data (raw ip addrs) empty", logger.NewORGPrefix(info.orgID)) return []model.GenesisVIP{} } for _, ipAddr := range ipAddrs { parsedGlobalIPs, err := genesiscommon.ParseIPOutput(strings.Trim(ipAddr, " ")) if err != nil { - log.Errorf("parse ip output error: (%s)", err) + log.Errorf("parse ip output error: (%s)", err, logger.NewORGPrefix(info.orgID)) return []model.GenesisVIP{} } @@ -346,7 +347,7 @@ func (v *GenesisSyncRpcUpdater) ParseVIP(info VIFRPCMessage, vtapID uint32) []mo for _, ip := range item.IPs { ipObj, err := netaddr.ParseIP(ip.Address) if err != nil { - log.Warningf("parse lo vip (%s) field: (%s)", ip.Address, err) + log.Warningf("parse lo vip (%s) field: (%s)", ip.Address, err, logger.NewORGPrefix(info.orgID)) continue } if ipObj.IsLoopback() { @@ -366,22 +367,22 @@ func (v *GenesisSyncRpcUpdater) ParseVIP(info VIFRPCMessage, vtapID uint32) []mo func (v *GenesisSyncRpcUpdater) ParseHostAsVmPlatformInfo(info VIFRPCMessage, peer, natIP string, vtapID uint32) GenesisSyncDataOperation { hostName := strings.Trim(info.message.GetPlatformData().GetRawHostname(), " \n") if hostName == "" { - log.Error("get sync data (raw hostname) empty") + log.Error("get sync data (raw hostname) empty", logger.NewORGPrefix(info.orgID)) return GenesisSyncDataOperation{} } ipAddrs := info.message.GetPlatformData().GetRawIpAddrs() if len(ipAddrs) == 0 { - log.Error("get sync data (raw ip addrs) empty") + log.Error("get sync data (raw ip addrs) empty", logger.NewORGPrefix(info.orgID)) return GenesisSyncDataOperation{} } interfaces, err := genesiscommon.ParseIPOutput(strings.Trim(ipAddrs[0], " ")) if err != nil { - log.Error(err.Error()) + log.Error(err.Error(), logger.NewORGPrefix(info.orgID)) return GenesisSyncDataOperation{} } // check if vm is behind NAT behindNat := peer != natIP - log.Infof("host (%s) nat ip is (%s) peer ip is (%s), behind nat: (%t), single vpc mode: (%t)", hostName, natIP, peer, behindNat, v.singleVPCMode) + log.Infof("host (%s) nat ip is (%s) peer ip is (%s), behind nat: (%t), single vpc mode: (%t)", hostName, natIP, peer, behindNat, v.singleVPCMode, logger.NewORGPrefix(info.orgID)) vpc := model.GenesisVpc{ Name: "default-public-cloud-vpc", Lcuuid: common.GetUUIDByOrgID(info.orgID, "default-public-cloud-vpc"), @@ -413,7 +414,7 @@ func (v *GenesisSyncRpcUpdater) ParseHostAsVmPlatformInfo(info VIFRPCMessage, pe ipLastSeens := []model.GenesisIP{} for _, iface := range interfaces { if iface.MAC == "" || iface.Name == "lo" { - log.Debugf("not found mac or netcard is loopback (%#v)", iface) + log.Debugf("not found mac or netcard is loopback (%#v)", iface, logger.NewORGPrefix(info.orgID)) continue } ips := iface.IPs @@ -433,7 +434,7 @@ func (v *GenesisSyncRpcUpdater) ParseHostAsVmPlatformInfo(info VIFRPCMessage, pe for _, ipItem := range ips { pIP, err := netaddr.ParseIP(ipItem.Address) if err != nil { - log.Error(err.Error()) + log.Error(err.Error(), logger.NewORGPrefix(info.orgID)) return GenesisSyncDataOperation{} } @@ -487,7 +488,7 @@ func (v *GenesisSyncRpcUpdater) ParseHostAsVmPlatformInfo(info VIFRPCMessage, pe } oIP, err := netaddr.ParseIP(p.Address) if err != nil { - log.Warning(err.Error()) + log.Warning(err.Error(), logger.NewORGPrefix(info.orgID)) continue } ipLastSeen := model.GenesisIP{ @@ -585,12 +586,12 @@ func (v *GenesisSyncRpcUpdater) ParseKVMPlatformInfo(info VIFRPCMessage, peer st pCSVkeys := []string{"_uuid", "other_config", "tag"} nameToOvsPort, err := genesiscommon.ParseCSVWithKey(rawOVSPorts, "name", pCSVkeys...) if err != nil { - log.Warning("parse csv with key failed: " + err.Error()) + log.Warning("parse csv with key failed: "+err.Error(), logger.NewORGPrefix(info.orgID)) } pCSVs := []string{"name", "external_ids"} nameToOvsIfs, err := genesiscommon.ParseCSV(rawOVSInterface, pCSVs...) if err != nil { - log.Warning("parse csv failed: " + err.Error()) + log.Warning("parse csv failed: "+err.Error(), logger.NewORGPrefix(info.orgID)) } for _, nameToOvsIf := range nameToOvsIfs { name, ok := nameToOvsIf["name"] @@ -599,11 +600,11 @@ func (v *GenesisSyncRpcUpdater) ParseKVMPlatformInfo(info VIFRPCMessage, peer st } eIDs, err := genesiscommon.ParseKVString(nameToOvsIf["external_ids"]) if err != nil { - log.Warning("parse kvstring failed: " + err.Error()) + log.Warning("parse kvstring failed: "+err.Error(), logger.NewORGPrefix(info.orgID)) } mac, ok := eIDs["attached-mac"] if !ok { - log.Debugf("ovs interface %s does not have external_ids:attached-mac", name) + log.Debugf("ovs interface %s does not have external_ids:attached-mac", name, logger.NewORGPrefix(info.orgID)) continue } if ovsPort, ok := nameToOvsPort[name]; ok { @@ -617,7 +618,7 @@ func (v *GenesisSyncRpcUpdater) ParseKVMPlatformInfo(info VIFRPCMessage, peer st } vlanConfig, err := genesiscommon.ParseVLANConfig(rawVlanConfig) if err != nil { - log.Warning("parse vlan config failed: " + err.Error()) + log.Warning("parse vlan config failed: "+err.Error(), logger.NewORGPrefix(info.orgID)) } for br, ifaces := range bridges { vlan := 1 @@ -645,7 +646,7 @@ func (v *GenesisSyncRpcUpdater) ParseKVMPlatformInfo(info VIFRPCMessage, peer st ipObj := net.IP(tIP.GetIp()) nIPObj, ok := netaddr.FromStdIP(ipObj) if !ok { - log.Warningf("ip (%s) invalid", ipObj.String()) + log.Warningf("ip (%s) invalid", ipObj.String(), logger.NewORGPrefix(info.orgID)) continue } ip.IP = ipObj.String() @@ -667,11 +668,11 @@ func (v *GenesisSyncRpcUpdater) ParseKVMPlatformInfo(info VIFRPCMessage, peer st ips := []model.GenesisIP{} vmStates, err := genesiscommon.ParseVMStates(rawVMStates) if err != nil { - log.Warning("parse vm states failed: " + err.Error()) + log.Warning("parse vm states failed: "+err.Error(), logger.NewORGPrefix(info.orgID)) } xmlVMs, err := genesiscommon.ParseVMXml(rawVM, v.vmNameField) if err != nil { - log.Warning("parse vm xml failed: " + err.Error()) + log.Warning("parse vm xml failed: "+err.Error(), logger.NewORGPrefix(info.orgID)) } for _, xmlVM := range xmlVMs { vm := model.GenesisVM{} @@ -710,13 +711,13 @@ func (v *GenesisSyncRpcUpdater) ParseKVMPlatformInfo(info VIFRPCMessage, peer st } else if oP, ok := nameToOvsPort[ifName]; ok { portMap = oP } else { - log.Debugf("vm %s interface %s mac %s not found in ovs ports", vm.Label, ifName, mac) + log.Debugf("vm %s interface %s mac %s not found in ovs ports", vm.Label, ifName, mac, logger.NewORGPrefix(info.orgID)) continue } port.Lcuuid = portMap["_uuid"] options, err := genesiscommon.ParseKVString(portMap["other_config"]) if err != nil { - log.Warning("parse kv string failed: " + err.Error()) + log.Warning("parse kv string failed: "+err.Error(), logger.NewORGPrefix(info.orgID)) } if nLcuuid, ok := options["net_uuid"]; ok { network.Lcuuid = nLcuuid @@ -880,11 +881,11 @@ func (v *GenesisSyncRpcUpdater) run() { genesisSyncDataOper := GenesisSyncDataOperation{} info := v.outputQueue.Get().(VIFRPCMessage) if info.msgType == genesiscommon.TYPE_EXIT { - log.Warningf("sync from (%s) vtap_id (%v) type (%v)", info.peer, info.vtapID, info.msgType) + log.Warningf("sync from (%s) vtap_id (%v) type (%v)", info.peer, info.vtapID, info.msgType, logger.NewORGPrefix(info.orgID)) continue } - log.Debugf("sync received (%s) vtap_id (%v) type (%v) received (%s)", info.peer, info.vtapID, info.msgType, info.message) + log.Debugf("sync received (%s) vtap_id (%v) type (%v) received (%s)", info.peer, info.vtapID, info.msgType, info.message, logger.NewORGPrefix(info.orgID)) vtap := fmt.Sprintf("%d%d", info.orgID, info.vtapID) if info.msgType == genesiscommon.TYPE_RENEW { @@ -945,10 +946,10 @@ func (k *KubernetesRpcUpdater) run() { for { info := k.outputQueue.Get().(K8SRPCMessage) if info.msgType == genesiscommon.TYPE_EXIT { - log.Warningf("k8s from (%s) vtap_id (%v) type (%v) exit", info.peer, info.vtapID, info.msgType) + log.Warningf("k8s from (%s) vtap_id (%v) type (%v) exit", info.peer, info.vtapID, info.msgType, logger.NewORGPrefix(info.orgID)) break } - log.Debugf("k8s from %s vtap_id %v received cluster_id %s version %v", info.peer, info.vtapID, info.message.GetClusterId(), info.message.GetVersion()) + log.Debugf("k8s from %s vtap_id %v received cluster_id %s version %v", info.peer, info.vtapID, info.message.GetClusterId(), info.message.GetVersion(), logger.NewORGPrefix(info.orgID)) // 更新和保存内存数据 k.storage.Add(info.orgID, KubernetesInfo{ ORGID: info.orgID, diff --git a/server/controller/manager/manager.go b/server/controller/manager/manager.go index 5f0b30ba631..6180ab49f6a 100644 --- a/server/controller/manager/manager.go +++ b/server/controller/manager/manager.go @@ -24,20 +24,20 @@ import ( "time" mapset "github.com/deckarep/golang-set" - logging "github.com/op/go-logging" cloudcfg "github.com/deepflowio/deepflow/server/controller/cloud/config" gathermodel "github.com/deepflowio/deepflow/server/controller/cloud/kubernetes_gather/model" "github.com/deepflowio/deepflow/server/controller/cloud/model" "github.com/deepflowio/deepflow/server/controller/common" "github.com/deepflowio/deepflow/server/controller/db/mysql" + "github.com/deepflowio/deepflow/server/controller/logger" "github.com/deepflowio/deepflow/server/controller/manager/config" "github.com/deepflowio/deepflow/server/controller/recorder" recordercfg "github.com/deepflowio/deepflow/server/controller/recorder/config" "github.com/deepflowio/deepflow/server/libs/queue" ) -var log = logging.MustGetLogger("manager") +var log = logger.MustGetLogger("manager") type Manager struct { cfg config.ManagerConfig @@ -187,10 +187,10 @@ func (m *Manager) run(ctx context.Context) { } var domains []mysql.Domain - var oldDomains = mapset.NewSet() - var newDomains = mapset.NewSet() - var delDomains = mapset.NewSet() - var addDomains = mapset.NewSet() + var oldDomainLcuuids = mapset.NewSet() + var newDomainLcuuids = mapset.NewSet() + var delDomainLcuuids = mapset.NewSet() + var addDomainLcuuids = mapset.NewSet() var intersectDomains = mapset.NewSet() for lcuuid, task := range m.taskMap { @@ -203,7 +203,7 @@ func (m *Manager) run(ctx context.Context) { if task.Cloud.GetOrgID() != orgID { continue } - oldDomains.Add(lcuuid) + oldDomainLcuuids.Add(lcuuid) } db.Where( "enabled = ? AND controller_ip = ?", common.DOMAIN_ENABLED_TRUE, controller.IP, @@ -211,76 +211,76 @@ func (m *Manager) run(ctx context.Context) { lcuuidToDomain := make(map[string]mysql.Domain) for _, domain := range domains { lcuuidToDomain[domain.Lcuuid] = domain - newDomains.Add(domain.Lcuuid) + newDomainLcuuids.Add(domain.Lcuuid) } // 对于删除的domain,停止Task,并移除管理 - delDomains = oldDomains.Difference(newDomains) - for _, domain := range delDomains.ToSlice() { - lcuuid := domain.(string) - m.taskMap[lcuuid].Stop() + delDomainLcuuids = oldDomainLcuuids.Difference(newDomainLcuuids) + for _, lcuuid := range delDomainLcuuids.ToSlice() { + deletedLcuuid := lcuuid.(string) + m.taskMap[deletedLcuuid].Stop() m.mutex.Lock() - delete(m.taskMap, lcuuid) + delete(m.taskMap, deletedLcuuid) m.mutex.Unlock() } // 对于新增的domain,启动Task,并纳入Manager管理 - addDomains = newDomains.Difference(oldDomains) - for _, domain := range addDomains.ToSlice() { - lcuuid := domain.(string) - task := NewTask(orgID, lcuuidToDomain[lcuuid], m.cfg.TaskCfg, ctx, m.resourceEventQueue) + addDomainLcuuids = newDomainLcuuids.Difference(oldDomainLcuuids) + for _, lcuuid := range addDomainLcuuids.ToSlice() { + addedLcuuid := lcuuid.(string) + domain := lcuuidToDomain[addedLcuuid] + task := NewTask(orgID, domain, m.cfg.TaskCfg, ctx, m.resourceEventQueue) if task == nil || task.Cloud == nil { - log.Errorf("org (%d) domain (%s) init failed", orgID, lcuuidToDomain[lcuuid].Name) + log.Errorf("domain (%s) init failed", domain.Name, logger.NewORGPrefix(orgID)) continue } m.mutex.Lock() - m.taskMap[lcuuid] = task - m.taskMap[lcuuid].Start() + m.taskMap[addedLcuuid] = task + m.taskMap[addedLcuuid].Start() m.mutex.Unlock() } // 检查已有domain是否存在配置/名称修改 // 如果存在配置修改,则停止已有Task,并移除管理;同时启动新的Task,并纳入Manager管理 // 如果没有配置修改,判断是否存在名称修改更新Task信息 - intersectDomains = newDomains.Intersect(oldDomains) - for _, domain := range intersectDomains.ToSlice() { - lcuuid := domain.(string) - oldDomainConfig := m.taskMap[lcuuid].DomainConfig - newDomainConfig := lcuuidToDomain[lcuuid].Config - if oldDomainConfig != newDomainConfig { - log.Infof("org (%d) oldDomainConfig: %s", orgID, oldDomainConfig) - log.Infof("org (%d) newDomainConfig: %s", orgID, newDomainConfig) - m.taskMap[lcuuid].Stop() - task := NewTask(orgID, lcuuidToDomain[lcuuid], m.cfg.TaskCfg, ctx, m.resourceEventQueue) + intersectDomains = newDomainLcuuids.Intersect(oldDomainLcuuids) + for _, lcuuid := range intersectDomains.ToSlice() { + domainLcuuid := lcuuid.(string) + newDomain := lcuuidToDomain[domainLcuuid] + oldDomainConfig := m.taskMap[domainLcuuid].DomainConfig + if oldDomainConfig != newDomain.Config { + log.Infof("domain (%s) oldDomainConfig: %s", newDomain.Name, oldDomainConfig, logger.NewORGPrefix(orgID)) + log.Infof("domain (%s) newDomainConfig: %s", newDomain.Name, newDomain.Config, logger.NewORGPrefix(orgID)) + m.taskMap[domainLcuuid].Stop() + task := NewTask(orgID, newDomain, m.cfg.TaskCfg, ctx, m.resourceEventQueue) if task == nil || task.Cloud == nil { - log.Errorf("org (%d) domain (%s) init failed", orgID, lcuuidToDomain[lcuuid].Name) + log.Errorf("domain (%s) init failed", newDomain.Name, logger.NewORGPrefix(orgID)) continue } m.mutex.Lock() - delete(m.taskMap, lcuuid) - m.taskMap[lcuuid] = task - m.taskMap[lcuuid].Start() + delete(m.taskMap, domainLcuuid) + m.taskMap[domainLcuuid] = task + m.taskMap[domainLcuuid].Start() m.mutex.Unlock() } else { - oldDomainName := m.taskMap[lcuuid].DomainName - newDomainName := lcuuidToDomain[lcuuid].Name - if oldDomainName != newDomainName { - if m.taskMap[lcuuid].Cloud.GetBasicInfo().Type == common.KUBERNETES { - m.taskMap[lcuuid].Stop() - task := NewTask(orgID, lcuuidToDomain[lcuuid], m.cfg.TaskCfg, ctx, m.resourceEventQueue) + oldDomainName := m.taskMap[domainLcuuid].DomainName + if oldDomainName != newDomain.Name { + if m.taskMap[domainLcuuid].Cloud.GetBasicInfo().Type == common.KUBERNETES { + m.taskMap[domainLcuuid].Stop() + task := NewTask(orgID, newDomain, m.cfg.TaskCfg, ctx, m.resourceEventQueue) if task == nil || task.Cloud == nil { - log.Errorf("org (%d) domain (%s) init failed", orgID, lcuuidToDomain[lcuuid].Name) + log.Errorf("domain (%s) init failed", orgID, newDomain.Name, logger.NewORGPrefix(orgID)) continue } m.mutex.Lock() - delete(m.taskMap, lcuuid) - m.taskMap[lcuuid] = task - m.taskMap[lcuuid].Start() + delete(m.taskMap, domainLcuuid) + m.taskMap[domainLcuuid] = task + m.taskMap[domainLcuuid].Start() m.mutex.Unlock() } else { - m.taskMap[lcuuid].UpdateDomainName(newDomainName) + m.taskMap[domainLcuuid].UpdateDomainName(newDomain.Name) } } }