Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

TRD-1749: Escape send queued when blocked on connection side #18

Merged
merged 3 commits into from
Jan 31, 2024
Merged
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
42 changes: 32 additions & 10 deletions session.go
Original file line number Diff line number Diff line change
Expand Up @@ -235,12 +235,16 @@ func (s *session) queueForSend(msg *Message) error {

s.toSend = append(s.toSend, msgBytes)

s.notifyMessageOut()

return nil
}

func (s *session) notifyMessageOut() {
select {
case s.messageEvent <- true:
default:
}

return nil
}

// send will validate, persist, queue the message. If the session is logged on, send all messages in the queue.
Expand Down Expand Up @@ -347,10 +351,23 @@ func (s *session) persist(seqNum int, msgBytes []byte) error {
}

func (s *session) sendQueued() {
for _, msgBytes := range s.toSend {
s.sendBytes(msgBytes)
var (
blocked bool
indexBlocked int
)

for i, msgBytes := range s.toSend {
blocked = s.sendBytes(msgBytes)
if blocked {
indexBlocked = i
break
}
}
if blocked {
s.toSend = s.toSend[indexBlocked:]
s.notifyMessageOut()
return
}

s.dropQueued()
}

Expand All @@ -366,15 +383,20 @@ func (s *session) EnqueueBytesAndSend(msg []byte) {
s.sendQueued()
}

func (s *session) sendBytes(msg []byte) {
func (s *session) sendBytes(msg []byte) bool {
Neal marked this conversation as resolved.
Show resolved Hide resolved
if s.messageOut == nil {
s.log.OnEventf("Failed to send: disconnected")
return
return false
}

s.log.OnOutgoing(msg)
s.messageOut <- msg
s.stateTimer.Reset(s.HeartBtInt)
select {
case <-time.After(5 * time.Millisecond):
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

With 1ms

2024/01/30 18:55:18 >>>>>>>>>>> INBOUND STATS <<<<<<<<<<<
2024/01/30 18:55:18 NumCPU: 10
2024/01/30 18:55:18 GOMAXPROCS: 10
2024/01/30 18:55:18 Sample mean is 3744.244753299986 us
2024/01/30 18:55:18 Sample max is 57088.334 us (4482)
2024/01/30 18:55:18 Standard Dev is 8178.456956331603 us
2024/01/30 18:55:18 Processed 10000 msg in 5.38205325s [effective rate: 1858.0270 msg/s]
2024/01/30 18:55:18 ----------- INBOUND STATS -----------

2024/01/30 18:55:18 >>>>>>>>>>> OUTBOUND STATS <<<<<<<<<<<
2024/01/30 18:55:18 NumCPU: 10
2024/01/30 18:55:18 GOMAXPROCS: 10
2024/01/30 18:55:18 Sample mean is 3753.0946988000005 us
2024/01/30 18:55:18 Sample max is 69729.459 us (1917)
2024/01/30 18:55:18 Standard Dev is 9290.683106959043 us
2024/01/30 18:55:18 Processed 10000 msg in 5.387390125s [effective rate: 1856.1863 msg/s]
2024/01/30 18:55:18 ----------- OUTBOUND STATS -----------

With 5ms

2024/01/30 18:57:57 >>>>>>>>>>> INBOUND STATS <<<<<<<<<<<
2024/01/30 18:57:57 NumCPU: 10
2024/01/30 18:57:57 GOMAXPROCS: 10
2024/01/30 18:57:57 Sample mean is 13627.323359899992 us
2024/01/30 18:57:57 Sample max is 179786.5 us (3812)
2024/01/30 18:57:57 Standard Dev is 28109.224029722398 us
2024/01/30 18:57:57 Processed 10000 msg in 20.271821667s [effective rate: 493.2956 msg/s]
2024/01/30 18:57:57 ----------- INBOUND STATS -----------

2024/01/30 18:57:57 >>>>>>>>>>> OUTBOUND STATS <<<<<<<<<<<
2024/01/30 18:57:57 NumCPU: 10
2024/01/30 18:57:57 GOMAXPROCS: 10
2024/01/30 18:57:57 Sample mean is 14487.111722499989 us
2024/01/30 18:57:57 Sample max is 254143.25 us (1879)
2024/01/30 18:57:57 Standard Dev is 35900.6777926075 us
2024/01/30 18:57:57 Processed 10000 msg in 20.272896125s [effective rate: 493.2694 msg/s]
2024/01/30 18:57:57 ----------- OUTBOUND STATS -----------

Copy link

@sai-g sai-g Jan 30, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've tested this locally and can see that the lesser the value, the better the throughput. Wondering how you choose 5ms?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why even wait 1ms? how's the throughput if you just have a default?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ohh ok I get your point. I avoided default because I think that as long as the connection is blocked this loop will keep the CPU very busy, since every time we try to send and then we trigger again the messageEvent.
Though it makes sense. So I will use default instead of some timer.

Addressed: 21384e6

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

while time.After is a nice convenience method, it shouldn't be used in hotpaths where you call it a many times as internally it creates a timer but never stops it, so you should rather just create a timer yourself so you can stop it if you end up returning before it fires.

it is also documented as such: https://pkg.go.dev/time#After

(although I also commented above that I think maybe there shouldn't be a timer at all and just use a default?)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Addressed: 21384e6

return true
case s.messageOut <- msg:
s.log.OnOutgoing(msg)
s.stateTimer.Reset(s.HeartBtInt)
return false
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just an observation: with this change, sometimes the benchmark on the outbound side stops responding. I'll capture the stuck goroutine and share later. But when it worked this is the benchmark result with the change

2024/01/31 09:54:40 >>>>>>>>>>> OUTBOUND STATS <<<<<<<<<<<
2024/01/31 09:54:40 NumCPU: 10
2024/01/31 09:54:40 GOMAXPROCS: 10
2024/01/31 09:54:40 Sample mean is 127.8728962000003 us
2024/01/31 09:54:40 Sample max is 2632.417 us (204)
2024/01/31 09:54:40 Standard Dev is 181.40784398506725 us
2024/01/31 09:54:40 Processed 10000 msg in 628.1435ms [effective rate: 15919.9291 msg/s]
2024/01/31 09:54:40 ----------- OUTBOUND STATS -----------

2024/01/31 09:54:40 >>>>>>>>>>> INBOUND STATS <<<<<<<<<<<
2024/01/31 09:54:40 NumCPU: 10
2024/01/31 09:54:40 GOMAXPROCS: 10
2024/01/31 09:54:40 Sample mean is 19.812591999999984 us
2024/01/31 09:54:40 Sample max is 941.417 us (1231)
2024/01/31 09:54:40 Standard Dev is 35.00415617401569 us
2024/01/31 09:54:40 Processed 10000 msg in 622.807375ms [effective rate: 16056.3288 msg/s]
2024/01/31 09:54:40 ----------- INBOUND STATS -----------

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

also curious about adding in GC runtime settings as part of the benchmark..will try

}
}

func (s *session) doTargetTooHigh(reject targetTooHigh) (nextState resendState, err error) {
Expand Down