Skip to content

Commit

Permalink
[Controller] pod associated service
Browse files Browse the repository at this point in the history
  • Loading branch information
askyrie authored and SongZhen0704 committed Mar 5, 2024
1 parent bc34baa commit 03c6b39
Show file tree
Hide file tree
Showing 13 changed files with 80 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ type KubernetesGather struct {
rsLcuuidToPodGroupLcuuid map[string]string
serviceLcuuidToIngressLcuuid map[string]string
k8sInfo map[string][]string
pgLcuuidToPSLcuuids map[string][]string
nsLabelToGroupLcuuids map[string]mapset.Set
pgLcuuidTopodTargetPorts map[string]map[string]int
namespaceToExLabels map[string]map[string]interface{}
Expand Down Expand Up @@ -197,6 +198,7 @@ func NewKubernetesGather(domain *mysql.Domain, subDomain *mysql.SubDomain, cfg c
rsLcuuidToPodGroupLcuuid: map[string]string{},
serviceLcuuidToIngressLcuuid: map[string]string{},
k8sInfo: map[string][]string{},
pgLcuuidToPSLcuuids: map[string][]string{},
nsLabelToGroupLcuuids: map[string]mapset.Set{},
pgLcuuidTopodTargetPorts: map[string]map[string]int{},
namespaceToExLabels: map[string]map[string]interface{}{},
Expand Down Expand Up @@ -249,6 +251,7 @@ func (k *KubernetesGather) GetKubernetesGatherData() (model.KubernetesGatherReso
k.rsLcuuidToPodGroupLcuuid = map[string]string{}
k.serviceLcuuidToIngressLcuuid = map[string]string{}
k.nsLabelToGroupLcuuids = map[string]mapset.Set{}
k.pgLcuuidToPSLcuuids = map[string][]string{}
k.pgLcuuidTopodTargetPorts = map[string]map[string]int{}
k.namespaceToExLabels = map[string]map[string]interface{}{}
k.nsServiceNameToService = map[string]map[string]map[string]int{}
Expand Down
8 changes: 8 additions & 0 deletions server/controller/cloud/kubernetes_gather/pod.go
Original file line number Diff line number Diff line change
Expand Up @@ -182,6 +182,13 @@ func (k *KubernetesGather) getPods() (pods []model.Pod, err error) {
}
sort.Strings(containerIDs)

var podServiceLcuuid string
podServiceLcuuids, ok := k.pgLcuuidToPSLcuuids[podGroupLcuuid]
if ok && len(podServiceLcuuids) > 0 {
sort.Strings(podServiceLcuuids)
podServiceLcuuid = podServiceLcuuids[0]
}

pod := model.Pod{
Lcuuid: podLcuuid,
Name: name,
Expand All @@ -194,6 +201,7 @@ func (k *KubernetesGather) getPods() (pods []model.Pod, err error) {
PodReplicaSetLcuuid: podRSLcuuid,
PodNodeLcuuid: k.nodeIPToLcuuid[hostIP],
PodGroupLcuuid: podGroupLcuuid,
PodServiceLcuuid: podServiceLcuuid,
PodNamespaceLcuuid: namespaceLcuuid,
CreatedAt: created,
AZLcuuid: k.azLcuuid,
Expand Down
10 changes: 8 additions & 2 deletions server/controller/cloud/kubernetes_gather/pod_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -212,16 +212,22 @@ func (k *KubernetesGather) getPodServices() (services []model.PodService, servic
// 在service确定有pod group的时候添加pod service port
servicePorts = append(servicePorts, servicePort)
for _, Lcuuid := range podGroupLcuuids.ToSlice() {
pgLcuuid, ok := Lcuuid.(string)
if !ok {
log.Warningf("pod group lcuuid (%v) assert failed", Lcuuid)
continue
}
key := ports.Get("protocol").MustString() + strconv.Itoa(targetPort)
podGroupPort := model.PodGroupPort{
Lcuuid: common.GetUUID(uID+Lcuuid.(string)+key, uuid.Nil),
Lcuuid: common.GetUUID(uID+pgLcuuid+key, uuid.Nil),
Name: ports.Get("name").MustString(),
Port: targetPort,
Protocol: strings.ToUpper(ports.Get("protocol").MustString()),
PodGroupLcuuid: Lcuuid.(string),
PodGroupLcuuid: pgLcuuid,
PodServiceLcuuid: uID,
}
podGroupPorts = append(podGroupPorts, podGroupPort)
k.pgLcuuidToPSLcuuids[pgLcuuid] = append(k.pgLcuuidToPSLcuuids[pgLcuuid], uID)
}
}
if !hasPodGroup {
Expand Down
1 change: 1 addition & 0 deletions server/controller/cloud/model/model.go
Original file line number Diff line number Diff line change
Expand Up @@ -471,6 +471,7 @@ type Pod struct {
PodReplicaSetLcuuid string `json:"pod_replica_set_lcuuid"`
PodNodeLcuuid string `json:"pod_node_lcuuid" binding:"required"`
PodGroupLcuuid string `json:"pod_group_lcuuid" binding:"required"`
PodServiceLcuuid string `json:"pod_service_lcuuid" binding:"required"`
PodNamespaceLcuuid string `json:"pod_namespace_lcuuid" binding:"required"`
PodClusterLcuuid string `json:"pod_cluster_lcuuid" binding:"required"`
VPCLcuuid string `json:"vpc_lcuuid" binding:"required"`
Expand Down
1 change: 1 addition & 0 deletions server/controller/db/mysql/migration/rawsql/init.sql
Original file line number Diff line number Diff line change
Expand Up @@ -811,6 +811,7 @@ CREATE TABLE IF NOT EXISTS pod (
state INTEGER NOT NULL COMMENT '0.Exception 1.Running',
pod_rs_id INTEGER DEFAULT NULL,
pod_group_id INTEGER DEFAULT NULL,
pod_service_id INTEGER DEFAULT 0,
pod_namespace_id INTEGER DEFAULT NULL,
pod_node_id INTEGER DEFAULT NULL,
pod_cluster_id INTEGER DEFAULT NULL,
Expand Down
6 changes: 6 additions & 0 deletions server/controller/db/mysql/migration/rawsql/issu/6.5.1.11.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
-- modify start, add upgrade sql
ALTER TABLE `pod` ADD COLUMN `pod_service_id` INTEGER DEFAULT 0 AFTER `pod_group_id`;

-- update db_version to latest, remeber update DB_VERSION_EXPECT in migrate/init.go
UPDATE db_version SET version='6.5.1.11';
-- modify end
2 changes: 1 addition & 1 deletion server/controller/db/mysql/migration/version.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,5 +18,5 @@ package migration

const (
DB_VERSION_TABLE = "db_version"
DB_VERSION_EXPECTED = "6.5.1.10"
DB_VERSION_EXPECTED = "6.5.1.11"
)
1 change: 1 addition & 0 deletions server/controller/db/mysql/platform_rsc_model.go
Original file line number Diff line number Diff line change
Expand Up @@ -827,6 +827,7 @@ type Pod struct {
ContainerIDs string `gorm:"column:container_ids;type:text;default:''" json:"CONTAINER_IDS" mapstructure:"CONTAINER_IDS"` // separated by ,
PodReplicaSetID int `gorm:"column:pod_rs_id;type:int;default:null" json:"POD_RS_ID" mapstructure:"POD_RS_ID"`
PodGroupID int `gorm:"column:pod_group_id;type:int;default:null" json:"POD_GROUP_ID" mapstructure:"POD_GROUP_ID"`
PodServiceID int `gorm:"column:pod_service_id;type:int;default:0" json:"POD_SERVICE_ID" mapstructure:"POD_SERVICE_ID"`
PodNamespaceID int `gorm:"column:pod_namespace_id;type:int;default:null" json:"POD_NAMESPACE_ID" mapstructure:"POD_NAMESPACE_ID"`
PodNodeID int `gorm:"column:pod_node_id;type:int;default:null" json:"POD_NODE_ID" mapstructure:"POD_NODE_ID"`
PodClusterID int `gorm:"column:pod_cluster_id;type:int;default:null" json:"POD_CLUSTER_ID" mapstructure:"POD_CLUSTER_ID"`
Expand Down
7 changes: 7 additions & 0 deletions server/controller/recorder/cache/diffbase/pod.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,10 @@ func (b *DataSet) AddPod(dbItem *mysql.Pod, seq int, toolDataSet *tool.DataSet)
if dbItem.PodGroupID != 0 {
podGroupLcuuid, _ = toolDataSet.GetPodGroupLcuuidByID(dbItem.PodGroupID)
}
var podServiceLcuuid string
if dbItem.PodServiceID != 0 {
podServiceLcuuid, _ = toolDataSet.GetPodServiceLcuuidByID(dbItem.PodServiceID)
}
vpcLcuuid, _ := toolDataSet.GetVPCLcuuidByID(dbItem.VPCID)
b.Pods[dbItem.Lcuuid] = &Pod{
DiffBase: DiffBase{
Expand All @@ -51,6 +55,7 @@ func (b *DataSet) AddPod(dbItem *mysql.Pod, seq int, toolDataSet *tool.DataSet)
PodNodeLcuuid: podNodeLcuuid,
PodReplicaSetLcuuid: podReplicaSetLcuuid,
PodGroupLcuuid: podGroupLcuuid,
PodServiceLcuuid: podServiceLcuuid,
VPCLcuuid: vpcLcuuid,
RegionLcuuid: dbItem.Region,
AZLcuuid: dbItem.AZ,
Expand All @@ -76,6 +81,7 @@ type Pod struct {
PodNodeLcuuid string `json:"pod_node_lcuuid"`
PodReplicaSetLcuuid string `json:"pod_replica_set_lcuuid"`
PodGroupLcuuid string `json:"pod_group_lcuuid"`
PodServiceLcuuid string `json:"pod_service_lcuuid"`
VPCLcuuid string `json:"vpc_lcuuid"`
RegionLcuuid string `json:"region_lcuuid"`
AZLcuuid string `json:"az_lcuuid"`
Expand All @@ -93,6 +99,7 @@ func (p *Pod) Update(cloudItem *cloudmodel.Pod) {
p.PodNodeLcuuid = cloudItem.PodNodeLcuuid
p.PodReplicaSetLcuuid = cloudItem.PodReplicaSetLcuuid
p.PodGroupLcuuid = cloudItem.PodGroupLcuuid
p.PodServiceLcuuid = cloudItem.PodServiceLcuuid
p.VPCLcuuid = cloudItem.VPCLcuuid
p.RegionLcuuid = cloudItem.RegionLcuuid
p.AZLcuuid = cloudItem.AZLcuuid
Expand Down
21 changes: 21 additions & 0 deletions server/controller/recorder/cache/tool/data_set.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,7 @@ type DataSet struct {
podIngressRuleLcuuidToID map[string]int

podServiceLcuuidToID map[string]int
podServiceIDToLcuuid map[int]string

podGroupLcuuidToID map[string]int
podGroupIDToLcuuid map[int]string
Expand Down Expand Up @@ -159,6 +160,7 @@ func NewDataSet() *DataSet {
podIngressRuleLcuuidToID: make(map[string]int),

podServiceLcuuidToID: make(map[string]int),
podServiceIDToLcuuid: make(map[int]string),

podGroupLcuuidToID: make(map[string]int),
podGroupIDToLcuuid: make(map[int]string),
Expand Down Expand Up @@ -803,6 +805,7 @@ func (t *DataSet) DeletePodIngressRule(lcuuid string) {

func (t *DataSet) AddPodService(item *mysql.PodService) {
t.podServiceLcuuidToID[item.Lcuuid] = item.ID
t.podServiceIDToLcuuid[item.ID] = item.Lcuuid
t.podServiceIDToInfo[item.ID] = &podServiceInfo{
Name: item.Name,
VPCID: item.VPCID,
Expand Down Expand Up @@ -848,6 +851,7 @@ func (t *DataSet) DeletePodService(lcuuid string) {
id, _ := t.GetPodServiceIDByLcuuid(lcuuid)
delete(t.podServiceIDToInfo, id)
delete(t.podServiceLcuuidToID, lcuuid)
delete(t.podServiceIDToLcuuid, id)
log.Info(deleteFromToolMap(ctrlrcommon.RESOURCE_TYPE_POD_SERVICE_EN, lcuuid))
}

Expand Down Expand Up @@ -1578,6 +1582,23 @@ func (t *DataSet) GetPodServiceIDByLcuuid(lcuuid string) (int, bool) {
}
}

func (t *DataSet) GetPodServiceLcuuidByID(id int) (string, bool) {
lcuuid, exists := t.podServiceIDToLcuuid[id]
if exists {
return lcuuid, true
}
log.Warning(cacheLcuuidByIDNotFound(ctrlrcommon.RESOURCE_TYPE_POD_SERVICE_EN, id))
var podService mysql.PodService
result := mysql.Db.Where("lcuuid = ?", id).Find(&podService)
if result.RowsAffected == 1 {
t.AddPodService(&podService)
return podService.Lcuuid, true
} else {
log.Error(dbResourceByIDNotFound(ctrlrcommon.RESOURCE_TYPE_POD_SERVICE_EN, id))
return lcuuid, false
}
}

func (t *DataSet) GetPodGroupIDByLcuuid(lcuuid string) (int, bool) {
id, exists := t.podGroupLcuuidToID[lcuuid]
if exists {
Expand Down
1 change: 1 addition & 0 deletions server/controller/recorder/event/pod.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ func (p *Pod) ProduceByAdd(items []*mysql.Pod) {
eventapi.TagVPCID(item.VPCID),
eventapi.TagPodClusterID(item.PodClusterID),
eventapi.TagPodGroupID(item.PodGroupID),
eventapi.TagPodServiceID(item.PodServiceID),
eventapi.TagPodNodeID(item.PodNodeID),
eventapi.TagPodNSID(item.PodNamespaceID),
}...)
Expand Down
2 changes: 2 additions & 0 deletions server/controller/recorder/pubsub/message/update.go
Original file line number Diff line number Diff line change
Expand Up @@ -739,7 +739,9 @@ type PodFieldsUpdate struct {
ContainerIDs fieldDetail[string]
CreatedAt fieldDetail[time.Time]
PodGroupID fieldDetail[int]
PodServiceID fieldDetail[int]
PodGroupLcuuid fieldDetail[string]
PodServiceLcuuid fieldDetail[string]
PodReplicaSetID fieldDetail[int]
PodReplicaSetLcuuid fieldDetail[string]
PodNodeID fieldDetail[int]
Expand Down
20 changes: 20 additions & 0 deletions server/controller/recorder/updater/pod.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,13 @@ func (p *Pod) generateDBItemToAdd(cloudItem *cloudmodel.Pod) (*mysql.Pod, bool)
))
return nil, false
}
podServiceID, exists := p.cache.ToolDataSet.GetPodServiceIDByLcuuid(cloudItem.PodServiceLcuuid)
if !exists {
log.Infof(resourceAForResourceBNotFound(
ctrlrcommon.RESOURCE_TYPE_POD_SERVICE_EN, cloudItem.PodServiceLcuuid,
ctrlrcommon.RESOURCE_TYPE_POD_EN, cloudItem.Lcuuid,
))
}
var podReplicaSetID int
if cloudItem.PodReplicaSetLcuuid != "" {
podReplicaSetID, exists = p.cache.ToolDataSet.GetPodReplicaSetIDByLcuuid(cloudItem.PodReplicaSetLcuuid)
Expand All @@ -128,6 +135,7 @@ func (p *Pod) generateDBItemToAdd(cloudItem *cloudmodel.Pod) (*mysql.Pod, bool)
PodNodeID: p.cache.ToolDataSet.GetPodNodeIDByLcuuid(cloudItem.PodNodeLcuuid),
PodReplicaSetID: podReplicaSetID,
PodGroupID: podGroupID,
PodServiceID: podServiceID,
SubDomain: cloudItem.SubDomainLcuuid,
Domain: p.cache.DomainLcuuid,
Region: cloudItem.RegionLcuuid,
Expand Down Expand Up @@ -193,6 +201,18 @@ func (p *Pod) generateUpdateInfo(diffBase *diffbase.Pod, cloudItem *cloudmodel.P
structInfo.PodGroupID.SetNew(podGroupID)
structInfo.PodGroupLcuuid.Set(diffBase.PodGroupLcuuid, cloudItem.PodGroupLcuuid)
}
if diffBase.PodServiceLcuuid != cloudItem.PodServiceLcuuid {
podServiceID, exists := p.cache.ToolDataSet.GetPodServiceIDByLcuuid(cloudItem.PodServiceLcuuid)
if !exists {
log.Infof(resourceAForResourceBNotFound(
ctrlrcommon.RESOURCE_TYPE_POD_SERVICE_EN, cloudItem.PodServiceLcuuid,
ctrlrcommon.RESOURCE_TYPE_POD_EN, cloudItem.Lcuuid,
))
}
mapInfo["pod_service_id"] = podServiceID
structInfo.PodServiceID.SetNew(podServiceID)
structInfo.PodServiceLcuuid.Set(diffBase.PodServiceLcuuid, cloudItem.PodServiceLcuuid)
}
if diffBase.Name != cloudItem.Name {
mapInfo["name"] = cloudItem.Name
structInfo.Name.Set(diffBase.Name, cloudItem.Name)
Expand Down

0 comments on commit 03c6b39

Please sign in to comment.