Skip to content

grpc: introduce new DialOption and ServerOption to configure initial window size without disabling BDP estimation. #8283

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 42 commits into from
Jun 2, 2025
Merged
Show file tree
Hide file tree
Changes from 39 commits
Commits
Show all changes
42 commits
Select commit Hold shift + click to select a range
0c8d8c3
Add new Dial/ServerOptions to set the size
vinothkumarr227 May 2, 2025
3f776f2
Added test and fix vet errors
vinothkumarr227 May 2, 2025
a55c070
small tweaks
vinothkumarr227 May 2, 2025
21f3aae
Added test
vinothkumarr227 May 5, 2025
68f1322
Fixed the review changes
vinothkumarr227 May 13, 2025
699cb70
credentials, transport, grpc : add a call option to override the :aut…
eshitachandwani Apr 30, 2025
ab9e865
xds: generic lrs client for load reporting (#8250)
purnesh42H May 5, 2025
e72ed5b
otel: Test streaming rpc sequence numbers (#8272)
vinothkumarr227 May 5, 2025
541c8ce
[alts] Add plumbing for the bound access token field in the ALTS Star…
matthewstevenson88 May 5, 2025
5f9eeb4
stats/opentelemetry: separate out interceptors for tracing and metric…
janardhanvissa May 5, 2025
2cb07fd
xds_test: Wait for server to enter serving mode in RBAC test (#8287)
arjan-bal May 5, 2025
d077278
internal/delegatingresolver: avoid proxy if networktype of target add…
eshitachandwani May 7, 2025
b2c9036
[alts] add keepalive params to the alts handshaker client dial option…
rockspore May 7, 2025
22cf9e1
weightedroundrobin: Remove nil embedded SubConn from endpointWeight (…
arjan-bal May 7, 2025
2d3f20d
transport: skip Status.Proto() without details in writeStatus (#8282)
evanj May 7, 2025
b94efb5
grpc: Update ClientStream.CloseSend docs (#8292)
arjan-bal May 7, 2025
d05b590
xds: modify generic clients grpctransport to accept optional custom g…
purnesh42H May 8, 2025
c127d5f
resolver/delegatingresolver: wait for proxy resolver to be built in t…
eshitachandwani May 8, 2025
21da04b
xds: add MetricsReporter for generic xds client (#8274)
purnesh42H May 8, 2025
909bf84
credentials/local: implement ValidateAuthority (#8291)
eshitachandwani May 8, 2025
e54e81b
health: Add List method to gRPC Health service (#8155)
marcoshuck May 9, 2025
d21bc12
Update CONTRIBUTING.md (#8300)
dfawley May 9, 2025
cfaf50d
cleanup: replace dial with newclient (#8196)
janardhanvissa May 12, 2025
b422fce
Fixed the review changes
vinothkumarr227 May 26, 2025
605c49d
Merge branch 'master' into fix-window-size-bdp
vinothkumarr227 May 26, 2025
7a77cde
small tweaks
vinothkumarr227 May 26, 2025
127227a
small tweaks
vinothkumarr227 May 26, 2025
ac71fae
Fixed the test cases issues
vinothkumarr227 May 27, 2025
e5b7e2b
Fixed the review changes
vinothkumarr227 May 27, 2025
d3a6773
small tweaks
vinothkumarr227 May 27, 2025
0b2d748
Added test cases
vinothkumarr227 May 27, 2025
a3c4701
Fixed the review changes
vinothkumarr227 May 28, 2025
9abcf34
Fixed the review changes
vinothkumarr227 May 28, 2025
838385e
Remove static
vinothkumarr227 May 29, 2025
7ba8859
Fixed the dynamic window size isssues
vinothkumarr227 May 29, 2025
382ef30
Fixed the test cases
vinothkumarr227 May 29, 2025
48c91da
Fixed the window size
vinothkumarr227 May 29, 2025
cf685db
Fixed the review changes
vinothkumarr227 May 30, 2025
35a2253
Fixed the test cases
vinothkumarr227 Jun 2, 2025
6ec9e7d
Fixed the review changes
vinothkumarr227 Jun 2, 2025
efe83f4
small tweaks
vinothkumarr227 Jun 2, 2025
6d355fb
Fixed the test cases
vinothkumarr227 Jun 2, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 21 additions & 0 deletions dialoptions.go
Original file line number Diff line number Diff line change
Expand Up @@ -213,6 +213,7 @@
func WithInitialWindowSize(s int32) DialOption {
return newFuncDialOption(func(o *dialOptions) {
o.copts.InitialWindowSize = s
o.copts.StaticWindowSize = false
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These (4) need to be true or you will be changing behavior, which is not what we want yet.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the clarification!, I've updated the code so that all four values are set to true

})
}

Expand All @@ -222,6 +223,26 @@
func WithInitialConnWindowSize(s int32) DialOption {
return newFuncDialOption(func(o *dialOptions) {
o.copts.InitialConnWindowSize = s
o.copts.StaticWindowSize = false
})
}

// WithStaticStreamWindowSize returns a DialOption which sets the initial
// stream window size to the value provided and disables dynamic flow control.
func WithStaticStreamWindowSize(s int32) DialOption {
return newFuncDialOption(func(o *dialOptions) {
o.copts.InitialWindowSize = s
o.copts.StaticWindowSize = true
})

Check warning on line 236 in dialoptions.go

View check run for this annotation

Codecov / codecov/patch

dialoptions.go#L232-L236

Added lines #L232 - L236 were not covered by tests
}

// WithStaticConnWindowSize returns a DialOption which sets the initial
// connection window size to the value provided and disables dynamic flow
// control.
func WithStaticConnWindowSize(s int32) DialOption {
return newFuncDialOption(func(o *dialOptions) {
o.copts.InitialConnWindowSize = s
o.copts.StaticWindowSize = true

Check warning on line 245 in dialoptions.go

View check run for this annotation

Codecov / codecov/patch

dialoptions.go#L242-L245

Added lines #L242 - L245 were not covered by tests
})
}

Expand Down
7 changes: 3 additions & 4 deletions internal/transport/http2_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -309,11 +309,10 @@ func NewHTTP2Client(connectCtx, ctx context.Context, addr resolver.Address, opts
scheme = "https"
}
}
dynamicWindow := true
icwz := int32(initialWindowSize)
if opts.InitialConnWindowSize >= defaultWindowSize {
icwz = opts.InitialConnWindowSize
dynamicWindow = false
opts.StaticWindowSize = false
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Don't alter the setting.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

}
writeBufSize := opts.WriteBufferSize
readBufSize := opts.ReadBufferSize
Expand Down Expand Up @@ -381,9 +380,9 @@ func NewHTTP2Client(connectCtx, ctx context.Context, addr resolver.Address, opts
t.controlBuf = newControlBuffer(t.ctxDone)
if opts.InitialWindowSize >= defaultWindowSize {
t.initialWindowSize = opts.InitialWindowSize
dynamicWindow = false
opts.StaticWindowSize = false
}
if dynamicWindow {
if opts.StaticWindowSize {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
if opts.StaticWindowSize {
if !opts.StaticWindowSize {

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

t.bdpEst = &bdpEstimator{
bdp: initialWindowSize,
updateFlowControl: t.updateFlowControl,
Expand Down
7 changes: 3 additions & 4 deletions internal/transport/http2_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -176,16 +176,15 @@ func NewServerTransport(conn net.Conn, config *ServerConfig) (_ ServerTransport,
Val: config.MaxStreams,
})
}
dynamicWindow := true
iwz := int32(initialWindowSize)
if config.InitialWindowSize >= defaultWindowSize {
iwz = config.InitialWindowSize
dynamicWindow = false
config.StaticWindowSize = false
}
icwz := int32(initialWindowSize)
if config.InitialConnWindowSize >= defaultWindowSize {
icwz = config.InitialConnWindowSize
dynamicWindow = false
config.StaticWindowSize = false
}
if iwz != defaultWindowSize {
isettings = append(isettings, http2.Setting{
Expand Down Expand Up @@ -285,7 +284,7 @@ func NewServerTransport(conn net.Conn, config *ServerConfig) (_ ServerTransport,
t.logger = prefixLoggerForServerTransport(t)

t.controlBuf = newControlBuffer(t.done)
if dynamicWindow {
if config.StaticWindowSize {
t.bdpEst = &bdpEstimator{
bdp: initialWindowSize,
updateFlowControl: t.updateFlowControl,
Expand Down
3 changes: 3 additions & 0 deletions internal/transport/transport.go
Original file line number Diff line number Diff line change
Expand Up @@ -466,6 +466,7 @@ type ServerConfig struct {
MaxHeaderListSize *uint32
HeaderTableSize *uint32
BufferPool mem.BufferPool
StaticWindowSize bool
}

// ConnectOptions covers all relevant options for communicating with the server.
Expand Down Expand Up @@ -504,6 +505,8 @@ type ConnectOptions struct {
MaxHeaderListSize *uint32
// The mem.BufferPool to use when reading/writing to the wire.
BufferPool mem.BufferPool
// StaticWindowSize controls whether dynamic window sizing is enabled.
StaticWindowSize bool
}

// WriteOptions provides additional hints and information for message
Expand Down
26 changes: 26 additions & 0 deletions server.go
Original file line number Diff line number Diff line change
Expand Up @@ -179,6 +179,7 @@ type serverOptions struct {
numServerWorkers uint32
bufferPool mem.BufferPool
waitForHandlers bool
staticWindowSize bool
}

var defaultServerOptions = serverOptions{
Expand Down Expand Up @@ -279,6 +280,7 @@ func ReadBufferSize(s int) ServerOption {
func InitialWindowSize(s int32) ServerOption {
return newFuncServerOption(func(o *serverOptions) {
o.initialWindowSize = s
o.staticWindowSize = false
})
}

Expand All @@ -287,6 +289,29 @@ func InitialWindowSize(s int32) ServerOption {
func InitialConnWindowSize(s int32) ServerOption {
return newFuncServerOption(func(o *serverOptions) {
o.initialConnWindowSize = s
o.staticWindowSize = false
})
}

// StaticStreamWindowSize returns a ServerOption to set the initial stream
// window size to the value provided and disables dynamic flow control.
// The lower bound for window size is 64K and any value smaller than that
// will be ignored.
func StaticStreamWindowSize(s int32) ServerOption {
return newFuncServerOption(func(o *serverOptions) {
o.initialWindowSize = s
o.staticWindowSize = true
})
}

// StaticConnWindowSize returns a ServerOption to set the initial connection
// window size to the value provided and disables dynamic flow control.
// The lower bound for window size is 64K and any value smaller than that
// will be ignored.
func StaticConnWindowSize(s int32) ServerOption {
return newFuncServerOption(func(o *serverOptions) {
o.initialConnWindowSize = s
o.staticWindowSize = true
})
}

Expand Down Expand Up @@ -986,6 +1011,7 @@ func (s *Server) newHTTP2Transport(c net.Conn) transport.ServerTransport {
MaxHeaderListSize: s.opts.maxHeaderListSize,
HeaderTableSize: s.opts.headerTableSize,
BufferPool: s.opts.bufferPool,
StaticWindowSize: s.opts.staticWindowSize,
}
st, err := transport.NewServerTransport(c, config)
if err != nil {
Expand Down
8 changes: 8 additions & 0 deletions test/end2end_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4225,6 +4225,10 @@ func testClientInitialHeaderEndStream(t *testing.T, e env) {
// checking.
handlerDone := make(chan struct{})
te := newTest(t, e)
te.customServerOptions = append(te.customServerOptions,
grpc.StaticStreamWindowSize(64000),
grpc.StaticConnWindowSize(10000),
)
ts := &funcServer{streamingInputCall: func(stream testgrpc.TestService_StreamingInputCallServer) error {
defer close(handlerDone)
// Block on serverTester receiving RST_STREAM. This ensures server has closed
Expand Down Expand Up @@ -4269,6 +4273,10 @@ func testClientSendDataAfterCloseSend(t *testing.T, e env) {
// checking.
handlerDone := make(chan struct{})
te := newTest(t, e)
te.customServerOptions = append(te.customServerOptions,
grpc.StaticStreamWindowSize(64000),
grpc.StaticConnWindowSize(10000),
)
ts := &funcServer{streamingInputCall: func(stream testgrpc.TestService_StreamingInputCallServer) error {
defer close(handlerDone)
// Block on serverTester receiving RST_STREAM. This ensures server has closed
Expand Down