Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
35 changes: 13 additions & 22 deletions cmd/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ var (
},
)
// The plugins are loaded and hooks registered before the configuration is loaded.
pluginRegistry = plugin.NewRegistry(config.Loose, config.PassDown, DefaultLogger)
pluginRegistry = plugin.NewRegistry(config.Loose, config.PassDown, config.Accept, DefaultLogger)
// Global koanf instance. Using "." as the key path delimiter.
globalConfig = koanf.New(".")
// Plugin koanf instance. Using "." as the key path delimiter.
Expand Down Expand Up @@ -63,8 +63,12 @@ var runCmd = &cobra.Command{
os.Exit(gerr.FailedToLoadPluginConfig)
}

// Set the plugin compatibility policy.
pluginRegistry.CompatPolicy = pConfig.GetPluginCompatPolicy()
// Set the plugin requirement's compatibility policy.
pluginRegistry.Compatibility = pConfig.GetPluginCompatibilityPolicy()
// Set hooks' signature verification policy.
pluginRegistry.Verification = pConfig.GetVerificationPolicy()
// Set custom hook acceptance policy.
pluginRegistry.Acceptance = pConfig.GetAcceptancePolicy()

// Load plugins and register their hooks.
pluginRegistry.LoadPlugins(pConfig.Plugins)
Expand All @@ -84,9 +88,6 @@ var runCmd = &cobra.Command{
// Load environment variables for the global configuration.
config.LoadEnvVars(globalConfig)

// Get hooks signature verification policy.
pluginRegistry.Verification = pConfig.GetVerificationPolicy()

// Unmarshal the global configuration for easier access.
var gConfig config.GlobalConfig
if err := globalConfig.Unmarshal("", &gConfig); err != nil {
Expand All @@ -100,8 +101,7 @@ var runCmd = &cobra.Command{
updatedGlobalConfig, err := pluginRegistry.Run(
context.Background(),
globalConfig.All(),
plugin.OnConfigLoaded,
pluginRegistry.Verification)
plugin.OnConfigLoaded)
if err != nil {
DefaultLogger.Error().Err(err).Msg("Failed to run OnConfigLoaded hooks")
}
Expand Down Expand Up @@ -145,8 +145,7 @@ var runCmd = &cobra.Command{
"fileName": loggerCfg.FileName,
}
// TODO: Use a context with a timeout
_, err = pluginRegistry.Run(
context.Background(), data, plugin.OnNewLogger, pluginRegistry.Verification)
_, err = pluginRegistry.Run(context.Background(), data, plugin.OnNewLogger)
if err != nil {
logger.Error().Err(err).Msg("Failed to run OnNewLogger hooks")
}
Expand Down Expand Up @@ -174,11 +173,7 @@ var runCmd = &cobra.Command{
"tcpKeepAlive": client.TCPKeepAlive,
"tcpKeepAlivePeriod": client.TCPKeepAlivePeriod.String(),
}
_, err := pluginRegistry.Run(
context.Background(),
clientCfg,
plugin.OnNewClient,
pluginRegistry.Verification)
_, err := pluginRegistry.Run(context.Background(), clientCfg, plugin.OnNewClient)
if err != nil {
logger.Error().Err(err).Msg("Failed to run OnNewClient hooks")
}
Expand All @@ -205,8 +200,7 @@ var runCmd = &cobra.Command{
_, err = pluginRegistry.Run(
context.Background(),
map[string]interface{}{"size": poolSize},
plugin.OnNewPool,
pluginRegistry.Verification)
plugin.OnNewPool)
if err != nil {
logger.Error().Err(err).Msg("Failed to run OnNewPool hooks")
}
Expand Down Expand Up @@ -240,8 +234,7 @@ var runCmd = &cobra.Command{
"tcpKeepAlivePeriod": clientConfig.TCPKeepAlivePeriod.String(),
},
}
_, err = pluginRegistry.Run(
context.Background(), proxyCfg, plugin.OnNewProxy, pluginRegistry.Verification)
_, err = pluginRegistry.Run(context.Background(), proxyCfg, plugin.OnNewProxy)
if err != nil {
logger.Error().Err(err).Msg("Failed to run OnNewProxy hooks")
}
Expand Down Expand Up @@ -302,8 +295,7 @@ var runCmd = &cobra.Command{
"tcpKeepAlive": gConfig.Server.TCPKeepAlive.String(),
"tcpNoDelay": gConfig.Server.TCPNoDelay,
}
_, err = pluginRegistry.Run(
context.Background(), serverCfg, plugin.OnNewServer, pluginRegistry.Verification)
_, err = pluginRegistry.Run(context.Background(), serverCfg, plugin.OnNewServer)
if err != nil {
logger.Error().Err(err).Msg("Failed to run OnNewServer hooks")
}
Expand All @@ -330,7 +322,6 @@ var runCmd = &cobra.Command{
context.Background(),
map[string]interface{}{"signal": sig.String()},
plugin.OnSignal,
pluginRegistry.Verification,
)
if err != nil {
logger.Error().Err(err).Msg("Failed to run OnSignal hooks")
Expand Down
23 changes: 15 additions & 8 deletions config/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,11 @@ package config
import "time"

type (
Status uint
Policy uint
CompatPolicy uint
LogOutput uint
Status uint
VerificationPolicy uint
CompatibilityPolicy uint
AcceptancePolicy uint
LogOutput uint
)

// Status is the status of the server.
Expand All @@ -18,17 +19,23 @@ const (
// Policy is the policy for hook verification.
const (
// Non-strict (permissive) mode.
PassDown Policy = iota // Pass down the extra keys/values in result to the next plugins
PassDown VerificationPolicy = iota // Pass down the extra keys/values in result to the next plugins
// Strict mode.
Ignore // Ignore errors and continue
Abort // Abort on first error and return results
Remove // Remove the hook from the list on error and continue
)

// CompatPolicy is the compatibility policy for plugins.
// CompatibilityPolicy is the compatibility policy for plugins.
const (
Strict CompatPolicy = iota
Loose
Strict CompatibilityPolicy = iota // Expect all required plugins to be loaded and present
Loose // Load the plugin, even if the requirements are not met
)

// AcceptancePolicy is the acceptance policy for custom hooks.
const (
Accept AcceptancePolicy = iota // Accept all custom hooks
Reject // Reject all custom hooks
)

// LogOutput is the output type for the logger.
Expand Down
18 changes: 15 additions & 3 deletions config/getters.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ import (
)

// GetVerificationPolicy returns the hook verification policy from plugin config file.
func (p PluginConfig) GetVerificationPolicy() Policy {
func (p PluginConfig) GetVerificationPolicy() VerificationPolicy {
switch p.VerificationPolicy {
case "ignore":
return Ignore
Expand All @@ -19,8 +19,8 @@ func (p PluginConfig) GetVerificationPolicy() Policy {
}
}

// GetPluginCompatPolicy returns the plugin compatibility policy from plugin config file.
func (p PluginConfig) GetPluginCompatPolicy() CompatPolicy {
// GetPluginCompatibilityPolicy returns the plugin compatibility policy from plugin config file.
func (p PluginConfig) GetPluginCompatibilityPolicy() CompatibilityPolicy {
switch p.CompatibilityPolicy {
case "strict":
return Strict
Expand All @@ -31,6 +31,18 @@ func (p PluginConfig) GetPluginCompatPolicy() CompatPolicy {
}
}

// GetAcceptancePolicy returns the acceptance policy from plugin config file.
func (p PluginConfig) GetAcceptancePolicy() AcceptancePolicy {
switch p.AcceptancePolicy {
case "accept":
return Accept
case "reject":
return Reject
default:
return Accept
}
}

// GetLoadBalancer returns the load balancing algorithm to use.
func (s Server) GetLoadBalancer() gnet.LoadBalancing {
switch s.LoadBalancer {
Expand Down
1 change: 1 addition & 0 deletions config/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ type Plugin struct {
type PluginConfig struct {
VerificationPolicy string `koanf:"verificationPolicy"`
CompatibilityPolicy string `koanf:"compatibilityPolicy"`
AcceptancePolicy string `koanf:"acceptancePolicy"`
Plugins []Plugin `koanf:"plugins"`
}

Expand Down
6 changes: 4 additions & 2 deletions gatewayd_plugins.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@
verificationPolicy: "passdown"
# Possible values: "strict" (default) and "loose"
compatibilityPolicy: "strict"
# Possible values: "accept" (default) and "reject"
acceptancePolicy: "accept"

plugins:
# Plugin name
Expand All @@ -23,7 +25,7 @@ plugins:
- MAGIC_COOKIE_KEY=GATEWAYD_PLUGIN
- MAGIC_COOKIE_VALUE=5712b87aa5d7e9f9e9ab643e6603181c5b796015cb1c09d6f5ada882bf2a1872
# Checksum hash to verify the binary before loading
checksum: 8c68bc8ca7a759637579330460a60ca785d659da567b95187f154723b1aa519c
checksum: b1cfe3c6b268d904abe405b106ef737da5373912586ff8a16695939c38caf30d
- name: gatewayd-plugin-cache
enabled: True
localPath: ../gatewayd-plugin-cache/gatewayd-plugin-cache
Expand All @@ -32,4 +34,4 @@ plugins:
- MAGIC_COOKIE_KEY=GATEWAYD_PLUGIN
- MAGIC_COOKIE_VALUE=5712b87aa5d7e9f9e9ab643e6603181c5b796015cb1c09d6f5ada882bf2a1872
- REDIS_ADDR=localhost:6379
checksum: a0b62927f8330638a2b9a36d843580b784b927a12addb76eabb25d0cec7c6b57
checksum: 0c55ed414432afd6db2fbe284112046b6ce4d70b7beb5b99d3703f4bbbcb5a84
12 changes: 4 additions & 8 deletions network/proxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -375,8 +375,7 @@ func (pr *Proxy) PassThrough(gconn gnet.Conn) *gerr.GatewayDError {
},
},
origErr),
plugin.OnTrafficFromClient,
pr.pluginRegistry.Verification)
plugin.OnTrafficFromClient)
if err != nil {
pr.logger.Error().Err(err).Msg("Error running hook")
}
Expand Down Expand Up @@ -408,8 +407,7 @@ func (pr *Proxy) PassThrough(gconn gnet.Conn) *gerr.GatewayDError {
},
},
err),
plugin.OnTrafficToServer,
pr.pluginRegistry.Verification)
plugin.OnTrafficToServer)
if err != nil {
pr.logger.Error().Err(err).Msg("Error running hook")
}
Expand Down Expand Up @@ -464,8 +462,7 @@ func (pr *Proxy) PassThrough(gconn gnet.Conn) *gerr.GatewayDError {
},
},
err),
plugin.OnTrafficFromServer,
pr.pluginRegistry.Verification)
plugin.OnTrafficFromServer)
if err != nil {
pr.logger.Error().Err(err).Msg("Error running hook")
}
Expand Down Expand Up @@ -498,8 +495,7 @@ func (pr *Proxy) PassThrough(gconn gnet.Conn) *gerr.GatewayDError {
},
err,
),
plugin.OnTrafficToClient,
pr.pluginRegistry.Verification)
plugin.OnTrafficToClient)
if err != nil {
pr.logger.Error().Err(err).Msg("Error running hook")
}
Expand Down
4 changes: 2 additions & 2 deletions network/proxy_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ func TestNewProxy(t *testing.T) {

// Create a proxy with a fixed buffer pool
proxy := NewProxy(pool,
plugin.NewRegistry(config.Loose, config.PassDown, logger),
plugin.NewRegistry(config.Loose, config.PassDown, config.Accept, logger),
false,
false,
config.DefaultHealthCheckPeriod,
Expand Down Expand Up @@ -89,7 +89,7 @@ func TestNewProxyElastic(t *testing.T) {

// Create a proxy with an elastic buffer pool
proxy := NewProxy(pool,
plugin.NewRegistry(config.Loose, config.PassDown, logger),
plugin.NewRegistry(config.Loose, config.PassDown, config.Accept, logger),
true,
false,
config.DefaultHealthCheckPeriod,
Expand Down
30 changes: 10 additions & 20 deletions network/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,8 +41,7 @@ func (s *Server) OnBoot(engine gnet.Engine) gnet.Action {
_, err := s.pluginRegistry.Run(
context.Background(),
map[string]interface{}{"status": fmt.Sprint(s.Status)},
plugin.OnBooting,
s.pluginRegistry.Verification)
plugin.OnBooting)
if err != nil {
s.logger.Error().Err(err).Msg("Failed to run OnBooting hook")
}
Expand All @@ -56,8 +55,7 @@ func (s *Server) OnBoot(engine gnet.Engine) gnet.Action {
_, err = s.pluginRegistry.Run(
context.Background(),
map[string]interface{}{"status": fmt.Sprint(s.Status)},
plugin.OnBooted,
s.pluginRegistry.Verification)
plugin.OnBooted)
if err != nil {
s.logger.Error().Err(err).Msg("Failed to run OnBooted hook")
}
Expand All @@ -80,8 +78,7 @@ func (s *Server) OnOpen(gconn gnet.Conn) ([]byte, gnet.Action) {
"remote": gconn.RemoteAddr().String(),
},
}
_, err := s.pluginRegistry.Run(
context.Background(), onOpeningData, plugin.OnOpening, s.pluginRegistry.Verification)
_, err := s.pluginRegistry.Run(context.Background(), onOpeningData, plugin.OnOpening)
if err != nil {
s.logger.Error().Err(err).Msg("Failed to run OnOpening hook")
}
Expand Down Expand Up @@ -121,8 +118,7 @@ func (s *Server) OnOpen(gconn gnet.Conn) ([]byte, gnet.Action) {
"remote": gconn.RemoteAddr().String(),
},
}
_, err = s.pluginRegistry.Run(
context.Background(), onOpenedData, plugin.OnOpened, s.pluginRegistry.Verification)
_, err = s.pluginRegistry.Run(context.Background(), onOpenedData, plugin.OnOpened)
if err != nil {
s.logger.Error().Err(err).Msg("Failed to run OnOpened hook")
}
Expand All @@ -148,8 +144,7 @@ func (s *Server) OnClose(gconn gnet.Conn, err error) gnet.Action {
if err != nil {
data["error"] = err.Error()
}
_, gatewaydErr := s.pluginRegistry.Run(
context.Background(), data, plugin.OnClosing, s.pluginRegistry.Verification)
_, gatewaydErr := s.pluginRegistry.Run(context.Background(), data, plugin.OnClosing)
if gatewaydErr != nil {
s.logger.Error().Err(gatewaydErr).Msg("Failed to run OnClosing hook")
}
Expand Down Expand Up @@ -179,8 +174,7 @@ func (s *Server) OnClose(gconn gnet.Conn, err error) gnet.Action {
if err != nil {
data["error"] = err.Error()
}
_, gatewaydErr = s.pluginRegistry.Run(
context.Background(), data, plugin.OnClosed, s.pluginRegistry.Verification)
_, gatewaydErr = s.pluginRegistry.Run(context.Background(), data, plugin.OnClosed)
if gatewaydErr != nil {
s.logger.Error().Err(gatewaydErr).Msg("Failed to run OnClosed hook")
}
Expand All @@ -198,8 +192,7 @@ func (s *Server) OnTraffic(gconn gnet.Conn) gnet.Action {
"remote": gconn.RemoteAddr().String(),
},
}
_, err := s.pluginRegistry.Run(
context.Background(), onTrafficData, plugin.OnTraffic, s.pluginRegistry.Verification)
_, err := s.pluginRegistry.Run(context.Background(), onTrafficData, plugin.OnTraffic)
if err != nil {
s.logger.Error().Err(err).Msg("Failed to run OnTraffic hook")
}
Expand Down Expand Up @@ -234,8 +227,7 @@ func (s *Server) OnShutdown(engine gnet.Engine) {
_, err := s.pluginRegistry.Run(
context.Background(),
map[string]interface{}{"connections": s.engine.CountConnections()},
plugin.OnShutdown,
s.pluginRegistry.Verification)
plugin.OnShutdown)
if err != nil {
s.logger.Error().Err(err).Msg("Failed to run OnShutdown hook")
}
Expand All @@ -257,8 +249,7 @@ func (s *Server) OnTick() (time.Duration, gnet.Action) {
_, err := s.pluginRegistry.Run(
context.Background(),
map[string]interface{}{"connections": s.engine.CountConnections()},
plugin.OnTick,
s.pluginRegistry.Verification)
plugin.OnTick)
if err != nil {
s.logger.Error().Err(err).Msg("Failed to run OnTick hook")
}
Expand All @@ -284,8 +275,7 @@ func (s *Server) Run() error {
if err != nil && err.Unwrap() != nil {
onRunData["error"] = err.OriginalError.Error()
}
result, err := s.pluginRegistry.Run(
context.Background(), onRunData, plugin.OnRun, s.pluginRegistry.Verification)
result, err := s.pluginRegistry.Run(context.Background(), onRunData, plugin.OnRun)
if err != nil {
s.logger.Error().Err(err).Msg("Failed to run the hook")
}
Expand Down
2 changes: 1 addition & 1 deletion network/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ func TestRunServer(t *testing.T) {

logger := logging.NewLogger(cfg)

pluginRegistry := plugin.NewRegistry(config.Loose, config.PassDown, logger)
pluginRegistry := plugin.NewRegistry(config.Loose, config.PassDown, config.Accept, logger)

onTrafficFromClient := func(
ctx context.Context,
Expand Down
Loading