Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

TRD-1749: Escape send queued when blocked on connection side #18

Merged
merged 3 commits into from
Jan 31, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 22 additions & 9 deletions session.go
Original file line number Diff line number Diff line change
Expand Up @@ -235,12 +235,16 @@ func (s *session) queueForSend(msg *Message) error {

s.toSend = append(s.toSend, msgBytes)

s.notifyMessageOut()

return nil
}

func (s *session) notifyMessageOut() {
select {
case s.messageEvent <- true:
default:
}

return nil
}

// send will validate, persist, queue the message. If the session is logged on, send all messages in the queue.
Expand Down Expand Up @@ -347,8 +351,12 @@ func (s *session) persist(seqNum int, msgBytes []byte) error {
}

func (s *session) sendQueued() {
for _, msgBytes := range s.toSend {
s.sendBytes(msgBytes)
for i, msgBytes := range s.toSend {
if !s.sendBytes(msgBytes) {
s.toSend = s.toSend[i:]
s.notifyMessageOut()
return
}
}

s.dropQueued()
Expand All @@ -366,15 +374,20 @@ func (s *session) EnqueueBytesAndSend(msg []byte) {
s.sendQueued()
}

func (s *session) sendBytes(msg []byte) {
func (s *session) sendBytes(msg []byte) bool {
Neal marked this conversation as resolved.
Show resolved Hide resolved
if s.messageOut == nil {
s.log.OnEventf("Failed to send: disconnected")
return
return false
}

s.log.OnOutgoing(msg)
s.messageOut <- msg
s.stateTimer.Reset(s.HeartBtInt)
select {
case s.messageOut <- msg:
s.log.OnOutgoing(msg)
s.stateTimer.Reset(s.HeartBtInt)
return true
default:
return false
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just an observation: with this change, sometimes the benchmark on the outbound side stops responding. I'll capture the stuck goroutine and share later. But when it worked this is the benchmark result with the change

2024/01/31 09:54:40 >>>>>>>>>>> OUTBOUND STATS <<<<<<<<<<<
2024/01/31 09:54:40 NumCPU: 10
2024/01/31 09:54:40 GOMAXPROCS: 10
2024/01/31 09:54:40 Sample mean is 127.8728962000003 us
2024/01/31 09:54:40 Sample max is 2632.417 us (204)
2024/01/31 09:54:40 Standard Dev is 181.40784398506725 us
2024/01/31 09:54:40 Processed 10000 msg in 628.1435ms [effective rate: 15919.9291 msg/s]
2024/01/31 09:54:40 ----------- OUTBOUND STATS -----------

2024/01/31 09:54:40 >>>>>>>>>>> INBOUND STATS <<<<<<<<<<<
2024/01/31 09:54:40 NumCPU: 10
2024/01/31 09:54:40 GOMAXPROCS: 10
2024/01/31 09:54:40 Sample mean is 19.812591999999984 us
2024/01/31 09:54:40 Sample max is 941.417 us (1231)
2024/01/31 09:54:40 Standard Dev is 35.00415617401569 us
2024/01/31 09:54:40 Processed 10000 msg in 622.807375ms [effective rate: 16056.3288 msg/s]
2024/01/31 09:54:40 ----------- INBOUND STATS -----------

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

also curious about adding in GC runtime settings as part of the benchmark..will try

}
}

func (s *session) doTargetTooHigh(reject targetTooHigh) (nextState resendState, err error) {
Expand Down