diff --git a/cmd/run.go b/cmd/run.go index 5d62f435..abef4099 100644 --- a/cmd/run.go +++ b/cmd/run.go @@ -9,7 +9,6 @@ import ( "os" "os/signal" "strconv" - "strings" "syscall" "time" @@ -32,11 +31,17 @@ import ( ) var ( - enableSentry bool - pluginConfigFile string - globalConfigFile string - conf *config.Config - pluginRegistry *plugin.Registry + enableSentry bool + pluginConfigFile string + globalConfigFile string + conf *config.Config + pluginRegistry *plugin.Registry + + loggers = make(map[string]zerolog.Logger) + pools = make(map[string]*pool.Pool) + clients = make(map[string]config.Client) + proxies = make(map[string]*network.Proxy) + servers = make(map[string]*network.Server) healthCheckScheduler = gocron.NewScheduler(time.UTC) ) @@ -66,24 +71,28 @@ var runCmd = &cobra.Command{ // Load global and plugin configuration. conf = config.NewConfig(globalConfigFile, pluginConfigFile) - // Create a new logger from the config. - loggerCfg := conf.Global.Loggers[config.Default] - logger := logging.NewLogger(logging.LoggerConfig{ - Output: loggerCfg.GetOutput(), - Level: loggerCfg.GetLevel(), - TimeFormat: loggerCfg.GetTimeFormat(), - ConsoleTimeFormat: loggerCfg.GetConsoleTimeFormat(), - NoColor: loggerCfg.NoColor, - FileName: loggerCfg.FileName, - MaxSize: loggerCfg.MaxSize, - MaxBackups: loggerCfg.MaxBackups, - MaxAge: loggerCfg.MaxAge, - Compress: loggerCfg.Compress, - LocalTime: loggerCfg.LocalTime, - SyslogPriority: loggerCfg.GetSyslogPriority(), - RSyslogNetwork: loggerCfg.RSyslogNetwork, - RSyslogAddress: loggerCfg.RSyslogAddress, - }) + // Create and initialize loggers from the config. + for name, cfg := range conf.Global.Loggers { + loggers[name] = logging.NewLogger(logging.LoggerConfig{ + Output: cfg.GetOutput(), + Level: cfg.GetLevel(), + TimeFormat: cfg.GetTimeFormat(), + ConsoleTimeFormat: cfg.GetConsoleTimeFormat(), + NoColor: cfg.NoColor, + FileName: cfg.FileName, + MaxSize: cfg.MaxSize, + MaxBackups: cfg.MaxBackups, + MaxAge: cfg.MaxAge, + Compress: cfg.Compress, + LocalTime: cfg.LocalTime, + SyslogPriority: cfg.GetSyslogPriority(), + RSyslogNetwork: cfg.RSyslogNetwork, + RSyslogAddress: cfg.RSyslogAddress, + }) + } + + // Set the default logger. + logger := loggers[config.Default] // Create a new plugin registry. // The plugins are loaded and hooks registered before the configuration is loaded. @@ -148,6 +157,8 @@ var runCmd = &cobra.Command{ } // Start the metrics server if enabled. + // TODO: Start multiple metrics servers. For now, only one default is supported. + // I should first find a use case for those multiple metrics servers. go func(metricsConfig config.Metrics, logger zerolog.Logger) { // TODO: refactor this to a separate function. if !metricsConfig.Enabled { @@ -206,176 +217,155 @@ var runCmd = &cobra.Command{ }(conf.Global.Metrics[config.Default], logger) // This is a notification hook, so we don't care about the result. - data := map[string]interface{}{ - "output": strings.Join(loggerCfg.Output, ","), - "level": loggerCfg.Level, - "timeFormat": loggerCfg.TimeFormat, - "consoleTimeFormat": loggerCfg.ConsoleTimeFormat, - "noColor": loggerCfg.NoColor, - "fileName": loggerCfg.FileName, - "maxSize": loggerCfg.MaxSize, - "maxBackups": loggerCfg.MaxBackups, - "maxAge": loggerCfg.MaxAge, - "compress": loggerCfg.Compress, - "localTime": loggerCfg.LocalTime, - "rsyslogNetwork": loggerCfg.RSyslogNetwork, - "rsyslogAddress": loggerCfg.RSyslogAddress, - "syslogPriority": loggerCfg.SyslogPriority, - } // TODO: Use a context with a timeout - _, err = pluginRegistry.Run(context.Background(), data, sdkPlugin.OnNewLogger) - if err != nil { - logger.Error().Err(err).Msg("Failed to run OnNewLogger hooks") + if data, ok := conf.GlobalKoanf.Get("loggers").(map[string]interface{}); ok { + _, err = pluginRegistry.Run(context.Background(), data, sdkPlugin.OnNewLogger) + if err != nil { + logger.Error().Err(err).Msg("Failed to run OnNewLogger hooks") + } + } else { + logger.Error().Msg("Failed to get loggers from config") } - // Create and initialize a pool of connections. - poolSize := conf.Global.Pools[config.Default].GetSize() - pool := pool.NewPool(poolSize) - - // Get client config from the config file. - clientConfig := conf.Global.Clients[config.Default] - - // Add clients to the pool. - for i := 0; i < poolSize; i++ { - client := network.NewClient(&clientConfig, logger) - - if client != nil { - clientCfg := map[string]interface{}{ - "id": client.ID, - "network": client.Network, - "address": client.Address, - "receiveBufferSize": client.ReceiveBufferSize, - "receiveChunkSize": client.ReceiveChunkSize, - "receiveDeadline": client.ReceiveDeadline.String(), - "sendDeadline": client.SendDeadline.String(), - "tcpKeepAlive": client.TCPKeepAlive, - "tcpKeepAlivePeriod": client.TCPKeepAlivePeriod.String(), - } - _, err := pluginRegistry.Run(context.Background(), clientCfg, sdkPlugin.OnNewClient) - if err != nil { - logger.Error().Err(err).Msg("Failed to run OnNewClient hooks") - } + // Create and initialize pools of connections. + for name, cfg := range conf.Global.Pools { + logger := loggers[name] + pools[name] = pool.NewPool(cfg.GetSize()) + + // Get client config from the config file. + if clientConfig, ok := conf.Global.Clients[name]; !ok { + // This ensures that the default client config is used if the pool name is not + // found in the clients section. + clients[name] = conf.Global.Clients[config.Default] + } else { + // Merge the default client config with the one from the pool. + clients[name] = clientConfig + } - err = pool.Put(client.ID, client) - if err != nil { - logger.Error().Err(err).Msg("Failed to add client to the pool") + // Add clients to the pool. + for i := 0; i < cfg.GetSize(); i++ { + clientConfig := clients[name] + client := network.NewClient(&clientConfig, logger) + + if client != nil { + clientCfg := map[string]interface{}{ + "id": client.ID, + "network": client.Network, + "address": client.Address, + "receiveBufferSize": client.ReceiveBufferSize, + "receiveChunkSize": client.ReceiveChunkSize, + "receiveDeadline": client.ReceiveDeadline.String(), + "sendDeadline": client.SendDeadline.String(), + "tcpKeepAlive": client.TCPKeepAlive, + "tcpKeepAlivePeriod": client.TCPKeepAlivePeriod.String(), + } + _, err := pluginRegistry.Run(context.Background(), clientCfg, sdkPlugin.OnNewClient) + if err != nil { + logger.Error().Err(err).Msg("Failed to run OnNewClient hooks") + } + + err = pools[name].Put(client.ID, client) + if err != nil { + logger.Error().Err(err).Msg("Failed to add client to the pool") + } } } - } - // Verify that the pool is properly populated. - logger.Info().Str("count", fmt.Sprint(pool.Size())).Msg( - "There are clients available in the pool") - if pool.Size() != poolSize { - logger.Error().Msg( - "The pool size is incorrect, either because " + - "the clients cannot connect due to no network connectivity " + - "or the server is not running. exiting...") - pluginRegistry.Shutdown() - os.Exit(gerr.FailedToInitializePool) - } + // Verify that the pool is properly populated. + logger.Info().Fields(map[string]interface{}{ + "name": name, + "count": fmt.Sprint(pools[name].Size()), + }).Msg( + "There are clients available in the pool") + if pools[name].Size() != cfg.GetSize() { + logger.Error().Msg( + "The pool size is incorrect, either because " + + "the clients cannot connect due to no network connectivity " + + "or the server is not running. exiting...") + pluginRegistry.Shutdown() + os.Exit(gerr.FailedToInitializePool) + } - _, err = pluginRegistry.Run( - context.Background(), - map[string]interface{}{"size": poolSize}, - sdkPlugin.OnNewPool) - if err != nil { - logger.Error().Err(err).Msg("Failed to run OnNewPool hooks") + _, err = pluginRegistry.Run( + context.Background(), + map[string]interface{}{"name": name, "size": cfg.GetSize()}, + sdkPlugin.OnNewPool) + if err != nil { + logger.Error().Err(err).Msg("Failed to run OnNewPool hooks") + } } - // Create a prefork proxy with the pool of clients. - elastic := conf.Global.Proxy[config.Default].Elastic - reuseElasticClients := conf.Global.Proxy[config.Default].ReuseElasticClients - healthCheckPeriod := conf.Global.Proxy[config.Default].HealthCheckPeriod - proxy := network.NewProxy( - pool, - pluginRegistry, - elastic, - reuseElasticClients, - healthCheckPeriod, - &clientConfig, - logger, - ) + // Create and initialize prefork proxies with each pool of clients. + for name, cfg := range conf.Global.Proxies { + logger := loggers[name] + clientConfig := clients[name] + proxies[name] = network.NewProxy( + pools[name], + pluginRegistry, + cfg.Elastic, + cfg.ReuseElasticClients, + cfg.HealthCheckPeriod, + &clientConfig, + logger, + ) - proxyCfg := map[string]interface{}{ - "elastic": elastic, - "reuseElasticClients": reuseElasticClients, - "healthCheckPeriod": healthCheckPeriod.String(), - "clientConfig": map[string]interface{}{ - "network": clientConfig.Network, - "address": clientConfig.Address, - "receiveBufferSize": clientConfig.ReceiveBufferSize, - "receiveChunkSize": clientConfig.ReceiveChunkSize, - "receiveDeadline": clientConfig.ReceiveDeadline.String(), - "sendDeadline": clientConfig.SendDeadline.String(), - "tcpKeepAlive": clientConfig.TCPKeepAlive, - "tcpKeepAlivePeriod": clientConfig.TCPKeepAlivePeriod.String(), - }, - } - _, err = pluginRegistry.Run(context.Background(), proxyCfg, sdkPlugin.OnNewProxy) - if err != nil { - logger.Error().Err(err).Msg("Failed to run OnNewProxy hooks") + if data, ok := conf.GlobalKoanf.Get("proxy").(map[string]interface{}); ok { + _, err = pluginRegistry.Run(context.Background(), data, sdkPlugin.OnNewProxy) + if err != nil { + logger.Error().Err(err).Msg("Failed to run OnNewProxy hooks") + } + } else { + logger.Error().Msg("Failed to get proxy from config") + } } - // Create a server - server := network.NewServer( - conf.Global.Server.Network, - conf.Global.Server.Address, - conf.Global.Server.SoftLimit, - conf.Global.Server.HardLimit, - conf.Global.Server.TickInterval, - []gnet.Option{ - // Scheduling options - gnet.WithMulticore(conf.Global.Server.MultiCore), - gnet.WithLockOSThread(conf.Global.Server.LockOSThread), - // NumEventLoop overrides Multicore option. - // gnet.WithNumEventLoop(1), - - // Can be used to send keepalive messages to the client. - gnet.WithTicker(conf.Global.Server.EnableTicker), - - // Internal event-loop load balancing options - gnet.WithLoadBalancing(conf.Global.Server.GetLoadBalancer()), - - // Buffer options - gnet.WithReadBufferCap(conf.Global.Server.ReadBufferCap), - gnet.WithWriteBufferCap(conf.Global.Server.WriteBufferCap), - gnet.WithSocketRecvBuffer(conf.Global.Server.SocketRecvBuffer), - gnet.WithSocketSendBuffer(conf.Global.Server.SocketSendBuffer), - - // TCP options - gnet.WithReuseAddr(conf.Global.Server.ReuseAddress), - gnet.WithReusePort(conf.Global.Server.ReusePort), - gnet.WithTCPKeepAlive(conf.Global.Server.TCPKeepAlive), - gnet.WithTCPNoDelay(conf.Global.Server.GetTCPNoDelay()), - }, - proxy, - logger, - pluginRegistry, - ) + // Create and initialize servers. + for name, cfg := range conf.Global.Servers { + logger := loggers[name] + servers[name] = network.NewServer( + cfg.Network, + cfg.Address, + cfg.SoftLimit, + cfg.HardLimit, + cfg.TickInterval, + []gnet.Option{ + // Scheduling options + gnet.WithMulticore(cfg.MultiCore), + gnet.WithLockOSThread(cfg.LockOSThread), + // NumEventLoop overrides Multicore option. + // gnet.WithNumEventLoop(1), + + // Can be used to send keepalive messages to the client. + gnet.WithTicker(cfg.EnableTicker), + + // Internal event-loop load balancing options + gnet.WithLoadBalancing(cfg.GetLoadBalancer()), + + // Buffer options + gnet.WithReadBufferCap(cfg.ReadBufferCap), + gnet.WithWriteBufferCap(cfg.WriteBufferCap), + gnet.WithSocketRecvBuffer(cfg.SocketRecvBuffer), + gnet.WithSocketSendBuffer(cfg.SocketSendBuffer), + + // TCP options + gnet.WithReuseAddr(cfg.ReuseAddress), + gnet.WithReusePort(cfg.ReusePort), + gnet.WithTCPKeepAlive(cfg.TCPKeepAlive), + gnet.WithTCPNoDelay(cfg.GetTCPNoDelay()), + }, + proxies[name], + logger, + pluginRegistry, + ) - serverCfg := map[string]interface{}{ - "network": conf.Global.Server.Network, - "address": conf.Global.Server.Address, - "softLimit": conf.Global.Server.SoftLimit, - "hardLimit": conf.Global.Server.HardLimit, - "tickInterval": conf.Global.Server.TickInterval.String(), - "multiCore": conf.Global.Server.MultiCore, - "lockOSThread": conf.Global.Server.LockOSThread, - "enableTicker": conf.Global.Server.EnableTicker, - "loadBalancer": conf.Global.Server.LoadBalancer, - "readBufferCap": conf.Global.Server.ReadBufferCap, - "writeBufferCap": conf.Global.Server.WriteBufferCap, - "socketRecvBuffer": conf.Global.Server.SocketRecvBuffer, - "socketSendBuffer": conf.Global.Server.SocketSendBuffer, - "reuseAddress": conf.Global.Server.ReuseAddress, - "reusePort": conf.Global.Server.ReusePort, - "tcpKeepAlive": conf.Global.Server.TCPKeepAlive.String(), - "tcpNoDelay": conf.Global.Server.TCPNoDelay, - } - _, err = pluginRegistry.Run(context.Background(), serverCfg, sdkPlugin.OnNewServer) - if err != nil { - logger.Error().Err(err).Msg("Failed to run OnNewServer hooks") + if data, ok := conf.GlobalKoanf.Get("servers").(map[string]interface{}); ok { + _, err = pluginRegistry.Run(context.Background(), data, sdkPlugin.OnNewServer) + if err != nil { + logger.Error().Err(err).Msg("Failed to run OnNewServer hooks") + } + } else { + logger.Error().Msg("Failed to get the servers configuration") + } } // Shutdown the server gracefully. @@ -391,7 +381,10 @@ var runCmd = &cobra.Command{ ) signalsCh := make(chan os.Signal, 1) signal.Notify(signalsCh, signals...) - go func(pluginRegistry *plugin.Registry, logger zerolog.Logger, server *network.Server) { + go func(pluginRegistry *plugin.Registry, + logger zerolog.Logger, + servers map[string]*network.Server, + ) { for sig := range signalsCh { for _, s := range signals { if sig != s { @@ -410,24 +403,36 @@ var runCmd = &cobra.Command{ logger.Info().Msg("Stopped health check scheduler") metricsMerger.Stop() logger.Info().Msg("Stopped metrics merger") - server.Shutdown() - logger.Info().Msg("Stopped server") + for name, server := range servers { + logger.Info().Str("name", name).Msg("Stopping server") + server.Shutdown() + } + logger.Info().Msg("Stopped servers") pluginRegistry.Shutdown() logger.Info().Msg("Stopped plugin registry") os.Exit(0) } } } - }(pluginRegistry, logger, server) - - // Run the server. - if err := server.Run(); err != nil { - logger.Error().Err(err).Msg("Failed to start server") - metricsMerger.Stop() - server.Shutdown() - pluginRegistry.Shutdown() - os.Exit(gerr.FailedToStartServer) + }(pluginRegistry, logger, servers) + + // Start the server. + for name, server := range servers { + logger := loggers[name] + go func(server *network.Server, logger zerolog.Logger) { + if err := server.Run(); err != nil { + logger.Error().Err(err).Msg("Failed to start server") + healthCheckScheduler.Clear() + metricsMerger.Stop() + server.Shutdown() + pluginRegistry.Shutdown() + os.Exit(gerr.FailedToStartServer) + } + }(server, logger) } + + // Wait for the server to shutdown. + <-make(chan struct{}) }, } diff --git a/config/config.go b/config/config.go index 59975999..36aae909 100644 --- a/config/config.go +++ b/config/config.go @@ -97,21 +97,23 @@ func (c *Config) LoadDefaults() { "healthCheckPeriod": DefaultHealthCheckPeriod.String(), }, }, - "server": map[string]interface{}{ - "network": DefaultListenNetwork, - "address": DefaultListenAddress, - "softLimit": 0, - "hardLimit": 0, - "enableTicker": false, - "multiCore": true, - "lockOSThread": false, - "reuseAddress": true, - "reusePort": true, - "loadBalancer": DefaultLoadBalancer, - "readBufferCap": DefaultBufferSize, - "writeBufferCap": DefaultBufferSize, - "socketRecvBuffer": DefaultBufferSize, - "socketSendBuffer": DefaultBufferSize, + "servers": map[string]interface{}{ + "default": map[string]interface{}{ + "network": DefaultListenNetwork, + "address": DefaultListenAddress, + "softLimit": 0, + "hardLimit": 0, + "enableTicker": false, + "multiCore": true, + "lockOSThread": false, + "reuseAddress": true, + "reusePort": true, + "loadBalancer": DefaultLoadBalancer, + "readBufferCap": DefaultBufferSize, + "writeBufferCap": DefaultBufferSize, + "socketRecvBuffer": DefaultBufferSize, + "socketSendBuffer": DefaultBufferSize, + }, }, "metrics": map[string]interface{}{ "default": map[string]interface{}{ diff --git a/config/constants.go b/config/constants.go index 8b62144e..328a4aac 100644 --- a/config/constants.go +++ b/config/constants.go @@ -1,7 +1,6 @@ package config import ( - "log/syslog" "time" "github.com/rs/zerolog" @@ -53,17 +52,13 @@ const ( RSyslog ) -var ( - DefaultLogOutput = [...]string{"console"} - DefaultSyslogPriority = syslog.LOG_LOCAL0 | syslog.LOG_DEBUG //nolint:nosnakecase -) - const ( // Config constants. Default = "default" EnvPrefix = "GATEWAYD_" // Logger constants. + DefaultLogOutput = "console" DefaultLogFileName = "gatewayd.log" DefaultLogLevel = "info" DefaultTimeFormat = zerolog.TimeFormatUnix @@ -76,6 +71,7 @@ const ( DefaultSyslogTag = "gatewayd" DefaultRSyslogNetwork = "tcp" DefaultRSyslogAddress = "localhost:514" + DefaultSyslogPriority = "info" // Plugin constants. DefaultMinPort = 50000 diff --git a/config/getters.go b/config/getters.go index 828bbec3..76189a84 100644 --- a/config/getters.go +++ b/config/getters.go @@ -182,5 +182,5 @@ func (l Logger) GetSyslogPriority() syslog.Priority { if priority, ok := rSyslogPriorities[l.SyslogPriority]; ok { return priority | syslog.LOG_DAEMON } - return DefaultSyslogPriority + return syslog.LOG_DAEMON | syslog.LOG_INFO } diff --git a/config/types.go b/config/types.go index 4137a0bd..e503df85 100644 --- a/config/types.go +++ b/config/types.go @@ -102,7 +102,7 @@ type GlobalConfig struct { Loggers map[string]Logger `koanf:"loggers"` Clients map[string]Client `koanf:"clients"` Pools map[string]Pool `koanf:"pools"` - Proxy map[string]Proxy `koanf:"proxy"` - Server Server `koanf:"server"` + Proxies map[string]Proxy `koanf:"proxies"` + Servers map[string]Server `koanf:"servers"` Metrics map[string]Metrics `koanf:"metrics"` } diff --git a/gatewayd.yaml b/gatewayd.yaml index 6ba51923..50c8bce2 100644 --- a/gatewayd.yaml +++ b/gatewayd.yaml @@ -34,30 +34,31 @@ pools: default: size: 10 -proxy: +proxies: default: elastic: False reuseElasticClients: False healthCheckPeriod: 60s # duration -server: - network: tcp - address: 0.0.0.0:15432 - softLimit: 0 - hardLimit: 0 - enableTicker: False - tickInterval: 5s # duration - multiCore: True - lockOSThread: False - loadBalancer: roundrobin - readBufferCap: 16777216 - writeBufferCap: 16777216 - socketRecvBuffer: 16777216 - socketSendBuffer: 16777216 - reuseAddress: True - reusePort: True - tcpKeepAlive: 3s # duration - tcpNoDelay: True +servers: + default: + network: tcp + address: 0.0.0.0:15432 + softLimit: 0 + hardLimit: 0 + enableTicker: False + tickInterval: 5s # duration + multiCore: True + lockOSThread: False + loadBalancer: roundrobin + readBufferCap: 16777216 + writeBufferCap: 16777216 + socketRecvBuffer: 16777216 + socketSendBuffer: 16777216 + reuseAddress: True + reusePort: True + tcpKeepAlive: 3s # duration + tcpNoDelay: True metrics: default: