Skip to content

Commit

Permalink
Multiple SessionManagers started out out of the same engine, configur…
Browse files Browse the repository at this point in the history
…ation refactoring completed for SessionManagers
  • Loading branch information
danbogos committed Mar 6, 2015
1 parent a0d4956 commit 96d4b6c
Show file tree
Hide file tree
Showing 12 changed files with 251 additions and 451 deletions.
154 changes: 122 additions & 32 deletions cmd/cgr-engine/cgr-engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -145,16 +145,16 @@ func startCdrc(cdrsChan chan struct{}, cdrcCfgs map[string]*config.CdrcConfig, h
exitChan <- true // If run stopped, something is bad, stop the application
}

func startSessionManager(responder *engine.Responder, loggerDb engine.LogStorage, cacheChan chan struct{}) {
func startSmFreeSWITCH(responder *engine.Responder, loggerDb engine.LogStorage, cacheChan chan struct{}) {
var raterConn, cdrsConn engine.Connector
var client *rpcclient.RpcClient
if cfg.SMRater == utils.INTERNAL {
if cfg.SmFsConfig.Rater == utils.INTERNAL {
<-cacheChan // Wait for the cache to init before start doing queries
raterConn = responder
} else {
var err error
for i := 0; i < cfg.SMReconnects; i++ {
client, err = rpcclient.NewRpcClient("tcp", cfg.SMRater, 0, cfg.SMReconnects, utils.GOB)
for i := 0; i < cfg.SmFsConfig.Reconnects; i++ {
client, err = rpcclient.NewRpcClient("tcp", cfg.SmFsConfig.Rater, 0, cfg.SmFsConfig.Reconnects, utils.GOB)
if err == nil { //Connected so no need to reiterate
break
}
Expand All @@ -166,14 +166,14 @@ func startSessionManager(responder *engine.Responder, loggerDb engine.LogStorage
}
raterConn = &engine.RPCClientConnector{Client: client}
}
if cfg.SMCdrS == cfg.SMRater {
if cfg.SmFsConfig.Cdrs == cfg.SmFsConfig.Rater {
cdrsConn = raterConn
} else if cfg.SMCdrS == utils.INTERNAL {
} else if cfg.SmFsConfig.Cdrs == utils.INTERNAL {
<-cacheChan // Wait for the cache to init before start doing queries
cdrsConn = responder
} else if len(cfg.SMCdrS) != 0 {
for i := 0; i < cfg.SMReconnects; i++ {
client, err = rpcclient.NewRpcClient("tcp", cfg.SMCdrS, 0, cfg.SMReconnects, utils.GOB)
} else if len(cfg.SmFsConfig.Cdrs) != 0 {
for i := 0; i < cfg.SmFsConfig.Reconnects; i++ {
client, err = rpcclient.NewRpcClient("tcp", cfg.SmFsConfig.Cdrs, 0, cfg.SmFsConfig.Reconnects, utils.GOB)
if err == nil { //Connected so no need to reiterate
break
}
Expand All @@ -186,23 +186,109 @@ func startSessionManager(responder *engine.Responder, loggerDb engine.LogStorage
}
cdrsConn = &engine.RPCClientConnector{Client: client}
}
switch cfg.SMSwitchType {
case FS:
sm = sessionmanager.NewFSSessionManager(cfg.SmFsConfig, loggerDb, raterConn, cdrsConn)
case KAMAILIO:
sm, _ = sessionmanager.NewKamailioSessionManager(cfg.SmKamConfig, raterConn, cdrsConn, loggerDb)
case OSIPS:
sm, _ = sessionmanager.NewOSipsSessionManager(cfg.SmOsipsConfig, raterConn, cdrsConn)
default:
engine.Logger.Err(fmt.Sprintf("<SessionManager> Unsupported session manger type: %s!", cfg.SMSwitchType))
exitChan <- true
sm := sessionmanager.NewFSSessionManager(cfg.SmFsConfig, loggerDb, raterConn, cdrsConn)
if err = sm.Connect(); err != nil {
engine.Logger.Err(fmt.Sprintf("<SessionManager> error: %s!", err))
}
exitChan <- true
}

func startSmKamailio(responder *engine.Responder, loggerDb engine.LogStorage, cacheChan chan struct{}) {
var raterConn, cdrsConn engine.Connector
var client *rpcclient.RpcClient
if cfg.SmKamConfig.Rater == utils.INTERNAL {
<-cacheChan // Wait for the cache to init before start doing queries
raterConn = responder
} else {
var err error
for i := 0; i < cfg.SmKamConfig.Reconnects; i++ {
client, err = rpcclient.NewRpcClient("tcp", cfg.SmKamConfig.Rater, 0, cfg.SmKamConfig.Reconnects, utils.GOB)
if err == nil { //Connected so no need to reiterate
break
}
time.Sleep(time.Duration(i+1) * time.Second)
}
if err != nil {
engine.Logger.Crit(fmt.Sprintf("<SessionManager> Could not connect to engine: %v", err))
exitChan <- true
}
raterConn = &engine.RPCClientConnector{Client: client}
}
if cfg.SmKamConfig.Cdrs == cfg.SmKamConfig.Rater {
cdrsConn = raterConn
} else if cfg.SmKamConfig.Cdrs == utils.INTERNAL {
<-cacheChan // Wait for the cache to init before start doing queries
cdrsConn = responder
} else if len(cfg.SmKamConfig.Cdrs) != 0 {
for i := 0; i < cfg.SmKamConfig.Reconnects; i++ {
client, err = rpcclient.NewRpcClient("tcp", cfg.SmKamConfig.Cdrs, 0, cfg.SmKamConfig.Reconnects, utils.GOB)
if err == nil { //Connected so no need to reiterate
break
}
time.Sleep(time.Duration(i+1) * time.Second)
}
if err != nil {
engine.Logger.Crit(fmt.Sprintf("<SM-OpenSIPS> Could not connect to CDRS via RPC: %v", err))
exitChan <- true
return
}
cdrsConn = &engine.RPCClientConnector{Client: client}
}
sm, _ := sessionmanager.NewKamailioSessionManager(cfg.SmKamConfig, raterConn, cdrsConn, loggerDb)
if err = sm.Connect(); err != nil {
engine.Logger.Err(fmt.Sprintf("<SessionManager> error: %s!", err))
}
exitChan <- true
}

func startSmOpenSIPS(responder *engine.Responder, loggerDb engine.LogStorage, cacheChan chan struct{}) {
var raterConn, cdrsConn engine.Connector
var client *rpcclient.RpcClient
if cfg.SmOsipsConfig.Rater == utils.INTERNAL {
<-cacheChan // Wait for the cache to init before start doing queries
raterConn = responder
} else {
var err error
for i := 0; i < cfg.SmOsipsConfig.Reconnects; i++ {
client, err = rpcclient.NewRpcClient("tcp", cfg.SmOsipsConfig.Rater, 0, cfg.SmOsipsConfig.Reconnects, utils.GOB)
if err == nil { //Connected so no need to reiterate
break
}
time.Sleep(time.Duration(i+1) * time.Second)
}
if err != nil {
engine.Logger.Crit(fmt.Sprintf("<SessionManager> Could not connect to engine: %v", err))
exitChan <- true
}
raterConn = &engine.RPCClientConnector{Client: client}
}
if cfg.SmOsipsConfig.Cdrs == cfg.SmOsipsConfig.Rater {
cdrsConn = raterConn
} else if cfg.SmOsipsConfig.Cdrs == utils.INTERNAL {
<-cacheChan // Wait for the cache to init before start doing queries
cdrsConn = responder
} else if len(cfg.SmOsipsConfig.Cdrs) != 0 {
for i := 0; i < cfg.SmOsipsConfig.Reconnects; i++ {
client, err = rpcclient.NewRpcClient("tcp", cfg.SmOsipsConfig.Cdrs, 0, cfg.SmOsipsConfig.Reconnects, utils.GOB)
if err == nil { //Connected so no need to reiterate
break
}
time.Sleep(time.Duration(i+1) * time.Second)
}
if err != nil {
engine.Logger.Crit(fmt.Sprintf("<SM-OpenSIPS> Could not connect to CDRS via RPC: %v", err))
exitChan <- true
return
}
cdrsConn = &engine.RPCClientConnector{Client: client}
}
sm, _ := sessionmanager.NewOSipsSessionManager(cfg.SmOsipsConfig, raterConn, cdrsConn)
if err = sm.Connect(); err != nil {
engine.Logger.Err(fmt.Sprintf("<SM-OpenSIPS> error: %s!", err))
}
exitChan <- true
}

func startCDRS(responder *engine.Responder, cdrDb engine.CdrStorage, mediChan, doneChan chan struct{}) {
if cfg.CDRSMediator == utils.INTERNAL {
<-mediChan // Deadlock if mediator not started
Expand Down Expand Up @@ -280,10 +366,12 @@ func serveHttp(httpWaitChans []chan struct{}) {
}

func checkConfigSanity() error {
if cfg.SMEnabled && cfg.RaterEnabled && cfg.RaterBalancer != "" {
engine.Logger.Crit("The session manager must not be enabled on a worker engine (change [engine]/balancer to disabled)!")
return errors.New("SessionManager on Worker")
}
/*
if cfg.SMEnabled && cfg.RaterEnabled && cfg.RaterBalancer != "" {
engine.Logger.Crit("The session manager must not be enabled on a worker engine (change [engine]/balancer to disabled)!")
return errors.New("SessionManager on Worker")
}
*/
if cfg.BalancerEnabled && cfg.RaterEnabled && cfg.RaterBalancer != "" {
engine.Logger.Crit("The balancer is enabled so it cannot connect to another balancer (change rater/balancer to disabled)!")
return errors.New("Improperly configured balancer")
Expand Down Expand Up @@ -389,12 +477,6 @@ func main() {
}

engine.SetRoundingDecimals(cfg.RoundingDecimals)
if cfg.SMDebitInterval > 0 {
if dp, err := time.ParseDuration(fmt.Sprintf("%vs", cfg.SMDebitInterval)); err == nil {
engine.SetDebitPeriod(dp)
}
}

stopHandled := false

// Async starts here
Expand Down Expand Up @@ -492,12 +574,20 @@ func main() {
go startCDRS(responder, cdrDb, medChan, cdrsChan)
}

if cfg.SMEnabled {
engine.Logger.Info("Starting CGRateS SessionManager service.")
go startSessionManager(responder, logDb, cacheChan)
if cfg.SmFsConfig.Enabled {
engine.Logger.Info("Starting CGRateS SM-FreeSWITCH service.")
go startSmFreeSWITCH(responder, logDb, cacheChan)
// close all sessions on shutdown
go shutdownSessionmanagerSingnalHandler()
}
if cfg.SmKamConfig.Enabled {
engine.Logger.Info("Starting CGRateS SM-Kamailio service.")
go startSmKamailio(responder, logDb, cacheChan)
}
if cfg.SmOsipsConfig.Enabled {
engine.Logger.Info("Starting CGRateS SM-OpenSIPS service.")
go startSmOpenSIPS(responder, logDb, cacheChan)
}
var cdrcEnabled bool
for _, cdrcCfgs := range cfg.CdrcProfiles {
var cdrcCfg *config.CdrcConfig
Expand Down

0 comments on commit 96d4b6c

Please sign in to comment.