From c36975bc24bb1262120e46c9c90f6790966dc064 Mon Sep 17 00:00:00 2001 From: Maurice Kherlakian Date: Mon, 6 Oct 2025 12:31:50 -0400 Subject: [PATCH 1/3] too many oprn files fix --- pkg/proxy/proxy.go | 32 ++++++++++++++++++++++---------- 1 file changed, 22 insertions(+), 10 deletions(-) diff --git a/pkg/proxy/proxy.go b/pkg/proxy/proxy.go index afff791..47cd03b 100644 --- a/pkg/proxy/proxy.go +++ b/pkg/proxy/proxy.go @@ -60,6 +60,7 @@ type Proxy struct { connections []*hookdecksdk.Connection webSocketClient *websocket.Client connectionTimer *time.Timer + httpClient *http.Client } func withSIGTERMCancel(ctx context.Context, onCancel func()) context.Context { @@ -252,21 +253,17 @@ func (p *Proxy) processAttempt(msg websocket.IncomingMessage) { fmt.Println(webhookEvent.Body.Request.DataString) } else { url := p.cfg.URL.Scheme + "://" + p.cfg.URL.Host + p.cfg.URL.Path + webhookEvent.Body.Path - tr := &http.Transport{ - TLSClientConfig: &tls.Config{InsecureSkipVerify: p.cfg.Insecure}, - } + // Create request with context for timeout control timeout := webhookEvent.Body.Request.Timeout if timeout == 0 { timeout = 1000 * 30 } - client := &http.Client{ - Timeout: time.Duration(timeout) * time.Millisecond, - Transport: tr, - } + ctx, cancel := context.WithTimeout(context.Background(), time.Duration(timeout)*time.Millisecond) + defer cancel() - req, err := http.NewRequest(webhookEvent.Body.Request.Method, url, nil) + req, err := http.NewRequestWithContext(ctx, webhookEvent.Body.Request.Method, url, nil) if err != nil { fmt.Printf("Error: %s\n", err) return @@ -286,8 +283,7 @@ func (p *Proxy) processAttempt(msg websocket.IncomingMessage) { req.Body = ioutil.NopCloser(strings.NewReader(webhookEvent.Body.Request.DataString)) req.ContentLength = int64(len(webhookEvent.Body.Request.DataString)) - res, err := client.Do(req) - + res, err := p.httpClient.Do(req) if err != nil { color := ansi.Color(os.Stdout) localTime := time.Now().Format(timeLayout) @@ -309,6 +305,7 @@ func (p *Proxy) processAttempt(msg websocket.IncomingMessage) { }, }}) } else { + defer res.Body.Close() p.processEndpointResponse(webhookEvent, res) } } @@ -366,10 +363,25 @@ func New(cfg *Config, connections []*hookdecksdk.Connection) *Proxy { cfg.Log = &log.Logger{Out: ioutil.Discard} } + // Create a shared HTTP transport with connection pooling + tr := &http.Transport{ + TLSClientConfig: &tls.Config{InsecureSkipVerify: cfg.Insecure}, + // Connection pool settings for better performance + MaxIdleConns: 100, + MaxIdleConnsPerHost: 10, + IdleConnTimeout: 90 * time.Second, + DisableKeepAlives: false, + } + p := &Proxy{ cfg: cfg, connections: connections, connectionTimer: time.NewTimer(0), // Defaults to no delay + httpClient: &http.Client{ + Transport: tr, + // Default timeout can be overridden per request + Timeout: 30 * time.Second, + }, } return p From d9c2c7a07da0cfdc7856c842f75e114502559048 Mon Sep 17 00:00:00 2001 From: Maurice Kherlakian Date: Tue, 7 Oct 2025 11:16:34 -0400 Subject: [PATCH 2/3] Add connection pooling. Track active connections. Warn users if connections close to limit. Allow max-connections override. --- pkg/cmd/listen.go | 13 ++++++---- pkg/listen/listen.go | 6 +++-- pkg/proxy/proxy.go | 57 +++++++++++++++++++++++++++++++++++++++----- 3 files changed, 63 insertions(+), 13 deletions(-) diff --git a/pkg/cmd/listen.go b/pkg/cmd/listen.go index 8e8b095..ef45274 100644 --- a/pkg/cmd/listen.go +++ b/pkg/cmd/listen.go @@ -28,9 +28,10 @@ import ( ) type listenCmd struct { - cmd *cobra.Command - noWSS bool - path string + cmd *cobra.Command + noWSS bool + path string + maxConnections int } // Map --cli-path to --path @@ -95,6 +96,7 @@ Destination CLI path will be "/". To set the CLI path, use the "--path" flag.`, lc.cmd.Flags().MarkHidden("no-wss") lc.cmd.Flags().StringVar(&lc.path, "path", "", "Sets the path to which events are forwarded e.g., /webhooks or /api/stripe") + lc.cmd.Flags().IntVar(&lc.maxConnections, "max-connections", 50, "Maximum concurrent connections to local endpoint (default: 50, increase for high-volume testing)") // --cli-path is an alias for lc.cmd.Flags().SetNormalizeFunc(normalizeCliPathFlag) @@ -162,7 +164,8 @@ func (lc *listenCmd) runListenCmd(cmd *cobra.Command, args []string) error { } return listen.Listen(url, sourceQuery, connectionQuery, listen.Flags{ - NoWSS: lc.noWSS, - Path: lc.path, + NoWSS: lc.noWSS, + Path: lc.path, + MaxConnections: lc.maxConnections, }, &Config) } diff --git a/pkg/listen/listen.go b/pkg/listen/listen.go index 2480a88..3eabddd 100644 --- a/pkg/listen/listen.go +++ b/pkg/listen/listen.go @@ -31,8 +31,9 @@ import ( ) type Flags struct { - NoWSS bool - Path string + NoWSS bool + Path string + MaxConnections int } // listenCmd represents the listen command @@ -139,6 +140,7 @@ Specify a single destination to update the path. For example, pass a connection URL: URL, Log: log.StandardLogger(), Insecure: config.Insecure, + MaxConnections: flags.MaxConnections, }, connections) err = p.Run(context.Background()) diff --git a/pkg/proxy/proxy.go b/pkg/proxy/proxy.go index 47cd03b..97ca56a 100644 --- a/pkg/proxy/proxy.go +++ b/pkg/proxy/proxy.go @@ -14,6 +14,7 @@ import ( "os/signal" "strconv" "strings" + "sync/atomic" "syscall" "time" @@ -50,6 +51,13 @@ type Config struct { // Force use of unencrypted ws:// protocol instead of wss:// NoWSS bool Insecure bool + // MaxConnections allows tuning the maximum concurrent connections per host. + // Default: 50 concurrent connections + // This can be increased for high-volume testing scenarios where the local + // endpoint can handle more concurrent requests. + // Example: Set to 100+ when load testing with many parallel webhooks. + // Warning: Setting this too high may cause resource exhaustion. + MaxConnections int } // A Proxy opens a websocket connection with Hookdeck, listens for incoming @@ -61,6 +69,9 @@ type Proxy struct { webSocketClient *websocket.Client connectionTimer *time.Timer httpClient *http.Client + transport *http.Transport + activeRequests int32 + maxConnWarned bool // Track if we've warned about connection limit } func withSIGTERMCancel(ctx context.Context, onCancel func()) context.Context { @@ -260,6 +271,26 @@ func (p *Proxy) processAttempt(msg websocket.IncomingMessage) { timeout = 1000 * 30 } + // Track active requests + atomic.AddInt32(&p.activeRequests, 1) + defer atomic.AddInt32(&p.activeRequests, -1) + + activeCount := atomic.LoadInt32(&p.activeRequests) + + // Warn when approaching connection limit + if activeCount > 40 && !p.maxConnWarned { + p.maxConnWarned = true + color := ansi.Color(os.Stdout) + fmt.Printf("\n%s High connection load detected (%d active requests)\n", + color.Yellow("⚠ WARNING:"), activeCount) + fmt.Printf(" The CLI is limited to %d concurrent connections per host.\n", p.transport.MaxConnsPerHost) + fmt.Printf(" Consider reducing request rate or increasing connection limit.\n") + fmt.Printf(" Run with --max-connections=100 to increase the limit.\n\n") + } else if activeCount < 30 && p.maxConnWarned { + // Reset warning flag when load decreases + p.maxConnWarned = false + } + ctx, cancel := context.WithTimeout(context.Background(), time.Duration(timeout)*time.Millisecond) defer cancel() @@ -288,7 +319,8 @@ func (p *Proxy) processAttempt(msg websocket.IncomingMessage) { color := ansi.Color(os.Stdout) localTime := time.Now().Format(timeLayout) - errStr := fmt.Sprintf("%s [%s] Failed to %s: %v", + // Use the original error message + errStr := fmt.Sprintf("%s [%s] Failed to %s: %s", color.Faint(localTime), color.Red("ERROR"), webhookEvent.Body.Request.Method, @@ -305,8 +337,11 @@ func (p *Proxy) processAttempt(msg websocket.IncomingMessage) { }, }}) } else { - defer res.Body.Close() + // Process the response (this reads the entire body) p.processEndpointResponse(webhookEvent, res) + + // Close the body - connection can be reused since body was fully read + res.Body.Close() } } } @@ -363,20 +398,30 @@ func New(cfg *Config, connections []*hookdecksdk.Connection) *Proxy { cfg.Log = &log.Logger{Out: ioutil.Discard} } + // Default to 50 connections if not specified + maxConns := cfg.MaxConnections + if maxConns <= 0 { + maxConns = 50 + } + // Create a shared HTTP transport with connection pooling tr := &http.Transport{ TLSClientConfig: &tls.Config{InsecureSkipVerify: cfg.Insecure}, - // Connection pool settings for better performance - MaxIdleConns: 100, - MaxIdleConnsPerHost: 10, - IdleConnTimeout: 90 * time.Second, + // Connection pool settings - sensible defaults for typical usage + MaxIdleConns: 20, // Total idle connections across all hosts + MaxIdleConnsPerHost: 10, // Keep some idle connections for reuse + IdleConnTimeout: 30 * time.Second, // Clean up idle connections DisableKeepAlives: false, + // Limit concurrent connections to prevent resource exhaustion + MaxConnsPerHost: maxConns, // User-configurable (default: 50) + ResponseHeaderTimeout: 60 * time.Second, } p := &Proxy{ cfg: cfg, connections: connections, connectionTimer: time.NewTimer(0), // Defaults to no delay + transport: tr, httpClient: &http.Client{ Transport: tr, // Default timeout can be overridden per request From 6805cc542df52794688fa969beaaf4708895afc4 Mon Sep 17 00:00:00 2001 From: Maurice Kherlakian Date: Tue, 7 Oct 2025 13:04:49 -0400 Subject: [PATCH 3/3] Removed client timeout. Made warning about connection exhaustion proportional to max --- pkg/proxy/proxy.go | 14 +++++++++----- 1 file changed, 9 insertions(+), 5 deletions(-) diff --git a/pkg/proxy/proxy.go b/pkg/proxy/proxy.go index 97ca56a..ade232d 100644 --- a/pkg/proxy/proxy.go +++ b/pkg/proxy/proxy.go @@ -277,16 +277,21 @@ func (p *Proxy) processAttempt(msg websocket.IncomingMessage) { activeCount := atomic.LoadInt32(&p.activeRequests) + // Calculate warning thresholds proportionally to max connections + maxConns := int32(p.transport.MaxConnsPerHost) + warningThreshold := int32(float64(maxConns) * 0.8) // Warn at 80% capacity + resetThreshold := int32(float64(maxConns) * 0.6) // Reset warning at 60% capacity + // Warn when approaching connection limit - if activeCount > 40 && !p.maxConnWarned { + if activeCount > warningThreshold && !p.maxConnWarned { p.maxConnWarned = true color := ansi.Color(os.Stdout) fmt.Printf("\n%s High connection load detected (%d active requests)\n", color.Yellow("⚠ WARNING:"), activeCount) fmt.Printf(" The CLI is limited to %d concurrent connections per host.\n", p.transport.MaxConnsPerHost) fmt.Printf(" Consider reducing request rate or increasing connection limit.\n") - fmt.Printf(" Run with --max-connections=100 to increase the limit.\n\n") - } else if activeCount < 30 && p.maxConnWarned { + fmt.Printf(" Run with --max-connections=%d to increase the limit.\n\n", maxConns*2) + } else if activeCount < resetThreshold && p.maxConnWarned { // Reset warning flag when load decreases p.maxConnWarned = false } @@ -424,8 +429,7 @@ func New(cfg *Config, connections []*hookdecksdk.Connection) *Proxy { transport: tr, httpClient: &http.Client{ Transport: tr, - // Default timeout can be overridden per request - Timeout: 30 * time.Second, + // Timeout is controlled per-request via context in processAttempt }, }