Skip to content
Permalink
Browse files
change grNum to chan
  • Loading branch information
pantianying authored and watermelo committed Jul 18, 2020
1 parent 293a60d commit 5f47c4d9b41640a806beb4b94eedef7f2a74e2e9
Showing 1 changed file with 24 additions and 23 deletions.
@@ -86,9 +86,10 @@ type session struct {
// attribute
attrs *ValuesContext

// goroutines sync
grNum int32
lock sync.RWMutex
// goroutines done signal
handleLoopDone chan struct{}
handlePackageDone chan struct{}
lock sync.RWMutex
}

func newSession(endPoint EndPoint, conn Connection) *session {
@@ -102,9 +103,11 @@ func newSession(endPoint EndPoint, conn Connection) *session {

period: period,

done: make(chan struct{}),
wait: pendingDuration,
attrs: NewValuesContext(nil),
done: make(chan struct{}),
wait: pendingDuration,
attrs: NewValuesContext(nil),
handleLoopDone: make(chan struct{}),
handlePackageDone: make(chan struct{}),
}

ss.Connection.setSession(ss)
@@ -145,7 +148,8 @@ func (s *session) Reset() {
s.period = period
s.wait = pendingDuration
s.attrs = NewValuesContext(nil)
s.grNum = 0
s.handleLoopDone = make(chan struct{})
s.handlePackageDone = make(chan struct{})

s.SetWriteTimeout(netIOTimeout)
s.SetReadTimeout(netIOTimeout)
@@ -470,7 +474,6 @@ func (s *session) run() {
}

// start read/write gr
atomic.AddInt32(&(s.grNum), 2)
go s.handleLoop()
go s.handlePackage()
}
@@ -496,7 +499,7 @@ func (s *session) handleLoop() {
log.Errorf("[session.handleLoop] panic session %s: err=%s\n%s", s.sessionToken(), r, rBuf)
}

grNum = atomic.AddInt32(&(s.grNum), -1)
close(s.handleLoopDone)
s.listener.OnClose(s)
log.Info("%s, [session.handleLoop] goroutine exit now, left gr num %d", s.Stat(), grNum)
s.gc()
@@ -511,19 +514,17 @@ LOOP:
select {
case <-s.done:
// this case branch assure the (session)handleLoop gr will exit before (session)handlePackage gr.
if atomic.LoadInt32(&(s.grNum)) == 1 { // make sure @(session)handlePackage goroutine has been closed.
if len(s.wQ) == 0 {
log.Infof("%s, [session.handleLoop] got done signal. wQ is nil.", s.Stat())
break LOOP
}
counter.Start()
// if time.Since(start).Nanoseconds() >= s.wait.Nanoseconds() {
if counter.Count() > s.wait.Nanoseconds() {
log.Infof("%s, [session.handleLoop] got done signal ", s.Stat())
break LOOP
}
<-s.handlePackageDone
if len(s.wQ) == 0 {
log.Infof("%s, [session.handleLoop] got done signal. wQ is nil.", s.Stat())
break LOOP
}
counter.Start()
// if time.Since(start).Nanoseconds() >= s.wait.Nanoseconds() {
if counter.Count() > s.wait.Nanoseconds() {
log.Infof("%s, [session.handleLoop] got done signal ", s.Stat())
break LOOP
}

case outPkg = <-s.wQ:
if flag {
if err = s.writer.Write(s, outPkg); err != nil {
@@ -577,7 +578,7 @@ func (s *session) handlePackage() {
log.Errorf("[session.handlePackage] panic session %s: err=%s\n%s", s.sessionToken(), r, rBuf)
}

grNum = atomic.AddInt32(&(s.grNum), -1)
close(s.handlePackageDone)
log.Infof("%s, [session.handlePackage] gr will exit now, left gr num %d", s.sessionToken(), grNum)
s.stop()
if err != nil {
@@ -856,5 +857,5 @@ func (s *session) gc() {
func (s *session) Close() {
s.stop()
log.Info("%s closed now. its current gr num is %d",
s.sessionToken(), atomic.LoadInt32(&(s.grNum)))
s.sessionToken())
}

0 comments on commit 5f47c4d

Please sign in to comment.