From bb33c9289e99adb4358a57902f6d4ad910ac0e97 Mon Sep 17 00:00:00 2001 From: AlexandrosKyriakakis Date: Tue, 30 Jan 2024 19:08:45 +0200 Subject: [PATCH 1/3] Escape send queued when blocked on connection side --- session.go | 42 ++++++++++++++++++++++++++++++++---------- 1 file changed, 32 insertions(+), 10 deletions(-) diff --git a/session.go b/session.go index b359245e2..5d4ab4cb0 100644 --- a/session.go +++ b/session.go @@ -235,12 +235,16 @@ func (s *session) queueForSend(msg *Message) error { s.toSend = append(s.toSend, msgBytes) + s.notifyMessageOut() + + return nil +} + +func (s *session) notifyMessageOut() { select { case s.messageEvent <- true: default: } - - return nil } // send will validate, persist, queue the message. If the session is logged on, send all messages in the queue. @@ -347,10 +351,23 @@ func (s *session) persist(seqNum int, msgBytes []byte) error { } func (s *session) sendQueued() { - for _, msgBytes := range s.toSend { - s.sendBytes(msgBytes) + var ( + blocked bool + indexBlocked int + ) + + for i, msgBytes := range s.toSend { + blocked = s.sendBytes(msgBytes) + if blocked { + indexBlocked = i + break + } + } + if blocked { + s.toSend = s.toSend[indexBlocked:] + s.notifyMessageOut() + return } - s.dropQueued() } @@ -366,15 +383,20 @@ func (s *session) EnqueueBytesAndSend(msg []byte) { s.sendQueued() } -func (s *session) sendBytes(msg []byte) { +func (s *session) sendBytes(msg []byte) bool { if s.messageOut == nil { s.log.OnEventf("Failed to send: disconnected") - return + return false } - s.log.OnOutgoing(msg) - s.messageOut <- msg - s.stateTimer.Reset(s.HeartBtInt) + select { + case <-time.After(5 * time.Millisecond): + return true + case s.messageOut <- msg: + s.log.OnOutgoing(msg) + s.stateTimer.Reset(s.HeartBtInt) + return false + } } func (s *session) doTargetTooHigh(reject targetTooHigh) (nextState resendState, err error) { From 21384e6dc24b832a5b3613f534b1256d6d4ebf85 Mon Sep 17 00:00:00 2001 From: AlexandrosKyriakakis Date: Wed, 31 Jan 2024 13:09:56 +0200 Subject: [PATCH 2/3] use default instead of a timer --- session.go | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/session.go b/session.go index 5d4ab4cb0..e76e596f8 100644 --- a/session.go +++ b/session.go @@ -352,18 +352,18 @@ func (s *session) persist(seqNum int, msgBytes []byte) error { func (s *session) sendQueued() { var ( - blocked bool + sent bool indexBlocked int ) for i, msgBytes := range s.toSend { - blocked = s.sendBytes(msgBytes) - if blocked { + sent = s.sendBytes(msgBytes) + if !sent { indexBlocked = i break } } - if blocked { + if !sent { s.toSend = s.toSend[indexBlocked:] s.notifyMessageOut() return @@ -390,11 +390,11 @@ func (s *session) sendBytes(msg []byte) bool { } select { - case <-time.After(5 * time.Millisecond): - return true case s.messageOut <- msg: s.log.OnOutgoing(msg) s.stateTimer.Reset(s.HeartBtInt) + return true + default: return false } } From 9693665cce97725f993ad72c53205846d1022653 Mon Sep 17 00:00:00 2001 From: AlexandrosKyriakakis Date: Wed, 31 Jan 2024 18:13:57 +0200 Subject: [PATCH 3/3] refactor send queued --- session.go | 19 +++++-------------- 1 file changed, 5 insertions(+), 14 deletions(-) diff --git a/session.go b/session.go index e76e596f8..db5198e04 100644 --- a/session.go +++ b/session.go @@ -351,23 +351,14 @@ func (s *session) persist(seqNum int, msgBytes []byte) error { } func (s *session) sendQueued() { - var ( - sent bool - indexBlocked int - ) - for i, msgBytes := range s.toSend { - sent = s.sendBytes(msgBytes) - if !sent { - indexBlocked = i - break + if !s.sendBytes(msgBytes) { + s.toSend = s.toSend[i:] + s.notifyMessageOut() + return } } - if !sent { - s.toSend = s.toSend[indexBlocked:] - s.notifyMessageOut() - return - } + s.dropQueued() }