Skip to content

Commit

Permalink
[Server] fix data source access port
Browse files Browse the repository at this point in the history
  • Loading branch information
roryye authored and SongZhen0704 committed Jun 21, 2024
1 parent ee16128 commit a8a8cb9
Show file tree
Hide file tree
Showing 6 changed files with 81 additions and 25 deletions.
6 changes: 6 additions & 0 deletions server/controller/common/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,3 +22,9 @@ type FPermit struct {
Port int `default:"20823" yaml:"port"`
Timeout int `default:"30" yaml:"timeout"`
}

type IngesterApi struct {
Port int `default:"20106" yaml:"port"`
NodePort int `default:"30106" yaml:"node-port"`
Timeout int `default:"60" yaml:"timeout"`
}
11 changes: 3 additions & 8 deletions server/controller/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,11 +41,6 @@ import (

var log = logging.MustGetLogger("config")

type IngesterApi struct {
Port int `default:"30106" yaml:"port"`
Timeout int `default:"60" yaml:"timeout"`
}

type Specification struct {
VTapGroupMax int `default:"1000" yaml:"vtap_group_max"`
VTapMaxPerGroup int `default:"10000" yaml:"vtap_max_per_group"`
Expand Down Expand Up @@ -92,8 +87,8 @@ type ControllerConfig struct {
RedisCfg redis.Config `yaml:"redis"`
ClickHouseCfg clickhouse.ClickHouseConfig `yaml:"clickhouse"`

IngesterApi IngesterApi `yaml:"ingester-api"`
Spec Specification `yaml:"spec"`
IngesterApi common.IngesterApi `yaml:"ingester-api"`
Spec Specification `yaml:"spec"`

MonitorCfg monitor.MonitorConfig `yaml:"monitor"`
ManagerCfg manager.ManagerConfig `yaml:"manager"`
Expand Down Expand Up @@ -135,7 +130,7 @@ func (c *Config) Load(path string) {
c.ControllerConfig.TrisolarisCfg.SetGrpcMaxMessageLength(c.ControllerConfig.GrpcMaxMessageLength)
c.ControllerConfig.TrisolarisCfg.SetNoTeamIDRefused(c.ControllerConfig.NoTeamIDRefused)
c.ControllerConfig.TrisolarisCfg.SetFPermitConfig(c.ControllerConfig.FPermit)
c.ControllerConfig.TrisolarisCfg.SetIngesterAPIPort(c.ControllerConfig.IngesterApi.Port)
c.ControllerConfig.TrisolarisCfg.SetIngesterAPI(c.ControllerConfig.IngesterApi) // for data source
c.ControllerConfig.TrisolarisCfg.SetAllAgentConnectToNatIP(c.ControllerConfig.AllAgentConnectToNatIP)
c.ControllerConfig.TrisolarisCfg.SetNoIPOverlapping(c.ControllerConfig.NoIPOverlapping)
grpcPort, err := strconv.Atoi(c.ControllerConfig.GrpcPort)
Expand Down
73 changes: 63 additions & 10 deletions server/controller/http/service/data_source.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,24 +38,47 @@ type DataSource struct {
cfg *config.ControllerConfig

resourceAccess *ResourceAccess
ipToController map[string]*mysql.Controller
}

func NewDataSource(userInfo *httpcommon.UserInfo, cfg *config.ControllerConfig) *DataSource {
return &DataSource{
dataSource := &DataSource{
cfg: cfg,
resourceAccess: &ResourceAccess{fpermit: cfg.FPermit, userInfo: userInfo},
}

dataSource.generateIPToController()
return dataSource
}

func NewDataSourceWithoutUserInfo(port int) *DataSource {
return &DataSource{
func NewDataSourceWithIngesterAPIConfig(userInfo *httpcommon.UserInfo, cfg common.IngesterApi) *DataSource {
dataSource := &DataSource{
cfg: &config.ControllerConfig{
IngesterApi: config.IngesterApi{
Port: port,
},
IngesterApi: cfg,
},
resourceAccess: &ResourceAccess{},
resourceAccess: &ResourceAccess{userInfo: userInfo},
}
if err := dataSource.generateIPToController(); err != nil {
log.Warning(err)
}
return dataSource
}

func (d *DataSource) generateIPToController() error {
db, err := mysql.GetDB(d.resourceAccess.userInfo.ORGID)
if err != nil {
return err
}
var controllers []mysql.Controller
if err = db.Find(&controllers).Error; err != nil {
return err
}
ipToController := make(map[string]*mysql.Controller)
for i, controller := range controllers {
ipToController[controller.IP] = &controllers[i]
}
d.ipToController = ipToController
return nil
}

var DEFAULT_DATA_SOURCE_DISPLAY_NAMES = []string{
Expand Down Expand Up @@ -532,7 +555,17 @@ func (d *DataSource) CallIngesterAPIAddRP(orgID int, ip string, dataSource, base
"interval": dataSource.Interval / common.INTERVAL_1MINUTE,
"retention-time": dataSource.RetentionTime,
}
url := fmt.Sprintf("http://%s:%d/v1/rpadd/", common.GetCURLIP(ip), d.cfg.IngesterApi.Port)
if len(d.ipToController) == 0 {
log.Warningf("ORGID-%d get ip to controller nil", orgID)
}
port := d.cfg.IngesterApi.NodePort
if controller, ok := d.ipToController[ip]; ok {
if controller.NodeType == common.CONTROLLER_NODE_TYPE_MASTER && len(controller.PodIP) != 0 {
ip = controller.PodIP
port = d.cfg.IngesterApi.Port
}
}
url := fmt.Sprintf("http://%s:%d/v1/rpadd/", common.GetCURLIP(ip), port)
log.Infof("call add data_source, url: %s, body: %v", url, body)
_, err = common.CURLPerform("POST", url, body, common.WithORGHeader(strconv.Itoa(orgID)))
if err != nil && !(errors.Is(err, httpcommon.ErrorPending) || errors.Is(err, httpcommon.ErrorFail)) {
Expand All @@ -552,7 +585,17 @@ func (d *DataSource) CallIngesterAPIModRP(orgID int, ip string, dataSource mysql
"db": getTableName(dataSource.DataTableCollection),
"retention-time": dataSource.RetentionTime,
}
url := fmt.Sprintf("http://%s:%d/v1/rpmod/", common.GetCURLIP(ip), d.cfg.IngesterApi.Port)
if len(d.ipToController) == 0 {
log.Warningf("ORGID-%d get ip to controller nil", orgID)
}
port := d.cfg.IngesterApi.NodePort
if controller, ok := d.ipToController[ip]; ok {
if controller.NodeType == common.CONTROLLER_NODE_TYPE_MASTER && len(controller.PodIP) != 0 {
ip = controller.PodIP
port = d.cfg.IngesterApi.Port
}
}
url := fmt.Sprintf("http://%s:%d/v1/rpmod/", common.GetCURLIP(ip), port)
log.Infof("call mod data_source, url: %s, body: %v", url, body)
_, err = common.CURLPerform("PATCH", url, body, common.WithORGHeader(strconv.Itoa(orgID)))
if err != nil && !(errors.Is(err, httpcommon.ErrorPending) || errors.Is(err, httpcommon.ErrorFail)) {
Expand All @@ -571,7 +614,17 @@ func (d *DataSource) CallIngesterAPIDelRP(orgID int, ip string, dataSource mysql
"name": name,
"db": getTableName(dataSource.DataTableCollection),
}
url := fmt.Sprintf("http://%s:%d/v1/rpdel/", common.GetCURLIP(ip), d.cfg.IngesterApi.Port)
if len(d.ipToController) == 0 {
log.Warningf("ORGID-%d get ip to controller nil", orgID)
}
port := d.cfg.IngesterApi.NodePort
if controller, ok := d.ipToController[ip]; ok {
if controller.NodeType == common.CONTROLLER_NODE_TYPE_MASTER && len(controller.PodIP) != 0 {
ip = controller.PodIP
port = d.cfg.IngesterApi.Port
}
}
url := fmt.Sprintf("http://%s:%d/v1/rpdel/", common.GetCURLIP(ip), port)
log.Infof("call del data_source, url: %s, body: %v", url, body)
_, err = common.CURLPerform("DELETE", url, body, common.WithORGHeader(strconv.Itoa(orgID)))
if err != nil && !(errors.Is(err, httpcommon.ErrorPending) || errors.Is(err, httpcommon.ErrorFail)) {
Expand Down
10 changes: 5 additions & 5 deletions server/controller/trisolaris/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ type Config struct {
PlatformDataRefreshDelayTime int `default:"1" yaml:"platform-data-refresh-delay-time"`
NoTeamIDRefused bool
FPermit common.FPermit
IngesterAPIPort int // data source
IngesterAPI common.IngesterApi // data source
AllAgentConnectToNatIP bool
NoIPOverlapping bool
}
Expand Down Expand Up @@ -119,12 +119,12 @@ func (c *Config) GetIngesterPort() int {
return c.IngesterPort
}

func (c *Config) SetIngesterAPIPort(port int) {
c.IngesterAPIPort = port
func (c *Config) SetIngesterAPI(ingesterAPI common.IngesterApi) {
c.IngesterAPI = ingesterAPI
}

func (c *Config) GetIngesterAPIPort() int {
return c.IngesterAPIPort
func (c *Config) GetIngesterAPI() common.IngesterApi {
return c.IngesterAPI
}

func (c *Config) SetLogLevel(logLevel string) {
Expand Down
3 changes: 2 additions & 1 deletion server/controller/trisolaris/node/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ import (
"github.com/deepflowio/deepflow/message/trident"
. "github.com/deepflowio/deepflow/server/controller/common"
models "github.com/deepflowio/deepflow/server/controller/db/mysql"
"github.com/deepflowio/deepflow/server/controller/http/common"
"github.com/deepflowio/deepflow/server/controller/http/service"
. "github.com/deepflowio/deepflow/server/controller/trisolaris/common"
"github.com/deepflowio/deepflow/server/controller/trisolaris/config"
Expand Down Expand Up @@ -582,7 +583,7 @@ func (n *NodeInfo) registerTSDBToDB(tsdb *models.Analyzer) {
}
}

dataSourceService := service.NewDataSourceWithoutUserInfo(n.config.GetIngesterAPIPort())
dataSourceService := service.NewDataSourceWithIngesterAPIConfig(&common.UserInfo{ORGID: n.GetORGID()}, n.config.GetIngesterAPI())
if IsStandaloneRunningMode() {
// in standalone mode, since all in one deployment and analyzer communication use 127.0.0.1
err = dataSourceService.ConfigAnalyzerDataSource(n.GetORGID(), "127.0.0.1")
Expand Down
3 changes: 2 additions & 1 deletion server/server.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,8 @@ controller:

# datasource-api from ingester
ingester-api:
port: 30106
port: 20106
node-port: 30106
timeout: 60

# 规格相关定义
Expand Down

0 comments on commit a8a8cb9

Please sign in to comment.