Skip to content

Commit

Permalink
Imp: close connection asynchronously in session.gc
Browse files Browse the repository at this point in the history
  • Loading branch information
AlexStocks authored and watermelo committed Jul 18, 2020
1 parent 4709210 commit c22d4d2
Showing 1 changed file with 35 additions and 15 deletions.
50 changes: 35 additions & 15 deletions session.go
Expand Up @@ -91,7 +91,7 @@ type session struct {

// done
wait time.Duration
once sync.Once
once *sync.Once
done chan struct{}

// attribute
Expand All @@ -115,6 +115,7 @@ func newSession(endPoint EndPoint, conn Connection) *session {

period: period,

once: &sync.Once{},
done: make(chan struct{}),
wait: pendingDuration,
attrs: gxcontext.NewValuesContext(nil),
Expand Down Expand Up @@ -153,14 +154,15 @@ func newWSSession(conn *websocket.Conn, endPoint EndPoint) Session {
}

func (s *session) Reset() {
s.name = defaultSessionName
s.once = sync.Once{}
s.done = make(chan struct{})
s.period = period
s.wait = pendingDuration
s.attrs = gxcontext.NewValuesContext(nil)
s.rDone = make(chan struct{})
s.grNum = 0
*s = session{
name: defaultSessionName,
once: &sync.Once{},
done: make(chan struct{}),
period: period,
wait: pendingDuration,
attrs: gxcontext.NewValuesContext(nil),
rDone: make(chan struct{}),
}

s.SetWriteTimeout(netIOTimeout)
s.SetReadTimeout(netIOTimeout)
Expand Down Expand Up @@ -360,6 +362,10 @@ func (s *session) RemoveAttribute(key interface{}) {
}

func (s *session) sessionToken() string {
if s.IsClosed() || s.Connection == nil {
return "session-closed"
}

return fmt.Sprintf("{%s:%s:%d:%s<->%s}",
s.name, s.EndPoint().EndPointType(), s.ID(), s.LocalAddr(), s.RemoteAddr())
}
Expand Down Expand Up @@ -663,7 +669,9 @@ func (s *session) handlePackage() {
s.stop()
if err != nil {
log.Errorf("%s, [session.handlePackage] error:%+v", s.sessionToken(), err)
s.listener.OnError(s, err)
if s != nil || s.listener != nil {
s.listener.OnError(s, err)
}
}
}()

Expand Down Expand Up @@ -730,12 +738,12 @@ func (s *session) handleTCPPackage() error {
if netError, ok = perrors.Cause(err).(net.Error); ok && netError.Timeout() {
break
}
log.Errorf("%s, [session.conn.read] = error:%+v", s.sessionToken(), err)
if perrors.Cause(err) == io.EOF {
err = nil
exit = true
break
}
log.Errorf("%s, [session.conn.read] = error:%+v", s.sessionToken(), err)
exit = true
}
break
Expand All @@ -759,7 +767,7 @@ func (s *session) handleTCPPackage() error {
// handle case 1
if err != nil {
log.Warnf("%s, [session.handleTCPPackage] = len{%d}, error:%+v",
s.sessionToken(), pkgLen, err)
s.sessionToken(), pkgLen, perrors.WithStack(err))
exit = true
break
}
Expand Down Expand Up @@ -877,7 +885,7 @@ func (s *session) handleWSPackage() error {
}
if err != nil {
log.Warnf("%s, [session.handleWSPackage] = error{%+s}",
s.sessionToken(), err)
s.sessionToken(), perrors.WithStack(err))
return perrors.WithStack(err)
}
s.UpdateActive()
Expand Down Expand Up @@ -924,16 +932,28 @@ func (s *session) stop() {
}

func (s *session) gc() {
var (
wQ chan interface{}
conn Connection
)

s.lock.Lock()
if s.attrs != nil {
s.attrs = nil
if s.wQ != nil {
close(s.wQ)
wQ = s.wQ
s.wQ = nil
}
s.Connection.close((int)((int64)(s.wait)))
conn = s.Connection
}
s.lock.Unlock()

go func() {
if wQ != nil {
conn.close((int)((int64)(s.wait)))
close(wQ)
}
}()
}

// Close will be invoked by NewSessionCallback(if return error is not nil)
Expand Down

0 comments on commit c22d4d2

Please sign in to comment.