Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
61 changes: 28 additions & 33 deletions cmd/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ import (
"github.com/gatewayd-io/gatewayd/logging"
"github.com/gatewayd-io/gatewayd/network"
"github.com/gatewayd-io/gatewayd/plugin"
"github.com/gatewayd-io/gatewayd/plugin/hook"
"github.com/gatewayd-io/gatewayd/pool"
"github.com/knadh/koanf"
"github.com/knadh/koanf/parsers/yaml"
Expand All @@ -24,14 +23,14 @@ import (
)

var (
hooksConfig = hook.NewHookConfig()
DefaultLogger = logging.NewLogger(
logging.LoggerConfig{
Level: zerolog.InfoLevel, // Default log level
NoColor: true,
},
)
pluginRegistry = plugin.NewRegistry(hooksConfig)
// The plugins are loaded and hooks registered before the configuration is loaded.
pluginRegistry = plugin.NewRegistry(config.Loose, config.PassDown, DefaultLogger)
// Global koanf instance. Using "." as the key path delimiter.
globalConfig = koanf.New(".")
// Plugin koanf instance. Using "." as the key path delimiter.
Expand All @@ -43,10 +42,6 @@ var runCmd = &cobra.Command{
Use: "run",
Short: "Run a gatewayd instance",
Run: func(cmd *cobra.Command, args []string) {
// The plugins are loaded and hooks registered
// before the configuration is loaded.
hooksConfig.Logger = DefaultLogger

// Load default plugin configuration.
config.LoadPluginConfigDefaults(pluginConfig)

Expand Down Expand Up @@ -90,7 +85,7 @@ var runCmd = &cobra.Command{
config.LoadEnvVars(globalConfig)

// Get hooks signature verification policy.
hooksConfig.Verification = pConfig.GetVerificationPolicy()
pluginRegistry.Verification = pConfig.GetVerificationPolicy()

// Unmarshal the global configuration for easier access.
var gConfig config.GlobalConfig
Expand All @@ -100,13 +95,13 @@ var runCmd = &cobra.Command{
os.Exit(gerr.FailedToLoadGlobalConfig)
}

// The config will be passed to the plugins that register to the "OnConfigLoaded" hook.
// The config will be passed to the plugins that register to the "OnConfigLoaded" plugin.
// The plugins can modify the config and return it.
updatedGlobalConfig, err := hooksConfig.Run(
updatedGlobalConfig, err := pluginRegistry.Run(
context.Background(),
globalConfig.All(),
hook.OnConfigLoaded,
hooksConfig.Verification)
plugin.OnConfigLoaded,
pluginRegistry.Verification)
if err != nil {
DefaultLogger.Error().Err(err).Msg("Failed to run OnConfigLoaded hooks")
}
Expand Down Expand Up @@ -139,7 +134,7 @@ var runCmd = &cobra.Command{
})

// Replace the default logger with the new one from the config.
hooksConfig.Logger = logger
pluginRegistry.Logger = logger

// This is a notification hook, so we don't care about the result.
data := map[string]interface{}{
Expand All @@ -150,8 +145,8 @@ var runCmd = &cobra.Command{
"fileName": loggerCfg.FileName,
}
// TODO: Use a context with a timeout
_, err = hooksConfig.Run(
context.Background(), data, hook.OnNewLogger, hooksConfig.Verification)
_, err = pluginRegistry.Run(
context.Background(), data, plugin.OnNewLogger, pluginRegistry.Verification)
if err != nil {
logger.Error().Err(err).Msg("Failed to run OnNewLogger hooks")
}
Expand Down Expand Up @@ -179,11 +174,11 @@ var runCmd = &cobra.Command{
"tcpKeepAlive": client.TCPKeepAlive,
"tcpKeepAlivePeriod": client.TCPKeepAlivePeriod.String(),
}
_, err := hooksConfig.Run(
_, err := pluginRegistry.Run(
context.Background(),
clientCfg,
hook.OnNewClient,
hooksConfig.Verification)
plugin.OnNewClient,
pluginRegistry.Verification)
if err != nil {
logger.Error().Err(err).Msg("Failed to run OnNewClient hooks")
}
Expand All @@ -204,14 +199,14 @@ var runCmd = &cobra.Command{
"the clients cannot connect due to no network connectivity " +
"or the server is not running. exiting...")
pluginRegistry.Shutdown()
os.Exit(1)
os.Exit(gerr.FailedToInitializePool)
}

_, err = hooksConfig.Run(
_, err = pluginRegistry.Run(
context.Background(),
map[string]interface{}{"size": poolSize},
hook.OnNewPool,
hooksConfig.Verification)
plugin.OnNewPool,
pluginRegistry.Verification)
if err != nil {
logger.Error().Err(err).Msg("Failed to run OnNewPool hooks")
}
Expand All @@ -222,7 +217,7 @@ var runCmd = &cobra.Command{
healthCheckPeriod := gConfig.Proxy[config.Default].HealthCheckPeriod
proxy := network.NewProxy(
pool,
hooksConfig,
pluginRegistry,
elastic,
reuseElasticClients,
healthCheckPeriod,
Expand All @@ -245,8 +240,8 @@ var runCmd = &cobra.Command{
"tcpKeepAlivePeriod": clientConfig.TCPKeepAlivePeriod.String(),
},
}
_, err = hooksConfig.Run(
context.Background(), proxyCfg, hook.OnNewProxy, hooksConfig.Verification)
_, err = pluginRegistry.Run(
context.Background(), proxyCfg, plugin.OnNewProxy, pluginRegistry.Verification)
if err != nil {
logger.Error().Err(err).Msg("Failed to run OnNewProxy hooks")
}
Expand Down Expand Up @@ -285,7 +280,7 @@ var runCmd = &cobra.Command{
},
proxy,
logger,
hooksConfig,
pluginRegistry,
)

serverCfg := map[string]interface{}{
Expand All @@ -307,8 +302,8 @@ var runCmd = &cobra.Command{
"tcpKeepAlive": gConfig.Server.TCPKeepAlive.String(),
"tcpNoDelay": gConfig.Server.TCPNoDelay,
}
_, err = hooksConfig.Run(
context.Background(), serverCfg, hook.OnNewServer, hooksConfig.Verification)
_, err = pluginRegistry.Run(
context.Background(), serverCfg, plugin.OnNewServer, pluginRegistry.Verification)
if err != nil {
logger.Error().Err(err).Msg("Failed to run OnNewServer hooks")
}
Expand All @@ -326,16 +321,16 @@ var runCmd = &cobra.Command{
)
signalsCh := make(chan os.Signal, 1)
signal.Notify(signalsCh, signals...)
go func(hooksConfig *hook.Config) {
go func(pluginRegistry *plugin.Registry) {
for sig := range signalsCh {
for _, s := range signals {
if sig != s {
// Notify the hooks that the server is shutting down.
_, err := hooksConfig.Run(
_, err := pluginRegistry.Run(
context.Background(),
map[string]interface{}{"signal": sig.String()},
hook.OnSignal,
hooksConfig.Verification,
plugin.OnSignal,
pluginRegistry.Verification,
)
if err != nil {
logger.Error().Err(err).Msg("Failed to run OnSignal hooks")
Expand All @@ -347,7 +342,7 @@ var runCmd = &cobra.Command{
}
}
}
}(hooksConfig)
}(pluginRegistry)

// Run the server.
if err := server.Run(); err != nil {
Expand Down
1 change: 1 addition & 0 deletions errors/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,4 +92,5 @@ var (
const (
FailedToLoadPluginConfig = 1
FailedToLoadGlobalConfig = 2
FailedToInitializePool = 3
)
32 changes: 16 additions & 16 deletions network/proxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ import (

"github.com/gatewayd-io/gatewayd/config"
gerr "github.com/gatewayd-io/gatewayd/errors"
"github.com/gatewayd-io/gatewayd/plugin/hook"
"github.com/gatewayd-io/gatewayd/plugin"
"github.com/gatewayd-io/gatewayd/pool"
"github.com/go-co-op/gocron"
"github.com/panjf2000/gnet/v2"
Expand All @@ -26,7 +26,7 @@ type Proxy struct {
availableConnections pool.IPool
busyConnections pool.IPool
logger zerolog.Logger
hookConfig *hook.Config
pluginRegistry *plugin.Registry
scheduler *gocron.Scheduler

Elastic bool
Expand All @@ -41,7 +41,7 @@ var _ IProxy = &Proxy{}

// NewProxy creates a new proxy.
func NewProxy(
connPool pool.IPool, hookConfig *hook.Config,
connPool pool.IPool, pluginRegistry *plugin.Registry,
elastic, reuseElasticClients bool,
healthCheckPeriod time.Duration,
clientConfig *config.Client, logger zerolog.Logger,
Expand All @@ -50,7 +50,7 @@ func NewProxy(
availableConnections: connPool,
busyConnections: pool.NewPool(config.EmptyPoolCapacity),
logger: logger,
hookConfig: hookConfig,
pluginRegistry: pluginRegistry,
scheduler: gocron.NewScheduler(time.UTC),
Elastic: elastic,
ReuseElasticClients: reuseElasticClients,
Expand Down Expand Up @@ -363,7 +363,7 @@ func (pr *Proxy) PassThrough(gconn gnet.Conn) *gerr.GatewayDError {
request, origErr := receiveTrafficFromClient()

// Run the OnTrafficFromClient hooks.
result, err := pr.hookConfig.Run(
result, err := pr.pluginRegistry.Run(
context.Background(),
trafficData(
gconn,
Expand All @@ -375,8 +375,8 @@ func (pr *Proxy) PassThrough(gconn gnet.Conn) *gerr.GatewayDError {
},
},
origErr),
hook.OnTrafficFromClient,
pr.hookConfig.Verification)
plugin.OnTrafficFromClient,
pr.pluginRegistry.Verification)
if err != nil {
pr.logger.Error().Err(err).Msg("Error running hook")
}
Expand All @@ -396,7 +396,7 @@ func (pr *Proxy) PassThrough(gconn gnet.Conn) *gerr.GatewayDError {
_, err = sendTrafficToServer(request)

// Run the OnTrafficToServer hooks.
_, err = pr.hookConfig.Run(
_, err = pr.pluginRegistry.Run(
context.Background(),
trafficData(
gconn,
Expand All @@ -408,8 +408,8 @@ func (pr *Proxy) PassThrough(gconn gnet.Conn) *gerr.GatewayDError {
},
},
err),
hook.OnTrafficToServer,
pr.hookConfig.Verification)
plugin.OnTrafficToServer,
pr.pluginRegistry.Verification)
if err != nil {
pr.logger.Error().Err(err).Msg("Error running hook")
}
Expand Down Expand Up @@ -448,7 +448,7 @@ func (pr *Proxy) PassThrough(gconn gnet.Conn) *gerr.GatewayDError {
}

// Run the OnTrafficFromServer hooks.
result, err = pr.hookConfig.Run(
result, err = pr.pluginRegistry.Run(
context.Background(),
trafficData(
gconn,
Expand All @@ -464,8 +464,8 @@ func (pr *Proxy) PassThrough(gconn gnet.Conn) *gerr.GatewayDError {
},
},
err),
hook.OnTrafficFromServer,
pr.hookConfig.Verification)
plugin.OnTrafficFromServer,
pr.pluginRegistry.Verification)
if err != nil {
pr.logger.Error().Err(err).Msg("Error running hook")
}
Expand All @@ -481,7 +481,7 @@ func (pr *Proxy) PassThrough(gconn gnet.Conn) *gerr.GatewayDError {
errVerdict := sendTrafficToClient(response, received)

// Run the OnTrafficToClient hooks.
_, err = pr.hookConfig.Run(
_, err = pr.pluginRegistry.Run(
context.Background(),
trafficData(
gconn,
Expand All @@ -498,8 +498,8 @@ func (pr *Proxy) PassThrough(gconn gnet.Conn) *gerr.GatewayDError {
},
err,
),
hook.OnTrafficToClient,
pr.hookConfig.Verification)
plugin.OnTrafficToClient,
pr.pluginRegistry.Verification)
if err != nil {
pr.logger.Error().Err(err).Msg("Error running hook")
}
Expand Down
20 changes: 15 additions & 5 deletions network/proxy_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ import (
embeddedpostgres "github.com/fergusstrange/embedded-postgres"
"github.com/gatewayd-io/gatewayd/config"
"github.com/gatewayd-io/gatewayd/logging"
"github.com/gatewayd-io/gatewayd/plugin/hook"
"github.com/gatewayd-io/gatewayd/plugin"
"github.com/gatewayd-io/gatewayd/pool"
"github.com/rs/zerolog"
"github.com/stretchr/testify/assert"
Expand Down Expand Up @@ -53,8 +53,13 @@ func TestNewProxy(t *testing.T) {
assert.Nil(t, err)

// Create a proxy with a fixed buffer pool
proxy := NewProxy(
pool, hook.NewHookConfig(), false, false, config.DefaultHealthCheckPeriod, nil, logger)
proxy := NewProxy(pool,
plugin.NewRegistry(config.Loose, config.PassDown, logger),
false,
false,
config.DefaultHealthCheckPeriod,
nil,
logger)

assert.NotNil(t, proxy)
assert.Equal(t, 0, proxy.busyConnections.Size(), "Proxy should have no connected clients")
Expand Down Expand Up @@ -83,7 +88,11 @@ func TestNewProxyElastic(t *testing.T) {
pool := pool.NewPool(config.EmptyPoolCapacity)

// Create a proxy with an elastic buffer pool
proxy := NewProxy(pool, hook.NewHookConfig(), true, false, config.DefaultHealthCheckPeriod,
proxy := NewProxy(pool,
plugin.NewRegistry(config.Loose, config.PassDown, logger),
true,
false,
config.DefaultHealthCheckPeriod,
&config.Client{
Network: "tcp",
Address: "localhost:5432",
Expand All @@ -93,7 +102,8 @@ func TestNewProxyElastic(t *testing.T) {
SendDeadline: config.DefaultSendDeadline,
TCPKeepAlive: false,
TCPKeepAlivePeriod: config.DefaultTCPKeepAlivePeriod,
}, logger)
},
logger)

assert.NotNil(t, proxy)
assert.Equal(t, 0, proxy.busyConnections.Size())
Expand Down
Loading