Skip to content

Commit

Permalink
[Controller] fix concurrent map panic
Browse files Browse the repository at this point in the history
  • Loading branch information
askyrie authored and SongZhen0704 committed May 31, 2024
1 parent 12a297e commit f4ed72b
Show file tree
Hide file tree
Showing 4 changed files with 39 additions and 44 deletions.
2 changes: 2 additions & 0 deletions server/controller/genesis/datatypes.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ type PrometheusMessage struct {
}

type KubernetesInfo struct {
ORGID int
ClusterID string
ErrorMSG string
Version uint64
Expand All @@ -65,6 +66,7 @@ type KubernetesInfo struct {
}

type PrometheusInfo struct {
ORGID int
ClusterID string
ErrorMSG string
Epoch time.Time
Expand Down
42 changes: 14 additions & 28 deletions server/controller/genesis/genesis.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,8 +83,8 @@ func NewGenesis(cfg *config.ControllerConfig) *Genesis {
func (g *Genesis) Start() {
ctx := context.Context(context.Background())
genesisSyncDataChan := make(chan GenesisSyncData)
kubernetesDataChan := make(chan map[int]map[string]KubernetesInfo)
prometheusDataChan := make(chan map[int]map[string]PrometheusInfo)
kubernetesDataChan := make(chan KubernetesInfo)
prometheusDataChan := make(chan PrometheusInfo)
sQueue := queue.NewOverwriteQueue("genesis-sync-data", g.cfg.QueueLengths)
kQueue := queue.NewOverwriteQueue("genesis-k8s-data", g.cfg.QueueLengths)
pQueue := queue.NewOverwriteQueue("genesis-prometheus-data", g.cfg.QueueLengths)
Expand Down Expand Up @@ -501,34 +501,27 @@ func (g *Genesis) getServerIPs(orgID int) ([]string, error) {
return serverIPs, nil
}

func (g *Genesis) receiveKubernetesData(kChan chan map[int]map[string]KubernetesInfo) {
func (g *Genesis) receiveKubernetesData(kChan chan KubernetesInfo) {
for {
select {
case k := <-kChan:
for key, value := range k {
g.kubernetesData.Store(key, value)
}
g.kubernetesData.Store(fmt.Sprintf("%d-%s", k.ORGID, k.ClusterID), k)
}
}
}

func (g *Genesis) GetKubernetesData(orgID int, clusterID string) (KubernetesInfo, bool) {
k8sDataInterface, ok := g.kubernetesData.Load(orgID)
k8sDataInterface, ok := g.kubernetesData.Load(fmt.Sprintf("%d-%s", orgID, clusterID))
if !ok {
log.Warningf("kubernetes data not found org_id (%d)", orgID)
log.Warningf("kubernetes data not found org_id (%d) cluster id (%s)", orgID, clusterID)
return KubernetesInfo{}, false
}
k8sData, ok := k8sDataInterface.(map[string]KubernetesInfo)
k8sData, ok := k8sDataInterface.(KubernetesInfo)
if !ok {
log.Error("kubernetes data interface assert failed")
return KubernetesInfo{}, false
}
k8sInfo, ok := k8sData[clusterID]
if !ok {
log.Warningf("kubernetes data not found org_id %d cluster id (%s)", orgID, clusterID)
return KubernetesInfo{}, false
}
return k8sInfo, true
return k8sData, true
}

func (g *Genesis) GetKubernetesResponse(orgID int, clusterID string) (map[string][]string, error) {
Expand Down Expand Up @@ -615,34 +608,27 @@ func (g *Genesis) GetKubernetesResponse(orgID int, clusterID string) (map[string
return k8sResp, nil
}

func (g *Genesis) receivePrometheusData(pChan chan map[int]map[string]PrometheusInfo) {
func (g *Genesis) receivePrometheusData(pChan chan PrometheusInfo) {
for {
select {
case p := <-pChan:
for k, v := range p {
g.prometheusData.Store(k, v)
}
g.prometheusData.Store(fmt.Sprintf("%d-%s", p.ORGID, p.ClusterID), p)
}
}
}

func (g *Genesis) GetPrometheusData(orgID int, clusterID string) (PrometheusInfo, bool) {
prometheusDataInterface, ok := g.prometheusData.Load(orgID)
prometheusDataInterface, ok := g.prometheusData.Load(fmt.Sprintf("%d-%s", orgID, clusterID))
if !ok {
log.Warningf("prometheus data not found org_id (%d)", orgID)
log.Warningf("prometheus data not found org_id (%d) cluster id (%s)", orgID, clusterID)
return PrometheusInfo{}, false
}
prometheusData, ok := prometheusDataInterface.(map[string]PrometheusInfo)
prometheusData, ok := prometheusDataInterface.(PrometheusInfo)
if !ok {
log.Error("prometheus data interface assert failed")
return PrometheusInfo{}, false
}
prometheusInfo, ok := prometheusData[clusterID]
if !ok {
log.Warningf("prometheus data not found cluster id (%s)", clusterID)
return PrometheusInfo{}, false
}
return prometheusInfo, true
return prometheusData, true
}

func (g *Genesis) GetPrometheusResponse(orgID int, clusterID string) ([]cloudmodel.PrometheusTarget, error) {
Expand Down
37 changes: 21 additions & 16 deletions server/controller/genesis/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -334,12 +334,12 @@ type KubernetesStorage struct {
cfg config.GenesisConfig
kCtx context.Context
kCancel context.CancelFunc
channel chan map[int]map[string]KubernetesInfo
channel chan KubernetesInfo
kubernetesData map[int]map[string]KubernetesInfo
mutex sync.Mutex
}

func NewKubernetesStorage(port, nPort int, cfg config.GenesisConfig, kChan chan map[int]map[string]KubernetesInfo, ctx context.Context) *KubernetesStorage {
func NewKubernetesStorage(port, nPort int, cfg config.GenesisConfig, kChan chan KubernetesInfo, ctx context.Context) *KubernetesStorage {
kCtx, kCancel := context.WithCancel(ctx)
return &KubernetesStorage{
listenPort: port,
Expand Down Expand Up @@ -379,10 +379,9 @@ func (k *KubernetesStorage) Add(orgID int, newInfo KubernetesInfo) {
newInfo.ClusterID: newInfo,
}
}
k.fetch()
k.mutex.Unlock()

k.channel <- k.fetch()

if !unTriggerFlag {
err := k.triggerCloudRrefresh(orgID, newInfo.ClusterID, newInfo.Version)
if err != nil {
Expand All @@ -391,8 +390,12 @@ func (k *KubernetesStorage) Add(orgID int, newInfo KubernetesInfo) {
}
}

func (k *KubernetesStorage) fetch() map[int]map[string]KubernetesInfo {
return k.kubernetesData
func (k *KubernetesStorage) fetch() {
for _, k8sDatas := range k.kubernetesData {
for _, kData := range k8sDatas {
k.channel <- kData
}
}
}

func (k *KubernetesStorage) triggerCloudRrefresh(orgID int, clusterID string, version uint64) error {
Expand Down Expand Up @@ -468,9 +471,8 @@ func (k *KubernetesStorage) run() {
delete(kubernetesData, key)
}
}
k.fetch()
k.mutex.Unlock()

k.channel <- k.fetch()
}
}

Expand All @@ -488,12 +490,12 @@ type PrometheusStorage struct {
cfg config.GenesisConfig
kCtx context.Context
kCancel context.CancelFunc
channel chan map[int]map[string]PrometheusInfo
channel chan PrometheusInfo
prometheusData map[int]map[string]PrometheusInfo
mutex sync.Mutex
}

func NewPrometheusStorage(cfg config.GenesisConfig, pChan chan map[int]map[string]PrometheusInfo, ctx context.Context) *PrometheusStorage {
func NewPrometheusStorage(cfg config.GenesisConfig, pChan chan PrometheusInfo, ctx context.Context) *PrometheusStorage {
pCtx, pCancel := context.WithCancel(ctx)
return &PrometheusStorage{
cfg: cfg,
Expand All @@ -514,6 +516,7 @@ func (p *PrometheusStorage) Clear() {

func (p *PrometheusStorage) Add(orgID int, isUpdate bool, newInfo PrometheusInfo) {
p.mutex.Lock()
defer p.mutex.Unlock()
prometheusData, ok := p.prometheusData[orgID]
if ok {
if oldInfo, ok := prometheusData[newInfo.ClusterID]; ok && !isUpdate {
Expand All @@ -528,13 +531,15 @@ func (p *PrometheusStorage) Add(orgID int, isUpdate bool, newInfo PrometheusInfo
newInfo.ClusterID: newInfo,
}
}
p.mutex.Unlock()

p.channel <- p.fetch()
p.fetch()
}

func (p *PrometheusStorage) fetch() map[int]map[string]PrometheusInfo {
return p.prometheusData
func (p *PrometheusStorage) fetch() {
for _, prometheusDatas := range p.prometheusData {
for _, pData := range prometheusDatas {
p.channel <- pData
}
}
}

func (p *PrometheusStorage) run() {
Expand All @@ -550,8 +555,8 @@ func (p *PrometheusStorage) run() {
delete(prometheusData, key)
}
}
p.fetch()
p.mutex.Unlock()
p.channel <- p.fetch()
}
}

Expand Down
2 changes: 2 additions & 0 deletions server/controller/genesis/updater.go
Original file line number Diff line number Diff line change
Expand Up @@ -948,6 +948,7 @@ func (k *KubernetesRpcUpdater) run() {
log.Debugf("k8s from %s vtap_id %v received cluster_id %s version %v", info.peer, info.vtapID, info.message.GetClusterId(), info.message.GetVersion())
// 更新和保存内存数据
k.storage.Add(info.orgID, KubernetesInfo{
ORGID: info.orgID,
Epoch: time.Now(),
ClusterID: info.message.GetClusterId(),
ErrorMSG: info.message.GetErrorMsg(),
Expand Down Expand Up @@ -1103,6 +1104,7 @@ func (p *PrometheusRpcUpdater) run() {
log.Debugf("prometheus from %s vtap_id %v received cluster_id %s version %v, error message (%s)", info.peer, info.vtapID, clusterID, version, errMSG)

p.storage.Add(info.orgID, parseFlag, PrometheusInfo{
ORGID: info.orgID,
ClusterID: clusterID,
Entries: entries,
Epoch: time.Now(),
Expand Down

0 comments on commit f4ed72b

Please sign in to comment.