Skip to content

Commit

Permalink
Fixing a bug where we were bottlenecking on frames that had bodies sm…
Browse files Browse the repository at this point in the history
…aller than 8 bytes (#320)

* The check for the header size was preventing us from moving on if the body length was smaller than the standard frame header (8 bytes).

Moving it within the portion of the reader that deals with parsing headers.

* update changelog and some comments

---------

Co-authored-by: Richard Park <ripark@microsoft.com>
Co-authored-by: Joel Hendrix <jhendrix@microsoft.com>
  • Loading branch information
3 people committed Mar 1, 2024
1 parent b0717e4 commit a819335
Show file tree
Hide file tree
Showing 3 changed files with 17 additions and 9 deletions.
6 changes: 6 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,11 @@
# Release History

## 1.0.5 (2024-03-05)

### Bugs Fixed

* Fixed an issue that could cause delays when parsing small frames.

## 1.0.4 (2024-01-16)

### Other Changes
Expand Down
16 changes: 9 additions & 7 deletions conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,13 +52,13 @@ type ConnOptions struct {
//
// Must be 512 or greater.
//
// Default: 512.
// Default: 65536.
MaxFrameSize uint32

// MaxSessions sets the maximum number of channels.
// The value must be greater than zero.
//
// Default: 65535.
// Default: 65536.
MaxSessions uint16

// Properties sets an entry in the connection properties map sent to the server.
Expand Down Expand Up @@ -663,13 +663,15 @@ func (c *Conn) readFrame() (frames.Frame, error) {
}
}

// read more if buf doesn't contain enough to parse the header
if c.rxBuf.Len() < frames.HeaderSize {
continue
}

// parse the header if a frame isn't in progress
if !frameInProgress {
// read more if buf doesn't contain enough to parse the header
// NOTE: we MUST do this ONLY if a frame isn't in progress else we can
// end up stalling when reading frames with bodies smaller than HeaderSize
if c.rxBuf.Len() < frames.HeaderSize {
continue
}

var err error
currentHeader, err = frames.ParseHeader(&c.rxBuf)
if err != nil {
Expand Down
4 changes: 2 additions & 2 deletions session.go
Original file line number Diff line number Diff line change
Expand Up @@ -434,7 +434,7 @@ func (s *Session) mux(remoteBegin *frames.PerformBegin) {

if body.Settled && body.Role == encoding.RoleReceiver {
// check if settlement confirmation was requested, if so
// confirm by closing channel
// confirm by closing channel (RSM == ModeFirst)
if done, ok := settlementFromDeliveryID[deliveryID]; ok {
delete(settlementFromDeliveryID, deliveryID)
select {
Expand Down Expand Up @@ -707,7 +707,7 @@ func (s *Session) mux(remoteBegin *frames.PerformBegin) {
}
for deliveryID := start; deliveryID <= end; deliveryID++ {
// send delivery state to the channel and close it to signal
// that the delivery has completed.
// that the delivery has completed (RSM == ModeSecond)
if done, ok := settlementFromDeliveryID[deliveryID]; ok {
delete(settlementFromDeliveryID, deliveryID)
select {
Expand Down

0 comments on commit a819335

Please sign in to comment.