Skip to content

Commit

Permalink
[tagrecorder] Subscribe ch_os_app_tag
Browse files Browse the repository at this point in the history
  • Loading branch information
xiaochaoren1 committed Mar 1, 2024
1 parent 14a4ba7 commit a3b3889
Show file tree
Hide file tree
Showing 10 changed files with 509 additions and 424 deletions.
128 changes: 91 additions & 37 deletions server/controller/tagrecorder/ch_os_app_tag.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,63 +19,117 @@ package tagrecorder
import (
"strings"

"github.com/deepflowio/deepflow/server/controller/common"
"github.com/deepflowio/deepflow/server/controller/db/mysql"
"github.com/deepflowio/deepflow/server/controller/db/mysql/query"
"github.com/deepflowio/deepflow/server/controller/recorder/pubsub/message"
)

type ChOSAppTag struct {
UpdaterComponent[mysql.ChOSAppTag, OSAPPTagKey]
SubscriberComponent[*message.ProcessFieldsUpdate, message.ProcessFieldsUpdate, mysql.Process, mysql.ChOSAppTag, OSAPPTagKey]
}

func NewChOSAppTag() *ChOSAppTag {
updater := &ChOSAppTag{
newUpdaterComponent[mysql.ChOSAppTag, OSAPPTagKey](
RESOURCE_TYPE_CH_OS_APP_TAG,
mng := &ChOSAppTag{
newSubscriberComponent[*message.ProcessFieldsUpdate, message.ProcessFieldsUpdate, mysql.Process, mysql.ChOSAppTag, OSAPPTagKey](
common.RESOURCE_TYPE_PROCESS_EN, RESOURCE_TYPE_CH_OS_APP_TAG,
),
}
updater.updaterDG = updater
return updater
mng.subscriberDG = mng
return mng
}

func (o *ChOSAppTag) generateNewData() (map[OSAPPTagKey]mysql.ChOSAppTag, bool) {
processes, err := query.FindInBatches[mysql.Process](mysql.Db.Unscoped())
if err != nil {
log.Errorf(dbQueryResourceFailed(o.resourceTypeName, err))
return nil, false
}
// onResourceUpdated implements SubscriberDataGenerator
func (c *ChOSAppTag) onResourceUpdated(sourceID int, fieldsUpdate *message.ProcessFieldsUpdate) {
keysToAdd := make([]OSAPPTagKey, 0)
targetsToAdd := make([]mysql.ChOSAppTag, 0)
keysToDelete := make([]OSAPPTagKey, 0)
targetsToDelete := make([]mysql.ChOSAppTag, 0)
var chItem mysql.ChOSAppTag
var updateKey OSAPPTagKey
updateInfo := make(map[string]interface{})
if fieldsUpdate.OSAPPTags.IsDifferent() {
new := map[string]string{}
old := map[string]string{}
newStr := fieldsUpdate.OSAPPTags.GetNew()
oldStr := fieldsUpdate.OSAPPTags.GetOld()
splitNews := strings.Split(newStr, ", ")
splitOlds := strings.Split(oldStr, ", ")

keyToItem := make(map[OSAPPTagKey]mysql.ChOSAppTag)
for _, process := range processes {
splitTags := strings.Split(process.OSAPPTags, ", ")
for _, singleTag := range splitTags {
splitSingleTag := strings.Split(singleTag, ":")
for _, splitNew := range splitNews {
splitSingleTag := strings.Split(splitNew, ":")
if len(splitSingleTag) == 2 {
key := OSAPPTagKey{
PID: process.ID,
Key: strings.Trim(splitSingleTag[0], " "),
}
keyToItem[key] = mysql.ChOSAppTag{
PID: process.ID,
Key: strings.Trim(splitSingleTag[0], " "),
Value: strings.Trim(splitSingleTag[1], " "),
new[strings.Trim(splitSingleTag[0], " ")] = strings.Trim(splitSingleTag[1], " ")
}
}
for _, splitOld := range splitOlds {
splitSingleTag := strings.Split(splitOld, ":")
if len(splitSingleTag) == 2 {
old[strings.Trim(splitSingleTag[0], " ")] = strings.Trim(splitSingleTag[1], " ")
}
}
for k, v := range new {
oldV, ok := old[k]
if !ok {
keysToAdd = append(keysToAdd, OSAPPTagKey{PID: sourceID, Key: k})
targetsToAdd = append(targetsToAdd, mysql.ChOSAppTag{
PID: sourceID,
Key: k,
Value: v,
})
} else {
if oldV != v {
updateKey = OSAPPTagKey{PID: sourceID, Key: k}
updateInfo[k] = v
mysql.Db.Where("pid = ? and `key` = ?", sourceID, k).First(&chItem) // TODO common
if chItem.PID == 0 {
keysToAdd = append(keysToAdd, OSAPPTagKey{PID: sourceID, Key: k})
targetsToAdd = append(targetsToAdd, mysql.ChOSAppTag{
PID: sourceID,
Key: k,
Value: v,
})
} else if len(updateInfo) > 0 {
c.SubscriberComponent.dbOperator.update(chItem, updateInfo, updateKey)
}
}
}
}
for k := range old {
if _, ok := new[k]; !ok {
keysToDelete = append(keysToDelete, OSAPPTagKey{PID: sourceID, Key: k})
targetsToDelete = append(targetsToDelete, mysql.ChOSAppTag{
PID: sourceID,
Key: k,
})
}
}
}
if len(keysToAdd) > 0 {
c.SubscriberComponent.dbOperator.add(keysToAdd, targetsToAdd)
}
if len(keysToDelete) > 0 {
c.SubscriberComponent.dbOperator.delete(keysToDelete, targetsToDelete)
}
return keyToItem, true
}

func (o *ChOSAppTag) generateKey(dbItem mysql.ChOSAppTag) OSAPPTagKey {
return OSAPPTagKey{PID: dbItem.PID, Key: dbItem.Key}
}
// onResourceUpdated implements SubscriberDataGenerator
func (c *ChOSAppTag) sourceToTarget(source *mysql.Process) (keys []OSAPPTagKey, targets []mysql.ChOSAppTag) {
osAppTagsMap := map[string]string{}
splitTags := strings.Split(source.OSAPPTags, ", ")

func (o *ChOSAppTag) generateUpdateInfo(oldItem, newItem mysql.ChOSAppTag) (map[string]interface{}, bool) {
updateInfo := make(map[string]interface{})
if oldItem.Value != newItem.Value {
updateInfo["value"] = newItem.Value
for _, splitTag := range splitTags {
splitSingleTag := strings.Split(splitTag, ":")
if len(splitSingleTag) == 2 {
osAppTagsMap[strings.Trim(splitSingleTag[0], " ")] = strings.Trim(splitSingleTag[1], " ")
}
}
if len(updateInfo) > 0 {
return updateInfo, true
for k, v := range osAppTagsMap {
keys = append(keys, OSAPPTagKey{PID: source.ID, Key: k})
targets = append(targets, mysql.ChOSAppTag{
PID: source.ID,
Key: k,
Value: v,
})
}
return nil, false
return
}
93 changes: 50 additions & 43 deletions server/controller/tagrecorder/ch_os_app_tags.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,70 +20,77 @@ import (
"encoding/json"
"strings"

"github.com/deepflowio/deepflow/server/controller/common"
"github.com/deepflowio/deepflow/server/controller/db/mysql"
"github.com/deepflowio/deepflow/server/controller/db/mysql/query"
"github.com/deepflowio/deepflow/server/controller/recorder/pubsub/message"
)

type ChOSAppTags struct {
UpdaterComponent[mysql.ChOSAppTags, OSAPPTagsKey]
SubscriberComponent[*message.ProcessFieldsUpdate, message.ProcessFieldsUpdate, mysql.Process, mysql.ChOSAppTags, OSAPPTagsKey]
}

func NewChOSAppTags() *ChOSAppTags {
updater := &ChOSAppTags{
newUpdaterComponent[mysql.ChOSAppTags, OSAPPTagsKey](
RESOURCE_TYPE_CH_OS_APP_TAGS,
mng := &ChOSAppTags{
newSubscriberComponent[*message.ProcessFieldsUpdate, message.ProcessFieldsUpdate, mysql.Process, mysql.ChOSAppTags, OSAPPTagsKey](
common.RESOURCE_TYPE_PROCESS_EN, RESOURCE_TYPE_CH_OS_APP_TAGS,
),
}
updater.updaterDG = updater
return updater
mng.subscriberDG = mng
return mng
}

func (o *ChOSAppTags) generateNewData() (map[OSAPPTagsKey]mysql.ChOSAppTags, bool) {
processes, err := query.FindInBatches[mysql.Process](mysql.Db.Unscoped())
if err != nil {
log.Errorf(dbQueryResourceFailed(o.resourceTypeName, err))
return nil, false
}

keyToItem := make(map[OSAPPTagsKey]mysql.ChOSAppTags)
for _, process := range processes {
// onResourceUpdated implements SubscriberDataGenerator
func (c *ChOSAppTags) onResourceUpdated(sourceID int, fieldsUpdate *message.ProcessFieldsUpdate) {
updateInfo := make(map[string]interface{})
if fieldsUpdate.OSAPPTags.IsDifferent() {
osAppTagsMap := map[string]string{}
splitOsAppTags := strings.Split(process.OSAPPTags, ", ")
for _, singleOsAppTag := range splitOsAppTags {
splitSingleTag := strings.Split(singleOsAppTag, ":")
splitTags := strings.Split(fieldsUpdate.OSAPPTags.GetNew(), ", ")

for _, splitTag := range splitTags {
splitSingleTag := strings.Split(splitTag, ":")
if len(splitSingleTag) == 2 {
osAppTagsMap[strings.Trim(splitSingleTag[0], " ")] = strings.Trim(splitSingleTag[1], " ")
}
}
if len(osAppTagsMap) > 0 {
osAppTagsStr, err := json.Marshal(osAppTagsMap)
if err != nil {
log.Error(err)
return nil, false
}
key := OSAPPTagsKey{
PID: process.ID,
}
keyToItem[key] = mysql.ChOSAppTags{
PID: process.ID,
OSAPPTags: string(osAppTagsStr),
}
bytes, err := json.Marshal(osAppTagsMap)
if err != nil {
log.Error(err)
return
}
updateInfo["os_app_tags"] = string(bytes)
}
if len(updateInfo) > 0 {
var chItem mysql.ChOSAppTags
mysql.Db.Where("pid = ?", sourceID).First(&chItem)
if chItem.PID == 0 {
c.SubscriberComponent.dbOperator.add(
[]OSAPPTagsKey{{PID: sourceID}},
[]mysql.ChOSAppTags{{PID: sourceID, OSAPPTags: updateInfo["os_app_tags"].(string)}},
)
} else {
c.SubscriberComponent.dbOperator.update(chItem, updateInfo, OSAPPTagsKey{PID: sourceID})
}
}
return keyToItem, true
}

func (o *ChOSAppTags) generateKey(dbItem mysql.ChOSAppTags) OSAPPTagsKey {
return OSAPPTagsKey{PID: dbItem.PID}
}
// onResourceUpdated implements SubscriberDataGenerator
func (c *ChOSAppTags) sourceToTarget(item *mysql.Process) (keys []OSAPPTagsKey, targets []mysql.ChOSAppTags) {
if item.OSAPPTags == "" {
return
}
osAppTagsMap := map[string]string{}
splitTags := strings.Split(item.OSAPPTags, ", ")

func (o *ChOSAppTags) generateUpdateInfo(oldItem, newItem mysql.ChOSAppTags) (map[string]interface{}, bool) {
updateInfo := make(map[string]interface{})
if oldItem.OSAPPTags != newItem.OSAPPTags {
updateInfo["os_app_tags"] = newItem.OSAPPTags
for _, splitTag := range splitTags {
splitSingleTag := strings.Split(splitTag, ":")
if len(splitSingleTag) == 2 {
osAppTagsMap[strings.Trim(splitSingleTag[0], " ")] = strings.Trim(splitSingleTag[1], " ")
}
}
if len(updateInfo) > 0 {
return updateInfo, true
bytes, err := json.Marshal(osAppTagsMap)
if err != nil {
log.Error(err)
return
}
return nil, false
return []OSAPPTagsKey{{PID: item.ID}}, []mysql.ChOSAppTags{{PID: item.ID, OSAPPTags: string(bytes)}}
}
Loading

0 comments on commit a3b3889

Please sign in to comment.