Skip to content

Commit

Permalink
[tagrecorder] Subscribe ch_os_app_tag
Browse files Browse the repository at this point in the history
  • Loading branch information
xiaochaoren1 committed Feb 26, 2024
1 parent fbeeda5 commit cac7171
Show file tree
Hide file tree
Showing 4 changed files with 109 additions and 94 deletions.
113 changes: 70 additions & 43 deletions server/controller/tagrecorder/ch_os_app_tag.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,65 +17,92 @@
package tagrecorder

import (
"strings"

"github.com/deepflowio/deepflow/server/controller/common"
"github.com/deepflowio/deepflow/server/controller/db/mysql"
"github.com/deepflowio/deepflow/server/controller/db/mysql/query"
"github.com/deepflowio/deepflow/server/controller/recorder/pubsub/message"
)

type ChOSAppTag struct {
UpdaterComponent[mysql.ChOSAppTag, OSAPPTagKey]
SubscriberComponent[*message.ProcessFieldsUpdate, message.ProcessFieldsUpdate, mysql.Process, mysql.ChOSAppTag, OSAPPTagKey]
}

func NewChOSAppTag() *ChOSAppTag {
updater := &ChOSAppTag{
newUpdaterComponent[mysql.ChOSAppTag, OSAPPTagKey](
RESOURCE_TYPE_CH_OS_APP_TAG,
mng := &ChOSAppTag{
newSubscriberComponent[*message.ProcessFieldsUpdate, message.ProcessFieldsUpdate, mysql.Process, mysql.ChOSAppTag, OSAPPTagKey](
common.RESOURCE_TYPE_PROCESS_EN, RESOURCE_TYPE_CH_OS_APP_TAG,
),
}
updater.updaterDG = updater
return updater
mng.subscriberDG = mng
return mng
}

func (o *ChOSAppTag) generateNewData() (map[OSAPPTagKey]mysql.ChOSAppTag, bool) {
processes, err := query.FindInBatches[mysql.Process](mysql.Db.Unscoped())
if err != nil {
log.Errorf(dbQueryResourceFailed(o.resourceTypeName, err))
return nil, false
}

keyToItem := make(map[OSAPPTagKey]mysql.ChOSAppTag)
for _, process := range processes {
splitTags := strings.Split(process.OSAPPTags, ", ")
for _, singleTag := range splitTags {
splitSingleTag := strings.Split(singleTag, ":")
if len(splitSingleTag) == 2 {
key := OSAPPTagKey{
PID: process.ID,
Key: strings.Trim(splitSingleTag[0], " "),
}
keyToItem[key] = mysql.ChOSAppTag{
PID: process.ID,
Key: strings.Trim(splitSingleTag[0], " "),
Value: strings.Trim(splitSingleTag[1], " "),
// onResourceUpdated implements SubscriberDataGenerator
func (c *ChOSAppTag) onResourceUpdated(sourceID int, fieldsUpdate *message.ProcessFieldsUpdate) {
keysToAdd := make([]OSAPPTagKey, 0)
targetsToAdd := make([]mysql.ChOSAppTag, 0)
keysToDelete := make([]OSAPPTagKey, 0)
targetsToDelete := make([]mysql.ChOSAppTag, 0)
var chItem mysql.ChOSAppTag
var updateKey OSAPPTagKey
updateInfo := make(map[string]interface{})
if fieldsUpdate.OSAPPTags.IsDifferent() {
new := fieldsUpdate.OSAPPTags.GetNew()
old := fieldsUpdate.OSAPPTags.GetOld()
for k, v := range new {
oldV, ok := old[k]
if !ok {
keysToAdd = append(keysToAdd, OSAPPTagKey{PID: sourceID, Key: k})
targetsToAdd = append(targetsToAdd, mysql.ChOSAppTag{
PID: sourceID,
Key: k,
Value: v,
})
} else {
if oldV != v {
updateKey = OSAPPTagKey{PID: sourceID, Key: k}
updateInfo[k] = v
mysql.Db.Where("pid = ? and `key` = ?", sourceID, k).First(&chItem) // TODO common
if chItem.PID == 0 {
keysToAdd = append(keysToAdd, OSAPPTagKey{PID: sourceID, Key: k})
targetsToAdd = append(targetsToAdd, mysql.ChOSAppTag{
PID: sourceID,
Key: k,
Value: v,
})
}
}
}
}
for k := range old {
if _, ok := new[k]; !ok {
keysToDelete = append(keysToDelete, OSAPPTagKey{PID: sourceID, Key: k})
targetsToDelete = append(targetsToDelete, mysql.ChOSAppTag{
PID: sourceID,
Key: k,
})
}
}
}
return keyToItem, true
}

func (o *ChOSAppTag) generateKey(dbItem mysql.ChOSAppTag) OSAPPTagKey {
return OSAPPTagKey{PID: dbItem.PID, Key: dbItem.Key}
}

func (o *ChOSAppTag) generateUpdateInfo(oldItem, newItem mysql.ChOSAppTag) (map[string]interface{}, bool) {
updateInfo := make(map[string]interface{})
if oldItem.Value != newItem.Value {
updateInfo["value"] = newItem.Value
if len(keysToAdd) > 0 {
c.SubscriberComponent.dbOperator.add(keysToAdd, targetsToAdd)
}
if len(keysToDelete) > 0 {
c.SubscriberComponent.dbOperator.delete(keysToDelete, targetsToDelete)
}
if len(updateInfo) > 0 {
return updateInfo, true
c.SubscriberComponent.dbOperator.update(chItem, updateInfo, updateKey)
}
}

// onResourceUpdated implements SubscriberDataGenerator
func (c *ChOSAppTag) sourceToTarget(source *mysql.Process) (keys []OSAPPTagKey, targets []mysql.ChOSAppTag) {
for k, v := range source.OSAPPTags {
keys = append(keys, OSAPPTagKey{PID: source.ID, Key: k})
targets = append(targets, mysql.ChOSAppTag{
PID: source.ID,
Key: k,
Value: v,
})
}
return nil, false
return
}
86 changes: 37 additions & 49 deletions server/controller/tagrecorder/ch_os_app_tags.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,72 +18,60 @@ package tagrecorder

import (
"encoding/json"
"strings"

"github.com/deepflowio/deepflow/server/controller/common"
"github.com/deepflowio/deepflow/server/controller/db/mysql"
"github.com/deepflowio/deepflow/server/controller/db/mysql/query"
"github.com/deepflowio/deepflow/server/controller/recorder/pubsub/message"
)

type ChOSAppTags struct {
UpdaterComponent[mysql.ChOSAppTags, OSAPPTagsKey]
SubscriberComponent[*message.ProcessFieldsUpdate, message.ProcessFieldsUpdate, mysql.Process, mysql.ChOSAppTags, OSAPPTagsKey]
}

func NewChOSAppTags() *ChOSAppTags {
updater := &ChOSAppTags{
newUpdaterComponent[mysql.ChOSAppTags, OSAPPTagsKey](
RESOURCE_TYPE_CH_OS_APP_TAGS,
mng := &ChOSAppTags{
newSubscriberComponent[*message.ProcessFieldsUpdate, message.ProcessFieldsUpdate, mysql.Process, mysql.ChOSAppTags, OSAPPTagsKey](
common.RESOURCE_TYPE_PROCESS_EN, RESOURCE_TYPE_CH_OS_APP_TAGS,
),
}
updater.updaterDG = updater
return updater
mng.subscriberDG = mng
return mng
}

func (o *ChOSAppTags) generateNewData() (map[OSAPPTagsKey]mysql.ChOSAppTags, bool) {
processes, err := query.FindInBatches[mysql.Process](mysql.Db.Unscoped())
if err != nil {
log.Errorf(dbQueryResourceFailed(o.resourceTypeName, err))
return nil, false
}

keyToItem := make(map[OSAPPTagsKey]mysql.ChOSAppTags)
for _, process := range processes {
osAppTagsMap := map[string]string{}
splitOsAppTags := strings.Split(process.OSAPPTags, ", ")
for _, singleOsAppTag := range splitOsAppTags {
splitSingleTag := strings.Split(singleOsAppTag, ":")
if len(splitSingleTag) == 2 {
osAppTagsMap[strings.Trim(splitSingleTag[0], " ")] = strings.Trim(splitSingleTag[1], " ")
}
// onResourceUpdated implements SubscriberDataGenerator
func (c *ChOSAppTags) onResourceUpdated(sourceID int, fieldsUpdate *message.ProcessFieldsUpdate) {
updateInfo := make(map[string]interface{})
if fieldsUpdate.OSAPPTags.IsDifferent() {
bytes, err := json.Marshal(fieldsUpdate.OSAPPTags.GetNew())
if err != nil {
log.Error(err)
return
}
if len(osAppTagsMap) > 0 {
osAppTagsStr, err := json.Marshal(osAppTagsMap)
if err != nil {
log.Error(err)
return nil, false
}
key := OSAPPTagsKey{
PID: process.ID,
}
keyToItem[key] = mysql.ChOSAppTags{
PID: process.ID,
OSAPPTags: string(osAppTagsStr),
}
updateInfo["os_app_tags"] = string(bytes)
}
if len(updateInfo) > 0 {
var chItem mysql.ChOSAppTags
mysql.Db.Where("pid = ?", sourceID).First(&chItem)
if chItem.PID == 0 {
c.SubscriberComponent.dbOperator.add(
[]OSAPPTagsKey{{PID: sourceID}},
[]mysql.ChOSAppTags{{PID: sourceID, OSAPPTags: updateInfo["os_app_tags"].(string)}},
)
} else {
c.SubscriberComponent.dbOperator.update(chItem, updateInfo, OSAPPTagsKey{PID: sourceID})
}
}
return keyToItem, true
}

func (o *ChOSAppTags) generateKey(dbItem mysql.ChOSAppTags) OSAPPTagsKey {
return OSAPPTagsKey{PID: dbItem.PID}
}

func (o *ChOSAppTags) generateUpdateInfo(oldItem, newItem mysql.ChOSAppTags) (map[string]interface{}, bool) {
updateInfo := make(map[string]interface{})
if oldItem.OSAPPTags != newItem.OSAPPTags {
updateInfo["os_app_tags"] = newItem.OSAPPTags
// onResourceUpdated implements SubscriberDataGenerator
func (c *ChOSAppTags) sourceToTarget(item *mysql.Process) (keys []OSAPPTagsKey, targets []mysql.ChOSAppTags) {
if len(item.OSAPPTags) == 0 {
return
}
if len(updateInfo) > 0 {
return updateInfo, true
bytes, err := json.Marshal(item.OSAPPTags)
if err != nil {
log.Error(err)
return
}
return nil, false
return []OSAPPTagsKey{{PID: item.ID}}, []mysql.ChOSAppTags{{PID: item.ID, OSAPPTags: string(bytes)}}
}
2 changes: 2 additions & 0 deletions server/controller/tagrecorder/subscriber.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,8 @@ func (c *SubscriberManager) Start() {
c.domainLcuuidToIconID, c.resourceTypeToIconID, _ = UpdateIconInfo(c.cfg) // TODO adds icon cache and refresh by timer?
subscribers := []Subscriber{
NewChAZ(c.domainLcuuidToIconID, c.resourceTypeToIconID),
NewChOSAppTag(),
NewChOSAppTags(),
NewChChostCloudTag(),
NewChChostCloudTags(),
}
Expand Down
2 changes: 0 additions & 2 deletions server/controller/tagrecorder/updater.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,8 +82,6 @@ func (c *UpdaterManager) refresh() {
NewChPodServiceK8sLabels(),
NewChPodNSCloudTag(),
NewChPodNSCloudTags(),
NewChOSAppTag(),
NewChOSAppTags(),
NewChVTapPort(),
NewChStringEnum(),
NewChIntEnum(),
Expand Down

0 comments on commit cac7171

Please sign in to comment.