diff --git a/.golangci.yaml b/.golangci.yaml index ea238f46..6c85d54e 100644 --- a/.golangci.yaml +++ b/.golangci.yaml @@ -14,4 +14,5 @@ linters: - gocognit - gochecknoinits - gocyclo - + - maligned + \ No newline at end of file diff --git a/cmd/config_parser.go b/cmd/config_parser.go deleted file mode 100644 index f1768830..00000000 --- a/cmd/config_parser.go +++ /dev/null @@ -1,277 +0,0 @@ -package cmd - -import ( - "os" - "time" - - "github.com/gatewayd-io/gatewayd/logging" - "github.com/gatewayd-io/gatewayd/network" - "github.com/gatewayd-io/gatewayd/plugin" - "github.com/knadh/koanf" - "github.com/panjf2000/gnet/v2" - "github.com/rs/zerolog" -) - -// Global koanf instance. Using "." as the key path delimiter. -var globalConfig = koanf.New(".") - -// Plugin koanf instance. Using "." as the key path delimiter. -var pluginConfig = koanf.New(".") - -// getPath returns the path to the referenced config value. -func getPath(path string) string { - ref := globalConfig.String(path) - if globalConfig.Exists(path) && globalConfig.StringMap(ref) != nil { - return ref - } - - return path -} - -// func resolvePath(path string) map[string]string { -// ref := getPath(path) -// if ref != path { -// return konfig.StringMap(ref) -// } -// return nil -// } - -// verificationPolicy returns the hook verification policy from plugin config file. -func verificationPolicy() plugin.Policy { - vPolicy := pluginConfig.String("plugins.verificationPolicy") - verificationPolicy := plugin.PassDown // default - switch vPolicy { - case "ignore": - verificationPolicy = plugin.Ignore - case "abort": - verificationPolicy = plugin.Abort - case "remove": - verificationPolicy = plugin.Remove - } - - return verificationPolicy -} - -// pluginCompatPolicy returns the plugin compatibility policy from plugin config file. -func pluginCompatPolicy() plugin.CompatPolicy { - vPolicy := pluginConfig.String("plugins.compatibilityPolicy") - compatPolicy := plugin.Strict // default - switch vPolicy { - case "strict": - compatPolicy = plugin.Strict - case "loose": - compatPolicy = plugin.Loose - } - - return compatPolicy -} - -// loggerConfig returns the logger config from config file. -func loggerConfig() logging.LoggerConfig { - cfg := logging.LoggerConfig{StartupMsg: true} - switch globalConfig.String("loggers.logger.output") { - case "stdout": - cfg.Output = os.Stdout - case "console": - default: - cfg.Output = nil - } - - switch globalConfig.String("loggers.logger.timeFormat") { - case "unixms": - cfg.TimeFormat = zerolog.TimeFormatUnixMs - case "unixmicro": - cfg.TimeFormat = zerolog.TimeFormatUnixMicro - case "unixnano": - cfg.TimeFormat = zerolog.TimeFormatUnixNano - case "unix": - cfg.TimeFormat = zerolog.TimeFormatUnix - default: - cfg.TimeFormat = zerolog.TimeFormatUnix - } - - switch globalConfig.String("loggers.logger.level") { - case "debug": - cfg.Level = zerolog.DebugLevel - case "info": - cfg.Level = zerolog.InfoLevel - case "warn": - cfg.Level = zerolog.WarnLevel - case "error": - cfg.Level = zerolog.ErrorLevel - case "fatal": - cfg.Level = zerolog.FatalLevel - case "panic": - cfg.Level = zerolog.PanicLevel - case "disabled": - cfg.Level = zerolog.Disabled - case "trace": - cfg.Level = zerolog.TraceLevel - default: - cfg.Level = zerolog.InfoLevel - } - - cfg.NoColor = globalConfig.Bool("loggers.logger.noColor") - - return cfg -} - -// serverConfig returns the pool config from config file. -func poolConfig() (int, *network.Client) { - poolSize := globalConfig.Int("pool.size") - if poolSize == 0 { - poolSize = network.DefaultPoolSize - } - - // Minimum pool size is 2. - if poolSize < network.MinimumPoolSize { - poolSize = network.MinimumPoolSize - } - - ref := getPath("pool.client") - net := globalConfig.String(ref + ".network") - address := globalConfig.String(ref + ".address") - receiveBufferSize := globalConfig.Int(ref + ".receiveBufferSize") - receiveChunkSize := globalConfig.Int(ref + ".receiveChunkSize") - receiveDeadline := globalConfig.Duration(ref + ".receiveDeadline") - sendDeadline := globalConfig.Duration(ref + ".sendDeadline") - tcpKeepAlive := globalConfig.Bool(ref + ".tcpKeepAlive") - tcpKeepAlivePeriod := globalConfig.Duration(ref + ".tcpKeepAlivePeriod") - - return poolSize, &network.Client{ - Network: net, - Address: address, - TCPKeepAlive: tcpKeepAlive, - TCPKeepAlivePeriod: tcpKeepAlivePeriod, - ReceiveBufferSize: receiveBufferSize, - ReceiveChunkSize: receiveChunkSize, - ReceiveDeadline: receiveDeadline, - SendDeadline: sendDeadline, - } -} - -// proxyConfig returns the proxy config from config file. -func proxyConfig() (bool, bool, *network.Client) { - elastic := globalConfig.Bool("proxy.elastic") - reuseElasticClients := globalConfig.Bool("proxy.reuseElasticClients") - - ref := getPath("pool.client") - net := globalConfig.String(ref + ".network") - address := globalConfig.String(ref + ".address") - receiveBufferSize := globalConfig.Int(ref + ".receiveBufferSize") - receiveChunkSize := globalConfig.Int(ref + ".receiveChunkSize") - receiveDeadline := globalConfig.Duration(ref + ".receiveDeadline") - sendDeadline := globalConfig.Duration(ref + ".sendDeadline") - tcpKeepAlive := globalConfig.Bool(ref + ".tcpKeepAlive") - tcpKeepAlivePeriod := globalConfig.Duration(ref + ".tcpKeepAlivePeriod") - - if receiveBufferSize <= 0 { - receiveBufferSize = network.DefaultBufferSize - } - - if receiveChunkSize <= 0 { - receiveChunkSize = network.DefaultChunkSize - } - - if tcpKeepAlive && tcpKeepAlivePeriod <= 0 { - tcpKeepAlivePeriod = network.DefaultTCPKeepAlivePeriod - } - - return elastic, reuseElasticClients, &network.Client{ - Network: net, - Address: address, - TCPKeepAlive: tcpKeepAlive, - TCPKeepAlivePeriod: tcpKeepAlivePeriod, - ReceiveBufferSize: receiveBufferSize, - ReceiveChunkSize: receiveChunkSize, - ReceiveDeadline: receiveDeadline, - SendDeadline: sendDeadline, - } -} - -type ServerConfig struct { - Network string - Address string - SoftLimit uint64 - HardLimit uint64 - EnableTicker bool - MultiCore bool - LockOSThread bool - ReuseAddress bool - ReusePort bool - LoadBalancer gnet.LoadBalancing - TickInterval time.Duration - ReadBufferCap int - WriteBufferCap int - SocketRecvBuffer int - SocketSendBuffer int - TCPKeepAlive time.Duration - TCPNoDelay gnet.TCPSocketOpt -} - -var loadBalancer = map[string]gnet.LoadBalancing{ - "roundrobin": gnet.RoundRobin, - "leastconnections": gnet.LeastConnections, - "sourceaddrhash": gnet.SourceAddrHash, -} - -// getLoadBalancer returns the load balancer from config file. -func getLoadBalancer(name string) gnet.LoadBalancing { - if lb, ok := loadBalancer[name]; ok { - return lb - } - - return gnet.RoundRobin -} - -// getTCPNoDelay returns the TCP no delay option from config file. -func getTCPNoDelay() gnet.TCPSocketOpt { - if globalConfig.Bool("server.tcpNoDelay") { - return gnet.TCPNoDelay - } - - return gnet.TCPDelay -} - -// serverConfig returns the server config from config file. -func serverConfig() *ServerConfig { - readBufferCap := globalConfig.Int("server.readBufferCap") - if readBufferCap <= 0 { - readBufferCap = network.DefaultBufferSize - } - - writeBufferCap := globalConfig.Int("server.writeBufferCap") - if writeBufferCap <= 0 { - writeBufferCap = network.DefaultBufferSize - } - - socketRecvBuffer := globalConfig.Int("server.socketRecvBuffer") - if socketRecvBuffer <= 0 { - socketRecvBuffer = network.DefaultBufferSize - } - - socketSendBuffer := globalConfig.Int("server.socketSendBuffer") - if socketSendBuffer <= 0 { - socketSendBuffer = network.DefaultBufferSize - } - - return &ServerConfig{ - Network: globalConfig.String("server.network"), - Address: globalConfig.String("server.address"), - SoftLimit: uint64(globalConfig.Int64("server.softLimit")), - HardLimit: uint64(globalConfig.Int64("server.hardLimit")), - EnableTicker: globalConfig.Bool("server.enableTicker"), - TickInterval: globalConfig.Duration("server.tickInterval"), - MultiCore: globalConfig.Bool("server.multiCore"), - LockOSThread: globalConfig.Bool("server.lockOSThread"), - LoadBalancer: getLoadBalancer(globalConfig.String("server.loadBalancer")), - ReadBufferCap: readBufferCap, - WriteBufferCap: writeBufferCap, - SocketRecvBuffer: socketRecvBuffer, - SocketSendBuffer: socketSendBuffer, - ReuseAddress: globalConfig.Bool("server.reuseAddress"), - ReusePort: globalConfig.Bool("server.reusePort"), - TCPKeepAlive: globalConfig.Duration("server.tcpKeepAlive"), - TCPNoDelay: getTCPNoDelay(), - } -} diff --git a/cmd/run.go b/cmd/run.go index 01563af4..0b91ebd2 100644 --- a/cmd/run.go +++ b/cmd/run.go @@ -6,13 +6,14 @@ import ( "os" "os/signal" "syscall" - "time" + "github.com/gatewayd-io/gatewayd/config" gerr "github.com/gatewayd-io/gatewayd/errors" "github.com/gatewayd-io/gatewayd/logging" "github.com/gatewayd-io/gatewayd/network" "github.com/gatewayd-io/gatewayd/plugin" "github.com/gatewayd-io/gatewayd/pool" + "github.com/knadh/koanf" "github.com/knadh/koanf/parsers/yaml" "github.com/knadh/koanf/providers/confmap" "github.com/knadh/koanf/providers/file" @@ -21,10 +22,6 @@ import ( "github.com/spf13/cobra" ) -const ( - DefaultTCPKeepAlive = 3 * time.Second -) - var ( globalConfigFile string pluginConfigFile string @@ -38,6 +35,10 @@ var ( NoColor: true, }) pluginRegistry = plugin.NewRegistry(hooksConfig) + // Global koanf instance. Using "." as the key path delimiter. + globalConfig = koanf.New(".") + // Plugin koanf instance. Using "." as the key path delimiter. + pluginConfig = koanf.New(".") ) // runCmd represents the run command. @@ -49,20 +50,32 @@ var runCmd = &cobra.Command{ // before the configuration is loaded. hooksConfig.Logger = DefaultLogger - // Load the plugin configuration file + // Load default plugin configuration. + config.LoadPluginConfigDefaults(pluginConfig) + + // Load the plugin configuration file. if f, err := cmd.Flags().GetString("plugin-config"); err == nil { if err := pluginConfig.Load(file.Provider(f), yaml.Parser()); err != nil { DefaultLogger.Fatal().Err(err).Msg("Failed to load plugin configuration") os.Exit(gerr.FailedToLoadPluginConfig) } } + var pConfig config.PluginConfig + if err := pluginConfig.Unmarshal("", &pConfig); err != nil { + DefaultLogger.Fatal().Err(err).Msg("Failed to unmarshal plugin configuration") + os.Exit(gerr.FailedToLoadPluginConfig) + } // Set the plugin compatibility policy. - pluginRegistry.CompatPolicy = pluginCompatPolicy() + pluginRegistry.CompatPolicy = pConfig.GetPluginCompatPolicy() // Load plugins and register their hooks. - pluginRegistry.LoadPlugins(pluginConfig) + pluginRegistry.LoadPlugins(pConfig.Plugins) + + // Load default global configuration. + config.LoadGlobalConfigDefaults(globalConfig) + // Load the global configuration file. if f, err := cmd.Flags().GetString("config"); err == nil { if err := globalConfig.Load(file.Provider(f), yaml.Parser()); err != nil { DefaultLogger.Fatal().Err(err).Msg("Failed to load configuration") @@ -72,7 +85,14 @@ var runCmd = &cobra.Command{ } // Get hooks signature verification policy. - hooksConfig.Verification = verificationPolicy() + hooksConfig.Verification = pConfig.GetVerificationPolicy() + + var gConfig config.GlobalConfig + if err := globalConfig.Unmarshal("", &gConfig); err != nil { + DefaultLogger.Fatal().Err(err).Msg("Failed to unmarshal global configuration") + pluginRegistry.Shutdown() + os.Exit(gerr.FailedToLoadGlobalConfig) + } // The config will be passed to the hooks, and in turn to the plugins that // register to this hook. @@ -93,19 +113,32 @@ var runCmd = &cobra.Command{ DefaultLogger.Fatal().Err(err).Msg("Failed to merge configuration") } } + if err := globalConfig.Unmarshal("", &gConfig); err != nil { + DefaultLogger.Fatal().Err(err).Msg("Failed to unmarshal updated global configuration") + pluginRegistry.Shutdown() + os.Exit(gerr.FailedToLoadGlobalConfig) + } // Create a new logger from the config. - loggerCfg := loggerConfig() - logger := logging.NewLogger(loggerCfg) + loggerCfg := gConfig.Loggers[config.Default] + logger := logging.NewLogger(logging.LoggerConfig{ + Output: loggerCfg.GetOutput(), + Level: loggerCfg.GetLevel(), + TimeFormat: loggerCfg.GetTimeFormat(), + NoColor: loggerCfg.NoColor, + FileName: loggerCfg.FileName, + }) // Replace the default logger with the new one from the config. hooksConfig.Logger = logger // This is a notification hook, so we don't care about the result. data := map[string]interface{}{ + "output": loggerCfg.Output, + "level": loggerCfg.Level, "timeFormat": loggerCfg.TimeFormat, - "level": loggerCfg.Level.String(), "noColor": loggerCfg.NoColor, + "fileName": loggerCfg.FileName, } // TODO: Use a context with a timeout _, err = hooksConfig.Run( @@ -115,34 +148,27 @@ var runCmd = &cobra.Command{ } // Create and initialize a pool of connections. - poolSize, clientConfig := poolConfig() + poolSize := gConfig.Pools[config.Default].GetSize() pool := pool.NewPool(poolSize) + // Get client config from the config file. + clientConfig := gConfig.Clients[config.Default] + // Add clients to the pool for i := 0; i < poolSize; i++ { - client := network.NewClient( - clientConfig.Network, - clientConfig.Address, - clientConfig.ReceiveBufferSize, - clientConfig.ReceiveChunkSize, - clientConfig.ReceiveDeadline, - clientConfig.SendDeadline, - clientConfig.TCPKeepAlive, - clientConfig.TCPKeepAlivePeriod, - logger, - ) + client := network.NewClient(&clientConfig, logger) if client != nil { clientCfg := map[string]interface{}{ "id": client.ID, - "network": clientConfig.Network, - "address": clientConfig.Address, - "receiveBufferSize": clientConfig.ReceiveBufferSize, - "receiveChunkSize": clientConfig.ReceiveChunkSize, - "receiveDeadline": clientConfig.ReceiveDeadline.Seconds(), - "sendDeadline": clientConfig.SendDeadline.Seconds(), - "tcpKeepAlive": clientConfig.TCPKeepAlive, - "tcpKeepAlivePeriod": clientConfig.TCPKeepAlivePeriod.Seconds(), + "network": client.Network, + "address": client.Address, + "receiveBufferSize": client.ReceiveBufferSize, + "receiveChunkSize": client.ReceiveChunkSize, + "receiveDeadline": client.ReceiveDeadline.Seconds(), + "sendDeadline": client.SendDeadline.Seconds(), + "tcpKeepAlive": client.TCPKeepAlive, + "tcpKeepAlivePeriod": client.TCPKeepAlivePeriod.Seconds(), } _, err := hooksConfig.Run( context.Background(), @@ -182,22 +208,23 @@ var runCmd = &cobra.Command{ } // Create a prefork proxy with the pool of clients. - elastic, reuseElasticClients, elasticClientConfig := proxyConfig() + elastic := gConfig.Proxy[config.Default].Elastic + reuseElasticClients := gConfig.Proxy[config.Default].ReuseElasticClients proxy := network.NewProxy( - pool, hooksConfig, elastic, reuseElasticClients, elasticClientConfig, logger) + pool, hooksConfig, elastic, reuseElasticClients, &clientConfig, logger) proxyCfg := map[string]interface{}{ "elastic": elastic, "reuseElasticClients": reuseElasticClients, "clientConfig": map[string]interface{}{ - "network": elasticClientConfig.Network, - "address": elasticClientConfig.Address, - "receiveBufferSize": elasticClientConfig.ReceiveBufferSize, - "receiveChunkSize": elasticClientConfig.ReceiveChunkSize, - "receiveDeadline": elasticClientConfig.ReceiveDeadline.Seconds(), - "sendDeadline": elasticClientConfig.SendDeadline.Seconds(), - "tcpKeepAlive": elasticClientConfig.TCPKeepAlive, - "tcpKeepAlivePeriod": elasticClientConfig.TCPKeepAlivePeriod.Seconds(), + "network": clientConfig.Network, + "address": clientConfig.Address, + "receiveBufferSize": clientConfig.ReceiveBufferSize, + "receiveChunkSize": clientConfig.ReceiveChunkSize, + "receiveDeadline": clientConfig.ReceiveDeadline.Seconds(), + "sendDeadline": clientConfig.SendDeadline.Seconds(), + "tcpKeepAlive": clientConfig.TCPKeepAlive, + "tcpKeepAlivePeriod": clientConfig.TCPKeepAlivePeriod.Seconds(), }, } _, err = hooksConfig.Run( @@ -207,37 +234,36 @@ var runCmd = &cobra.Command{ } // Create a server - serverConfig := serverConfig() server := network.NewServer( - serverConfig.Network, - serverConfig.Address, - serverConfig.SoftLimit, - serverConfig.HardLimit, - serverConfig.TickInterval, + gConfig.Server.Network, + gConfig.Server.Address, + gConfig.Server.SoftLimit, + gConfig.Server.HardLimit, + gConfig.Server.TickInterval, []gnet.Option{ // Scheduling options - gnet.WithMulticore(serverConfig.MultiCore), - gnet.WithLockOSThread(serverConfig.LockOSThread), + gnet.WithMulticore(gConfig.Server.MultiCore), + gnet.WithLockOSThread(gConfig.Server.LockOSThread), // NumEventLoop overrides Multicore option. // gnet.WithNumEventLoop(1), // Can be used to send keepalive messages to the client. - gnet.WithTicker(serverConfig.EnableTicker), + gnet.WithTicker(gConfig.Server.EnableTicker), // Internal event-loop load balancing options - gnet.WithLoadBalancing(serverConfig.LoadBalancer), + gnet.WithLoadBalancing(gConfig.Server.GetLoadBalancer()), // Buffer options - gnet.WithReadBufferCap(serverConfig.ReadBufferCap), - gnet.WithWriteBufferCap(serverConfig.WriteBufferCap), - gnet.WithSocketRecvBuffer(serverConfig.SocketRecvBuffer), - gnet.WithSocketSendBuffer(serverConfig.SocketSendBuffer), + gnet.WithReadBufferCap(gConfig.Server.ReadBufferCap), + gnet.WithWriteBufferCap(gConfig.Server.WriteBufferCap), + gnet.WithSocketRecvBuffer(gConfig.Server.SocketRecvBuffer), + gnet.WithSocketSendBuffer(gConfig.Server.SocketSendBuffer), // TCP options - gnet.WithReuseAddr(serverConfig.ReuseAddress), - gnet.WithReusePort(serverConfig.ReusePort), - gnet.WithTCPKeepAlive(serverConfig.TCPKeepAlive), - gnet.WithTCPNoDelay(serverConfig.TCPNoDelay), + gnet.WithReuseAddr(gConfig.Server.ReuseAddress), + gnet.WithReusePort(gConfig.Server.ReusePort), + gnet.WithTCPKeepAlive(gConfig.Server.TCPKeepAlive), + gnet.WithTCPNoDelay(gConfig.Server.GetTCPNoDelay()), }, proxy, logger, @@ -245,23 +271,23 @@ var runCmd = &cobra.Command{ ) serverCfg := map[string]interface{}{ - "network": serverConfig.Network, - "address": serverConfig.Address, - "softLimit": serverConfig.SoftLimit, - "hardLimit": serverConfig.HardLimit, - "tickInterval": serverConfig.TickInterval.Seconds(), - "multiCore": serverConfig.MultiCore, - "lockOSThread": serverConfig.LockOSThread, - "enableTicker": serverConfig.EnableTicker, - "loadBalancer": int(serverConfig.LoadBalancer), - "readBufferCap": serverConfig.ReadBufferCap, - "writeBufferCap": serverConfig.WriteBufferCap, - "socketRecvBuffer": serverConfig.SocketRecvBuffer, - "socketSendBuffer": serverConfig.SocketSendBuffer, - "reuseAddress": serverConfig.ReuseAddress, - "reusePort": serverConfig.ReusePort, - "tcpKeepAlive": serverConfig.TCPKeepAlive.Seconds(), - "tcpNoDelay": int(serverConfig.TCPNoDelay), + "network": gConfig.Server.Network, + "address": gConfig.Server.Address, + "softLimit": gConfig.Server.SoftLimit, + "hardLimit": gConfig.Server.HardLimit, + "tickInterval": gConfig.Server.TickInterval.Seconds(), + "multiCore": gConfig.Server.MultiCore, + "lockOSThread": gConfig.Server.LockOSThread, + "enableTicker": gConfig.Server.EnableTicker, + "loadBalancer": gConfig.Server.LoadBalancer, + "readBufferCap": gConfig.Server.ReadBufferCap, + "writeBufferCap": gConfig.Server.WriteBufferCap, + "socketRecvBuffer": gConfig.Server.SocketRecvBuffer, + "socketSendBuffer": gConfig.Server.SocketSendBuffer, + "reuseAddress": gConfig.Server.ReuseAddress, + "reusePort": gConfig.Server.ReusePort, + "tcpKeepAlive": gConfig.Server.TCPKeepAlive.Seconds(), + "tcpNoDelay": gConfig.Server.TCPNoDelay, } _, err = hooksConfig.Run( context.Background(), serverCfg, plugin.OnNewServer, hooksConfig.Verification) diff --git a/config/constants.go b/config/constants.go new file mode 100644 index 00000000..20232a99 --- /dev/null +++ b/config/constants.go @@ -0,0 +1,79 @@ +package config + +import ( + "time" +) + +type ( + Status uint + Policy uint + CompatPolicy uint + LogOutput uint +) + +const ( + Running Status = iota + Stopped +) + +const ( + // Non-strict (permissive) mode. + PassDown Policy = iota // Pass down the extra keys/values in result to the next plugins + // Strict mode. + Ignore // Ignore errors and continue + Abort // Abort on first error and return results + Remove // Remove the hook from the list on error and continue +) + +const ( + Strict CompatPolicy = iota + Loose +) + +const ( + Console LogOutput = iota + Stdout + Stderr + Buffer // Buffer the output and return it as a string (for testing). + File +) + +const ( + // Config constants. + Default = "default" + + // Logger constants. + DefaultLogFileName = "gatewayd.log" + DefaultLogFilePermission = 0o660 + DefaultLogOutput = "console" + DefaultLogLevel = "info" + + // Plugin constants. + DefaultMinPort = 50000 + DefaultMaxPort = 60000 + PluginPriorityStart = 1000 + LoggerName = "plugin" + + // Client constants. + DefaultChunkSize = 4096 + DefaultReceiveDeadline = 0 // 0 means no deadline (timeout) + DefaultSendDeadline = 0 + DefaultTCPKeepAlivePeriod = 30 * time.Second + + // Pool constants. + EmptyPoolCapacity = 0 + DefaultPoolSize = 10 + MinimumPoolSize = 2 + + // Server constants. + DefaultListenNetwork = "tcp" + DefaultListenAddress = "0.0.0.0:15432" + DefaultTickInterval = 5 * time.Second + DefaultBufferSize = 1 << 24 // 16777216 bytes + DefaultTCPKeepAlive = 3 * time.Second + DefaultLoadBalancer = "roundrobin" + + // Utility constants. + DefaultSeed = 1000 + ChecksumBufferSize = 65536 +) diff --git a/config/getters.go b/config/getters.go new file mode 100644 index 00000000..59c4fc3b --- /dev/null +++ b/config/getters.go @@ -0,0 +1,128 @@ +package config + +import ( + "github.com/panjf2000/gnet/v2" + "github.com/rs/zerolog" +) + +// verificationPolicy returns the hook verification policy from plugin config file. +func (p PluginConfig) GetVerificationPolicy() Policy { + // vPolicy := pluginConfig.String("plugins.verificationPolicy") + verificationPolicy := PassDown // default + switch p.VerificationPolicy { + case "ignore": + verificationPolicy = Ignore + case "abort": + verificationPolicy = Abort + case "remove": + verificationPolicy = Remove + } + + return verificationPolicy +} + +// pluginCompatPolicy returns the plugin compatibility policy from plugin config file. +func (p PluginConfig) GetPluginCompatPolicy() CompatPolicy { + // vPolicy := pluginConfig.String("plugins.compatibilityPolicy") + compatPolicy := Strict // default + switch p.CompatibilityPolicy { + case "strict": + compatPolicy = Strict + case "loose": + compatPolicy = Loose + } + + return compatPolicy +} + +// loadBalancer returns the load balancing algorithm to use. +func (s Server) GetLoadBalancer() gnet.LoadBalancing { + loadBalancer := map[string]gnet.LoadBalancing{ + "roundrobin": gnet.RoundRobin, + "leastconnections": gnet.LeastConnections, + "sourceaddrhash": gnet.SourceAddrHash, + } + + if lb, ok := loadBalancer[s.LoadBalancer]; ok { + return lb + } + + return gnet.RoundRobin +} + +// tcpNoDelay returns the TCP no delay option from config file. +func (s Server) GetTCPNoDelay() gnet.TCPSocketOpt { + if s.TCPNoDelay { + return gnet.TCPNoDelay + } + + return gnet.TCPDelay +} + +// GetSize returns the pool size from config file. +func (p Pool) GetSize() int { + if p.Size == 0 { + return DefaultPoolSize + } + + // Minimum pool size is 2. + if p.Size < MinimumPoolSize { + p.Size = MinimumPoolSize + } + + return p.Size +} + +// output returns the logger output from config file. +func (l Logger) GetOutput() LogOutput { + switch l.Output { + case "file": + return File + case "stdout": + return Stdout + case "stderr": + return Stderr + default: + return Console + } +} + +// timeFormat returns the logger time format from config file. +func (l Logger) GetTimeFormat() string { + switch l.TimeFormat { + case "unixms": + return zerolog.TimeFormatUnixMs + case "unixmicro": + return zerolog.TimeFormatUnixMicro + case "unixnano": + return zerolog.TimeFormatUnixNano + case "unix": + return zerolog.TimeFormatUnix + default: + return zerolog.TimeFormatUnix + } +} + +// level returns the logger level from config file. +func (l Logger) GetLevel() zerolog.Level { + switch l.Level { + case "debug": + return zerolog.DebugLevel + case "info": + return zerolog.InfoLevel + case "warn": + return zerolog.WarnLevel + case "error": + return zerolog.ErrorLevel + case "fatal": + return zerolog.FatalLevel + case "panic": + return zerolog.PanicLevel + case "disabled": + return zerolog.Disabled + case "trace": + return zerolog.TraceLevel + default: + return zerolog.InfoLevel + } +} diff --git a/config/types.go b/config/types.go new file mode 100644 index 00000000..976fbc63 --- /dev/null +++ b/config/types.go @@ -0,0 +1,157 @@ +package config + +import ( + "fmt" + "time" + + "github.com/knadh/koanf" + "github.com/knadh/koanf/providers/confmap" +) + +// // getPath returns the path to the referenced config value. +// func getPath(cfg *koanf.Koanf, path string) string { +// ref := cfg.String(path) +// if cfg.Exists(path) && cfg.StringMap(ref) != nil { +// return ref +// } + +// return path +// } + +type Plugin struct { + Name string `koanf:"name"` + Enabled bool `koanf:"enabled"` + LocalPath string `koanf:"localPath"` + Args []string `koanf:"args"` + Env []string `koanf:"env"` + Checksum string `koanf:"checksum"` +} + +type PluginConfig struct { + VerificationPolicy string `koanf:"verificationPolicy"` + CompatibilityPolicy string `koanf:"compatibilityPolicy"` + Plugins []Plugin `koanf:"plugins"` +} + +type Client struct { + Network string `koanf:"network"` + Address string `koanf:"address"` + TCPKeepAlive bool `koanf:"tcpKeepAlive"` + TCPKeepAlivePeriod time.Duration `koanf:"tcpKeepAlivePeriod"` + ReceiveBufferSize int `koanf:"receiveBufferSize"` + ReceiveChunkSize int `koanf:"receiveChunkSize"` + ReceiveDeadline time.Duration `koanf:"receiveDeadline"` + SendDeadline time.Duration `koanf:"sendDeadline"` +} + +type Logger struct { + Output string `koanf:"output"` + FileName string `koanf:"fileName"` + TimeFormat string `koanf:"timeFormat"` + Level string `koanf:"level"` + Permission uint32 `koanf:"permission"` + NoColor bool `koanf:"noColor"` + StartupMsg bool `koanf:"startupMsg"` +} + +type Pool struct { + Size int `koanf:"size"` +} + +type Proxy struct { + Elastic bool `koanf:"elastic"` + ReuseElasticClients bool `koanf:"reuseElasticClients"` +} + +type Server struct { + EnableTicker bool `koanf:"enableTicker"` + MultiCore bool `koanf:"multiCore"` + LockOSThread bool `koanf:"lockOSThread"` + ReuseAddress bool `koanf:"reuseAddress"` + ReusePort bool `koanf:"reusePort"` + TCPNoDelay bool `koanf:"tcpNoDelay"` + ReadBufferCap int `koanf:"readBufferCap"` + WriteBufferCap int `koanf:"writeBufferCap"` + SocketRecvBuffer int `koanf:"socketRecvBuffer"` + SocketSendBuffer int `koanf:"socketSendBuffer"` + SoftLimit uint64 `koanf:"softLimit"` + HardLimit uint64 `koanf:"hardLimit"` + TCPKeepAlive time.Duration `koanf:"tcpKeepAlive"` + TickInterval time.Duration `koanf:"tickInterval"` + Network string `koanf:"network"` + Address string `koanf:"address"` + LoadBalancer string `koanf:"loadBalancer"` +} + +type GlobalConfig struct { + Loggers map[string]Logger `koanf:"loggers"` + Clients map[string]Client `koanf:"clients"` + Pools map[string]Pool `koanf:"pools"` + Proxy map[string]Proxy `koanf:"proxy"` + Server Server `koanf:"server"` +} + +// LoadDefaultConfig loads the default configuration before loading the config file. +func LoadGlobalConfigDefaults(cfg *koanf.Koanf) { + defaultValues := confmap.Provider(map[string]interface{}{ + "loggers": map[string]interface{}{ + "default": map[string]interface{}{ + "output": DefaultLogOutput, + "level": DefaultLogLevel, + "fileName": DefaultLogFileName, + "permission": DefaultLogFilePermission, + }, + }, + "clients": map[string]interface{}{ + "default": map[string]interface{}{ + "receiveBufferSize": DefaultBufferSize, + "receiveChunkSize": DefaultChunkSize, + "tcpKeepAlivePeriod": DefaultTCPKeepAlivePeriod, + }, + }, + "pools": map[string]interface{}{ + "default": map[string]interface{}{ + "size": DefaultPoolSize, + }, + }, + "proxy": map[string]interface{}{ + "default": map[string]interface{}{ + "elastic": false, + "reuseElasticClients": false, + }, + }, + "server": map[string]interface{}{ + "network": DefaultListenNetwork, + "address": DefaultListenAddress, + "softLimit": 0, + "hardLimit": 0, + "enableTicker": false, + "multiCore": true, + "lockOSThread": false, + "reuseAddress": true, + "reusePort": true, + "loadBalancer": DefaultLoadBalancer, + "readBufferCap": DefaultBufferSize, + "writeBufferCap": DefaultBufferSize, + "socketRecvBuffer": DefaultBufferSize, + "socketSendBuffer": DefaultBufferSize, + }, + }, "") + + if err := cfg.Load(defaultValues, nil); err != nil { + panic(fmt.Errorf("failed to load default global configuration: %w", err)) + } +} + +func LoadPluginConfigDefaults(cfg *koanf.Koanf) { + defaultValues := confmap.Provider(map[string]interface{}{ + "plugins": map[string]interface{}{ + "verificationPolicy": "passdown", + "compatibilityPolicy": "strict", + }, + }, "") + + if err := cfg.Load(defaultValues, nil); err != nil { + panic(fmt.Errorf("failed to load default plugin configuration: %w", err)) + } +} diff --git a/gatewayd.yaml b/gatewayd.yaml index 295089dc..bfd23b31 100644 --- a/gatewayd.yaml +++ b/gatewayd.yaml @@ -3,8 +3,8 @@ # Loggers config loggers: - logger: - output: "null" # stdout or file or console + default: + output: "console" # stdout, stderr, file or console # Implementing file output # file : ./logs/gatewayd.log level: "debug" # panic, fatal, error, warn, info, debug, trace @@ -12,7 +12,7 @@ loggers: timeFormat: "unix" clients: - client1: + default: network: tcp address: localhost:5432 # tcpKeepAlive: True @@ -24,24 +24,23 @@ clients: # Pool config pool: - # Use the logger config passed here - # i.e. don't assume it's the same as the logger config above - logger: loggers.logger - size: 10 - # Database configs for the connection pool - client: clients.client1 + default: + size: 10 + # Database configs for the connection pool + client: clients.default # Proxy config proxy: - # Use the logger config passed here - # i.e. don't assume it's the same as the logger config above - logger: loggers.logger - # Use the pool config passed here - # i.e. don't assume it's the same as the pool config above - pool: pool - elastic: False - reuseElasticClients: False - elasticClient: clients.client1 + default: + # Use the logger config passed here + # i.e. don't assume it's the same as the logger config above + logger: loggers.default + # Use the pool config passed here + # i.e. don't assume it's the same as the pool config above + pool: pool + elastic: False + reuseElasticClients: False + elasticClient: clients.default server: network: tcp @@ -49,8 +48,8 @@ server: # softLimit: 0 # hardLimit: 0 - logger: loggers.logger - proxy: proxy + logger: loggers.default + proxy: proxy.default enableTicker: False tickInterval: 5s # duration diff --git a/gatewayd_plugins.yaml b/gatewayd_plugins.yaml index 1ecb469f..79da93c9 100644 --- a/gatewayd_plugins.yaml +++ b/gatewayd_plugins.yaml @@ -1,26 +1,26 @@ # Plugin configuration file for GatewayD -plugins: - # Possible values: "passdown" (default), "ignore", "abort" and "remove" - verificationPolicy: "passdown" - # Possible values: "strict" (default) and "loose" - compatibilityPolicy: "strict" +# Possible values: "passdown" (default), "ignore", "abort" and "remove" +verificationPolicy: "passdown" +# Possible values: "strict" (default) and "loose" +compatibilityPolicy: "strict" -# Plugin name -gatewayd-plugin-test: - # whether to enable or disable the plugin on the next run - enabled: True - # path to the plugin's binary file - localPath: ../gatewayd-plugin-test/gatewayd-plugin-test - # Pass cmdline args to the plugin - args: ["--log-level", "info"] - # Pass environment variables to the plugin - # System-wide environment variables are passed to the plugin normally - # and they can be accessed via os.Environ(). - # Defining any environment variables below will override system-wide environment variables. - env: - # The below environment variables are used by the plugin loader to verify the plugin's identity. - - MAGIC_COOKIE_KEY=GATEWAYD_PLUGIN - - MAGIC_COOKIE_VALUE=5712b87aa5d7e9f9e9ab643e6603181c5b796015cb1c09d6f5ada882bf2a1872 - # Checksum hash to verify the binary before loading - checksum: 9eba62b8d50610493a376a512dd0bb42430c415b3ad966484d8f4c0bc6ed91b4 +plugins: + # Plugin name + - name: gatewayd-plugin-test + # whether to enable or disable the plugin on the next run + enabled: True + # path to the plugin's binary file + localPath: ../gatewayd-plugin-test/gatewayd-plugin-test + # Pass cmdline args to the plugin + args: ["--log-level", "info"] + # Pass environment variables to the plugin + # System-wide environment variables are passed to the plugin normally + # and they can be accessed via os.Environ(). + # Defining any environment variables below will override system-wide environment variables. + env: + # The below environment variables are used by the plugin loader to verify the plugin's identity. + - MAGIC_COOKIE_KEY=GATEWAYD_PLUGIN + - MAGIC_COOKIE_VALUE=5712b87aa5d7e9f9e9ab643e6603181c5b796015cb1c09d6f5ada882bf2a1872 + # Checksum hash to verify the binary before loading + checksum: 911cbab556bd3b14b60c088d786ae7c3ecf0a2aa2958406c3214ea64073cde36 diff --git a/go.mod b/go.mod index 8aa44d8a..65a1f02b 100644 --- a/go.mod +++ b/go.mod @@ -8,7 +8,7 @@ require ( github.com/google/go-cmp v0.5.9 github.com/hashicorp/go-hclog v1.4.0 github.com/hashicorp/go-plugin v1.4.8 - github.com/knadh/koanf v1.4.4 + github.com/knadh/koanf v1.4.5 github.com/mitchellh/mapstructure v1.5.0 github.com/panjf2000/gnet/v2 v2.2.2 github.com/rs/zerolog v1.28.0 @@ -27,7 +27,7 @@ require ( github.com/inconshreveable/mousetrap v1.1.0 // indirect github.com/lib/pq v1.10.7 // indirect github.com/mattn/go-colorable v0.1.13 // indirect - github.com/mattn/go-isatty v0.0.16 // indirect + github.com/mattn/go-isatty v0.0.17 // indirect github.com/mitchellh/copystructure v1.2.0 // indirect github.com/mitchellh/go-testing-interface v1.14.1 // indirect github.com/mitchellh/reflectwalk v1.0.2 // indirect @@ -38,10 +38,10 @@ require ( go.uber.org/atomic v1.10.0 // indirect go.uber.org/multierr v1.9.0 // indirect go.uber.org/zap v1.24.0 // indirect - golang.org/x/net v0.4.0 // indirect - golang.org/x/sys v0.3.0 // indirect - golang.org/x/text v0.5.0 // indirect - google.golang.org/genproto v0.0.0-20221207170731-23e4bf6bdc37 // indirect + golang.org/x/net v0.5.0 // indirect + golang.org/x/sys v0.4.0 // indirect + golang.org/x/text v0.6.0 // indirect + google.golang.org/genproto v0.0.0-20230104163317-caabf589fcbf // indirect gopkg.in/natefinch/lumberjack.v2 v2.0.0 // indirect gopkg.in/yaml.v3 v3.0.1 // indirect ) diff --git a/go.sum b/go.sum index d39b3823..0f7cf922 100644 --- a/go.sum +++ b/go.sum @@ -162,8 +162,8 @@ github.com/julienschmidt/httprouter v1.2.0/go.mod h1:SYymIcj16QtmaHHD7aYtjjsJG7V github.com/julienschmidt/httprouter v1.3.0/go.mod h1:JR6WtHb+2LUe8TCKY3cZOxFyyO8IZAc4RVcycCCAKdM= github.com/kisielk/errcheck v1.5.0/go.mod h1:pFxgyoBC7bSaBwPgfKdkLd5X25qrDl4LWUI2bnpBCr8= github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck= -github.com/knadh/koanf v1.4.4 h1:d2jY5nCCeoaiqvEKSBW9rEc93EfNy/XWgWsSB3j7JEA= -github.com/knadh/koanf v1.4.4/go.mod h1:Hgyjp4y8v44hpZtPzs7JZfRAW5AhN7KfZcwv1RYggDs= +github.com/knadh/koanf v1.4.5 h1:yKWFswTrqFc0u7jBAoERUz30+N1b1yPXU01gAPr8IrY= +github.com/knadh/koanf v1.4.5/go.mod h1:Hgyjp4y8v44hpZtPzs7JZfRAW5AhN7KfZcwv1RYggDs= github.com/konsorten/go-windows-terminal-sequences v1.0.1/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ= github.com/konsorten/go-windows-terminal-sequences v1.0.3/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ= github.com/kr/logfmt v0.0.0-20140226030751-b84e30acd515/go.mod h1:+0opPa2QZZtGFBFZlji/RkVcI2GknAs/DXo4wKdlNEc= @@ -189,8 +189,9 @@ github.com/mattn/go-isatty v0.0.10/go.mod h1:qgIWMr58cqv1PHHyhnkY9lrL7etaEgOFcME github.com/mattn/go-isatty v0.0.11/go.mod h1:PhnuNfih5lzO57/f3n+odYbM4JtupLOxQOAqxQCu2WE= github.com/mattn/go-isatty v0.0.12/go.mod h1:cbi8OIDigv2wuxKPP5vlRcQ1OAZbq2CE4Kysco4FUpU= github.com/mattn/go-isatty v0.0.14/go.mod h1:7GGIvUiUoEMVVmxf/4nioHXj79iQHKdU27kJ6hsGG94= -github.com/mattn/go-isatty v0.0.16 h1:bq3VjFmv/sOjHtdEhmkEV4x1AJtvUvOJ2PFAZ5+peKQ= github.com/mattn/go-isatty v0.0.16/go.mod h1:kYGgaQfpe5nmfYZH+SKPsOc2e4SrIfOl2e/yFXSvRLM= +github.com/mattn/go-isatty v0.0.17 h1:BTarxUcIeDqL27Mc+vyvdWYSL28zpIhv3RoTdsLMPng= +github.com/mattn/go-isatty v0.0.17/go.mod h1:kYGgaQfpe5nmfYZH+SKPsOc2e4SrIfOl2e/yFXSvRLM= github.com/matttproud/golang_protobuf_extensions v1.0.1/go.mod h1:D8He9yQNgCq6Z5Ld7szi9bcBfOoFv/3dc6xSMkL2PC0= github.com/miekg/dns v1.1.26/go.mod h1:bPDLeHnStXmXAq1m/Ch/hvfNHr14JKNPMBo3VZKjuso= github.com/miekg/dns v1.1.41/go.mod h1:p6aan82bvRIyn+zDIv9xYNUpwa73JcSh9BKwknJysuI= @@ -336,8 +337,8 @@ golang.org/x/net v0.0.0-20201021035429-f5854403a974/go.mod h1:sp8m0HH+o8qH0wwXwY golang.org/x/net v0.0.0-20210226172049-e18ecbb05110/go.mod h1:m0MpNAwzfU5UDzcl9v0D8zg8gWTRqZa9RBIspLL5mdg= golang.org/x/net v0.0.0-20210405180319-a5a99cb37ef4/go.mod h1:p54w0d4576C0XHj96bSt6lcn1PtDYWL6XObtHCRCNQM= golang.org/x/net v0.0.0-20210410081132-afb366fc7cd1/go.mod h1:9tjilg8BloeKEkVJvy7fQ90B1CfIiPueXVOjqfkSzI8= -golang.org/x/net v0.4.0 h1:Q5QPcMlvfxFTAPV0+07Xz/MpK9NTXu2VDUuy0FeMfaU= -golang.org/x/net v0.4.0/go.mod h1:MBQ8lrhLObU/6UmLb4fmbmk5OcyYmqtbGd/9yIeKjEE= +golang.org/x/net v0.5.0 h1:GyT4nK/YDHSqa1c4753ouYCDajOYKTja9Xb/OHtgvSw= +golang.org/x/net v0.5.0/go.mod h1:DivGGAXEgPSlEBzxGzZI+ZLohi+xUj054jfeKui00ws= golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U= golang.org/x/oauth2 v0.0.0-20190226205417-e64efc72b421/go.mod h1:gOpvHmFTYa4IltrdGE7lF6nIHvwfUNPOp7c8zoXwtLw= golang.org/x/oauth2 v0.0.0-20200107190931-bf48bf16ab8d/go.mod h1:gOpvHmFTYa4IltrdGE7lF6nIHvwfUNPOp7c8zoXwtLw= @@ -385,8 +386,8 @@ golang.org/x/sys v0.0.0-20210927094055-39ccf1dd6fa6/go.mod h1:oPkhp1MJrh7nUepCBc golang.org/x/sys v0.0.0-20220503163025-988cb79eb6c6/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20220811171246-fbc7d0a398ab/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20220908164124-27713097b956/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= -golang.org/x/sys v0.3.0 h1:w8ZOecv6NaNa/zC8944JTU3vz4u6Lagfk4RPQxv92NQ= -golang.org/x/sys v0.3.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.4.0 h1:Zr2JFtRQNX3BCZ8YtxRE9hNJYC8J6I1MVbMg6owUp18= +golang.org/x/sys v0.4.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo= golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= golang.org/x/text v0.3.1-0.20181227161524-e6919f6577db/go.mod h1:bEr9sfX3Q8Zfm5fL9x+3itogRgK3+ptLWKqgva+5dAk= @@ -394,8 +395,8 @@ golang.org/x/text v0.3.2/go.mod h1:bEr9sfX3Q8Zfm5fL9x+3itogRgK3+ptLWKqgva+5dAk= golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= golang.org/x/text v0.3.5/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= golang.org/x/text v0.3.6/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= -golang.org/x/text v0.5.0 h1:OLmvp0KP+FVG99Ct/qFiL/Fhk4zp4QQnZ7b2U+5piUM= -golang.org/x/text v0.5.0/go.mod h1:mrYo+phRRbMaCq/xk9113O4dZlRixOauAjOtrjsXDZ8= +golang.org/x/text v0.6.0 h1:3XmdazWV+ubf7QgHSTWeykHOci5oeekaGJBLkrkaw4k= +golang.org/x/text v0.6.0/go.mod h1:mrYo+phRRbMaCq/xk9113O4dZlRixOauAjOtrjsXDZ8= golang.org/x/time v0.0.0-20190308202827-9d24e82272b4/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ= golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= golang.org/x/tools v0.0.0-20190114222345-bf090417da8b/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= @@ -421,8 +422,8 @@ google.golang.org/genproto v0.0.0-20190819201941-24fa4b261c55/go.mod h1:DMBHOl98 google.golang.org/genproto v0.0.0-20200513103714-09dca8ec2884/go.mod h1:55QSHmfGQM9UVYDPBsyGGes0y52j32PQ3BqQfXhyH3c= google.golang.org/genproto v0.0.0-20200526211855-cb27e3aa2013/go.mod h1:NbSheEEYHJ7i3ixzK3sjbqSGDJWnxyFXZblF3eUsNvo= google.golang.org/genproto v0.0.0-20210602131652-f16073e35f0c/go.mod h1:UODoCrxHCcBojKKwX1terBiRUaqAsFqJiF615XL43r0= -google.golang.org/genproto v0.0.0-20221207170731-23e4bf6bdc37 h1:jmIfw8+gSvXcZSgaFAGyInDXeWzUhvYH57G/5GKMn70= -google.golang.org/genproto v0.0.0-20221207170731-23e4bf6bdc37/go.mod h1:RGgjbofJ8xD9Sq1VVhDM1Vok1vRONV+rg+CjzG4SZKM= +google.golang.org/genproto v0.0.0-20230104163317-caabf589fcbf h1:/JqRexUvugu6JURQ0O7RfV1EnvgrOxUV4tSjuAv0Sr0= +google.golang.org/genproto v0.0.0-20230104163317-caabf589fcbf/go.mod h1:RGgjbofJ8xD9Sq1VVhDM1Vok1vRONV+rg+CjzG4SZKM= google.golang.org/grpc v1.14.0/go.mod h1:yo6s7OP7yaDglbqo1J04qKzAhqBH6lvTonzMVmEdcZw= google.golang.org/grpc v1.19.0/go.mod h1:mqu4LbDTu4XGKhr4mRzUsmM4RtVoemTSY81AxZiDr8c= google.golang.org/grpc v1.22.0/go.mod h1:Y5yQAOtifL1yxbo5wqy6BxZv8vAUGQwXBOALyacEbxg= diff --git a/logging/logger.go b/logging/logger.go index 0bebcfe5..2d1953a9 100644 --- a/logging/logger.go +++ b/logging/logger.go @@ -1,9 +1,11 @@ package logging import ( + "bytes" "io" "os" + "github.com/gatewayd-io/gatewayd/config" "github.com/rs/zerolog" ) @@ -15,7 +17,9 @@ type ( ) type LoggerConfig struct { - Output io.Writer + Output config.LogOutput + FileName string + Permission os.FileMode // Log file permission TimeFormat string Level zerolog.Level NoColor bool @@ -25,6 +29,13 @@ type LoggerConfig struct { // NewLogger creates a new logger with the given configuration. func NewLogger(cfg LoggerConfig) zerolog.Logger { + return NewLoggerWithBuffer(cfg) +} + +// NewLoggerWithBuffer creates a new logger with the given configuration. +// +//nolint:funlen +func NewLoggerWithBuffer(cfg LoggerConfig, buffer ...*bytes.Buffer) zerolog.Logger { // Create a new logger. consoleWriter := zerolog.ConsoleWriter{ Out: os.Stdout, @@ -32,9 +43,37 @@ func NewLogger(cfg LoggerConfig) zerolog.Logger { NoColor: cfg.NoColor, } - if cfg.Output == nil { - // Default to stdout. - cfg.Output = consoleWriter + var output io.Writer + + if cfg.FileName == "" { + cfg.FileName = config.DefaultLogFileName + } + + switch cfg.Output { + case config.Console: + output = consoleWriter + case config.Stdout: + output = os.Stdout + case config.Stderr: + output = os.Stderr + case config.File: + if logFile, err := os.OpenFile( + cfg.FileName, + os.O_CREATE|os.O_WRONLY|os.O_APPEND, //nolint:nosnakecase + cfg.Permission); err == nil { + output = logFile + } else { + // If we can't open the file, we'll just log to stdout. + output = os.Stdout + } + case config.Buffer: + if len(buffer) == 0 { + output = os.Stdout + } else { + output = buffer[0] + } + default: + output = os.Stdout } if cfg.TimeFormat == "" { @@ -45,7 +84,7 @@ func NewLogger(cfg LoggerConfig) zerolog.Logger { zerolog.TimeFieldFormat = cfg.TimeFormat // Create a new logger. - logger := zerolog.New(cfg.Output) + logger := zerolog.New(output) if cfg.TimeFormat != "" { logger = logger.With().Timestamp().Logger() } diff --git a/logging/logging_test.go b/logging/logger_test.go similarity index 88% rename from logging/logging_test.go rename to logging/logger_test.go index 4b07f134..e94873c2 100644 --- a/logging/logging_test.go +++ b/logging/logger_test.go @@ -5,6 +5,7 @@ import ( "encoding/json" "testing" + "github.com/gatewayd-io/gatewayd/config" "github.com/rs/zerolog" "github.com/stretchr/testify/assert" ) @@ -12,14 +13,15 @@ import ( // TestNewLogger tests the creation of a new logger. func TestNewLogger(t *testing.T) { var buffer bytes.Buffer - logger := NewLogger( + logger := NewLoggerWithBuffer( LoggerConfig{ - Output: &buffer, + Output: config.Buffer, // This is only used for testing. Level: zerolog.DebugLevel, TimeFormat: zerolog.TimeFormatUnix, StartupMsg: true, NoColor: true, }, + &buffer, ) assert.NotNil(t, logger) diff --git a/network/client.go b/network/client.go index da8f7d7b..3282eb06 100644 --- a/network/client.go +++ b/network/client.go @@ -5,18 +5,11 @@ import ( "net" "time" + "github.com/gatewayd-io/gatewayd/config" gerr "github.com/gatewayd-io/gatewayd/errors" "github.com/rs/zerolog" ) -const ( - DefaultSeed = 1000 - DefaultChunkSize = 4096 - DefaultReceiveDeadline = 0 // 0 means no deadline (timeout) - DefaultSendDeadline = 0 - DefaultTCPKeepAlivePeriod = 30 * time.Second -) - type ClientInterface interface { Send(data []byte) (int, *gerr.GatewayDError) Receive() (int, []byte, *gerr.GatewayDError) @@ -47,34 +40,32 @@ var _ ClientInterface = &Client{} // NewClient creates a new client. // //nolint:funlen -func NewClient( - network, address string, - receiveBufferSize, receiveChunkSize int, - receiveDeadline, sendDeadline time.Duration, - tcpKeepAlive bool, tcpKeepAlivePeriod time.Duration, - logger zerolog.Logger, -) *Client { +func NewClient(clientConfig *config.Client, logger zerolog.Logger) *Client { var client Client + if clientConfig == nil { + return nil + } + client.logger = logger // Try to resolve the address and log an error if it can't be resolved. - addr, err := Resolve(network, address, logger) + addr, err := Resolve(clientConfig.Network, clientConfig.Address, logger) if err != nil { logger.Error().Err(err).Msg("Failed to resolve address") } // Create a resolved client. client = Client{ - Network: network, + Network: clientConfig.Network, Address: addr, } // Fall back to the original network and address if the address can't be resolved. if client.Address == "" || client.Network == "" { client = Client{ - Network: network, - Address: address, + Network: clientConfig.Network, + Address: clientConfig.Address, } } @@ -89,11 +80,11 @@ func NewClient( client.Conn = conn // Set the TCP keep alive. - client.TCPKeepAlive = tcpKeepAlive - if tcpKeepAlivePeriod <= 0 { - client.TCPKeepAlivePeriod = DefaultTCPKeepAlivePeriod + client.TCPKeepAlive = clientConfig.TCPKeepAlive + if clientConfig.TCPKeepAlivePeriod <= 0 { + client.TCPKeepAlivePeriod = config.DefaultTCPKeepAlivePeriod } else { - client.TCPKeepAlivePeriod = tcpKeepAlivePeriod + client.TCPKeepAlivePeriod = clientConfig.TCPKeepAlivePeriod } if c, ok := client.Conn.(*net.TCPConn); ok { @@ -107,10 +98,10 @@ func NewClient( } // Set the receive deadline (timeout). - if receiveDeadline <= 0 { - client.ReceiveDeadline = DefaultReceiveDeadline + if clientConfig.ReceiveDeadline <= 0 { + client.ReceiveDeadline = config.DefaultReceiveDeadline } else { - client.ReceiveDeadline = receiveDeadline + client.ReceiveDeadline = clientConfig.ReceiveDeadline if err := client.Conn.SetReadDeadline(time.Now().Add(client.ReceiveDeadline)); err != nil { logger.Error().Err(err).Msg("Failed to set receive deadline") } else { @@ -120,10 +111,10 @@ func NewClient( } // Set the send deadline (timeout). - if sendDeadline <= 0 { - client.SendDeadline = DefaultSendDeadline + if clientConfig.SendDeadline <= 0 { + client.SendDeadline = config.DefaultSendDeadline } else { - client.SendDeadline = sendDeadline + client.SendDeadline = clientConfig.SendDeadline if err := client.Conn.SetWriteDeadline(time.Now().Add(client.SendDeadline)); err != nil { logger.Error().Err(err).Msg("Failed to set send deadline") } else { @@ -133,22 +124,23 @@ func NewClient( } // Set the receive buffer size. This is the maximum size of the buffer. - if receiveBufferSize <= 0 { - client.ReceiveBufferSize = DefaultBufferSize + if clientConfig.ReceiveBufferSize <= 0 { + client.ReceiveBufferSize = config.DefaultBufferSize } else { - client.ReceiveBufferSize = receiveBufferSize + client.ReceiveBufferSize = clientConfig.ReceiveBufferSize } // Set the receive chunk size. This is the size of the buffer that is read from the connection // in chunks. - if receiveChunkSize <= 0 { - client.ReceiveChunkSize = DefaultChunkSize + if clientConfig.ReceiveChunkSize <= 0 { + client.ReceiveChunkSize = config.DefaultChunkSize } else { - client.ReceiveChunkSize = receiveChunkSize + client.ReceiveChunkSize = clientConfig.ReceiveChunkSize } logger.Debug().Str("address", client.Address).Msg("New client created") - client.ID = GetID(conn.LocalAddr().Network(), conn.LocalAddr().String(), DefaultSeed, logger) + client.ID = GetID( + conn.LocalAddr().Network(), conn.LocalAddr().String(), config.DefaultSeed, logger) return &client } diff --git a/network/client_test.go b/network/client_test.go index 62d5c638..facfb0ba 100644 --- a/network/client_test.go +++ b/network/client_test.go @@ -4,6 +4,7 @@ import ( "testing" embeddedpostgres "github.com/fergusstrange/embedded-postgres" + "github.com/gatewayd-io/gatewayd/config" "github.com/gatewayd-io/gatewayd/logging" "github.com/rs/zerolog" "github.com/stretchr/testify/assert" @@ -23,7 +24,7 @@ func TestNewClient(t *testing.T) { }() cfg := logging.LoggerConfig{ - Output: nil, + Output: config.Console, TimeFormat: zerolog.TimeFormatUnix, Level: zerolog.DebugLevel, NoColor: true, @@ -32,20 +33,22 @@ func TestNewClient(t *testing.T) { logger := logging.NewLogger(cfg) client := NewClient( - "tcp", - "localhost:5432", - DefaultBufferSize, - DefaultChunkSize, - DefaultReceiveDeadline, - DefaultSendDeadline, - false, - DefaultTCPKeepAlivePeriod, + &config.Client{ + Network: "tcp", + Address: "localhost:5432", + ReceiveBufferSize: config.DefaultBufferSize, + ReceiveChunkSize: config.DefaultChunkSize, + ReceiveDeadline: config.DefaultReceiveDeadline, + SendDeadline: config.DefaultSendDeadline, + TCPKeepAlive: false, + TCPKeepAlivePeriod: config.DefaultTCPKeepAlivePeriod, + }, logger) defer client.Close() assert.Equal(t, "tcp", client.Network) assert.Equal(t, "127.0.0.1:5432", client.Address) - assert.Equal(t, DefaultBufferSize, client.ReceiveBufferSize) + assert.Equal(t, config.DefaultBufferSize, client.ReceiveBufferSize) assert.NotEmpty(t, client.ID) assert.NotNil(t, client.Conn) } @@ -64,7 +67,7 @@ func TestSend(t *testing.T) { }() cfg := logging.LoggerConfig{ - Output: nil, + Output: config.Console, TimeFormat: zerolog.TimeFormatUnix, Level: zerolog.DebugLevel, NoColor: true, @@ -73,14 +76,16 @@ func TestSend(t *testing.T) { logger := logging.NewLogger(cfg) client := NewClient( - "tcp", - "localhost:5432", - DefaultBufferSize, - DefaultChunkSize, - DefaultReceiveDeadline, - DefaultSendDeadline, - false, - DefaultTCPKeepAlivePeriod, + &config.Client{ + Network: "tcp", + Address: "localhost:5432", + ReceiveBufferSize: config.DefaultBufferSize, + ReceiveChunkSize: config.DefaultChunkSize, + ReceiveDeadline: config.DefaultReceiveDeadline, + SendDeadline: config.DefaultSendDeadline, + TCPKeepAlive: false, + TCPKeepAlivePeriod: config.DefaultTCPKeepAlivePeriod, + }, logger) defer client.Close() @@ -105,7 +110,7 @@ func TestReceive(t *testing.T) { }() cfg := logging.LoggerConfig{ - Output: nil, + Output: config.Console, TimeFormat: zerolog.TimeFormatUnix, Level: zerolog.DebugLevel, NoColor: true, @@ -114,14 +119,16 @@ func TestReceive(t *testing.T) { logger := logging.NewLogger(cfg) client := NewClient( - "tcp", - "localhost:5432", - DefaultBufferSize, - DefaultChunkSize, - DefaultReceiveDeadline, - DefaultSendDeadline, - false, - DefaultTCPKeepAlivePeriod, + &config.Client{ + Network: "tcp", + Address: "localhost:5432", + ReceiveBufferSize: config.DefaultBufferSize, + ReceiveChunkSize: config.DefaultChunkSize, + ReceiveDeadline: config.DefaultReceiveDeadline, + SendDeadline: config.DefaultSendDeadline, + TCPKeepAlive: false, + TCPKeepAlivePeriod: config.DefaultTCPKeepAlivePeriod, + }, logger) defer client.Close() @@ -156,7 +163,7 @@ func TestClose(t *testing.T) { }() cfg := logging.LoggerConfig{ - Output: nil, + Output: config.Console, TimeFormat: zerolog.TimeFormatUnix, Level: zerolog.DebugLevel, NoColor: true, @@ -165,14 +172,16 @@ func TestClose(t *testing.T) { logger := logging.NewLogger(cfg) client := NewClient( - "tcp", - "localhost:5432", - DefaultBufferSize, - DefaultChunkSize, - DefaultReceiveDeadline, - DefaultSendDeadline, - false, - DefaultTCPKeepAlivePeriod, + &config.Client{ + Network: "tcp", + Address: "localhost:5432", + ReceiveBufferSize: config.DefaultBufferSize, + ReceiveChunkSize: config.DefaultChunkSize, + ReceiveDeadline: config.DefaultReceiveDeadline, + SendDeadline: config.DefaultSendDeadline, + TCPKeepAlive: false, + TCPKeepAlivePeriod: config.DefaultTCPKeepAlivePeriod, + }, logger) assert.NotNil(t, client) client.Close() @@ -180,5 +189,5 @@ func TestClose(t *testing.T) { assert.Equal(t, "", client.Network) assert.Equal(t, "", client.Address) assert.Nil(t, client.Conn) - assert.Equal(t, DefaultBufferSize, client.ReceiveBufferSize) + assert.Equal(t, config.DefaultBufferSize, client.ReceiveBufferSize) } diff --git a/network/proxy.go b/network/proxy.go index 04a378a7..6da5822c 100644 --- a/network/proxy.go +++ b/network/proxy.go @@ -3,6 +3,7 @@ package network import ( "context" + "github.com/gatewayd-io/gatewayd/config" gerr "github.com/gatewayd-io/gatewayd/errors" "github.com/gatewayd-io/gatewayd/plugin" "github.com/gatewayd-io/gatewayd/pool" @@ -10,10 +11,6 @@ import ( "github.com/rs/zerolog" ) -const ( - EmptyPoolCapacity int = 0 -) - type Proxy interface { Connect(gconn gnet.Conn) *gerr.GatewayDError Disconnect(gconn gnet.Conn) *gerr.GatewayDError @@ -33,7 +30,7 @@ type ProxyImpl struct { ReuseElasticClients bool // ClientConfig is used for elastic proxy and reconnection - ClientConfig *Client + ClientConfig *config.Client } var _ Proxy = &ProxyImpl{} @@ -42,11 +39,11 @@ var _ Proxy = &ProxyImpl{} func NewProxy( p pool.Pool, hookConfig *plugin.HookConfig, elastic, reuseElasticClients bool, - clientConfig *Client, logger zerolog.Logger, + clientConfig *config.Client, logger zerolog.Logger, ) *ProxyImpl { return &ProxyImpl{ availableConnections: p, - busyConnections: pool.NewPool(EmptyPoolCapacity), + busyConnections: pool.NewPool(config.EmptyPoolCapacity), logger: logger, hookConfig: hookConfig, Elastic: elastic, @@ -58,8 +55,6 @@ func NewProxy( // Connect maps a server connection from the available connection pool to a incoming connection. // It returns an error if the pool is exhausted. If the pool is elastic, it creates a new client // and maps it to the incoming connection. -// -//nolint:funlen func (pr *ProxyImpl) Connect(gconn gnet.Conn) *gerr.GatewayDError { var clientID string // Get the first available client from the pool. @@ -76,17 +71,7 @@ func (pr *ProxyImpl) Connect(gconn gnet.Conn) *gerr.GatewayDError { // Pool is exhausted or is elastic. if pr.Elastic { // Create a new client. - client = NewClient( - pr.ClientConfig.Network, - pr.ClientConfig.Address, - pr.ClientConfig.ReceiveBufferSize, - pr.ClientConfig.ReceiveChunkSize, - pr.ClientConfig.ReceiveDeadline, - pr.ClientConfig.SendDeadline, - pr.ClientConfig.TCPKeepAlive, - pr.ClientConfig.TCPKeepAlivePeriod, - pr.logger, - ) + client = NewClient(pr.ClientConfig, pr.logger) pr.logger.Debug().Str("id", client.ID[:7]).Msg("Reused the client connection") } else { return gerr.ErrPoolExhausted @@ -269,17 +254,7 @@ func (pr *ProxyImpl) PassThrough(gconn gnet.Conn) *gerr.GatewayDError { }).Msg("Client disconnected") client.Close() - client = NewClient( - pr.ClientConfig.Network, - pr.ClientConfig.Address, - pr.ClientConfig.ReceiveBufferSize, - pr.ClientConfig.ReceiveChunkSize, - pr.ClientConfig.ReceiveDeadline, - pr.ClientConfig.SendDeadline, - pr.ClientConfig.TCPKeepAlive, - pr.ClientConfig.TCPKeepAlivePeriod, - pr.logger, - ) + client = NewClient(pr.ClientConfig, pr.logger) pr.busyConnections.Remove(gconn) if err := pr.busyConnections.Put(gconn, client); err != nil { // This should never happen diff --git a/network/proxy_test.go b/network/proxy_test.go index 93a587e6..043fe7c0 100644 --- a/network/proxy_test.go +++ b/network/proxy_test.go @@ -4,6 +4,7 @@ import ( "testing" embeddedpostgres "github.com/fergusstrange/embedded-postgres" + "github.com/gatewayd-io/gatewayd/config" "github.com/gatewayd-io/gatewayd/logging" "github.com/gatewayd-io/gatewayd/plugin" "github.com/gatewayd-io/gatewayd/pool" @@ -25,7 +26,7 @@ func TestNewProxy(t *testing.T) { }() cfg := logging.LoggerConfig{ - Output: nil, + Output: config.Console, TimeFormat: zerolog.TimeFormatUnix, Level: zerolog.DebugLevel, NoColor: true, @@ -34,16 +35,18 @@ func TestNewProxy(t *testing.T) { logger := logging.NewLogger(cfg) // Create a connection pool - pool := pool.NewPool(EmptyPoolCapacity) + pool := pool.NewPool(config.EmptyPoolCapacity) client := NewClient( - "tcp", - "localhost:5432", - DefaultBufferSize, - DefaultChunkSize, - DefaultReceiveDeadline, - DefaultSendDeadline, - false, - DefaultTCPKeepAlivePeriod, + &config.Client{ + Network: "tcp", + Address: "localhost:5432", + ReceiveBufferSize: config.DefaultBufferSize, + ReceiveChunkSize: config.DefaultChunkSize, + ReceiveDeadline: config.DefaultReceiveDeadline, + SendDeadline: config.DefaultSendDeadline, + TCPKeepAlive: false, + TCPKeepAlivePeriod: config.DefaultTCPKeepAlivePeriod, + }, logger) err := pool.Put(client.ID, client) assert.Nil(t, err) @@ -66,7 +69,7 @@ func TestNewProxy(t *testing.T) { // TestNewProxyElastic tests the creation of a new proxy with an elastic connection pool. func TestNewProxyElastic(t *testing.T) { cfg := logging.LoggerConfig{ - Output: nil, + Output: config.Console, TimeFormat: zerolog.TimeFormatUnix, Level: zerolog.DebugLevel, NoColor: true, @@ -75,13 +78,18 @@ func TestNewProxyElastic(t *testing.T) { logger := logging.NewLogger(cfg) // Create a connection pool - pool := pool.NewPool(EmptyPoolCapacity) + pool := pool.NewPool(config.EmptyPoolCapacity) // Create a proxy with an elastic buffer pool - proxy := NewProxy(pool, plugin.NewHookConfig(), true, false, &Client{ - Network: "tcp", - Address: "localhost:5432", - ReceiveBufferSize: DefaultBufferSize, + proxy := NewProxy(pool, plugin.NewHookConfig(), true, false, &config.Client{ + Network: "tcp", + Address: "localhost:5432", + ReceiveBufferSize: config.DefaultBufferSize, + ReceiveChunkSize: config.DefaultChunkSize, + ReceiveDeadline: config.DefaultReceiveDeadline, + SendDeadline: config.DefaultSendDeadline, + TCPKeepAlive: false, + TCPKeepAlivePeriod: config.DefaultTCPKeepAlivePeriod, }, logger) assert.NotNil(t, proxy) @@ -91,7 +99,7 @@ func TestNewProxyElastic(t *testing.T) { assert.Equal(t, false, proxy.ReuseElasticClients) assert.Equal(t, "tcp", proxy.ClientConfig.Network) assert.Equal(t, "localhost:5432", proxy.ClientConfig.Address) - assert.Equal(t, DefaultBufferSize, proxy.ClientConfig.ReceiveBufferSize) + assert.Equal(t, config.DefaultBufferSize, proxy.ClientConfig.ReceiveBufferSize) proxy.availableConnections.Clear() } diff --git a/network/server.go b/network/server.go index b4c4a677..37843de3 100644 --- a/network/server.go +++ b/network/server.go @@ -8,6 +8,7 @@ import ( "os" "time" + "github.com/gatewayd-io/gatewayd/config" gerr "github.com/gatewayd-io/gatewayd/errors" "github.com/gatewayd-io/gatewayd/plugin" "github.com/panjf2000/gnet/v2" @@ -19,11 +20,6 @@ type Status string const ( Running Status = "running" Stopped Status = "stopped" - - DefaultTickInterval = 5 * time.Second - DefaultPoolSize = 10 - MinimumPoolSize = 2 - DefaultBufferSize = 1 << 24 // 16777216 bytes ) type Server struct { @@ -392,7 +388,7 @@ func NewServer( } if tickInterval == 0 { - server.TickInterval = DefaultTickInterval + server.TickInterval = config.DefaultTickInterval logger.Debug().Msg("Tick interval is not set, using the default value") } else { server.TickInterval = tickInterval diff --git a/network/server_test.go b/network/server_test.go index 54207283..34adfb49 100644 --- a/network/server_test.go +++ b/network/server_test.go @@ -7,6 +7,7 @@ import ( "testing" embeddedpostgres "github.com/fergusstrange/embedded-postgres" + "github.com/gatewayd-io/gatewayd/config" "github.com/gatewayd-io/gatewayd/logging" "github.com/gatewayd-io/gatewayd/plugin" "github.com/gatewayd-io/gatewayd/pool" @@ -30,7 +31,7 @@ func TestRunServer(t *testing.T) { // Create a logger. cfg := logging.LoggerConfig{ - Output: nil, + Output: config.Console, TimeFormat: zerolog.TimeFormatUnix, Level: zerolog.DebugLevel, NoColor: true, @@ -93,39 +94,28 @@ func TestRunServer(t *testing.T) { } hooksConfig.Add(plugin.OnEgressTraffic, 1, onEgressTraffic) + clientConfig := config.Client{ + Network: "tcp", + Address: "localhost:5432", + ReceiveBufferSize: config.DefaultBufferSize, + ReceiveChunkSize: config.DefaultChunkSize, + ReceiveDeadline: config.DefaultReceiveDeadline, + SendDeadline: config.DefaultSendDeadline, + TCPKeepAlive: false, + TCPKeepAlivePeriod: config.DefaultTCPKeepAlivePeriod, + } + // Create a connection pool. pool := pool.NewPool(2) - client1 := NewClient( - "tcp", - "localhost:5432", - DefaultBufferSize, - DefaultChunkSize, - DefaultReceiveDeadline, - DefaultSendDeadline, - false, - DefaultTCPKeepAlivePeriod, - logger) + client1 := NewClient(&clientConfig, logger) err := pool.Put(client1.ID, client1) assert.Nil(t, err) - client2 := NewClient( - "tcp", - "localhost:5432", - DefaultBufferSize, - DefaultChunkSize, - DefaultReceiveDeadline, - DefaultSendDeadline, - false, - DefaultTCPKeepAlivePeriod, - logger) + client2 := NewClient(&clientConfig, logger) err = pool.Put(client2.ID, client2) assert.Nil(t, err) // Create a proxy with a fixed buffer pool. - proxy := NewProxy(pool, hooksConfig, false, false, &Client{ - Network: "tcp", - Address: "localhost:5432", - ReceiveBufferSize: DefaultBufferSize, - }, logger) + proxy := NewProxy(pool, hooksConfig, false, false, &clientConfig, logger) // Create a server. server := NewServer( @@ -133,7 +123,7 @@ func TestRunServer(t *testing.T) { "127.0.0.1:15432", 0, 0, - DefaultTickInterval, + config.DefaultTickInterval, []gnet.Option{ gnet.WithMulticore(false), gnet.WithReuseAddr(true), @@ -157,14 +147,16 @@ func TestRunServer(t *testing.T) { for { if server.IsRunning() { client := NewClient( - "tcp", - "127.0.0.1:15432", - DefaultBufferSize, - DefaultChunkSize, - DefaultReceiveDeadline, - DefaultSendDeadline, - false, - DefaultTCPKeepAlivePeriod, + &config.Client{ + Network: "tcp", + Address: "127.0.0.1:15432", + ReceiveBufferSize: config.DefaultBufferSize, + ReceiveChunkSize: config.DefaultChunkSize, + ReceiveDeadline: config.DefaultReceiveDeadline, + SendDeadline: config.DefaultSendDeadline, + TCPKeepAlive: false, + TCPKeepAlivePeriod: config.DefaultTCPKeepAlivePeriod, + }, logger) assert.NotNil(t, client) @@ -172,7 +164,7 @@ func TestRunServer(t *testing.T) { assert.Nil(t, err) assert.Equal(t, len(CreatePgStartupPacket()), sent) - // The server should respond with a 'R' packet. + // The server should respond with an 'R' packet. size, data, err := client.Receive() msg := []byte{0x0, 0x0, 0x0, 0x3} // This includes the message type, length and the message itself. diff --git a/network/utils_test.go b/network/utils_test.go index b8e15510..8f003791 100644 --- a/network/utils_test.go +++ b/network/utils_test.go @@ -3,6 +3,7 @@ package network import ( "testing" + "github.com/gatewayd-io/gatewayd/config" "github.com/gatewayd-io/gatewayd/logging" "github.com/rs/zerolog" "github.com/stretchr/testify/assert" @@ -11,7 +12,7 @@ import ( // TestGetRlimit tests the GetRLimit function. func TestGetRlimit(t *testing.T) { cfg := logging.LoggerConfig{ - Output: nil, + Output: config.Console, TimeFormat: zerolog.TimeFormatUnix, Level: zerolog.DebugLevel, NoColor: true, @@ -26,7 +27,7 @@ func TestGetRlimit(t *testing.T) { // TestGetID tests the GetID function. func TestGetID(t *testing.T) { cfg := logging.LoggerConfig{ - Output: nil, + Output: config.Console, TimeFormat: zerolog.TimeFormatUnix, Level: zerolog.DebugLevel, NoColor: true, @@ -40,7 +41,7 @@ func TestGetID(t *testing.T) { // TestResolve tests the Resolve function. func TestResolve(t *testing.T) { cfg := logging.LoggerConfig{ - Output: nil, + Output: config.Console, TimeFormat: zerolog.TimeFormatUnix, Level: zerolog.DebugLevel, NoColor: true, diff --git a/plugin/hooks.go b/plugin/hooks.go index 58947b23..e10da341 100644 --- a/plugin/hooks.go +++ b/plugin/hooks.go @@ -3,7 +3,9 @@ package plugin import ( "context" "sort" + "time" + "github.com/gatewayd-io/gatewayd/config" gerr "github.com/gatewayd-io/gatewayd/errors" "github.com/rs/zerolog" "google.golang.org/grpc" @@ -17,16 +19,6 @@ type ( HookType string HookDef func( context.Context, *structpb.Struct, ...grpc.CallOption) (*structpb.Struct, error) - Policy int -) - -const ( - // Non-strict (permissive) mode. - PassDown Policy = iota // Pass down the extra keys/values in result to the next plugins - // Strict mode. - Ignore // Ignore errors and continue - Abort // Abort on first error and return results - Remove // Remove the hook from the list on error and continue ) const ( @@ -57,7 +49,7 @@ const ( type HookConfig struct { hooks map[HookType]map[Priority]HookDef Logger zerolog.Logger - Verification Policy + Verification config.Policy } // NewHookConfig returns a new HookConfig. @@ -112,7 +104,7 @@ func (h *HookConfig) Run( ctx context.Context, args map[string]interface{}, hookType HookType, - verification Policy, + verification config.Policy, opts ...grpc.CallOption, ) (map[string]interface{}, *gerr.GatewayDError) { if ctx == nil { @@ -125,9 +117,19 @@ func (h *HookConfig) Run( // Create structpb.Struct from args. var params *structpb.Struct - if args == nil { + arguments := map[string]interface{}{} + for k, v := range args { + switch v := v.(type) { + case time.Duration: + arguments[k] = v.Seconds() + default: + arguments[k] = v + } + } + + if len(arguments) == 0 { params = &structpb.Struct{} - } else if casted, err := structpb.NewStruct(args); err == nil { + } else if casted, err := structpb.NewStruct(arguments); err == nil { params = casted } else { return nil, gerr.ErrCastFailed.Wrap(err) @@ -161,7 +163,7 @@ func (h *HookConfig) Run( // and that the hook does not return any unexpected values. // If the verification mode is non-strict (permissive), let the plugin pass // extra keys/values to the next plugin in chain. - if Verify(params, result) || verification == PassDown { + if Verify(params, result) || verification == config.PassDown { // Update the last return value with the current result returnVal = result continue @@ -171,7 +173,7 @@ func (h *HookConfig) Run( // The result of the current hook will be ignored, regardless of the policy. switch verification { // Ignore the result of this plugin, log an error and execute the next hook. - case Ignore: + case config.Ignore: h.Logger.Error().Err(err).Fields( map[string]interface{}{ "hookType": hookType, @@ -182,7 +184,7 @@ func (h *HookConfig) Run( returnVal = params } // Abort execution of the plugins, log the error and return the result of the last hook. - case Abort: + case config.Abort: h.Logger.Error().Err(err).Fields( map[string]interface{}{ "hookType": hookType, @@ -194,7 +196,7 @@ func (h *HookConfig) Run( } return returnVal.AsMap(), nil // Remove the hook from the registry, log the error and execute the next hook. - case Remove: + case config.Remove: h.Logger.Error().Err(err).Fields( map[string]interface{}{ "hookType": hookType, @@ -205,7 +207,7 @@ func (h *HookConfig) Run( if idx == 0 { returnVal = params } - case PassDown: + case config.PassDown: default: returnVal = result } diff --git a/plugin/hooks_test.go b/plugin/hooks_test.go index 4f768638..65149c67 100644 --- a/plugin/hooks_test.go +++ b/plugin/hooks_test.go @@ -4,6 +4,7 @@ import ( "context" "testing" + "github.com/gatewayd-io/gatewayd/config" "github.com/stretchr/testify/assert" "google.golang.org/grpc" "google.golang.org/protobuf/types/known/structpb" @@ -77,7 +78,8 @@ func Test_HookConfig_Run(t *testing.T) { ) (*structpb.Struct, error) { return args, nil }) - result, err := hooks.Run(context.Background(), map[string]interface{}{}, OnNewLogger, Ignore) + result, err := hooks.Run( + context.Background(), map[string]interface{}{}, OnNewLogger, config.Ignore) assert.NotNil(t, result) assert.Nil(t, err) } @@ -113,7 +115,7 @@ func Test_HookConfig_Run_PassDown(t *testing.T) { context.Background(), map[string]interface{}{"test": "test"}, OnNewLogger, - PassDown) + config.PassDown) assert.Nil(t, err) assert.NotNil(t, result) } @@ -154,7 +156,7 @@ func Test_HookConfig_Run_PassDown_2(t *testing.T) { context.Background(), map[string]interface{}{"test": "test"}, OnNewLogger, - PassDown) + config.PassDown) assert.Nil(t, err) assert.NotNil(t, result) } @@ -190,7 +192,7 @@ func Test_HookConfig_Run_Ignore(t *testing.T) { context.Background(), map[string]interface{}{"test": "test"}, OnNewLogger, - Ignore) + config.Ignore) assert.Nil(t, err) assert.NotNil(t, result) } @@ -219,7 +221,8 @@ func Test_HookConfig_Run_Abort(t *testing.T) { return output, nil }) // The first hook returns nil, and it aborts the execution of the rest of the hook. - result, err := hooks.Run(context.Background(), map[string]interface{}{}, OnNewLogger, Abort) + result, err := hooks.Run( + context.Background(), map[string]interface{}{}, OnNewLogger, config.Abort) assert.Nil(t, err) assert.Equal(t, map[string]interface{}{}, result) } @@ -250,7 +253,8 @@ func Test_HookConfig_Run_Remove(t *testing.T) { // The first hook returns nil, and its signature doesn't match the params, // so its result is ignored. The failing hook is removed from the list and // the execution continues with the next hook in the list. - result, err := hooks.Run(context.Background(), map[string]interface{}{}, OnNewLogger, Remove) + result, err := hooks.Run( + context.Background(), map[string]interface{}{}, OnNewLogger, config.Remove) assert.Nil(t, err) assert.Equal(t, map[string]interface{}{}, result) assert.Equal(t, 1, len(hooks.Hooks()[OnNewLogger])) diff --git a/plugin/registry.go b/plugin/registry.go index 159ba0b9..61959af5 100644 --- a/plugin/registry.go +++ b/plugin/registry.go @@ -4,31 +4,16 @@ import ( "context" semver "github.com/Masterminds/semver/v3" + "github.com/gatewayd-io/gatewayd/config" gerr "github.com/gatewayd-io/gatewayd/errors" "github.com/gatewayd-io/gatewayd/logging" pluginV1 "github.com/gatewayd-io/gatewayd/plugin/v1" "github.com/gatewayd-io/gatewayd/pool" goplugin "github.com/hashicorp/go-plugin" - "github.com/knadh/koanf" "github.com/mitchellh/mapstructure" "google.golang.org/protobuf/types/known/structpb" ) -type CompatPolicy uint - -const ( - DefaultMinPort uint = 50000 - DefaultMaxPort uint = 60000 - PluginPriorityStart uint = 1000 - EmptyPoolCapacity int = 0 - LoggerName string = "plugin" -) - -const ( - Strict CompatPolicy = iota - Loose -) - type Registry interface { Add(plugin *Impl) bool Get(id Identifier) *Impl @@ -36,21 +21,21 @@ type Registry interface { Exists(name, version, remoteURL string) bool Remove(id Identifier) Shutdown() - LoadPlugins(pluginConfig *koanf.Koanf) + LoadPlugins(plugins []config.Plugin) RegisterHooks(id Identifier) } type RegistryImpl struct { plugins pool.Pool hooksConfig *HookConfig - CompatPolicy CompatPolicy + CompatPolicy config.CompatPolicy } var _ Registry = &RegistryImpl{} // NewRegistry creates a new plugin registry. func NewRegistry(hooksConfig *HookConfig) *RegistryImpl { - return &RegistryImpl{plugins: pool.NewPool(EmptyPoolCapacity), hooksConfig: hooksConfig} + return &RegistryImpl{plugins: pool.NewPool(config.EmptyPoolCapacity), hooksConfig: hooksConfig} } // Add adds a plugin to the registry. @@ -140,46 +125,46 @@ func (reg *RegistryImpl) Shutdown() { // LoadPlugins loads plugins from the config file. // //nolint:funlen -func (reg *RegistryImpl) LoadPlugins(pluginConfig *koanf.Koanf) { - // Get top-level list of plugins. - plugins := pluginConfig.MapKeys("") - +func (reg *RegistryImpl) LoadPlugins(plugins []config.Plugin) { // TODO: Append built-in plugins to the list of plugins // Built-in plugins are plugins that are compiled and shipped with the gatewayd binary. // Add each plugin to the registry. - for priority, name := range plugins { + for priority, pCfg := range plugins { // Skip the top-level "plugins" key. - if name == "plugins" { + if pCfg.Name == "plugins" { continue } - reg.hooksConfig.Logger.Debug().Str("name", name).Msg("Loading plugin") + reg.hooksConfig.Logger.Debug().Str("name", pCfg.Name).Msg("Loading plugin") plugin := &Impl{ ID: Identifier{ - Name: name, + Name: pCfg.Name, + Checksum: pCfg.Checksum, }, + Enabled: pCfg.Enabled, + LocalPath: pCfg.LocalPath, + Args: pCfg.Args, + Env: pCfg.Env, } // Is the plugin enabled? - plugin.Enabled = pluginConfig.Bool(name + ".enabled") + plugin.Enabled = pCfg.Enabled if !plugin.Enabled { - reg.hooksConfig.Logger.Debug().Str("name", name).Msg("Plugin is disabled") + reg.hooksConfig.Logger.Debug().Str("name", plugin.ID.Name).Msg("Plugin is disabled") continue } // File path of the plugin on disk. - plugin.LocalPath = pluginConfig.String(name + ".localPath") if plugin.LocalPath == "" { - reg.hooksConfig.Logger.Debug().Str("name", name).Msg( + reg.hooksConfig.Logger.Debug().Str("name", plugin.ID.Name).Msg( "Local file of the plugin doesn't exist or is not set") continue } // Checksum of the plugin. - plugin.ID.Checksum = pluginConfig.String(name + ".checksum") if plugin.ID.Checksum == "" { - reg.hooksConfig.Logger.Debug().Str("name", name).Msg( + reg.hooksConfig.Logger.Debug().Str("name", plugin.ID.Name).Msg( "Checksum of plugin doesn't exist or is not set") continue } @@ -199,23 +184,13 @@ func (reg *RegistryImpl) LoadPlugins(pluginConfig *koanf.Koanf) { continue } - // Commandline arguments to pass to the plugin. - if args := pluginConfig.Strings(name + ".args"); len(args) > 0 { - plugin.Args = args - } - - // Custom environment variables to pass to the plugin. - if env := pluginConfig.Strings(name + ".env"); len(env) > 0 { - plugin.Env = append(plugin.Env, env...) - } - // Plugin priority is determined by the order in which the plugin is listed // in the config file. Built-in plugins are loaded first, followed by user-defined // plugins. Built-in plugins have a priority of 0 to 999, and user-defined plugins // have a priority of 1000 or greater. - plugin.Priority = Priority(PluginPriorityStart + uint(priority)) + plugin.Priority = Priority(config.PluginPriorityStart + uint(priority)) - logAdapter := logging.NewHcLogAdapter(®.hooksConfig.Logger, LoggerName) + logAdapter := logging.NewHcLogAdapter(®.hooksConfig.Logger, config.LoggerName) plugin.client = goplugin.NewClient( &goplugin.ClientConfig{ @@ -228,8 +203,8 @@ func (reg *RegistryImpl) LoadPlugins(pluginConfig *koanf.Koanf) { // SecureConfig: nil, Logger: logAdapter, Managed: true, - MinPort: DefaultMinPort, - MaxPort: DefaultMaxPort, + MinPort: config.DefaultMinPort, + MaxPort: config.DefaultMaxPort, // TODO: Enable GRPC DialOptions // GRPCDialOptions: []grpc.DialOption{ // grpc.WithInsecure(), @@ -280,7 +255,7 @@ func (reg *RegistryImpl) LoadPlugins(pluginConfig *koanf.Koanf) { "requirement": req.Name, }, ).Msg("The plugin requirement is not met, so it won't work properly") - if reg.CompatPolicy == Strict { + if reg.CompatPolicy == config.Strict { reg.hooksConfig.Logger.Debug().Str("name", plugin.ID.Name).Msg( "Registry is in strict compatibility mode, so the plugin won't be loaded") plugin.Stop() // Stop the plugin. diff --git a/plugin/utils.go b/plugin/utils.go index c500d73f..e5af464a 100644 --- a/plugin/utils.go +++ b/plugin/utils.go @@ -9,14 +9,13 @@ import ( "os" "os/exec" + "github.com/gatewayd-io/gatewayd/config" gerr "github.com/gatewayd-io/gatewayd/errors" "github.com/google/go-cmp/cmp" "github.com/google/go-cmp/cmp/cmpopts" "google.golang.org/protobuf/types/known/structpb" ) -const bufferSize = 65536 - // sha256sum returns the sha256 checksum of a file. // Ref: https://github.com/codingsince1985/checksum // A little copying is better than a little dependency. @@ -33,7 +32,7 @@ func sha256sum(filename string) (string, *gerr.GatewayDError) { hashAlgorithm := sha256.New() - buf := make([]byte, bufferSize) + buf := make([]byte, config.ChecksumBufferSize) for { n, err := bufio.NewReader(file).Read(buf) //nolint:gocritic diff --git a/pool/pool.go b/pool/pool.go index 341be35a..b27cc0b2 100644 --- a/pool/pool.go +++ b/pool/pool.go @@ -6,10 +6,6 @@ import ( gerr "github.com/gatewayd-io/gatewayd/errors" ) -const ( - EmptyPoolCapacity = 0 -) - type Callback func(key, value interface{}) bool type Pool interface { diff --git a/pool/pool_test.go b/pool/pool_test.go index e8d4163f..b194acef 100644 --- a/pool/pool_test.go +++ b/pool/pool_test.go @@ -3,12 +3,13 @@ package pool import ( "testing" + "github.com/gatewayd-io/gatewayd/config" "github.com/stretchr/testify/assert" ) // TestNewPool tests the NewPool function. func TestNewPool(t *testing.T) { - pool := NewPool(EmptyPoolCapacity) + pool := NewPool(config.EmptyPoolCapacity) defer pool.Clear() assert.NotNil(t, pool) assert.NotNil(t, pool.Pool()) @@ -17,7 +18,7 @@ func TestNewPool(t *testing.T) { // TestPool_Put tests the Put function. func TestPool_Put(t *testing.T) { - pool := NewPool(EmptyPoolCapacity) + pool := NewPool(config.EmptyPoolCapacity) defer pool.Clear() assert.NotNil(t, pool) assert.NotNil(t, pool.Pool()) @@ -34,7 +35,7 @@ func TestPool_Put(t *testing.T) { // //nolint:dupl func TestPool_Pop(t *testing.T) { - pool := NewPool(EmptyPoolCapacity) + pool := NewPool(config.EmptyPoolCapacity) defer pool.Clear() assert.NotNil(t, pool) assert.NotNil(t, pool.Pool()) @@ -61,7 +62,7 @@ func TestPool_Pop(t *testing.T) { // TestPool_Clear tests the Clear function. func TestPool_Clear(t *testing.T) { - pool := NewPool(EmptyPoolCapacity) + pool := NewPool(config.EmptyPoolCapacity) defer pool.Clear() assert.NotNil(t, pool) assert.NotNil(t, pool.Pool()) @@ -78,7 +79,7 @@ func TestPool_Clear(t *testing.T) { // TestPool_ForEach tests the ForEach function. func TestPool_ForEach(t *testing.T) { - pool := NewPool(EmptyPoolCapacity) + pool := NewPool(config.EmptyPoolCapacity) defer pool.Clear() assert.NotNil(t, pool) assert.NotNil(t, pool.Pool()) @@ -101,7 +102,7 @@ func TestPool_ForEach(t *testing.T) { // //nolint:dupl func TestPool_Get(t *testing.T) { - pool := NewPool(EmptyPoolCapacity) + pool := NewPool(config.EmptyPoolCapacity) defer pool.Clear() assert.NotNil(t, pool) assert.NotNil(t, pool.Pool()) @@ -128,7 +129,7 @@ func TestPool_Get(t *testing.T) { // TestPool_GetOrPut tests the GetOrPut function. func TestPool_GetOrPut(t *testing.T) { - pool := NewPool(EmptyPoolCapacity) + pool := NewPool(config.EmptyPoolCapacity) defer pool.Clear() assert.NotNil(t, pool) assert.NotNil(t, pool.Pool()) @@ -161,7 +162,7 @@ func TestPool_GetOrPut(t *testing.T) { // TestPool_Remove tests the Remove function. func TestPool_Remove(t *testing.T) { - pool := NewPool(EmptyPoolCapacity) + pool := NewPool(config.EmptyPoolCapacity) defer pool.Clear() assert.NotNil(t, pool) assert.NotNil(t, pool.Pool()) @@ -180,7 +181,7 @@ func TestPool_Remove(t *testing.T) { // TestPool_GetClientIDs tests the GetClientIDs function. func TestPool_GetClientIDs(t *testing.T) { - pool := NewPool(EmptyPoolCapacity) + pool := NewPool(config.EmptyPoolCapacity) defer pool.Clear() assert.NotNil(t, pool) assert.NotNil(t, pool.Pool())