-
Notifications
You must be signed in to change notification settings - Fork 4k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
THRIFT-5495: close client when shutdown server in go lib #2497
Conversation
Thanks for the PR! This doesn't seem like a good use of
In your case, the map is write-heavy, and reads only happen once during |
Good suggestion,I will change the code later this week |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM. @fishy any thoughts?
lib/go/thrift/simple_server.go
Outdated
defer func() { | ||
p.mu.Lock() | ||
defer p.mu.Unlock() | ||
delete(p.clients, client) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Go has a long standing runtime bug that a builtin map never actually shrinks when you delete from it: golang/go#20135
Which means although the behavior logic is correct (you won't try to close those clients that's already disconnected during Close
), for a long running server this map will keep growing indefinitely and eventually causing memory issues.
For that reason, a sync.Map
will probably actually be better, as I understood it sync.Map
actually occasionally replaces the whole map under the hood so it actually can be garbage collected.
Also I'm not sure that you can actually use a TTransport
as the map key (for either builtin map or sync.Map
), when you try to use a type that cannot be used as the map key it will cause a runtime panic instead of some compile time error. We might need to make some breaking change to expose the TSocket
/TSSLSocket
that's wrapped inside this TTransport
and use its .Addr().String()
as the map key (and ignore any TTransport
that does not have a TSocket
/TSSLSocket
under the hood).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
sync.Map
will probably actually be better, as I understood itsync.Map
actually occasionally replaces the whole map under the hood so it actually can be garbage collected.
Looking at the implementation, I don't think sync.Map
would help either. The underlying "dirty" map is only replaced after a number of Load()
misses, which would never happen here as we only read once during Close()
.
Also I'm not sure that you can actually use a TTransport as the map key (for either builtin map or sync.Map), when you try to use a type that cannot be used as the map key it will cause a runtime panic instead of some compile time error.
I'm not seeing any Go test failures on Travis (though I didn't check thoroughly) so I assume all implementations of TTransport
are hashable.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks like at least a *TSocket
is comparable:
$ cat main.go
package main
import (
"fmt"
"github.com/apache/thrift/lib/go/thrift"
)
func main() {
sock := thrift.NewTSocketConf("localhost:6060", nil)
m := map[thrift.TTransport]struct{}{
sock: {},
}
fmt.Println(m)
}
$ go run main.go
map[0xc000068120:{}]
But still, there's no guarantee that TTransport
returned by TServerSocket.Accept()
will always be comparable, and when it's not trying to use it as a key will cause a runtime panic and it will be hard to detect in tests. (For example, we have a pending PR to wrap TServerSocket
to return a TTransport
that counts the number of active clients: reddit/baseplate.go#464)
So if this is really important to you, make it opt-in instead: This map is only used when people call an exported function from TSimpleServer
to enable it (for example, TSimpleServer.EnableClientCleanup()
, name tbd as I'm bad at names). And we should note in the function's doc about the implications:
- The memory used by this map could be unbounded
- If you enable this map,
TTransport
returned by yourTServerTransport
must be comparable or panics will happen
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Even with the opt-in approach, consider how big a footgun using TTransport
as the map key could be, I think it's still better to:
- Make a breaking API change, add
Addr() net.Addr
toTTransport
interface (TMemoryBuffer.Addr()
should returnnil
, delegate transports likeTFramedTransport
,TBufferedTransport
,THeaderTransport
, etc. should just call their wrappedTTransport.Addr()
) - We use
TTransport.Addr().String()
as the map key, and don't put theTTransport
into the map ifAddr()
returnsnil
(this should only happen when we useTMemoryBuffer
under the hood, I think, which should be fine)
@dcelasun what do you think?
Another alternative is to add something like TTransport.String()
to the interface, but the behavior of that would be more undefined.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Another idea I had considered before for a different issue is to add an Unwrap() TTransport
api to TTransport
, similar to how errors.Unwrap()
works. This way we can keep unwrapping until we find the *TSocket
or *TSSLSocket
wrapped under the hood and use its Addr().String()
as the map key (and if there's no *TSocket
/*TSSLSocket
, then we don't put this client into the map).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we trade a (kinda unbounded) map with one additional goroutine per client connection. the big advantage is that we no longer need to make a breaking change to TTransport to make it usable in a map.
Sounds much more reasonable. @buptubuntu what do you think?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I have also considered to add one additional goroutine per client connection,but not sure if that will cause performance problem. But if we want to fix the map key issue we will have to make big change to TTrancsport, that will be another issue that we will handle in the future.Besides the cost of the additinal go routine should be quite small comparing to the cost of user defined processor or the data read and write operations in transport.
So I think this is a better solution theoretically
I will try to get the cost of the additional goroutine for some extreme cases:
1>100010000 client connect to server but not send data for a long time10000 client connect to server and send 1k data per second
2>1000
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I have tested the mem the goroutine cost using the code below:
package main
import (
"context"
"fmt"
"runtime"
"sync"
)
func main() {
memConsumed := func() uint64 {
runtime.GC()
var s runtime.MemStats
runtime.ReadMemStats(&s)
return s.Sys
}
var channel <-chan interface{}
var wg sync.WaitGroup
const numGoroutines = 1000000 // 1M
wg.Add(numGoroutines)
before := memConsumed()
for i := numGoroutines; i > 0; i-- {
ctx, cancel := context.WithCancel(context.Background())
go func() {
wg.Done()
cancel()
<-ctx.Done()
<-channel
}()
}
wg.Wait()
after := memConsumed()
fmt.Printf("%.3fkb", float64(after-before)/numGoroutines/1024)
}
cost as below:
goroutines | total mem | mem per goroutine |
---|---|---|
10000 | 35M | 3.5K |
1000 | 5.3M | 5.3K |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I have also tested the cost per client using code below:
package main
import (
"context"
"fmt"
"net"
_ "net/http/pprof"
"runtime"
"strings"
"time"
"github.com/apache/thrift/lib/go/thrift"
)
func main() {
memConsumed := func() uint64 {
runtime.GC()
var s runtime.MemStats
runtime.ReadMemStats(&s)
return s.Sys
}
ln, _ := net.Listen("tcp", "localhost:0")
proc := &mockProcessor{
ProcessFunc: func(in, out thrift.TProtocol) (bool, thrift.TException) {
in.ReadMessageBegin(context.Background())
time.Sleep(time.Hour)
return false, nil
},
}
trans := &mockServerTransport{
ListenFunc: func() error {
return nil
},
AcceptFunc: func() (thrift.TTransport, error) {
conn, err := ln.Accept()
if err != nil {
// t.Errorf("error accept connection")
return nil, err
}
return thrift.NewTSocketFromConnTimeout(conn, 0), nil
},
CloseFunc: func() error {
return nil
},
InterruptFunc: func() error {
return ln.Close()
},
}
serv := thrift.NewTSimpleServer2(proc, trans)
go serv.Serve()
time.Sleep(time.Second)
const numGoroutines = 1000
conns := make([]net.Conn, 0, numGoroutines)
before := memConsumed()
for i := 0; i < numGoroutines; i++ {
port := strings.Split(ln.Addr().String(), ":")[1]
netConn, _ := net.Dial("tcp", "localhost:"+port)
time.Sleep(time.Millisecond)
conns = append(conns, netConn)
}
after := memConsumed()
time.Sleep(time.Second)
fmt.Printf("%.3fkb", float64(after-before)/numGoroutines/1024)
}
type mockProcessor struct {
ProcessFunc func(in, out thrift.TProtocol) (bool, thrift.TException)
}
func (m *mockProcessor) Process(ctx context.Context, in, out thrift.TProtocol) (bool, thrift.TException) {
return m.ProcessFunc(in, out)
}
func (m *mockProcessor) ProcessorMap() map[string]thrift.TProcessorFunction {
return map[string]thrift.TProcessorFunction{
"mock": thrift.WrappedTProcessorFunction{
Wrapped: func(ctx context.Context, seqId int32, in, out thrift.TProtocol) (bool, thrift.TException) {
return m.ProcessFunc(in, out)
},
},
}
}
func (m *mockProcessor) AddToProcessorMap(name string, processorFunc thrift.TProcessorFunction) {}
type mockWrappedProcessorContextKey int
const (
processorName mockWrappedProcessorContextKey = iota
)
// setMockWrappableProcessorName sets the "name" of the TProcessorFunction to
// call on a mockWrappableProcessor when calling Process.
//
// In a normal TProcessor, the request name is read from the request itself
// which happens in TProcessor.Process, so it is not passed into the call to
// Process itself, to get around this in testing, mockWrappableProcessor calls
// getMockWrappableProcessorName to get the name to use from the context
// object.
func setMockWrappableProcessorName(ctx context.Context, name string) context.Context {
return context.WithValue(ctx, processorName, name)
}
// getMockWrappableProcessorName gets the "name" of the TProcessorFunction to
// call on a mockWrappableProcessor when calling Process.
func getMockWrappableProcessorName(ctx context.Context) (string, bool) {
val, ok := ctx.Value(processorName).(string)
return val, ok
}
// mockWrappableProcessor can be used to create a mock object that fufills the
// TProcessor interface in testing.
type mockWrappableProcessor struct {
ProcessorFuncs map[string]thrift.TProcessorFunction
}
// Process calls the TProcessorFunction assigned to the "name" set on the
// context object by setMockWrappableProcessorName.
//
// If no name is set on the context or there is no TProcessorFunction mapped to
// that name, the call will panic.
func (p *mockWrappableProcessor) Process(ctx context.Context, in, out thrift.TProtocol) (bool, thrift.TException) {
name, ok := getMockWrappableProcessorName(ctx)
if !ok {
panic("MockWrappableProcessorName not set on context")
}
processor, ok := p.ProcessorMap()[name]
if !ok {
panic(fmt.Sprintf("No processor set for name %q", name))
}
return processor.Process(ctx, 0, in, out)
}
func (p *mockWrappableProcessor) ProcessorMap() map[string]thrift.TProcessorFunction {
return p.ProcessorFuncs
}
func (p *mockWrappableProcessor) AddToProcessorMap(name string, processorFunc thrift.TProcessorFunction) {
p.ProcessorFuncs[name] = processorFunc
}
var (
_ thrift.TProcessor = (*mockProcessor)(nil)
_ thrift.TProcessor = (*mockWrappableProcessor)(nil)
)
type mockServerTransport struct {
ListenFunc func() error
AcceptFunc func() (thrift.TTransport, error)
CloseFunc func() error
InterruptFunc func() error
}
func (m *mockServerTransport) Listen() error {
return m.ListenFunc()
}
func (m *mockServerTransport) Accept() (thrift.TTransport, error) {
return m.AcceptFunc()
}
func (m *mockServerTransport) Close() error {
return m.CloseFunc()
}
func (m *mockServerTransport) Interrupt() error {
return m.InterruptFunc()
}
type mockTTransport struct {
thrift.TTransport
}
func (m *mockTTransport) Close() error {
return nil
}
cost as below:
clients | total mem | mem per client |
---|---|---|
10000 | 157M | 15.7K |
1000 | 24M | 24K |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For the cpu cost, as the operation of the additional goroutine is quite simple(close the client) and we just move it from the stop function,i think we can just ignore that
lib/go/thrift/simple_server_test.go
Outdated
func TestNoHangDuringStopFromClientNoDataSendDuringAcceptLoop(t *testing.T) { | ||
ln, err := net.Listen("tcp", "localhost:0") | ||
if err != nil { | ||
t.Errorf("error when listen") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this should be t.Fatal
instead of t.Errorf
. As if we cannot get a local port to listen none of the rest of this test have any meaning.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
also I don't see any format verbs used in other t.Errorf
calls inside this function, so they can be changed to t.Error
instead.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
lib/go/thrift/simple_server_test.go
Outdated
AcceptFunc: func() (TTransport, error) { | ||
conn, err := ln.Accept() | ||
if err != nil { | ||
// t.Errorf("error accept connection") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
just remove this line instead of comment it out.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
lib/go/thrift/simple_server_test.go
Outdated
if err != nil || netConn == nil { | ||
t.Errorf("error when dial server") | ||
} | ||
time.Sleep(time.Second) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this means this test takes at least 1 second to finish. currently all the tests under this directory only takes slightly longer than 1s:
$ go test
PASS
ok github.com/apache/thrift/lib/go/thrift 1.379s
so a test that takes at least 1s will double the time of running tests here. please use a shorter sleep time.
also I'm not sure why this sleep is necessary?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
lib/go/thrift/simple_server_test.go
Outdated
port := strings.Split(ln.Addr().String(), ":")[1] | ||
netConn, err := net.Dial("tcp", "localhost:"+port) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
isn't net.Dial("tcp", ln.Addr().String())
sufficient? why do we need to do string split to get the port out then combine that back to the full address again?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
lib/go/thrift/simple_server_test.go
Outdated
// t.Errorf("error accept connection") | ||
return nil, err | ||
} | ||
return NewTSocketFromConnTimeout(conn, 0), nil |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
NewTSocketFromConnTimeout
is already deprecated. please use NewTSocketFromConnConf
instead (and since you don't need any timeout here, you can use nil
conf)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
p.wg.Wait() | ||
p.stopChan = make(chan struct{}) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is the purpose of this line to make sure that calling TSimpleServer.Close
twice won't cause any problems? if that's the case I'd rather just store the return values of context.WithCancel(context.Background())
because that handles this kind of corner cases much more robustly.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Because is stopped server can serve later, and the closed channel can not be closed again, so we have to make a new channel
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
in that case this should be done in Serve
instead.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
i don't think so
Firstly, the stopChan should be created when the server is created
Secondly, it is stop that close the stopChan, so stop should also be responsible to recreate the stopChan
After all if we call serve several times before stop, it will create stopChan several times without close it, that is quite strange
lib/go/thrift/socket.go
Outdated
@@ -195,8 +195,9 @@ func (p *TSocket) IsOpen() bool { | |||
// Closes the socket. | |||
func (p *TSocket) Close() error { | |||
// Close the socket | |||
if p.conn != nil { | |||
err := p.conn.Close() | |||
conn := p.conn |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I assume the intention of this change is to make it thread safe, but this does not achieve that (if you run go test -race
with the change in this PR it still complains race condition, because we have 2 goroutines calling TSocket.Close
concurrently).
#2500 will address this issue.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
OK,i have undo the change
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Race detector detected issue in this change and the issue went away after I cherry-picked #2500 in:
~/work/thrift/lib/go/thrift$ go test -race
==================
WARNING: DATA RACE
Write at 0x00c0002c2260 by goroutine 74:
github.com/apache/thrift/lib/go/thrift.(*TSocket).Close()
/home/fishy/work/thrift/lib/go/thrift/socket.go:203 +0x79
github.com/apache/thrift/lib/go/thrift.(*TSimpleServer).innerAccept.func2()
/home/fishy/work/thrift/lib/go/thrift/simple_server.go:215 +0x1a1
Previous read at 0x00c0002c2260 by goroutine 107:
github.com/apache/thrift/lib/go/thrift.(*TSocket).Close()
/home/fishy/work/thrift/lib/go/thrift/socket.go:198 +0x32
github.com/apache/thrift/lib/go/thrift.(*TSimpleServer).processRequests·dwrap·14()
/home/fishy/work/thrift/lib/go/thrift/simple_server.go:307 +0x48
github.com/apache/thrift/lib/go/thrift.(*TSimpleServer).processRequests()
/home/fishy/work/thrift/lib/go/thrift/simple_server.go:349 +0x8ce
github.com/apache/thrift/lib/go/thrift.(*TSimpleServer).innerAccept.func1()
/home/fishy/work/thrift/lib/go/thrift/simple_server.go:203 +0x108
Goroutine 74 (running) created at:
github.com/apache/thrift/lib/go/thrift.(*TSimpleServer).innerAccept()
/home/fishy/work/thrift/lib/go/thrift/simple_server.go:208 +0x396
github.com/apache/thrift/lib/go/thrift.(*TSimpleServer).AcceptLoop()
/home/fishy/work/thrift/lib/go/thrift/simple_server.go:224 +0xee
github.com/apache/thrift/lib/go/thrift.(*TSimpleServer).Serve()
/home/fishy/work/thrift/lib/go/thrift/simple_server.go:241 +0xbd
github.com/apache/thrift/lib/go/thrift.TestNoHangDuringStopFromClientNoDataSendDuringAcceptLoop·dwrap·30()
/home/fishy/work/thrift/lib/go/thrift/simple_server_test.go:191 +0x39
Goroutine 107 (finished) created at:
github.com/apache/thrift/lib/go/thrift.(*TSimpleServer).innerAccept()
/home/fishy/work/thrift/lib/go/thrift/simple_server.go:200 +0x296
github.com/apache/thrift/lib/go/thrift.(*TSimpleServer).AcceptLoop()
/home/fishy/work/thrift/lib/go/thrift/simple_server.go:224 +0xee
github.com/apache/thrift/lib/go/thrift.(*TSimpleServer).Serve()
/home/fishy/work/thrift/lib/go/thrift/simple_server.go:241 +0xbd
github.com/apache/thrift/lib/go/thrift.TestNoHangDuringStopFromClientNoDataSendDuringAcceptLoop·dwrap·30()
/home/fishy/work/thrift/lib/go/thrift/simple_server_test.go:191 +0x39
==================
--- FAIL: TestNoHangDuringStopFromClientNoDataSendDuringAcceptLoop (0.02s)
testing.go:1152: race detected during execution of test
FAIL
exit status 1
FAIL github.com/apache/thrift/lib/go/thrift 2.597s
~/work/thrift/lib/go/thrift$ git cherry-pick 1387ff557
[pr/2497 c356299ba] go: Make socketConn.Close thread-safe
Date: Sat Jan 8 01:03:57 2022 -0800
3 files changed, 14 insertions(+), 19 deletions(-)
~/work/thrift/lib/go/thrift$ go test -race
PASS
ok github.com/apache/thrift/lib/go/thrift 2.468s
ed858c0
to
c5117e4
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this one should only be merged after #2500. we probably should enable race detector in travis.
lib/go/thrift/simple_server.go
Outdated
closed int32 | ||
wg sync.WaitGroup | ||
mu sync.Mutex | ||
stopChan chan struct{} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: I would still prefer to have a blank line after this line so we have the logical "groups" of the fields of a TSimpleServer
.
actually we already have race detector enabled on travis and it's falling (https://app.travis-ci.com/github/apache/thrift/jobs/554765808):
|
also cc @Jens-G we should also wait for this one for 0.16.0, but this one is already pretty close :) |
@buptubuntu now that #2500 is merged, please rebase this branch on top of the latest master so that it can pass the test with race detector on travis. |
OK |
81cfea4
to
d7126f7
Compare
hmm, this test failed on travis:
but I cannot reproduce it locally and it doesn't make sense to me |
Oh I think I know why ( @buptubuntu please apply this diff to the PR: diff --git a/test/go/src/common/clientserver_test.go b/test/go/src/common/clientserver_test.go
index 609086bad..64b326a81 100644
--- a/test/go/src/common/clientserver_test.go
+++ b/test/go/src/common/clientserver_test.go
@@ -75,7 +75,7 @@ func doUnit(t *testing.T, unit *test_unit) {
t.Errorf("Unable to start server: %v", err)
return
}
- go server.AcceptLoop()
+ go server.Serve()
defer server.Stop()
client, trans, err := StartClient(unit.host, unit.port, unit.domain_socket, unit.transport, unit.protocol, unit.ssl)
if err != nil { |
ok,later today |
done |
@@ -192,13 +195,27 @@ func (p *TSimpleServer) innerAccept() (int32, error) { | |||
return 0, err | |||
} | |||
if client != nil { | |||
p.wg.Add(1) | |||
ctx, cancel := context.WithCancel(context.Background()) | |||
p.wg.Add(2) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
style nit: it's probably less error prone to use wg.Add(1)
right before every go
, e.g. write it this way:
ctx, cancel := context.WithCancel(context.Background())
p.wg.Add(1)
go func() {
defer p.wg.Done()
// actual work
}()
p.wg.Add(1)
go func() {
defer p.wg.Done()
// actual work
}()
this avoids the potential bug that someone adds another goroutine in the future but forgot to change the 2
to 3
.
also style-wise, it's really better to always immediately do defer cancel()
right after ctx, cancel := ...
(which is a style guide to avoid resource leaks), which is not possible in the current 2 separated goroutines way but possible if you wrap the second goroutine as a goroutine inside the first one (which was what I suggested in #2497 (comment) :) )
I don't feel too strongly for either of the nits here, though.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ctx cancel is done by close a chan
func (c *cancelCtx) cancel(removeFromParent bool, err error) {
if err == nil {
panic("context: internal error: missing cancel error")
}
c.mu.Lock()
if c.err != nil {
c.mu.Unlock()
return // already canceled
}
c.err = err
d, _ := c.done.Load().(chan struct{})
if d == nil {
c.done.Store(closedchan)
} else {
close(d)
}
for child := range c.children {
// NOTE: acquiring the child's lock while holding parent's lock.
child.cancel(false, err)
}
c.children = nil
c.mu.Unlock()
if removeFromParent {
removeChild(c.Context, c)
}
}
And yes, if the ctx is caneled, it will not close the inner channel again, and will do nothing and return. So we still have to create a new ctx after stop the server, because the ctx.Done() channel is already stopped.
divide p.wg.Add(2) into two p.wg.Add(1), it seems ok for now
@dcelasun what do you think? I'm ok to merge this in as-is in its current state. |
Yeah, LGTM as well. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Wait. Sorry I just realized that just closing all clients during TSimpleServer.Stop
would be a behavior change and make things worse.
In the current code, when TSimpleServer.Stop
is called, it's a graceful shutdown: it makes TSimpleServer
to stop accepting new connections (via the atomic int and Interrupt
call), but it still waits for any request that's currently handling to finish. And letting any request that's currently handling to finish is an important part of graceful shutdown.
Now we just close all the connections (this is true with either the goroutine approach or original map approach), which will cause any currently handling connection to be closed abruptly. from client's pov, they will not receive the full response for this request (it could be either they haven't received any response at all, or they received half of the response then the unexpected EOF happens), and this will cause those requests to fail (because we didn't let them to finish).
So when any server upgrade to this version, this will increase their error rate whenever autoscaling is scaling down. This is a worse scenario than graceful shutdown just timeouts (kubernnetes has a timeout to wait for graceful shutdown, so in current version as long as the server can finish all the pending requests in that time they won't fail any requests just because of a shutdown happens).
I don't know what's the best way to fix that. Maybe add a setting to TSimpleServer
to sleep for a period of time before trying to abruptly close all connections is a ok short term solution. for long term we might need to make some breaking changes to TProtocol
/TTransport
to add the graceful shutdown semantic to them.
@dcelasun your thoughts?
|
If the hang problem is worse than abruptly closing is not absolute,it depend on user, maybe user want a quick restart more than zero error even in k8s. As for our situation, hang is absolutely worse than abruptly closing. |
@buptubuntu could you please clarify whether you are working on the new approach(-es)? |
As I mentioned above: Yeah,I think there are generally two solutions:
|
between the 2 I would certainly prefer the 2nd one. |
Likewise, the second one. |
There are some unstable tests. As long as Go tests pass, it's fine. |
ef18c39
to
003dd7a
Compare
070827b
to
ab18bec
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
looks good besides some documentation/naming nitpicks.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@dcelasun what do you think?
@dcelasun any more suggestions? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
So,it‘s ok to merge? what do you think? @fishy |
@fishy any other suggestions? |
Client: [go]
If there is client connection and no data is send,we will encounter hang druing server stop:
1>If transport factory conf with socket timeout,we will hang until the deadline of the socket
2>If transport factory conf without socket timeout,we will hang forever
Stack As below:
goroutine 140800 [IO wait, 2706 minutes]:
internal/poll.runtime_pollWait(0x7fbf804fb100, 0x72)
runtime/netpoll.go:234 +0x89
internal/poll.(*pollDesc).wait(0xc009087700, 0xc008196000, 0x0)
internal/poll/fd_poll_runtime.go:84 +0x32
internal/poll.(*pollDesc).waitRead(...)
internal/poll/fd_poll_runtime.go:89
internal/poll.(*FD).Read(0xc009087700, {0xc008196000, 0x10000, 0x10000})
internal/poll/fd_unix.go:167 +0x25a
net.(*netFD).Read(0xc009087700, {0xc008196000, 0x0, 0xc0061089b8})
net/fd_posix.go:56 +0x29
net.(*conn).Read(0xc007c98038, {0xc008196000, 0x0, 0xc006108978})
net/net.go:183 +0x45
github.com/apache/thrift/lib/go/thrift.(*socketConn).Read(0x246aae0, {0xc008196000, 0xc0058b4ed0, 0x246aae0})
github.com/apache/thrift@v0.15.0/lib/go/thrift/socket_conn.go:101 +0x44
github.com/apache/thrift/lib/go/thrift.(*TSocket).Read(0xc003555460, {0xc008196000, 0x10000, 0x10000})
github.com/apache/thrift@v0.15.0/lib/go/thrift/socket.go:221 +0x67
bufio.(*Reader).Read(0xc005657320, {0xc001da1000, 0x1000, 0x203000})
bufio/bufio.go:227 +0x1b4
github.com/apache/thrift/lib/go/thrift.(*TBufferedTransport).Read(0xc0035554a0, {0xc001da1000, 0x431e10, 0x64})
github.com/apache/thrift@v0.15.0/lib/go/thrift/buffered_transport.go:67 +0x45
bufio.(*Reader).Read(0xc005657380, {0xc0090877f0, 0x4, 0x4b5bac0})
bufio/bufio.go:227 +0x1b4
io.ReadAtLeast({0x30c0520, 0xc005657380}, {0xc0090877f0, 0x4, 0x4}, 0x4)
io/io.go:328 +0x9a
io.ReadFull(...)
io/io.go:347
github.com/apache/thrift/lib/go/thrift.(*TFramedTransport).readFrame(0xc009087780)
github.com/apache/thrift@v0.15.0/lib/go/thrift/framed_transport.go:199 +0x3c
github.com/apache/thrift/lib/go/thrift.(*TFramedTransport).Read(0xc009087780, {0xc0090877f0, 0x1, 0x4})
github.com/apache/thrift@v0.15.0/lib/go/thrift/framed_transport.go:148 +0x130
github.com/apache/thrift/lib/go/thrift.(*TFramedTransport).ReadByte(0xc009087780)
github.com/apache/thrift@v0.15.0/lib/go/thrift/framed_transport.go:157 +0x2e
github.com/apache/thrift/lib/go/thrift.(*TCompactProtocol).readByteDirect(...)
github.com/apache/thrift@v0.15.0/lib/go/thrift/compact_protocol.go:766
github.com/apache/thrift/lib/go/thrift.(*TCompactProtocol).ReadMessageBegin(0xc008765040, {0x311a118, 0xc00319ede0})
github.com/apache/thrift@v0.15.0/lib/go/thrift/compact_protocol.go:367 +0x62