From 1223e8aa9331527ce8da6d9b11b78d6f4438fb9f Mon Sep 17 00:00:00 2001 From: Mostafa Moradian Date: Sun, 5 Feb 2023 14:14:52 +0100 Subject: [PATCH 01/12] Enable multiple configs for server object --- cmd/run.go | 73 ++++++++++++++++++++++++------------------------ config/config.go | 30 ++++++++++---------- config/types.go | 2 +- gatewayd.yaml | 35 ++++++++++++----------- 4 files changed, 72 insertions(+), 68 deletions(-) diff --git a/cmd/run.go b/cmd/run.go index 5d62f435..728ff06c 100644 --- a/cmd/run.go +++ b/cmd/run.go @@ -318,62 +318,63 @@ var runCmd = &cobra.Command{ } // Create a server + serverCfg := conf.Global.Server[config.Default] server := network.NewServer( - conf.Global.Server.Network, - conf.Global.Server.Address, - conf.Global.Server.SoftLimit, - conf.Global.Server.HardLimit, - conf.Global.Server.TickInterval, + serverCfg.Network, + serverCfg.Address, + serverCfg.SoftLimit, + serverCfg.HardLimit, + serverCfg.TickInterval, []gnet.Option{ // Scheduling options - gnet.WithMulticore(conf.Global.Server.MultiCore), - gnet.WithLockOSThread(conf.Global.Server.LockOSThread), + gnet.WithMulticore(serverCfg.MultiCore), + gnet.WithLockOSThread(serverCfg.LockOSThread), // NumEventLoop overrides Multicore option. // gnet.WithNumEventLoop(1), // Can be used to send keepalive messages to the client. - gnet.WithTicker(conf.Global.Server.EnableTicker), + gnet.WithTicker(serverCfg.EnableTicker), // Internal event-loop load balancing options - gnet.WithLoadBalancing(conf.Global.Server.GetLoadBalancer()), + gnet.WithLoadBalancing(serverCfg.GetLoadBalancer()), // Buffer options - gnet.WithReadBufferCap(conf.Global.Server.ReadBufferCap), - gnet.WithWriteBufferCap(conf.Global.Server.WriteBufferCap), - gnet.WithSocketRecvBuffer(conf.Global.Server.SocketRecvBuffer), - gnet.WithSocketSendBuffer(conf.Global.Server.SocketSendBuffer), + gnet.WithReadBufferCap(serverCfg.ReadBufferCap), + gnet.WithWriteBufferCap(serverCfg.WriteBufferCap), + gnet.WithSocketRecvBuffer(serverCfg.SocketRecvBuffer), + gnet.WithSocketSendBuffer(serverCfg.SocketSendBuffer), // TCP options - gnet.WithReuseAddr(conf.Global.Server.ReuseAddress), - gnet.WithReusePort(conf.Global.Server.ReusePort), - gnet.WithTCPKeepAlive(conf.Global.Server.TCPKeepAlive), - gnet.WithTCPNoDelay(conf.Global.Server.GetTCPNoDelay()), + gnet.WithReuseAddr(serverCfg.ReuseAddress), + gnet.WithReusePort(serverCfg.ReusePort), + gnet.WithTCPKeepAlive(serverCfg.TCPKeepAlive), + gnet.WithTCPNoDelay(serverCfg.GetTCPNoDelay()), }, proxy, logger, pluginRegistry, ) - serverCfg := map[string]interface{}{ - "network": conf.Global.Server.Network, - "address": conf.Global.Server.Address, - "softLimit": conf.Global.Server.SoftLimit, - "hardLimit": conf.Global.Server.HardLimit, - "tickInterval": conf.Global.Server.TickInterval.String(), - "multiCore": conf.Global.Server.MultiCore, - "lockOSThread": conf.Global.Server.LockOSThread, - "enableTicker": conf.Global.Server.EnableTicker, - "loadBalancer": conf.Global.Server.LoadBalancer, - "readBufferCap": conf.Global.Server.ReadBufferCap, - "writeBufferCap": conf.Global.Server.WriteBufferCap, - "socketRecvBuffer": conf.Global.Server.SocketRecvBuffer, - "socketSendBuffer": conf.Global.Server.SocketSendBuffer, - "reuseAddress": conf.Global.Server.ReuseAddress, - "reusePort": conf.Global.Server.ReusePort, - "tcpKeepAlive": conf.Global.Server.TCPKeepAlive.String(), - "tcpNoDelay": conf.Global.Server.TCPNoDelay, + data = map[string]interface{}{ + "network": serverCfg.Network, + "address": serverCfg.Address, + "softLimit": serverCfg.SoftLimit, + "hardLimit": serverCfg.HardLimit, + "tickInterval": serverCfg.TickInterval.String(), + "multiCore": serverCfg.MultiCore, + "lockOSThread": serverCfg.LockOSThread, + "enableTicker": serverCfg.EnableTicker, + "loadBalancer": serverCfg.LoadBalancer, + "readBufferCap": serverCfg.ReadBufferCap, + "writeBufferCap": serverCfg.WriteBufferCap, + "socketRecvBuffer": serverCfg.SocketRecvBuffer, + "socketSendBuffer": serverCfg.SocketSendBuffer, + "reuseAddress": serverCfg.ReuseAddress, + "reusePort": serverCfg.ReusePort, + "tcpKeepAlive": serverCfg.TCPKeepAlive.String(), + "tcpNoDelay": serverCfg.TCPNoDelay, } - _, err = pluginRegistry.Run(context.Background(), serverCfg, sdkPlugin.OnNewServer) + _, err = pluginRegistry.Run(context.Background(), data, sdkPlugin.OnNewServer) if err != nil { logger.Error().Err(err).Msg("Failed to run OnNewServer hooks") } diff --git a/config/config.go b/config/config.go index 59975999..3c1ce094 100644 --- a/config/config.go +++ b/config/config.go @@ -98,20 +98,22 @@ func (c *Config) LoadDefaults() { }, }, "server": map[string]interface{}{ - "network": DefaultListenNetwork, - "address": DefaultListenAddress, - "softLimit": 0, - "hardLimit": 0, - "enableTicker": false, - "multiCore": true, - "lockOSThread": false, - "reuseAddress": true, - "reusePort": true, - "loadBalancer": DefaultLoadBalancer, - "readBufferCap": DefaultBufferSize, - "writeBufferCap": DefaultBufferSize, - "socketRecvBuffer": DefaultBufferSize, - "socketSendBuffer": DefaultBufferSize, + "default": map[string]interface{}{ + "network": DefaultListenNetwork, + "address": DefaultListenAddress, + "softLimit": 0, + "hardLimit": 0, + "enableTicker": false, + "multiCore": true, + "lockOSThread": false, + "reuseAddress": true, + "reusePort": true, + "loadBalancer": DefaultLoadBalancer, + "readBufferCap": DefaultBufferSize, + "writeBufferCap": DefaultBufferSize, + "socketRecvBuffer": DefaultBufferSize, + "socketSendBuffer": DefaultBufferSize, + }, }, "metrics": map[string]interface{}{ "default": map[string]interface{}{ diff --git a/config/types.go b/config/types.go index 4137a0bd..7f49a63e 100644 --- a/config/types.go +++ b/config/types.go @@ -103,6 +103,6 @@ type GlobalConfig struct { Clients map[string]Client `koanf:"clients"` Pools map[string]Pool `koanf:"pools"` Proxy map[string]Proxy `koanf:"proxy"` - Server Server `koanf:"server"` + Server map[string]Server `koanf:"server"` Metrics map[string]Metrics `koanf:"metrics"` } diff --git a/gatewayd.yaml b/gatewayd.yaml index 6ba51923..e398d6c3 100644 --- a/gatewayd.yaml +++ b/gatewayd.yaml @@ -41,23 +41,24 @@ proxy: healthCheckPeriod: 60s # duration server: - network: tcp - address: 0.0.0.0:15432 - softLimit: 0 - hardLimit: 0 - enableTicker: False - tickInterval: 5s # duration - multiCore: True - lockOSThread: False - loadBalancer: roundrobin - readBufferCap: 16777216 - writeBufferCap: 16777216 - socketRecvBuffer: 16777216 - socketSendBuffer: 16777216 - reuseAddress: True - reusePort: True - tcpKeepAlive: 3s # duration - tcpNoDelay: True + default: + network: tcp + address: 0.0.0.0:15432 + softLimit: 0 + hardLimit: 0 + enableTicker: False + tickInterval: 5s # duration + multiCore: True + lockOSThread: False + loadBalancer: roundrobin + readBufferCap: 16777216 + writeBufferCap: 16777216 + socketRecvBuffer: 16777216 + socketSendBuffer: 16777216 + reuseAddress: True + reusePort: True + tcpKeepAlive: 3s # duration + tcpNoDelay: True metrics: default: From fa5b406b007fd4bb97ef166ba9e78c63f8ac5c1d Mon Sep 17 00:00:00 2001 From: Mostafa Moradian Date: Sun, 5 Feb 2023 14:58:53 +0100 Subject: [PATCH 02/12] Load multiple loggers --- cmd/run.go | 52 +++++++++++++++++++++++++++++----------------------- 1 file changed, 29 insertions(+), 23 deletions(-) diff --git a/cmd/run.go b/cmd/run.go index 728ff06c..df5b7520 100644 --- a/cmd/run.go +++ b/cmd/run.go @@ -32,11 +32,13 @@ import ( ) var ( - enableSentry bool - pluginConfigFile string - globalConfigFile string - conf *config.Config - pluginRegistry *plugin.Registry + enableSentry bool + pluginConfigFile string + globalConfigFile string + conf *config.Config + pluginRegistry *plugin.Registry + + loggers = make(map[string]zerolog.Logger) healthCheckScheduler = gocron.NewScheduler(time.UTC) ) @@ -66,24 +68,28 @@ var runCmd = &cobra.Command{ // Load global and plugin configuration. conf = config.NewConfig(globalConfigFile, pluginConfigFile) - // Create a new logger from the config. - loggerCfg := conf.Global.Loggers[config.Default] - logger := logging.NewLogger(logging.LoggerConfig{ - Output: loggerCfg.GetOutput(), - Level: loggerCfg.GetLevel(), - TimeFormat: loggerCfg.GetTimeFormat(), - ConsoleTimeFormat: loggerCfg.GetConsoleTimeFormat(), - NoColor: loggerCfg.NoColor, - FileName: loggerCfg.FileName, - MaxSize: loggerCfg.MaxSize, - MaxBackups: loggerCfg.MaxBackups, - MaxAge: loggerCfg.MaxAge, - Compress: loggerCfg.Compress, - LocalTime: loggerCfg.LocalTime, - SyslogPriority: loggerCfg.GetSyslogPriority(), - RSyslogNetwork: loggerCfg.RSyslogNetwork, - RSyslogAddress: loggerCfg.RSyslogAddress, - }) + // Create and initialize loggers from the config. + for name, cfg := range conf.Global.Loggers { + loggers[name] = logging.NewLogger(logging.LoggerConfig{ + Output: cfg.GetOutput(), + Level: cfg.GetLevel(), + TimeFormat: cfg.GetTimeFormat(), + ConsoleTimeFormat: cfg.GetConsoleTimeFormat(), + NoColor: cfg.NoColor, + FileName: cfg.FileName, + MaxSize: cfg.MaxSize, + MaxBackups: cfg.MaxBackups, + MaxAge: cfg.MaxAge, + Compress: cfg.Compress, + LocalTime: cfg.LocalTime, + SyslogPriority: cfg.GetSyslogPriority(), + RSyslogNetwork: cfg.RSyslogNetwork, + RSyslogAddress: cfg.RSyslogAddress, + }) + } + + // Set the default logger. + logger := loggers[config.Default] // Create a new plugin registry. // The plugins are loaded and hooks registered before the configuration is loaded. From 40566dfbfaa538dc5ffcd30ed37667f11273f7be Mon Sep 17 00:00:00 2001 From: Mostafa Moradian Date: Sun, 5 Feb 2023 15:00:02 +0100 Subject: [PATCH 03/12] Pluralize server directive --- cmd/run.go | 2 +- config/config.go | 2 +- config/types.go | 2 +- gatewayd.yaml | 2 +- 4 files changed, 4 insertions(+), 4 deletions(-) diff --git a/cmd/run.go b/cmd/run.go index df5b7520..85cc66fc 100644 --- a/cmd/run.go +++ b/cmd/run.go @@ -324,7 +324,7 @@ var runCmd = &cobra.Command{ } // Create a server - serverCfg := conf.Global.Server[config.Default] + serverCfg := conf.Global.Servers[config.Default] server := network.NewServer( serverCfg.Network, serverCfg.Address, diff --git a/config/config.go b/config/config.go index 3c1ce094..36aae909 100644 --- a/config/config.go +++ b/config/config.go @@ -97,7 +97,7 @@ func (c *Config) LoadDefaults() { "healthCheckPeriod": DefaultHealthCheckPeriod.String(), }, }, - "server": map[string]interface{}{ + "servers": map[string]interface{}{ "default": map[string]interface{}{ "network": DefaultListenNetwork, "address": DefaultListenAddress, diff --git a/config/types.go b/config/types.go index 7f49a63e..a66d2ba5 100644 --- a/config/types.go +++ b/config/types.go @@ -103,6 +103,6 @@ type GlobalConfig struct { Clients map[string]Client `koanf:"clients"` Pools map[string]Pool `koanf:"pools"` Proxy map[string]Proxy `koanf:"proxy"` - Server map[string]Server `koanf:"server"` + Servers map[string]Server `koanf:"servers"` Metrics map[string]Metrics `koanf:"metrics"` } diff --git a/gatewayd.yaml b/gatewayd.yaml index e398d6c3..65cdca63 100644 --- a/gatewayd.yaml +++ b/gatewayd.yaml @@ -40,7 +40,7 @@ proxy: reuseElasticClients: False healthCheckPeriod: 60s # duration -server: +servers: default: network: tcp address: 0.0.0.0:15432 From b033cc12db6c67c31013146531b4216a12455268 Mon Sep 17 00:00:00 2001 From: Mostafa Moradian Date: Sun, 5 Feb 2023 15:00:26 +0100 Subject: [PATCH 04/12] Simplify passing configs to hooks --- cmd/run.go | 81 ++++++++++++++---------------------------------------- 1 file changed, 21 insertions(+), 60 deletions(-) diff --git a/cmd/run.go b/cmd/run.go index 85cc66fc..22dfbc8f 100644 --- a/cmd/run.go +++ b/cmd/run.go @@ -9,7 +9,6 @@ import ( "os" "os/signal" "strconv" - "strings" "syscall" "time" @@ -212,26 +211,14 @@ var runCmd = &cobra.Command{ }(conf.Global.Metrics[config.Default], logger) // This is a notification hook, so we don't care about the result. - data := map[string]interface{}{ - "output": strings.Join(loggerCfg.Output, ","), - "level": loggerCfg.Level, - "timeFormat": loggerCfg.TimeFormat, - "consoleTimeFormat": loggerCfg.ConsoleTimeFormat, - "noColor": loggerCfg.NoColor, - "fileName": loggerCfg.FileName, - "maxSize": loggerCfg.MaxSize, - "maxBackups": loggerCfg.MaxBackups, - "maxAge": loggerCfg.MaxAge, - "compress": loggerCfg.Compress, - "localTime": loggerCfg.LocalTime, - "rsyslogNetwork": loggerCfg.RSyslogNetwork, - "rsyslogAddress": loggerCfg.RSyslogAddress, - "syslogPriority": loggerCfg.SyslogPriority, - } // TODO: Use a context with a timeout - _, err = pluginRegistry.Run(context.Background(), data, sdkPlugin.OnNewLogger) - if err != nil { - logger.Error().Err(err).Msg("Failed to run OnNewLogger hooks") + if data, ok := conf.GlobalKoanf.Get("loggers").(map[string]interface{}); ok { + _, err = pluginRegistry.Run(context.Background(), data, sdkPlugin.OnNewLogger) + if err != nil { + logger.Error().Err(err).Msg("Failed to run OnNewLogger hooks") + } + } else { + logger.Error().Msg("Failed to get loggers from config") } // Create and initialize a pool of connections. @@ -303,24 +290,13 @@ var runCmd = &cobra.Command{ logger, ) - proxyCfg := map[string]interface{}{ - "elastic": elastic, - "reuseElasticClients": reuseElasticClients, - "healthCheckPeriod": healthCheckPeriod.String(), - "clientConfig": map[string]interface{}{ - "network": clientConfig.Network, - "address": clientConfig.Address, - "receiveBufferSize": clientConfig.ReceiveBufferSize, - "receiveChunkSize": clientConfig.ReceiveChunkSize, - "receiveDeadline": clientConfig.ReceiveDeadline.String(), - "sendDeadline": clientConfig.SendDeadline.String(), - "tcpKeepAlive": clientConfig.TCPKeepAlive, - "tcpKeepAlivePeriod": clientConfig.TCPKeepAlivePeriod.String(), - }, - } - _, err = pluginRegistry.Run(context.Background(), proxyCfg, sdkPlugin.OnNewProxy) - if err != nil { - logger.Error().Err(err).Msg("Failed to run OnNewProxy hooks") + if data, ok := conf.GlobalKoanf.Get("proxy").(map[string]interface{}); ok { + _, err = pluginRegistry.Run(context.Background(), data, sdkPlugin.OnNewProxy) + if err != nil { + logger.Error().Err(err).Msg("Failed to run OnNewProxy hooks") + } + } else { + logger.Error().Msg("Failed to get proxy from config") } // Create a server @@ -361,28 +337,13 @@ var runCmd = &cobra.Command{ pluginRegistry, ) - data = map[string]interface{}{ - "network": serverCfg.Network, - "address": serverCfg.Address, - "softLimit": serverCfg.SoftLimit, - "hardLimit": serverCfg.HardLimit, - "tickInterval": serverCfg.TickInterval.String(), - "multiCore": serverCfg.MultiCore, - "lockOSThread": serverCfg.LockOSThread, - "enableTicker": serverCfg.EnableTicker, - "loadBalancer": serverCfg.LoadBalancer, - "readBufferCap": serverCfg.ReadBufferCap, - "writeBufferCap": serverCfg.WriteBufferCap, - "socketRecvBuffer": serverCfg.SocketRecvBuffer, - "socketSendBuffer": serverCfg.SocketSendBuffer, - "reuseAddress": serverCfg.ReuseAddress, - "reusePort": serverCfg.ReusePort, - "tcpKeepAlive": serverCfg.TCPKeepAlive.String(), - "tcpNoDelay": serverCfg.TCPNoDelay, - } - _, err = pluginRegistry.Run(context.Background(), data, sdkPlugin.OnNewServer) - if err != nil { - logger.Error().Err(err).Msg("Failed to run OnNewServer hooks") + if data, ok := conf.GlobalKoanf.Get("servers").(map[string]interface{}); ok { + _, err = pluginRegistry.Run(context.Background(), data, sdkPlugin.OnNewServer) + if err != nil { + logger.Error().Err(err).Msg("Failed to run OnNewServer hooks") + } + } else { + logger.Error().Msg("Failed to get the servers configuration") } // Shutdown the server gracefully. From c63bf516067e199e0343b729bca4b6f5bca1f871 Mon Sep 17 00:00:00 2001 From: Mostafa Moradian Date: Sun, 5 Feb 2023 18:09:05 +0100 Subject: [PATCH 05/12] Fix default values --- config/constants.go | 8 ++------ config/getters.go | 2 +- 2 files changed, 3 insertions(+), 7 deletions(-) diff --git a/config/constants.go b/config/constants.go index 8b62144e..328a4aac 100644 --- a/config/constants.go +++ b/config/constants.go @@ -1,7 +1,6 @@ package config import ( - "log/syslog" "time" "github.com/rs/zerolog" @@ -53,17 +52,13 @@ const ( RSyslog ) -var ( - DefaultLogOutput = [...]string{"console"} - DefaultSyslogPriority = syslog.LOG_LOCAL0 | syslog.LOG_DEBUG //nolint:nosnakecase -) - const ( // Config constants. Default = "default" EnvPrefix = "GATEWAYD_" // Logger constants. + DefaultLogOutput = "console" DefaultLogFileName = "gatewayd.log" DefaultLogLevel = "info" DefaultTimeFormat = zerolog.TimeFormatUnix @@ -76,6 +71,7 @@ const ( DefaultSyslogTag = "gatewayd" DefaultRSyslogNetwork = "tcp" DefaultRSyslogAddress = "localhost:514" + DefaultSyslogPriority = "info" // Plugin constants. DefaultMinPort = 50000 diff --git a/config/getters.go b/config/getters.go index 828bbec3..76189a84 100644 --- a/config/getters.go +++ b/config/getters.go @@ -182,5 +182,5 @@ func (l Logger) GetSyslogPriority() syslog.Priority { if priority, ok := rSyslogPriorities[l.SyslogPriority]; ok { return priority | syslog.LOG_DAEMON } - return DefaultSyslogPriority + return syslog.LOG_DAEMON | syslog.LOG_INFO } From ff28e2dbe1cdb83e21995bf2852c00addbac9aa9 Mon Sep 17 00:00:00 2001 From: Mostafa Moradian Date: Sun, 5 Feb 2023 20:56:17 +0100 Subject: [PATCH 06/12] Enable multiple configs for pool, clients, proxy and server --- cmd/run.go | 255 +++++++++++++++++++++++++++++------------------------ 1 file changed, 138 insertions(+), 117 deletions(-) diff --git a/cmd/run.go b/cmd/run.go index 22dfbc8f..f43da2a6 100644 --- a/cmd/run.go +++ b/cmd/run.go @@ -38,6 +38,10 @@ var ( pluginRegistry *plugin.Registry loggers = make(map[string]zerolog.Logger) + pools = make(map[string]*pool.Pool) + clients = make(map[string]config.Client) + proxies = make(map[string]*network.Proxy) + servers = make(map[string]*network.Server) healthCheckScheduler = gocron.NewScheduler(time.UTC) ) @@ -221,129 +225,139 @@ var runCmd = &cobra.Command{ logger.Error().Msg("Failed to get loggers from config") } - // Create and initialize a pool of connections. - poolSize := conf.Global.Pools[config.Default].GetSize() - pool := pool.NewPool(poolSize) - - // Get client config from the config file. - clientConfig := conf.Global.Clients[config.Default] - - // Add clients to the pool. - for i := 0; i < poolSize; i++ { - client := network.NewClient(&clientConfig, logger) - - if client != nil { - clientCfg := map[string]interface{}{ - "id": client.ID, - "network": client.Network, - "address": client.Address, - "receiveBufferSize": client.ReceiveBufferSize, - "receiveChunkSize": client.ReceiveChunkSize, - "receiveDeadline": client.ReceiveDeadline.String(), - "sendDeadline": client.SendDeadline.String(), - "tcpKeepAlive": client.TCPKeepAlive, - "tcpKeepAlivePeriod": client.TCPKeepAlivePeriod.String(), - } - _, err := pluginRegistry.Run(context.Background(), clientCfg, sdkPlugin.OnNewClient) - if err != nil { - logger.Error().Err(err).Msg("Failed to run OnNewClient hooks") - } + // Create and initialize pools of connections. + for name, cfg := range conf.Global.Pools { + pools[name] = pool.NewPool(cfg.GetSize()) + + // Get client config from the config file. + if clientConfig, ok := conf.Global.Clients[name]; !ok { + // This ensures that the default client config is used if the pool name is not + // found in the clients section. + clients[name] = conf.Global.Clients[config.Default] + } else { + // Merge the default client config with the one from the pool. + clients[name] = clientConfig + } - err = pool.Put(client.ID, client) - if err != nil { - logger.Error().Err(err).Msg("Failed to add client to the pool") + // Add clients to the pool. + for i := 0; i < cfg.GetSize(); i++ { + clientConfig := clients[name] + client := network.NewClient(&clientConfig, logger) + + if client != nil { + clientCfg := map[string]interface{}{ + "id": client.ID, + "network": client.Network, + "address": client.Address, + "receiveBufferSize": client.ReceiveBufferSize, + "receiveChunkSize": client.ReceiveChunkSize, + "receiveDeadline": client.ReceiveDeadline.String(), + "sendDeadline": client.SendDeadline.String(), + "tcpKeepAlive": client.TCPKeepAlive, + "tcpKeepAlivePeriod": client.TCPKeepAlivePeriod.String(), + } + _, err := pluginRegistry.Run(context.Background(), clientCfg, sdkPlugin.OnNewClient) + if err != nil { + logger.Error().Err(err).Msg("Failed to run OnNewClient hooks") + } + + err = pools[name].Put(client.ID, client) + if err != nil { + logger.Error().Err(err).Msg("Failed to add client to the pool") + } } } - } - // Verify that the pool is properly populated. - logger.Info().Str("count", fmt.Sprint(pool.Size())).Msg( - "There are clients available in the pool") - if pool.Size() != poolSize { - logger.Error().Msg( - "The pool size is incorrect, either because " + - "the clients cannot connect due to no network connectivity " + - "or the server is not running. exiting...") - pluginRegistry.Shutdown() - os.Exit(gerr.FailedToInitializePool) - } + // Verify that the pool is properly populated. + logger.Info().Str("count", fmt.Sprint(pools[name].Size())).Msg( + "There are clients available in the pool") + if pools[name].Size() != cfg.GetSize() { + logger.Error().Msg( + "The pool size is incorrect, either because " + + "the clients cannot connect due to no network connectivity " + + "or the server is not running. exiting...") + pluginRegistry.Shutdown() + os.Exit(gerr.FailedToInitializePool) + } - _, err = pluginRegistry.Run( - context.Background(), - map[string]interface{}{"size": poolSize}, - sdkPlugin.OnNewPool) - if err != nil { - logger.Error().Err(err).Msg("Failed to run OnNewPool hooks") + _, err = pluginRegistry.Run( + context.Background(), + map[string]interface{}{"name": name, "size": cfg.GetSize()}, + sdkPlugin.OnNewPool) + if err != nil { + logger.Error().Err(err).Msg("Failed to run OnNewPool hooks") + } } - // Create a prefork proxy with the pool of clients. - elastic := conf.Global.Proxy[config.Default].Elastic - reuseElasticClients := conf.Global.Proxy[config.Default].ReuseElasticClients - healthCheckPeriod := conf.Global.Proxy[config.Default].HealthCheckPeriod - proxy := network.NewProxy( - pool, - pluginRegistry, - elastic, - reuseElasticClients, - healthCheckPeriod, - &clientConfig, - logger, - ) + // Create and initialize prefork proxies with each pool of clients. + for name, cfg := range conf.Global.Proxy { + clientConfig := clients[name] + proxies[name] = network.NewProxy( + pools[name], + pluginRegistry, + cfg.Elastic, + cfg.ReuseElasticClients, + cfg.HealthCheckPeriod, + &clientConfig, + logger, + ) - if data, ok := conf.GlobalKoanf.Get("proxy").(map[string]interface{}); ok { - _, err = pluginRegistry.Run(context.Background(), data, sdkPlugin.OnNewProxy) - if err != nil { - logger.Error().Err(err).Msg("Failed to run OnNewProxy hooks") + if data, ok := conf.GlobalKoanf.Get("proxy").(map[string]interface{}); ok { + _, err = pluginRegistry.Run(context.Background(), data, sdkPlugin.OnNewProxy) + if err != nil { + logger.Error().Err(err).Msg("Failed to run OnNewProxy hooks") + } + } else { + logger.Error().Msg("Failed to get proxy from config") } - } else { - logger.Error().Msg("Failed to get proxy from config") } - // Create a server - serverCfg := conf.Global.Servers[config.Default] - server := network.NewServer( - serverCfg.Network, - serverCfg.Address, - serverCfg.SoftLimit, - serverCfg.HardLimit, - serverCfg.TickInterval, - []gnet.Option{ - // Scheduling options - gnet.WithMulticore(serverCfg.MultiCore), - gnet.WithLockOSThread(serverCfg.LockOSThread), - // NumEventLoop overrides Multicore option. - // gnet.WithNumEventLoop(1), - - // Can be used to send keepalive messages to the client. - gnet.WithTicker(serverCfg.EnableTicker), - - // Internal event-loop load balancing options - gnet.WithLoadBalancing(serverCfg.GetLoadBalancer()), - - // Buffer options - gnet.WithReadBufferCap(serverCfg.ReadBufferCap), - gnet.WithWriteBufferCap(serverCfg.WriteBufferCap), - gnet.WithSocketRecvBuffer(serverCfg.SocketRecvBuffer), - gnet.WithSocketSendBuffer(serverCfg.SocketSendBuffer), - - // TCP options - gnet.WithReuseAddr(serverCfg.ReuseAddress), - gnet.WithReusePort(serverCfg.ReusePort), - gnet.WithTCPKeepAlive(serverCfg.TCPKeepAlive), - gnet.WithTCPNoDelay(serverCfg.GetTCPNoDelay()), - }, - proxy, - logger, - pluginRegistry, - ) + // Create and initialize servers. + for name, cfg := range conf.Global.Servers { + servers[name] = network.NewServer( + cfg.Network, + cfg.Address, + cfg.SoftLimit, + cfg.HardLimit, + cfg.TickInterval, + []gnet.Option{ + // Scheduling options + gnet.WithMulticore(cfg.MultiCore), + gnet.WithLockOSThread(cfg.LockOSThread), + // NumEventLoop overrides Multicore option. + // gnet.WithNumEventLoop(1), + + // Can be used to send keepalive messages to the client. + gnet.WithTicker(cfg.EnableTicker), + + // Internal event-loop load balancing options + gnet.WithLoadBalancing(cfg.GetLoadBalancer()), + + // Buffer options + gnet.WithReadBufferCap(cfg.ReadBufferCap), + gnet.WithWriteBufferCap(cfg.WriteBufferCap), + gnet.WithSocketRecvBuffer(cfg.SocketRecvBuffer), + gnet.WithSocketSendBuffer(cfg.SocketSendBuffer), + + // TCP options + gnet.WithReuseAddr(cfg.ReuseAddress), + gnet.WithReusePort(cfg.ReusePort), + gnet.WithTCPKeepAlive(cfg.TCPKeepAlive), + gnet.WithTCPNoDelay(cfg.GetTCPNoDelay()), + }, + proxies[name], + logger, + pluginRegistry, + ) - if data, ok := conf.GlobalKoanf.Get("servers").(map[string]interface{}); ok { - _, err = pluginRegistry.Run(context.Background(), data, sdkPlugin.OnNewServer) - if err != nil { - logger.Error().Err(err).Msg("Failed to run OnNewServer hooks") + if data, ok := conf.GlobalKoanf.Get("servers").(map[string]interface{}); ok { + _, err = pluginRegistry.Run(context.Background(), data, sdkPlugin.OnNewServer) + if err != nil { + logger.Error().Err(err).Msg("Failed to run OnNewServer hooks") + } + } else { + logger.Error().Msg("Failed to get the servers configuration") } - } else { - logger.Error().Msg("Failed to get the servers configuration") } // Shutdown the server gracefully. @@ -359,7 +373,10 @@ var runCmd = &cobra.Command{ ) signalsCh := make(chan os.Signal, 1) signal.Notify(signalsCh, signals...) - go func(pluginRegistry *plugin.Registry, logger zerolog.Logger, server *network.Server) { + go func(pluginRegistry *plugin.Registry, + logger zerolog.Logger, + servers map[string]*network.Server, + ) { for sig := range signalsCh { for _, s := range signals { if sig != s { @@ -378,21 +395,25 @@ var runCmd = &cobra.Command{ logger.Info().Msg("Stopped health check scheduler") metricsMerger.Stop() logger.Info().Msg("Stopped metrics merger") - server.Shutdown() - logger.Info().Msg("Stopped server") + for name, server := range servers { + logger.Info().Str("name", name).Msg("Stopping server") + server.Shutdown() + } + logger.Info().Msg("Stopped servers") pluginRegistry.Shutdown() logger.Info().Msg("Stopped plugin registry") os.Exit(0) } } } - }(pluginRegistry, logger, server) + }(pluginRegistry, logger, servers) - // Run the server. - if err := server.Run(); err != nil { + // Start the server. + if err := servers[config.Default].Run(); err != nil { logger.Error().Err(err).Msg("Failed to start server") + healthCheckScheduler.Clear() metricsMerger.Stop() - server.Shutdown() + servers[config.Default].Shutdown() pluginRegistry.Shutdown() os.Exit(gerr.FailedToStartServer) } From 961b2789b4e52bdf262323898de6189ed5861e34 Mon Sep 17 00:00:00 2001 From: Mostafa Moradian Date: Sun, 5 Feb 2023 21:16:19 +0100 Subject: [PATCH 07/12] Add pool name to log output --- cmd/run.go | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/cmd/run.go b/cmd/run.go index f43da2a6..0fa44062 100644 --- a/cmd/run.go +++ b/cmd/run.go @@ -269,7 +269,10 @@ var runCmd = &cobra.Command{ } // Verify that the pool is properly populated. - logger.Info().Str("count", fmt.Sprint(pools[name].Size())).Msg( + logger.Info().Fields(map[string]interface{}{ + "name": name, + "count": fmt.Sprint(pools[name].Size()), + }).Msg( "There are clients available in the pool") if pools[name].Size() != cfg.GetSize() { logger.Error().Msg( From 9329d43fc32cd524a50db6a02962e03a0eabe945 Mon Sep 17 00:00:00 2001 From: Mostafa Moradian Date: Sun, 5 Feb 2023 21:49:01 +0100 Subject: [PATCH 08/12] Start multiple servers at once --- cmd/run.go | 23 +++++++++++++++-------- 1 file changed, 15 insertions(+), 8 deletions(-) diff --git a/cmd/run.go b/cmd/run.go index 0fa44062..32fd7ee3 100644 --- a/cmd/run.go +++ b/cmd/run.go @@ -302,7 +302,7 @@ var runCmd = &cobra.Command{ cfg.ReuseElasticClients, cfg.HealthCheckPeriod, &clientConfig, - logger, + loggers[name], ) if data, ok := conf.GlobalKoanf.Get("proxy").(map[string]interface{}); ok { @@ -412,14 +412,21 @@ var runCmd = &cobra.Command{ }(pluginRegistry, logger, servers) // Start the server. - if err := servers[config.Default].Run(); err != nil { - logger.Error().Err(err).Msg("Failed to start server") - healthCheckScheduler.Clear() - metricsMerger.Stop() - servers[config.Default].Shutdown() - pluginRegistry.Shutdown() - os.Exit(gerr.FailedToStartServer) + for _, server := range servers { + go func(server *network.Server) { + if err := server.Run(); err != nil { + logger.Error().Err(err).Msg("Failed to start server") + healthCheckScheduler.Clear() + metricsMerger.Stop() + server.Shutdown() + pluginRegistry.Shutdown() + os.Exit(gerr.FailedToStartServer) + } + }(server) } + + // Wait for the server to shutdown. + <-make(chan struct{}) }, } From b22a1e4703f8d4cfbd214bd7c2c6587c3052797d Mon Sep 17 00:00:00 2001 From: Mostafa Moradian Date: Sun, 5 Feb 2023 21:57:49 +0100 Subject: [PATCH 09/12] Pluralize proxy directive --- cmd/run.go | 2 +- config/types.go | 2 +- gatewayd.yaml | 2 +- 3 files changed, 3 insertions(+), 3 deletions(-) diff --git a/cmd/run.go b/cmd/run.go index 32fd7ee3..d9e5f46b 100644 --- a/cmd/run.go +++ b/cmd/run.go @@ -293,7 +293,7 @@ var runCmd = &cobra.Command{ } // Create and initialize prefork proxies with each pool of clients. - for name, cfg := range conf.Global.Proxy { + for name, cfg := range conf.Global.Proxies { clientConfig := clients[name] proxies[name] = network.NewProxy( pools[name], diff --git a/config/types.go b/config/types.go index a66d2ba5..e503df85 100644 --- a/config/types.go +++ b/config/types.go @@ -102,7 +102,7 @@ type GlobalConfig struct { Loggers map[string]Logger `koanf:"loggers"` Clients map[string]Client `koanf:"clients"` Pools map[string]Pool `koanf:"pools"` - Proxy map[string]Proxy `koanf:"proxy"` + Proxies map[string]Proxy `koanf:"proxies"` Servers map[string]Server `koanf:"servers"` Metrics map[string]Metrics `koanf:"metrics"` } diff --git a/gatewayd.yaml b/gatewayd.yaml index 65cdca63..50c8bce2 100644 --- a/gatewayd.yaml +++ b/gatewayd.yaml @@ -34,7 +34,7 @@ pools: default: size: 10 -proxy: +proxies: default: elastic: False reuseElasticClients: False From 107018613ff7451db6a06feeb5e62d9d69585e4d Mon Sep 17 00:00:00 2001 From: Mostafa Moradian Date: Sun, 5 Feb 2023 22:02:04 +0100 Subject: [PATCH 10/12] Add TODO --- cmd/run.go | 2 ++ 1 file changed, 2 insertions(+) diff --git a/cmd/run.go b/cmd/run.go index d9e5f46b..9f86c5ec 100644 --- a/cmd/run.go +++ b/cmd/run.go @@ -157,6 +157,8 @@ var runCmd = &cobra.Command{ } // Start the metrics server if enabled. + // TODO: Start multiple metrics servers. For now, only one default is supported. + // I should first find a use case for those multiple metrics servers. go func(metricsConfig config.Metrics, logger zerolog.Logger) { // TODO: refactor this to a separate function. if !metricsConfig.Enabled { From 488fe59aa5ddc12971347d64373bcf57b37eefce Mon Sep 17 00:00:00 2001 From: Mostafa Moradian Date: Sun, 5 Feb 2023 22:12:17 +0100 Subject: [PATCH 11/12] Use custom logger --- cmd/run.go | 12 ++++++++---- 1 file changed, 8 insertions(+), 4 deletions(-) diff --git a/cmd/run.go b/cmd/run.go index 9f86c5ec..8c14c8bd 100644 --- a/cmd/run.go +++ b/cmd/run.go @@ -244,6 +244,7 @@ var runCmd = &cobra.Command{ // Add clients to the pool. for i := 0; i < cfg.GetSize(); i++ { clientConfig := clients[name] + logger := loggers[name] client := network.NewClient(&clientConfig, logger) if client != nil { @@ -296,6 +297,7 @@ var runCmd = &cobra.Command{ // Create and initialize prefork proxies with each pool of clients. for name, cfg := range conf.Global.Proxies { + logger := loggers[name] clientConfig := clients[name] proxies[name] = network.NewProxy( pools[name], @@ -304,7 +306,7 @@ var runCmd = &cobra.Command{ cfg.ReuseElasticClients, cfg.HealthCheckPeriod, &clientConfig, - loggers[name], + logger, ) if data, ok := conf.GlobalKoanf.Get("proxy").(map[string]interface{}); ok { @@ -319,6 +321,7 @@ var runCmd = &cobra.Command{ // Create and initialize servers. for name, cfg := range conf.Global.Servers { + logger := loggers[name] servers[name] = network.NewServer( cfg.Network, cfg.Address, @@ -414,8 +417,9 @@ var runCmd = &cobra.Command{ }(pluginRegistry, logger, servers) // Start the server. - for _, server := range servers { - go func(server *network.Server) { + for name, server := range servers { + logger := loggers[name] + go func(server *network.Server, logger zerolog.Logger) { if err := server.Run(); err != nil { logger.Error().Err(err).Msg("Failed to start server") healthCheckScheduler.Clear() @@ -424,7 +428,7 @@ var runCmd = &cobra.Command{ pluginRegistry.Shutdown() os.Exit(gerr.FailedToStartServer) } - }(server) + }(server, logger) } // Wait for the server to shutdown. From 7c6f785c143003ec3ac9e336389ef3e0b1f0eeb9 Mon Sep 17 00:00:00 2001 From: Mostafa Moradian Date: Sun, 5 Feb 2023 22:19:20 +0100 Subject: [PATCH 12/12] Use custom logger for each pool, its clients and others --- cmd/run.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cmd/run.go b/cmd/run.go index 8c14c8bd..abef4099 100644 --- a/cmd/run.go +++ b/cmd/run.go @@ -229,6 +229,7 @@ var runCmd = &cobra.Command{ // Create and initialize pools of connections. for name, cfg := range conf.Global.Pools { + logger := loggers[name] pools[name] = pool.NewPool(cfg.GetSize()) // Get client config from the config file. @@ -244,7 +245,6 @@ var runCmd = &cobra.Command{ // Add clients to the pool. for i := 0; i < cfg.GetSize(); i++ { clientConfig := clients[name] - logger := loggers[name] client := network.NewClient(&clientConfig, logger) if client != nil {