Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 8 additions & 5 deletions pkg/cmd/listen.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,10 @@ import (
)

type listenCmd struct {
cmd *cobra.Command
noWSS bool
path string
cmd *cobra.Command
noWSS bool
path string
maxConnections int
}

// Map --cli-path to --path
Expand Down Expand Up @@ -95,6 +96,7 @@ Destination CLI path will be "/". To set the CLI path, use the "--path" flag.`,
lc.cmd.Flags().MarkHidden("no-wss")

lc.cmd.Flags().StringVar(&lc.path, "path", "", "Sets the path to which events are forwarded e.g., /webhooks or /api/stripe")
lc.cmd.Flags().IntVar(&lc.maxConnections, "max-connections", 50, "Maximum concurrent connections to local endpoint (default: 50, increase for high-volume testing)")

// --cli-path is an alias for
lc.cmd.Flags().SetNormalizeFunc(normalizeCliPathFlag)
Expand Down Expand Up @@ -162,7 +164,8 @@ func (lc *listenCmd) runListenCmd(cmd *cobra.Command, args []string) error {
}

return listen.Listen(url, sourceQuery, connectionQuery, listen.Flags{
NoWSS: lc.noWSS,
Path: lc.path,
NoWSS: lc.noWSS,
Path: lc.path,
MaxConnections: lc.maxConnections,
}, &Config)
}
6 changes: 4 additions & 2 deletions pkg/listen/listen.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,9 @@ import (
)

type Flags struct {
NoWSS bool
Path string
NoWSS bool
Path string
MaxConnections int
}

// listenCmd represents the listen command
Expand Down Expand Up @@ -139,6 +140,7 @@ Specify a single destination to update the path. For example, pass a connection
URL: URL,
Log: log.StandardLogger(),
Insecure: config.Insecure,
MaxConnections: flags.MaxConnections,
}, connections)

err = p.Run(context.Background())
Expand Down
81 changes: 71 additions & 10 deletions pkg/proxy/proxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"os/signal"
"strconv"
"strings"
"sync/atomic"
"syscall"
"time"

Expand Down Expand Up @@ -50,6 +51,13 @@ type Config struct {
// Force use of unencrypted ws:// protocol instead of wss://
NoWSS bool
Insecure bool
// MaxConnections allows tuning the maximum concurrent connections per host.
// Default: 50 concurrent connections
// This can be increased for high-volume testing scenarios where the local
// endpoint can handle more concurrent requests.
// Example: Set to 100+ when load testing with many parallel webhooks.
// Warning: Setting this too high may cause resource exhaustion.
MaxConnections int
}

// A Proxy opens a websocket connection with Hookdeck, listens for incoming
Expand All @@ -60,6 +68,10 @@ type Proxy struct {
connections []*hookdecksdk.Connection
webSocketClient *websocket.Client
connectionTimer *time.Timer
httpClient *http.Client
transport *http.Transport
activeRequests int32
maxConnWarned bool // Track if we've warned about connection limit
}

func withSIGTERMCancel(ctx context.Context, onCancel func()) context.Context {
Expand Down Expand Up @@ -252,21 +264,42 @@ func (p *Proxy) processAttempt(msg websocket.IncomingMessage) {
fmt.Println(webhookEvent.Body.Request.DataString)
} else {
url := p.cfg.URL.Scheme + "://" + p.cfg.URL.Host + p.cfg.URL.Path + webhookEvent.Body.Path
tr := &http.Transport{
TLSClientConfig: &tls.Config{InsecureSkipVerify: p.cfg.Insecure},
}

// Create request with context for timeout control
timeout := webhookEvent.Body.Request.Timeout
if timeout == 0 {
timeout = 1000 * 30
}

client := &http.Client{
Timeout: time.Duration(timeout) * time.Millisecond,
Transport: tr,
// Track active requests
atomic.AddInt32(&p.activeRequests, 1)
defer atomic.AddInt32(&p.activeRequests, -1)

activeCount := atomic.LoadInt32(&p.activeRequests)

// Calculate warning thresholds proportionally to max connections
maxConns := int32(p.transport.MaxConnsPerHost)
warningThreshold := int32(float64(maxConns) * 0.8) // Warn at 80% capacity
resetThreshold := int32(float64(maxConns) * 0.6) // Reset warning at 60% capacity

// Warn when approaching connection limit
if activeCount > warningThreshold && !p.maxConnWarned {
p.maxConnWarned = true
color := ansi.Color(os.Stdout)
fmt.Printf("\n%s High connection load detected (%d active requests)\n",
color.Yellow("⚠ WARNING:"), activeCount)
fmt.Printf(" The CLI is limited to %d concurrent connections per host.\n", p.transport.MaxConnsPerHost)
fmt.Printf(" Consider reducing request rate or increasing connection limit.\n")
fmt.Printf(" Run with --max-connections=%d to increase the limit.\n\n", maxConns*2)
} else if activeCount < resetThreshold && p.maxConnWarned {
// Reset warning flag when load decreases
p.maxConnWarned = false
}

req, err := http.NewRequest(webhookEvent.Body.Request.Method, url, nil)
ctx, cancel := context.WithTimeout(context.Background(), time.Duration(timeout)*time.Millisecond)
defer cancel()

req, err := http.NewRequestWithContext(ctx, webhookEvent.Body.Request.Method, url, nil)
if err != nil {
fmt.Printf("Error: %s\n", err)
return
Expand All @@ -286,13 +319,13 @@ func (p *Proxy) processAttempt(msg websocket.IncomingMessage) {
req.Body = ioutil.NopCloser(strings.NewReader(webhookEvent.Body.Request.DataString))
req.ContentLength = int64(len(webhookEvent.Body.Request.DataString))

res, err := client.Do(req)

res, err := p.httpClient.Do(req)
if err != nil {
color := ansi.Color(os.Stdout)
localTime := time.Now().Format(timeLayout)

errStr := fmt.Sprintf("%s [%s] Failed to %s: %v",
// Use the original error message
errStr := fmt.Sprintf("%s [%s] Failed to %s: %s",
color.Faint(localTime),
color.Red("ERROR"),
webhookEvent.Body.Request.Method,
Expand All @@ -309,7 +342,11 @@ func (p *Proxy) processAttempt(msg websocket.IncomingMessage) {
},
}})
} else {
// Process the response (this reads the entire body)
p.processEndpointResponse(webhookEvent, res)

// Close the body - connection can be reused since body was fully read
res.Body.Close()
}
}
}
Expand Down Expand Up @@ -366,10 +403,34 @@ func New(cfg *Config, connections []*hookdecksdk.Connection) *Proxy {
cfg.Log = &log.Logger{Out: ioutil.Discard}
}

// Default to 50 connections if not specified
maxConns := cfg.MaxConnections
if maxConns <= 0 {
maxConns = 50
}

// Create a shared HTTP transport with connection pooling
tr := &http.Transport{
TLSClientConfig: &tls.Config{InsecureSkipVerify: cfg.Insecure},
// Connection pool settings - sensible defaults for typical usage
MaxIdleConns: 20, // Total idle connections across all hosts
MaxIdleConnsPerHost: 10, // Keep some idle connections for reuse
IdleConnTimeout: 30 * time.Second, // Clean up idle connections
DisableKeepAlives: false,
// Limit concurrent connections to prevent resource exhaustion
MaxConnsPerHost: maxConns, // User-configurable (default: 50)
ResponseHeaderTimeout: 60 * time.Second,
}

p := &Proxy{
cfg: cfg,
connections: connections,
connectionTimer: time.NewTimer(0), // Defaults to no delay
transport: tr,
httpClient: &http.Client{
Transport: tr,
// Timeout is controlled per-request via context in processAttempt
},
}

return p
Expand Down