Skip to content

Commit

Permalink
TRD-1749: Escape send queued when blocked on connection side (#18)
Browse files Browse the repository at this point in the history
  • Loading branch information
AlexandrosKyriakakis committed Jan 31, 2024
1 parent 3bf2b87 commit 3594015
Showing 1 changed file with 22 additions and 9 deletions.
31 changes: 22 additions & 9 deletions session.go
Original file line number Diff line number Diff line change
Expand Up @@ -235,12 +235,16 @@ func (s *session) queueForSend(msg *Message) error {

s.toSend = append(s.toSend, msgBytes)

s.notifyMessageOut()

return nil
}

func (s *session) notifyMessageOut() {
select {
case s.messageEvent <- true:
default:
}

return nil
}

// send will validate, persist, queue the message. If the session is logged on, send all messages in the queue.
Expand Down Expand Up @@ -347,8 +351,12 @@ func (s *session) persist(seqNum int, msgBytes []byte) error {
}

func (s *session) sendQueued() {
for _, msgBytes := range s.toSend {
s.sendBytes(msgBytes)
for i, msgBytes := range s.toSend {
if !s.sendBytes(msgBytes) {
s.toSend = s.toSend[i:]
s.notifyMessageOut()
return
}
}

s.dropQueued()
Expand All @@ -366,15 +374,20 @@ func (s *session) EnqueueBytesAndSend(msg []byte) {
s.sendQueued()
}

func (s *session) sendBytes(msg []byte) {
func (s *session) sendBytes(msg []byte) bool {
if s.messageOut == nil {
s.log.OnEventf("Failed to send: disconnected")
return
return false
}

s.log.OnOutgoing(msg)
s.messageOut <- msg
s.stateTimer.Reset(s.HeartBtInt)
select {
case s.messageOut <- msg:
s.log.OnOutgoing(msg)
s.stateTimer.Reset(s.HeartBtInt)
return true
default:
return false
}
}

func (s *session) doTargetTooHigh(reject targetTooHigh) (nextState resendState, err error) {
Expand Down

0 comments on commit 3594015

Please sign in to comment.