New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Expand stream's flow control in case of an active read. #1248
Changes from 12 commits
9012be0
c2d5ec9
4d09482
722d058
c9d1fc8
78f4eb0
da22a03
9d8f07c
04b882f
d4d90a8
1df7a2a
e85309f
a96ed0a
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -58,6 +58,8 @@ const ( | |
defaultServerKeepaliveTime = time.Duration(2 * time.Hour) | ||
defaultServerKeepaliveTimeout = time.Duration(20 * time.Second) | ||
defaultKeepalivePolicyMinTime = time.Duration(5 * time.Minute) | ||
// max window limit set by HTTP2 Specs. | ||
maxWindowSize = math.MaxInt32 | ||
) | ||
|
||
// The following defines various control items which could flow through | ||
|
@@ -167,14 +169,37 @@ type inFlow struct { | |
// The amount of data the application has consumed but grpc has not sent | ||
// window update for them. Used to reduce window update frequency. | ||
pendingUpdate uint32 | ||
// delta is the extra window update given by receiver when an application | ||
// is reading data bigger in size than the inFlow limit. | ||
delta uint32 | ||
} | ||
|
||
func (f *inFlow) maybeAdjust(n uint32) uint32 { | ||
if n > uint32(math.MaxInt32) { | ||
n = uint32(math.MaxInt32) | ||
} | ||
f.mu.Lock() | ||
defer f.mu.Unlock() | ||
estSenderQuota := int32(f.limit - (f.pendingData + f.pendingUpdate)) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. optional nit, sorry one more naming suggestion: There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. How about I add comments and leave the names as is? I fear long names take us away from Go-styling. |
||
estUntransmittedData := int32(n - f.pendingData) // Casting into int32 since it could be negative. | ||
if estUntransmittedData > estSenderQuota { | ||
// Sender's window shouldn't go more than 2^31 - 1. | ||
if f.limit+n > maxWindowSize { | ||
f.delta = maxWindowSize - f.limit | ||
} else { | ||
f.delta = n | ||
} | ||
return f.delta | ||
} | ||
return 0 | ||
} | ||
|
||
// onData is invoked when some data frame is received. It updates pendingData. | ||
func (f *inFlow) onData(n uint32) error { | ||
f.mu.Lock() | ||
defer f.mu.Unlock() | ||
f.pendingData += n | ||
if f.pendingData+f.pendingUpdate > f.limit { | ||
if f.pendingData+f.pendingUpdate > f.limit+f.delta { | ||
return fmt.Errorf("received %d-bytes data exceeding the limit %d bytes", f.pendingData+f.pendingUpdate, f.limit) | ||
} | ||
return nil | ||
|
@@ -189,6 +214,13 @@ func (f *inFlow) onRead(n uint32) uint32 { | |
return 0 | ||
} | ||
f.pendingData -= n | ||
if n > f.delta { | ||
n -= f.delta | ||
f.delta = 0 | ||
} else { | ||
f.delta -= n | ||
n = 0 | ||
} | ||
f.pendingUpdate += n | ||
if f.pendingUpdate >= f.limit/4 { | ||
wu := f.pendingUpdate | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: I think the use of
fullReader
and other changes to this file can be reverted, since the transport is still anio.Reader
.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
These test here rely on recvMsg's behavior of reading the full message, (which is true when it interacts with the transport stream). However, here the parser is given a "fake" buffer instead of the stream. Given that we changed recvMsg to go from io.ReadFull to p.r.Read, the "fake" buffer here needs to read the full message just like the transport stream does.