From 5606c050ef38163e859e88647de0e9d534bb745e Mon Sep 17 00:00:00 2001 From: "Jun.S.Shen" Date: Fri, 17 Nov 2023 14:28:25 -0800 Subject: [PATCH 1/3] enforce to close accepted connection after processing --- lib/go/thrift/simple_server.go | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/lib/go/thrift/simple_server.go b/lib/go/thrift/simple_server.go index d4f555ccd51..fcf6de32fc3 100644 --- a/lib/go/thrift/simple_server.go +++ b/lib/go/thrift/simple_server.go @@ -211,6 +211,10 @@ func (p *TSimpleServer) innerAccept() (int32, error) { select { case <-ctx.Done(): // client exited, do nothing + if client != nil { + client.Close() + client = nil + } case <-p.stopChan: // TSimpleServer.Close called, close the client connection client.Close() @@ -270,6 +274,7 @@ func (p *TSimpleServer) Stop() error { } <-ctx.Done() + p.stopChan = nil p.stopChan = make(chan struct{}) return nil } @@ -356,6 +361,7 @@ func (p *TSimpleServer) processRequests(client TTransport) (err error) { ok, err := processor.Process(ctx, inputProtocol, outputProtocol) if errors.Is(err, ErrAbandonRequest) { err := client.Close() + client = nil if errors.Is(err, net.ErrClosed) { // In this case, it's kinda expected to get // net.ErrClosed, treat that as no-error From 356da66c6f49936a6bfa483c8b2c196129ac4603 Mon Sep 17 00:00:00 2001 From: "Jun.S.Shen" Date: Mon, 20 Nov 2023 22:20:18 -0800 Subject: [PATCH 2/3] enforce client.Close in innerAccept --- lib/go/thrift/simple_server.go | 16 ++-------------- 1 file changed, 2 insertions(+), 14 deletions(-) diff --git a/lib/go/thrift/simple_server.go b/lib/go/thrift/simple_server.go index fcf6de32fc3..4e1b38a2c9a 100644 --- a/lib/go/thrift/simple_server.go +++ b/lib/go/thrift/simple_server.go @@ -24,7 +24,6 @@ import ( "errors" "fmt" "io" - "net" "sync" "sync/atomic" "time" @@ -195,6 +194,7 @@ func (p *TSimpleServer) innerAccept() (int32, error) { return 0, err } if client != nil { + defer client.Close() ctx, cancel := context.WithCancel(context.Background()) p.wg.Add(2) @@ -211,10 +211,6 @@ func (p *TSimpleServer) innerAccept() (int32, error) { select { case <-ctx.Done(): // client exited, do nothing - if client != nil { - client.Close() - client = nil - } case <-p.stopChan: // TSimpleServer.Close called, close the client connection client.Close() @@ -274,7 +270,6 @@ func (p *TSimpleServer) Stop() error { } <-ctx.Done() - p.stopChan = nil p.stopChan = make(chan struct{}) return nil } @@ -360,14 +355,7 @@ func (p *TSimpleServer) processRequests(client TTransport) (err error) { ok, err := processor.Process(ctx, inputProtocol, outputProtocol) if errors.Is(err, ErrAbandonRequest) { - err := client.Close() - client = nil - if errors.Is(err, net.ErrClosed) { - // In this case, it's kinda expected to get - // net.ErrClosed, treat that as no-error - return nil - } - return err + return nil } if errors.As(err, new(TTransportException)) && err != nil { return err From 5c5b8eb675e3b6993cdcbd7ebae25d06a828b0db Mon Sep 17 00:00:00 2001 From: "Jun.S.Shen" Date: Thu, 23 Nov 2023 21:55:24 -0800 Subject: [PATCH 3/3] fix unit test --- lib/go/thrift/simple_server.go | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/lib/go/thrift/simple_server.go b/lib/go/thrift/simple_server.go index 4e1b38a2c9a..3f46cd68bda 100644 --- a/lib/go/thrift/simple_server.go +++ b/lib/go/thrift/simple_server.go @@ -194,13 +194,14 @@ func (p *TSimpleServer) innerAccept() (int32, error) { return 0, err } if client != nil { - defer client.Close() + ctx, cancel := context.WithCancel(context.Background()) p.wg.Add(2) go func() { defer p.wg.Done() defer cancel() + defer client.Close() if err := p.processRequests(client); err != nil { p.logger(fmt.Sprintf("error processing request: %v", err)) }