diff --git a/pkg/cmd/listen.go b/pkg/cmd/listen.go index 8e8b095..ef45274 100644 --- a/pkg/cmd/listen.go +++ b/pkg/cmd/listen.go @@ -28,9 +28,10 @@ import ( ) type listenCmd struct { - cmd *cobra.Command - noWSS bool - path string + cmd *cobra.Command + noWSS bool + path string + maxConnections int } // Map --cli-path to --path @@ -95,6 +96,7 @@ Destination CLI path will be "/". To set the CLI path, use the "--path" flag.`, lc.cmd.Flags().MarkHidden("no-wss") lc.cmd.Flags().StringVar(&lc.path, "path", "", "Sets the path to which events are forwarded e.g., /webhooks or /api/stripe") + lc.cmd.Flags().IntVar(&lc.maxConnections, "max-connections", 50, "Maximum concurrent connections to local endpoint (default: 50, increase for high-volume testing)") // --cli-path is an alias for lc.cmd.Flags().SetNormalizeFunc(normalizeCliPathFlag) @@ -162,7 +164,8 @@ func (lc *listenCmd) runListenCmd(cmd *cobra.Command, args []string) error { } return listen.Listen(url, sourceQuery, connectionQuery, listen.Flags{ - NoWSS: lc.noWSS, - Path: lc.path, + NoWSS: lc.noWSS, + Path: lc.path, + MaxConnections: lc.maxConnections, }, &Config) } diff --git a/pkg/listen/listen.go b/pkg/listen/listen.go index 2480a88..3eabddd 100644 --- a/pkg/listen/listen.go +++ b/pkg/listen/listen.go @@ -31,8 +31,9 @@ import ( ) type Flags struct { - NoWSS bool - Path string + NoWSS bool + Path string + MaxConnections int } // listenCmd represents the listen command @@ -139,6 +140,7 @@ Specify a single destination to update the path. For example, pass a connection URL: URL, Log: log.StandardLogger(), Insecure: config.Insecure, + MaxConnections: flags.MaxConnections, }, connections) err = p.Run(context.Background()) diff --git a/pkg/proxy/proxy.go b/pkg/proxy/proxy.go index afff791..ade232d 100644 --- a/pkg/proxy/proxy.go +++ b/pkg/proxy/proxy.go @@ -14,6 +14,7 @@ import ( "os/signal" "strconv" "strings" + "sync/atomic" "syscall" "time" @@ -50,6 +51,13 @@ type Config struct { // Force use of unencrypted ws:// protocol instead of wss:// NoWSS bool Insecure bool + // MaxConnections allows tuning the maximum concurrent connections per host. + // Default: 50 concurrent connections + // This can be increased for high-volume testing scenarios where the local + // endpoint can handle more concurrent requests. + // Example: Set to 100+ when load testing with many parallel webhooks. + // Warning: Setting this too high may cause resource exhaustion. + MaxConnections int } // A Proxy opens a websocket connection with Hookdeck, listens for incoming @@ -60,6 +68,10 @@ type Proxy struct { connections []*hookdecksdk.Connection webSocketClient *websocket.Client connectionTimer *time.Timer + httpClient *http.Client + transport *http.Transport + activeRequests int32 + maxConnWarned bool // Track if we've warned about connection limit } func withSIGTERMCancel(ctx context.Context, onCancel func()) context.Context { @@ -252,21 +264,42 @@ func (p *Proxy) processAttempt(msg websocket.IncomingMessage) { fmt.Println(webhookEvent.Body.Request.DataString) } else { url := p.cfg.URL.Scheme + "://" + p.cfg.URL.Host + p.cfg.URL.Path + webhookEvent.Body.Path - tr := &http.Transport{ - TLSClientConfig: &tls.Config{InsecureSkipVerify: p.cfg.Insecure}, - } + // Create request with context for timeout control timeout := webhookEvent.Body.Request.Timeout if timeout == 0 { timeout = 1000 * 30 } - client := &http.Client{ - Timeout: time.Duration(timeout) * time.Millisecond, - Transport: tr, + // Track active requests + atomic.AddInt32(&p.activeRequests, 1) + defer atomic.AddInt32(&p.activeRequests, -1) + + activeCount := atomic.LoadInt32(&p.activeRequests) + + // Calculate warning thresholds proportionally to max connections + maxConns := int32(p.transport.MaxConnsPerHost) + warningThreshold := int32(float64(maxConns) * 0.8) // Warn at 80% capacity + resetThreshold := int32(float64(maxConns) * 0.6) // Reset warning at 60% capacity + + // Warn when approaching connection limit + if activeCount > warningThreshold && !p.maxConnWarned { + p.maxConnWarned = true + color := ansi.Color(os.Stdout) + fmt.Printf("\n%s High connection load detected (%d active requests)\n", + color.Yellow("⚠ WARNING:"), activeCount) + fmt.Printf(" The CLI is limited to %d concurrent connections per host.\n", p.transport.MaxConnsPerHost) + fmt.Printf(" Consider reducing request rate or increasing connection limit.\n") + fmt.Printf(" Run with --max-connections=%d to increase the limit.\n\n", maxConns*2) + } else if activeCount < resetThreshold && p.maxConnWarned { + // Reset warning flag when load decreases + p.maxConnWarned = false } - req, err := http.NewRequest(webhookEvent.Body.Request.Method, url, nil) + ctx, cancel := context.WithTimeout(context.Background(), time.Duration(timeout)*time.Millisecond) + defer cancel() + + req, err := http.NewRequestWithContext(ctx, webhookEvent.Body.Request.Method, url, nil) if err != nil { fmt.Printf("Error: %s\n", err) return @@ -286,13 +319,13 @@ func (p *Proxy) processAttempt(msg websocket.IncomingMessage) { req.Body = ioutil.NopCloser(strings.NewReader(webhookEvent.Body.Request.DataString)) req.ContentLength = int64(len(webhookEvent.Body.Request.DataString)) - res, err := client.Do(req) - + res, err := p.httpClient.Do(req) if err != nil { color := ansi.Color(os.Stdout) localTime := time.Now().Format(timeLayout) - errStr := fmt.Sprintf("%s [%s] Failed to %s: %v", + // Use the original error message + errStr := fmt.Sprintf("%s [%s] Failed to %s: %s", color.Faint(localTime), color.Red("ERROR"), webhookEvent.Body.Request.Method, @@ -309,7 +342,11 @@ func (p *Proxy) processAttempt(msg websocket.IncomingMessage) { }, }}) } else { + // Process the response (this reads the entire body) p.processEndpointResponse(webhookEvent, res) + + // Close the body - connection can be reused since body was fully read + res.Body.Close() } } } @@ -366,10 +403,34 @@ func New(cfg *Config, connections []*hookdecksdk.Connection) *Proxy { cfg.Log = &log.Logger{Out: ioutil.Discard} } + // Default to 50 connections if not specified + maxConns := cfg.MaxConnections + if maxConns <= 0 { + maxConns = 50 + } + + // Create a shared HTTP transport with connection pooling + tr := &http.Transport{ + TLSClientConfig: &tls.Config{InsecureSkipVerify: cfg.Insecure}, + // Connection pool settings - sensible defaults for typical usage + MaxIdleConns: 20, // Total idle connections across all hosts + MaxIdleConnsPerHost: 10, // Keep some idle connections for reuse + IdleConnTimeout: 30 * time.Second, // Clean up idle connections + DisableKeepAlives: false, + // Limit concurrent connections to prevent resource exhaustion + MaxConnsPerHost: maxConns, // User-configurable (default: 50) + ResponseHeaderTimeout: 60 * time.Second, + } + p := &Proxy{ cfg: cfg, connections: connections, connectionTimer: time.NewTimer(0), // Defaults to no delay + transport: tr, + httpClient: &http.Client{ + Transport: tr, + // Timeout is controlled per-request via context in processAttempt + }, } return p