Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion cmd/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package cmd

import (
"context"
"fmt"
"os"
"os/signal"
"syscall"
Expand Down Expand Up @@ -147,7 +148,8 @@ var runCmd = &cobra.Command{
}

// Verify that the pool is properly populated
logger.Info().Msgf("There are %d clients in the pool", pool.Size())
logger.Info().Str("count", fmt.Sprint(pool.Size())).Msg(
"There are clients available in the pool")
if pool.Size() != poolSize {
logger.Error().Msg(
"The pool size is incorrect, either because " +
Expand Down
42 changes: 29 additions & 13 deletions network/client.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package network

import (
"fmt"
"net"
"time"

Expand Down Expand Up @@ -91,7 +92,8 @@ func NewClient(
if err := client.Conn.SetReadDeadline(time.Now().Add(client.ReceiveDeadline)); err != nil {
logger.Error().Err(err).Msg("Failed to set receive deadline")
} else {
logger.Debug().Msgf("Receive deadline set to %s", client.ReceiveDeadline)
logger.Debug().Str("duration", fmt.Sprint(client.ReceiveDeadline.Seconds())).Msg(
"Set receive deadline")
}
}

Expand All @@ -103,7 +105,8 @@ func NewClient(
if err := client.Conn.SetWriteDeadline(time.Now().Add(client.SendDeadline)); err != nil {
logger.Error().Err(err).Msg("Failed to set send deadline")
} else {
logger.Debug().Msgf("Send deadline set to %s", client.SendDeadline)
logger.Debug().Str("duration", fmt.Sprint(client.SendDeadline)).Msg(
"Set send deadline")
}
}

Expand All @@ -122,7 +125,7 @@ func NewClient(
client.ReceiveChunkSize = receiveChunkSize
}

logger.Debug().Msgf("New client created: %s", client.Address)
logger.Debug().Str("address", client.Address).Msg("New client created")
client.ID = GetID(conn.LocalAddr().Network(), conn.LocalAddr().String(), DefaultSeed, logger)

return &client
Expand All @@ -132,10 +135,15 @@ func NewClient(
func (c *Client) Send(data []byte) (int, *gerr.GatewayDError) {
sent, err := c.Conn.Write(data)
if err != nil {
c.logger.Error().Err(err).Msgf("Couldn't send data to the server: %s", err)
c.logger.Error().Err(err).Msg("Couldn't send data to the server")
return 0, gerr.ErrClientSendFailed.Wrap(err)
}
c.logger.Debug().Msgf("Sent %d bytes to %s", len(data), c.Address)
c.logger.Debug().Fields(
map[string]interface{}{
"length": sent,
"address": c.Address,
},
).Msg("Sent data to server")
return sent, nil
}

Expand Down Expand Up @@ -170,7 +178,7 @@ func (c *Client) Receive() (int, []byte, *gerr.GatewayDError) {

// Close closes the connection to the server.
func (c *Client) Close() {
c.logger.Debug().Msgf("Closing connection to %s", c.Address)
c.logger.Debug().Str("address", c.Address).Msg("Closing connection to server")
if c.Conn != nil {
c.Conn.Close()
}
Expand All @@ -183,21 +191,29 @@ func (c *Client) Close() {
// IsConnected checks if the client is still connected to the server.
func (c *Client) IsConnected() bool {
if c == nil {
c.logger.Debug().Str(
"reason", "client is nil").Msgf("Connection to %s is closed", c.Address)
c.logger.Debug().Fields(
map[string]interface{}{
"address": c.Address,
"reason": "client is nil",
}).Msg("Connection to server is closed")
return false
}

if c != nil && c.Conn == nil || c.ID == "" {
c.logger.Debug().Str(
"reason", "connection is nil or invalid",
).Msgf("Connection to %s is closed", c.Address)
c.logger.Debug().Fields(
map[string]interface{}{
"address": c.Address,
"reason": "connection is nil or invalid",
}).Msg("Connection to server is closed")
return false
}

if n, err := c.Read([]byte{}); n == 0 && err != nil {
c.logger.Debug().Str(
"reason", "read 0 bytes").Msgf("Connection to %s is closed, ", c.Address)
c.logger.Debug().Fields(
map[string]interface{}{
"address": c.Address,
"reason": "read 0 bytes",
}).Msg("Connection to server is closed")
return false
}

Expand Down
71 changes: 52 additions & 19 deletions network/proxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,8 @@ func NewProxy(
// Connect maps a server connection from the available connection pool to a incoming connection.
// It returns an error if the pool is exhausted. If the pool is elastic, it creates a new client
// and maps it to the incoming connection.
//
//nolint:funlen
func (pr *ProxyImpl) Connect(gconn gnet.Conn) *gerr.GatewayDError {
var clientID string
// Get the first available client from the pool.
Expand All @@ -86,7 +88,7 @@ func (pr *ProxyImpl) Connect(gconn gnet.Conn) *gerr.GatewayDError {
pr.ClientConfig.SendDeadline,
pr.logger,
)
pr.logger.Debug().Msgf("Reused the client %s by putting it back in the pool", client.ID)
pr.logger.Debug().Str("id", client.ID[:7]).Msg("Reused the client connection")
} else {
return gerr.ErrPoolExhausted
}
Expand All @@ -107,13 +109,26 @@ func (pr *ProxyImpl) Connect(gconn gnet.Conn) *gerr.GatewayDError {
// This should never happen.
return err
}
pr.logger.Debug().Msgf(
"Client %s has been assigned to %s", client.ID, gconn.RemoteAddr().String())
pr.logger.Debug().Fields(
map[string]interface{}{
"function": "proxy.connect",
"client": client.ID[:7],
"server": gconn.RemoteAddr().String(),
},
).Msg("Client has been assigned")

pr.logger.Debug().Str("function", "proxy.connect").Msgf(
"There are %d available clients", pr.availableConnections.Size())
pr.logger.Debug().Str("function", "proxy.connect").Msgf(
"There are %d busy clients", pr.busyConnections.Size())
pr.logger.Debug().Fields(
map[string]interface{}{
"function": "proxy.connect",
"count": pr.availableConnections.Size(),
},
).Msg("Available client connections")
pr.logger.Debug().Fields(
map[string]interface{}{
"function": "proxy.connect",
"count": pr.availableConnections.Size(),
},
).Msg("Busy client connections")

return nil
}
Expand Down Expand Up @@ -147,10 +162,18 @@ func (pr *ProxyImpl) Disconnect(gconn gnet.Conn) *gerr.GatewayDError {
return gerr.ErrClientNotFound
}

pr.logger.Debug().Str("function", "proxy.disconnect").Msgf(
"There are %d available clients", pr.availableConnections.Size())
pr.logger.Debug().Str("function", "proxy.disconnect").Msgf(
"There are %d busy clients", pr.busyConnections.Size())
pr.logger.Debug().Fields(
map[string]interface{}{
"function": "proxy.disconnect",
"count": pr.availableConnections.Size(),
},
).Msg("Available client connections")
pr.logger.Debug().Fields(
map[string]interface{}{
"function": "proxy.disconnect",
"count": pr.availableConnections.Size(),
},
).Msg("Busy client connections")

return nil
}
Expand Down Expand Up @@ -224,12 +247,17 @@ func (pr *ProxyImpl) PassThrough(gconn gnet.Conn) *gerr.GatewayDError {
// If the hook modified the request, use the modified request.
if result != nil {
if req, ok := result["request"].([]byte); ok {
pr.logger.Debug().Msgf(
"Hook modified request from %d to %d bytes", len(request), len(req))
pr.logger.Debug().Fields(
map[string]interface{}{
"function": "proxy.passthrough",
"from": len(request),
"to": len(req),
},
).Msg("Hook modified request")
request = req
}
if errMsg, ok := result["error"].(string); ok && errMsg != "" {
pr.logger.Error().Msgf("Error in hook: %s", errMsg)
pr.logger.Error().Str("error", errMsg).Msg("Error in hook")
}
}

Expand Down Expand Up @@ -266,7 +294,7 @@ func (pr *ProxyImpl) PassThrough(gconn gnet.Conn) *gerr.GatewayDError {
"function": "proxy.passthrough",
"local": client.Conn.LocalAddr().String(),
"remote": client.Conn.RemoteAddr().String(),
}).Msgf("Client disconnected")
}).Msg("Client disconnected")

client.Close()
client = NewClient(
Expand Down Expand Up @@ -302,17 +330,22 @@ func (pr *ProxyImpl) PassThrough(gconn gnet.Conn) *gerr.GatewayDError {
plugin.OnEgressTraffic,
pr.hookConfig.Verification)
if err != nil {
pr.logger.Error().Err(err).Msgf("Error running hook: %v", err)
pr.logger.Error().Err(err).Msg("Error running hook")
}
// If the hook returns a response, use it instead of the original response.
if result != nil {
if resp, ok := result["response"].([]byte); ok {
pr.logger.Debug().Msgf(
"Hook modified response from %d to %d bytes", len(response), len(resp))
pr.logger.Debug().Fields(
map[string]interface{}{
"function": "proxy.passthrough",
"from": len(response),
"to": len(resp),
},
).Msg("Hook modified response")
response = resp
}
if errMsg, ok := result["error"].(string); ok && errMsg != "" {
pr.logger.Error().Msgf("Error in hook: %s", errMsg)
pr.logger.Error().Str("error", errMsg).Msg("Error in hook")
}
}

Expand Down
25 changes: 15 additions & 10 deletions network/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package network
import (
"context"
"errors"
"fmt"
"io"
"os"
"time"
Expand Down Expand Up @@ -80,7 +81,8 @@ func (s *Server) OnBoot(engine gnet.Engine) gnet.Action {
// OnOpen is called when a new connection is opened. It calls the OnOpening and OnOpened hooks.
// It also checks if the server is at the soft or hard limit and closes the connection if it is.
func (s *Server) OnOpen(gconn gnet.Conn) ([]byte, gnet.Action) {
s.logger.Debug().Msgf("GatewayD is opening a connection from %s", gconn.RemoteAddr().String())
s.logger.Debug().Str("from", gconn.RemoteAddr().String()).Msg(
"GatewayD is opening a connection")

// Run the OnOpening hooks.
onOpeningData := map[string]interface{}{
Expand Down Expand Up @@ -143,7 +145,8 @@ func (s *Server) OnOpen(gconn gnet.Conn) ([]byte, gnet.Action) {
// It also recycles the connection back to the available connection pool, unless the pool
// is elastic and reuse is disabled.
func (s *Server) OnClose(gconn gnet.Conn, err error) gnet.Action {
s.logger.Debug().Msgf("GatewayD is closing a connection from %s", gconn.RemoteAddr().String())
s.logger.Debug().Str("from", gconn.RemoteAddr().String()).Msg(
"GatewayD is closing a connection")

// Run the OnClosing hooks.
data := map[string]interface{}{
Expand Down Expand Up @@ -257,7 +260,8 @@ func (s *Server) OnShutdown(engine gnet.Engine) {
// OnTick is called every TickInterval. It calls the OnTick hooks.
func (s *Server) OnTick() (time.Duration, gnet.Action) {
s.logger.Debug().Msg("GatewayD is ticking...")
s.logger.Info().Msgf("Active connections: %d", s.engine.CountConnections())
s.logger.Info().Str("count", fmt.Sprint(s.engine.CountConnections())).Msg(
"Active client connections")

// Run the OnTick hooks.
_, err := s.hooksConfig.Run(
Expand All @@ -276,7 +280,7 @@ func (s *Server) OnTick() (time.Duration, gnet.Action) {

// Run starts the server and blocks until the server is stopped. It calls the OnRun hooks.
func (s *Server) Run() error {
s.logger.Info().Msgf("GatewayD is running with PID %d", os.Getpid())
s.logger.Info().Str("pid", fmt.Sprint(os.Getpid())).Msg("GatewayD is running")

// Try to resolve the address and log an error if it can't be resolved
addr, err := Resolve(s.Network, s.Address, s.logger)
Expand All @@ -298,7 +302,7 @@ func (s *Server) Run() error {

if result != nil {
if errMsg, ok := result["error"].(string); ok && errMsg != "" {
s.logger.Error().Msgf("Error in hook: %s", errMsg)
s.logger.Error().Str("error", errMsg).Msg("Error in hook")
}

if address, ok := result["address"].(string); ok {
Expand Down Expand Up @@ -359,11 +363,12 @@ func NewServer(

if addr != "" {
server.Address = addr
logger.Debug().Msgf("Resolved address to %s", addr)
logger.Info().Msgf("GatewayD is listening on %s", addr)
logger.Debug().Str("address", addr).Msg("Resolved address")
logger.Info().Str("address", addr).Msg("GatewayD is listening")
} else {
logger.Error().Msg("Failed to resolve address")
logger.Warn().Msgf("GatewayD is listening on %s (unresolved address)", server.Address)
logger.Warn().Str("address", server.Address).Msg(
"GatewayD is listening on an unresolved address")
}

// Get the current limits.
Expand All @@ -375,15 +380,15 @@ func NewServer(
logger.Debug().Msg("Soft limit is not set, using the current system soft limit")
} else {
server.SoftLimit = softLimit
logger.Debug().Msgf("Soft limit is set to %d", softLimit)
logger.Debug().Str("value", fmt.Sprint(softLimit)).Msg("Set soft limit")
}

if hardLimit == 0 {
server.HardLimit = limits.Max
logger.Debug().Msg("Hard limit is not set, using the current system hard limit")
} else {
server.HardLimit = hardLimit
logger.Debug().Msgf("Hard limit is set to %d", hardLimit)
logger.Debug().Str("value", fmt.Sprint(hardLimit)).Msg("Set hard limit")
}

if tickInterval == 0 {
Expand Down
6 changes: 3 additions & 3 deletions network/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,8 @@ func GetRLimit(logger zerolog.Logger) syscall.Rlimit {
if err := syscall.Getrlimit(syscall.RLIMIT_NOFILE, &limits); err != nil { //nolint:nosnakecase
logger.Error().Err(err).Msg("Failed to get rlimit")
}
logger.Debug().Msgf("Current system soft limit: %d", limits.Cur)
logger.Debug().Msgf("Current system hard limit: %d", limits.Max)
logger.Debug().Str("value", fmt.Sprint(limits.Cur)).Msg("Current system soft limit")
logger.Debug().Str("value", fmt.Sprint(limits.Max)).Msg("Current system hard limit")
return limits
}

Expand Down Expand Up @@ -54,7 +54,7 @@ func Resolve(network, address string, logger zerolog.Logger) (string, *gerr.Gate
}
return "", gerr.ErrResolveFailed.Wrap(err)
default:
logger.Error().Msgf("Network %s is not supported", network)
logger.Error().Str("network", network).Msg("Network is not supported")
return "", gerr.ErrNetworkNotSupported
}
}
7 changes: 6 additions & 1 deletion plugin/hooks.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,12 @@ func (h *HookConfig) Add(hookType HookType, prio Priority, hook HookDef) {
h.hooks[hookType] = map[Priority]HookDef{prio: hook}
} else {
if _, ok := h.hooks[hookType][prio]; ok {
h.Logger.Warn().Msgf("Hook %s replaced with priority %d.", hookType, prio)
h.Logger.Warn().Fields(
map[string]interface{}{
"hookType": hookType,
"priority": prio,
},
).Msg("Hook is replaced")
}
h.hooks[hookType][prio] = hook
}
Expand Down
Loading