Skip to content

Commit

Permalink
feat: update log format
Browse files Browse the repository at this point in the history
  • Loading branch information
askyrie committed Jul 26, 2024
1 parent bf37330 commit 4a08c31
Show file tree
Hide file tree
Showing 6 changed files with 102 additions and 99 deletions.
4 changes: 2 additions & 2 deletions server/controller/genesis/common/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,15 +34,15 @@ import (
"time"

simplejson "github.com/bitly/go-simplejson"
"github.com/op/go-logging"
"gopkg.in/yaml.v3"
"inet.af/netaddr"

"github.com/deepflowio/deepflow/server/controller/common"
"github.com/deepflowio/deepflow/server/controller/db/mysql"
"github.com/deepflowio/deepflow/server/controller/logger"
)

var log = logging.MustGetLogger("genesis.common")
var log = logger.MustGetLogger("genesis.common")

type TeamInfo struct {
OrgID int
Expand Down
28 changes: 14 additions & 14 deletions server/controller/genesis/genesis.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@ import (
"sync/atomic"
"time"

"github.com/op/go-logging"
"google.golang.org/grpc"

api "github.com/deepflowio/deepflow/message/controller"
Expand All @@ -37,12 +36,13 @@ import (
"github.com/deepflowio/deepflow/server/controller/db/mysql"
genesiscommon "github.com/deepflowio/deepflow/server/controller/genesis/common"
gconfig "github.com/deepflowio/deepflow/server/controller/genesis/config"
"github.com/deepflowio/deepflow/server/controller/logger"
"github.com/deepflowio/deepflow/server/controller/model"
"github.com/deepflowio/deepflow/server/controller/statsd"
"github.com/deepflowio/deepflow/server/libs/queue"
)

var log = logging.MustGetLogger("genesis")
var log = logger.MustGetLogger("genesis")
var GenesisService *Genesis
var Synchronizer *SynchronizerServer

Expand Down Expand Up @@ -203,7 +203,7 @@ func (g *Genesis) GetGenesisSyncResponse(orgID int) (GenesisSyncDataResponse, er
conn, err := grpc.Dial(grpcServer, grpc.WithInsecure(), grpc.WithMaxMsgSize(g.grpcMaxMSGLength))
if err != nil {
msg := "create grpc connection faild:" + err.Error()
log.Error(msg)
log.Error(msg, logger.NewORGPrefix(orgID))
return retGenesisSyncData, errors.New(msg)
}
defer conn.Close()
Expand All @@ -216,7 +216,7 @@ func (g *Genesis) GetGenesisSyncResponse(orgID int) (GenesisSyncDataResponse, er
ret, err := client.GenesisSharingSync(context.Background(), req)
if err != nil {
msg := fmt.Sprintf("get genesis sharing sync faild (%s)", err.Error())
log.Warning(msg)
log.Warning(msg, logger.NewORGPrefix(orgID))
return retGenesisSyncData, errors.New(msg)
}

Expand Down Expand Up @@ -468,12 +468,12 @@ func (g *Genesis) getServerIPs(orgID int) ([]string, error) {
nodeIP := os.Getenv(common.NODE_IP_KEY)
err = db.Find(&azControllerConns).Error
if err != nil {
log.Warningf("query az_controller_connection failed (%s)", err.Error())
log.Warningf("query az_controller_connection failed (%s)", err.Error(), logger.NewORGPrefix(orgID))
return []string{}, err
}
err = db.Where("ip <> ? AND state <> ?", nodeIP, common.CONTROLLER_STATE_EXCEPTION).Find(&controllers).Error
if err != nil {
log.Warningf("query controller failed (%s)", err.Error())
log.Warningf("query controller failed (%s)", err.Error(), logger.NewORGPrefix(orgID))
return []string{}, err
}

Expand Down Expand Up @@ -513,12 +513,12 @@ func (g *Genesis) receiveKubernetesData(kChan chan KubernetesInfo) {
func (g *Genesis) GetKubernetesData(orgID int, clusterID string) (KubernetesInfo, bool) {
k8sDataInterface, ok := g.kubernetesData.Load(fmt.Sprintf("%d-%s", orgID, clusterID))
if !ok {
log.Warningf("kubernetes data not found org_id (%d) cluster id (%s)", orgID, clusterID)
log.Warningf("kubernetes data not found org_id (%d) cluster id (%s)", orgID, clusterID, logger.NewORGPrefix(orgID))
return KubernetesInfo{}, false
}
k8sData, ok := k8sDataInterface.(KubernetesInfo)
if !ok {
log.Error("kubernetes data interface assert failed")
log.Error("kubernetes data interface assert failed", logger.NewORGPrefix(orgID))
return KubernetesInfo{}, false
}
return k8sData, true
Expand All @@ -539,7 +539,7 @@ func (g *Genesis) GetKubernetesResponse(orgID int, clusterID string) (map[string
conn, err := grpc.Dial(grpcServer, grpc.WithInsecure(), grpc.WithMaxMsgSize(g.grpcMaxMSGLength))
if err != nil {
msg := "create grpc connection faild:" + err.Error()
log.Error(msg)
log.Error(msg, logger.NewORGPrefix(orgID))
return k8sResp, errors.New(msg)
}
defer conn.Close()
Expand All @@ -553,18 +553,18 @@ func (g *Genesis) GetKubernetesResponse(orgID int, clusterID string) (map[string
ret, err := client.GenesisSharingK8S(context.Background(), req)
if err != nil {
msg := fmt.Sprintf("get (%s) genesis sharing k8s failed (%s) ", serverIP, err.Error())
log.Error(msg)
log.Error(msg, logger.NewORGPrefix(orgID), logger.NewORGPrefix(orgID))
return k8sResp, errors.New(msg)
}
entries := ret.GetEntries()
if len(entries) == 0 {
log.Debugf("genesis sharing k8s node (%s) entries length is 0", serverIP)
log.Debugf("genesis sharing k8s node (%s) entries length is 0", serverIP, logger.NewORGPrefix(orgID))
continue
}
epochStr := ret.GetEpoch()
epoch, err := time.ParseInLocation(common.GO_BIRTHDAY, epochStr, time.Local)
if err != nil {
log.Error("genesis api sharing k8s format timestr faild:" + err.Error())
log.Error("genesis api sharing k8s format timestr faild:"+err.Error(), logger.NewORGPrefix(orgID))
return k8sResp, err
}
if !epoch.After(k8sInfo.Epoch) {
Expand All @@ -582,7 +582,7 @@ func (g *Genesis) GetKubernetesResponse(orgID int, clusterID string) (map[string
return k8sResp, errors.New("no vtap report cluster id:" + clusterID)
}
if k8sInfo.ErrorMSG != "" {
log.Errorf("cluster id (%s) k8s info grpc Error: %s", clusterID, k8sInfo.ErrorMSG)
log.Errorf("cluster id (%s) k8s info grpc Error: %s", clusterID, k8sInfo.ErrorMSG, logger.NewORGPrefix(orgID))
return k8sResp, errors.New(k8sInfo.ErrorMSG)
}
if len(k8sInfo.Entries) == 0 {
Expand All @@ -600,7 +600,7 @@ func (g *Genesis) GetKubernetesResponse(orgID int, clusterID string) (map[string
eType := e.GetType()
out, err := genesiscommon.ParseCompressedInfo(e.GetCompressedInfo())
if err != nil {
log.Warningf("decode decompress error: %s", err.Error())
log.Warningf("decode decompress error: %s", err.Error(), logger.NewORGPrefix(orgID))
return map[string][]string{}, err
}
k8sResp[eType] = append(k8sResp[eType], string(out.Bytes()))
Expand Down
11 changes: 6 additions & 5 deletions server/controller/genesis/grpc_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ import (
mysqlcommon "github.com/deepflowio/deepflow/server/controller/db/mysql/common"
"github.com/deepflowio/deepflow/server/controller/genesis/common"
"github.com/deepflowio/deepflow/server/controller/genesis/config"
"github.com/deepflowio/deepflow/server/controller/logger"
"github.com/deepflowio/deepflow/server/libs/queue"
)

Expand Down Expand Up @@ -161,7 +162,7 @@ func (g *SynchronizerServer) GenesisSync(ctx context.Context, request *trident.G

var localVersion uint64 = 0
if vtapID == 0 {
log.Infof("genesis sync received message with org_id %d vtap_id 0 from %s", orgID, remote)
log.Infof("genesis sync received message with vtap_id 0 from %s", remote, logger.NewORGPrefix(orgID))
} else {
now := time.Now()
if lTime, ok := g.vtapToLastSeen.Load(vtap); ok {
Expand All @@ -185,7 +186,7 @@ func (g *SynchronizerServer) GenesisSync(ctx context.Context, request *trident.G

platformData := request.GetPlatformData()
if version == localVersion || platformData == nil {
log.Debugf("genesis sync renew version %v from ip %s org_id %d vtap_id %v", version, remote, orgID, vtapID)
log.Debugf("genesis sync renew version %v from ip %s vtap_id %v", version, remote, vtapID, logger.NewORGPrefix(orgID))
g.genesisSyncQueue.Put(
VIFRPCMessage{
peer: remote,
Expand All @@ -199,7 +200,7 @@ func (g *SynchronizerServer) GenesisSync(ctx context.Context, request *trident.G
return &trident.GenesisSyncResponse{Version: &localVersion}, nil
}

log.Infof("genesis sync received version %v -> %v from ip %s org_id %d vtap_id %v", localVersion, version, remote, orgID, vtapID)
log.Infof("genesis sync received version %v -> %v from ip %s vtap_id %v", localVersion, version, remote, vtapID, logger.NewORGPrefix(orgID))
g.genesisSyncQueue.Put(
VIFRPCMessage{
peer: remote,
Expand Down Expand Up @@ -320,7 +321,7 @@ func (g *SynchronizerServer) KubernetesAPISync(ctx context.Context, request *tri
if ok {
localVersion = lVersion.(uint64)
}
log.Infof("kubernetes api sync received version %v -> %v from ip %s org_id %d vtap_id %v len %v", localVersion, version, remote, orgID, vtapID, len(entries))
log.Infof("kubernetes api sync received version %v -> %v from ip %s vtap_id %v len %v", localVersion, version, remote, vtapID, len(entries), logger.NewORGPrefix(orgID))

// 如果version有更新,但消息中没有任何kubernetes数据,触发trident重新上报数据
if localVersion != version && len(entries) == 0 {
Expand All @@ -341,7 +342,7 @@ func (g *SynchronizerServer) KubernetesAPISync(ctx context.Context, request *tri
g.clusterIDToVersion.Store(clusterID, version)
return &trident.KubernetesAPISyncResponse{Version: &version}, nil
} else {
log.Infof("kubernetes api sync received version %v from ip %s org_id %d no vtap_id", version, remote, orgID)
log.Infof("kubernetes api sync received version %v from ip %s no vtap_id", version, remote, logger.NewORGPrefix(orgID))
//正常上报数据,才推送消息到队列中
if len(entries) > 0 {
g.k8sQueue.Put(K8SRPCMessage{
Expand Down
9 changes: 5 additions & 4 deletions server/controller/genesis/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ import (
mcommon "github.com/deepflowio/deepflow/server/controller/db/mysql/common"
gcommon "github.com/deepflowio/deepflow/server/controller/genesis/common"
"github.com/deepflowio/deepflow/server/controller/genesis/config"
"github.com/deepflowio/deepflow/server/controller/logger"
"github.com/deepflowio/deepflow/server/controller/model"
)

Expand Down Expand Up @@ -278,9 +279,9 @@ func (s *SyncStorage) refreshDatabase() {
if len(invalidStorages) > 0 {
err := db.Delete(&invalidStorages).Error
if err != nil {
log.Errorf("node (%s) clean org (%d) genesis storage invalid data failed: %s", nodeIP, orgID, err)
log.Errorf("node (%s) clean genesis storage invalid data failed: %s", nodeIP, err, logger.NewORGPrefix(orgID))
} else {
log.Infof("node (%s) clean org (%d) genesis storage invalid data success", nodeIP, orgID)
log.Infof("node (%s) clean genesis storage invalid data success", nodeIP, logger.NewORGPrefix(orgID))
}
}
}
Expand Down Expand Up @@ -385,7 +386,7 @@ func (k *KubernetesStorage) Add(orgID int, newInfo KubernetesInfo) {
if !unTriggerFlag {
err := k.triggerCloudRrefresh(orgID, newInfo.ClusterID, newInfo.Version)
if err != nil {
log.Warning(fmt.Sprintf("trigger cloud kubernetes refresh failed: (%s)", err.Error()))
log.Warning(fmt.Sprintf("trigger cloud kubernetes refresh failed: (%s)", err.Error()), logger.NewORGPrefix(orgID))
}
}
}
Expand Down Expand Up @@ -453,7 +454,7 @@ func (k *KubernetesStorage) triggerCloudRrefresh(orgID int, clusterID string, ve
"version": strconv.Itoa(int(version)),
}

log.Debugf("trigger cloud (%s) org (%d) kubernetes (%s) refresh version (%d)", requestUrl, orgID, clusterID, version)
log.Debugf("trigger cloud (%s) kubernetes (%s) refresh version (%d)", requestUrl, clusterID, version, logger.NewORGPrefix(orgID))

return gcommon.RequestGet(requestUrl, 30, queryStrings)
}
Expand Down
Loading

0 comments on commit 4a08c31

Please sign in to comment.