Skip to content

grpc: introduce new DialOption and ServerOption to configure initial window size without disabling BDP estimation. #8283

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 42 commits into from
Jun 2, 2025
Merged
Show file tree
Hide file tree
Changes from 34 commits
Commits
Show all changes
42 commits
Select commit Hold shift + click to select a range
0c8d8c3
Add new Dial/ServerOptions to set the size
vinothkumarr227 May 2, 2025
3f776f2
Added test and fix vet errors
vinothkumarr227 May 2, 2025
a55c070
small tweaks
vinothkumarr227 May 2, 2025
21f3aae
Added test
vinothkumarr227 May 5, 2025
68f1322
Fixed the review changes
vinothkumarr227 May 13, 2025
699cb70
credentials, transport, grpc : add a call option to override the :aut…
eshitachandwani Apr 30, 2025
ab9e865
xds: generic lrs client for load reporting (#8250)
purnesh42H May 5, 2025
e72ed5b
otel: Test streaming rpc sequence numbers (#8272)
vinothkumarr227 May 5, 2025
541c8ce
[alts] Add plumbing for the bound access token field in the ALTS Star…
matthewstevenson88 May 5, 2025
5f9eeb4
stats/opentelemetry: separate out interceptors for tracing and metric…
janardhanvissa May 5, 2025
2cb07fd
xds_test: Wait for server to enter serving mode in RBAC test (#8287)
arjan-bal May 5, 2025
d077278
internal/delegatingresolver: avoid proxy if networktype of target add…
eshitachandwani May 7, 2025
b2c9036
[alts] add keepalive params to the alts handshaker client dial option…
rockspore May 7, 2025
22cf9e1
weightedroundrobin: Remove nil embedded SubConn from endpointWeight (…
arjan-bal May 7, 2025
2d3f20d
transport: skip Status.Proto() without details in writeStatus (#8282)
evanj May 7, 2025
b94efb5
grpc: Update ClientStream.CloseSend docs (#8292)
arjan-bal May 7, 2025
d05b590
xds: modify generic clients grpctransport to accept optional custom g…
purnesh42H May 8, 2025
c127d5f
resolver/delegatingresolver: wait for proxy resolver to be built in t…
eshitachandwani May 8, 2025
21da04b
xds: add MetricsReporter for generic xds client (#8274)
purnesh42H May 8, 2025
909bf84
credentials/local: implement ValidateAuthority (#8291)
eshitachandwani May 8, 2025
e54e81b
health: Add List method to gRPC Health service (#8155)
marcoshuck May 9, 2025
d21bc12
Update CONTRIBUTING.md (#8300)
dfawley May 9, 2025
cfaf50d
cleanup: replace dial with newclient (#8196)
janardhanvissa May 12, 2025
b422fce
Fixed the review changes
vinothkumarr227 May 26, 2025
605c49d
Merge branch 'master' into fix-window-size-bdp
vinothkumarr227 May 26, 2025
7a77cde
small tweaks
vinothkumarr227 May 26, 2025
127227a
small tweaks
vinothkumarr227 May 26, 2025
ac71fae
Fixed the test cases issues
vinothkumarr227 May 27, 2025
e5b7e2b
Fixed the review changes
vinothkumarr227 May 27, 2025
d3a6773
small tweaks
vinothkumarr227 May 27, 2025
0b2d748
Added test cases
vinothkumarr227 May 27, 2025
a3c4701
Fixed the review changes
vinothkumarr227 May 28, 2025
9abcf34
Fixed the review changes
vinothkumarr227 May 28, 2025
838385e
Remove static
vinothkumarr227 May 29, 2025
7ba8859
Fixed the dynamic window size isssues
vinothkumarr227 May 29, 2025
382ef30
Fixed the test cases
vinothkumarr227 May 29, 2025
48c91da
Fixed the window size
vinothkumarr227 May 29, 2025
cf685db
Fixed the review changes
vinothkumarr227 May 30, 2025
35a2253
Fixed the test cases
vinothkumarr227 Jun 2, 2025
6ec9e7d
Fixed the review changes
vinothkumarr227 Jun 2, 2025
efe83f4
small tweaks
vinothkumarr227 Jun 2, 2025
6d355fb
Fixed the test cases
vinothkumarr227 Jun 2, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 21 additions & 0 deletions dialoptions.go
Original file line number Diff line number Diff line change
Expand Up @@ -213,6 +213,7 @@
func WithInitialWindowSize(s int32) DialOption {
return newFuncDialOption(func(o *dialOptions) {
o.copts.InitialWindowSize = s
o.copts.UseDynamicWindowSizing = false
})
}

Expand All @@ -222,6 +223,26 @@
func WithInitialConnWindowSize(s int32) DialOption {
return newFuncDialOption(func(o *dialOptions) {
o.copts.InitialConnWindowSize = s
o.copts.UseDynamicWindowSizing = false
})
}

// WithStaticStreamWindowSize returns a DialOption which sets the initial
// stream window size to the value provided and disables dynamic flow control.
func WithStaticStreamWindowSize(s int32) DialOption {
return newFuncDialOption(func(o *dialOptions) {
o.copts.InitialWindowSize = s
o.copts.UseDynamicWindowSizing = false
})

Check warning on line 236 in dialoptions.go

View check run for this annotation

Codecov / codecov/patch

dialoptions.go#L232-L236

Added lines #L232 - L236 were not covered by tests
}

// WithStaticConnWindowSize returns a DialOption which sets the initial
// connection window size to the value provided and disables dynamic flow
// control.
func WithStaticConnWindowSize(s int32) DialOption {
return newFuncDialOption(func(o *dialOptions) {
o.copts.InitialConnWindowSize = s
o.copts.UseDynamicWindowSizing = false

Check warning on line 245 in dialoptions.go

View check run for this annotation

Codecov / codecov/patch

dialoptions.go#L242-L245

Added lines #L242 - L245 were not covered by tests
})
}

Expand Down
4 changes: 2 additions & 2 deletions internal/transport/http2_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -313,7 +313,7 @@ func NewHTTP2Client(connectCtx, ctx context.Context, addr resolver.Address, opts
icwz := int32(initialWindowSize)
if opts.InitialConnWindowSize >= defaultWindowSize {
icwz = opts.InitialConnWindowSize
dynamicWindow = false
dynamicWindow = opts.UseDynamicWindowSizing
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This logic is wrong now.

dynamicWindow can be removed, and opts.Use... can be used in its place. Same on the server.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, I've updated the code accordingly

writeBufSize := opts.WriteBufferSize
readBufSize := opts.ReadBufferSize
Expand Down Expand Up @@ -381,7 +381,7 @@ func NewHTTP2Client(connectCtx, ctx context.Context, addr resolver.Address, opts
t.controlBuf = newControlBuffer(t.ctxDone)
if opts.InitialWindowSize >= defaultWindowSize {
t.initialWindowSize = opts.InitialWindowSize
dynamicWindow = false
dynamicWindow = opts.UseDynamicWindowSizing
}
if dynamicWindow {
t.bdpEst = &bdpEstimator{
Expand Down
4 changes: 2 additions & 2 deletions internal/transport/http2_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -180,12 +180,12 @@ func NewServerTransport(conn net.Conn, config *ServerConfig) (_ ServerTransport,
iwz := int32(initialWindowSize)
if config.InitialWindowSize >= defaultWindowSize {
iwz = config.InitialWindowSize
dynamicWindow = false
dynamicWindow = config.UseDynamicWindowSizing
}
icwz := int32(initialWindowSize)
if config.InitialConnWindowSize >= defaultWindowSize {
icwz = config.InitialConnWindowSize
dynamicWindow = false
dynamicWindow = config.UseDynamicWindowSizing
}
if iwz != defaultWindowSize {
isettings = append(isettings, http2.Setting{
Expand Down
35 changes: 19 additions & 16 deletions internal/transport/transport.go
Original file line number Diff line number Diff line change
Expand Up @@ -450,22 +450,23 @@ const (

// ServerConfig consists of all the configurations to establish a server transport.
type ServerConfig struct {
MaxStreams uint32
ConnectionTimeout time.Duration
Credentials credentials.TransportCredentials
InTapHandle tap.ServerInHandle
StatsHandlers []stats.Handler
KeepaliveParams keepalive.ServerParameters
KeepalivePolicy keepalive.EnforcementPolicy
InitialWindowSize int32
InitialConnWindowSize int32
WriteBufferSize int
ReadBufferSize int
SharedWriteBuffer bool
ChannelzParent *channelz.Server
MaxHeaderListSize *uint32
HeaderTableSize *uint32
BufferPool mem.BufferPool
MaxStreams uint32
ConnectionTimeout time.Duration
Credentials credentials.TransportCredentials
InTapHandle tap.ServerInHandle
StatsHandlers []stats.Handler
KeepaliveParams keepalive.ServerParameters
KeepalivePolicy keepalive.EnforcementPolicy
InitialWindowSize int32
InitialConnWindowSize int32
WriteBufferSize int
ReadBufferSize int
SharedWriteBuffer bool
ChannelzParent *channelz.Server
MaxHeaderListSize *uint32
HeaderTableSize *uint32
BufferPool mem.BufferPool
UseDynamicWindowSizing bool
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The problem with this name is the default behavior (disabled) is not our current default (enabled).

So let's rename these to StaticWindowSize please.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the feedback!, I've renamed it to StaticWindowSize for clarity and to better reflect the actual default behavior. The code has been updated accordingly.

}

// ConnectOptions covers all relevant options for communicating with the server.
Expand Down Expand Up @@ -504,6 +505,8 @@ type ConnectOptions struct {
MaxHeaderListSize *uint32
// The mem.BufferPool to use when reading/writing to the wire.
BufferPool mem.BufferPool
// UseDynamicWindowSizing controls whether dynamic window sizing is enabled.
UseDynamicWindowSizing bool
}

// WriteOptions provides additional hints and information for message
Expand Down
114 changes: 70 additions & 44 deletions server.go
Original file line number Diff line number Diff line change
Expand Up @@ -151,34 +151,35 @@
}

type serverOptions struct {
creds credentials.TransportCredentials
codec baseCodec
cp Compressor
dc Decompressor
unaryInt UnaryServerInterceptor
streamInt StreamServerInterceptor
chainUnaryInts []UnaryServerInterceptor
chainStreamInts []StreamServerInterceptor
binaryLogger binarylog.Logger
inTapHandle tap.ServerInHandle
statsHandlers []stats.Handler
maxConcurrentStreams uint32
maxReceiveMessageSize int
maxSendMessageSize int
unknownStreamDesc *StreamDesc
keepaliveParams keepalive.ServerParameters
keepalivePolicy keepalive.EnforcementPolicy
initialWindowSize int32
initialConnWindowSize int32
writeBufferSize int
readBufferSize int
sharedWriteBuffer bool
connectionTimeout time.Duration
maxHeaderListSize *uint32
headerTableSize *uint32
numServerWorkers uint32
bufferPool mem.BufferPool
waitForHandlers bool
creds credentials.TransportCredentials
codec baseCodec
cp Compressor
dc Decompressor
unaryInt UnaryServerInterceptor
streamInt StreamServerInterceptor
chainUnaryInts []UnaryServerInterceptor
chainStreamInts []StreamServerInterceptor
binaryLogger binarylog.Logger
inTapHandle tap.ServerInHandle
statsHandlers []stats.Handler
maxConcurrentStreams uint32
maxReceiveMessageSize int
maxSendMessageSize int
unknownStreamDesc *StreamDesc
keepaliveParams keepalive.ServerParameters
keepalivePolicy keepalive.EnforcementPolicy
initialWindowSize int32
initialConnWindowSize int32
writeBufferSize int
readBufferSize int
sharedWriteBuffer bool
connectionTimeout time.Duration
maxHeaderListSize *uint32
headerTableSize *uint32
numServerWorkers uint32
bufferPool mem.BufferPool
waitForHandlers bool
useDynamicWindowSizing bool
}

var defaultServerOptions = serverOptions{
Expand Down Expand Up @@ -279,6 +280,7 @@
func InitialWindowSize(s int32) ServerOption {
return newFuncServerOption(func(o *serverOptions) {
o.initialWindowSize = s
o.useDynamicWindowSizing = false
})
}

Expand All @@ -287,6 +289,29 @@
func InitialConnWindowSize(s int32) ServerOption {
return newFuncServerOption(func(o *serverOptions) {
o.initialConnWindowSize = s
o.useDynamicWindowSizing = false
})
}

// StaticStreamWindowSize returns a ServerOption to set the initial stream
// window size to the value provided and disables dynamic flow control.
// The lower bound for window size is 64K and any value smaller than that
// will be ignored.
func StaticStreamWindowSize(s int32) ServerOption {
return newFuncServerOption(func(o *serverOptions) {
o.initialWindowSize = s
o.useDynamicWindowSizing = false
})

Check warning on line 304 in server.go

View check run for this annotation

Codecov / codecov/patch

server.go#L300-L304

Added lines #L300 - L304 were not covered by tests
}

// StaticConnWindowSize returns a ServerOption to set the initial connection
// window size to the value provided and disables dynamic flow control.
// The lower bound for window size is 64K and any value smaller than that
// will be ignored.
func StaticConnWindowSize(s int32) ServerOption {
return newFuncServerOption(func(o *serverOptions) {
o.initialConnWindowSize = s
o.useDynamicWindowSizing = false

Check warning on line 314 in server.go

View check run for this annotation

Codecov / codecov/patch

server.go#L311-L314

Added lines #L311 - L314 were not covered by tests
})
}

Expand Down Expand Up @@ -970,22 +995,23 @@
// gRPC http2 server transport in transport/http2_server.go).
func (s *Server) newHTTP2Transport(c net.Conn) transport.ServerTransport {
config := &transport.ServerConfig{
MaxStreams: s.opts.maxConcurrentStreams,
ConnectionTimeout: s.opts.connectionTimeout,
Credentials: s.opts.creds,
InTapHandle: s.opts.inTapHandle,
StatsHandlers: s.opts.statsHandlers,
KeepaliveParams: s.opts.keepaliveParams,
KeepalivePolicy: s.opts.keepalivePolicy,
InitialWindowSize: s.opts.initialWindowSize,
InitialConnWindowSize: s.opts.initialConnWindowSize,
WriteBufferSize: s.opts.writeBufferSize,
ReadBufferSize: s.opts.readBufferSize,
SharedWriteBuffer: s.opts.sharedWriteBuffer,
ChannelzParent: s.channelz,
MaxHeaderListSize: s.opts.maxHeaderListSize,
HeaderTableSize: s.opts.headerTableSize,
BufferPool: s.opts.bufferPool,
MaxStreams: s.opts.maxConcurrentStreams,
ConnectionTimeout: s.opts.connectionTimeout,
Credentials: s.opts.creds,
InTapHandle: s.opts.inTapHandle,
StatsHandlers: s.opts.statsHandlers,
KeepaliveParams: s.opts.keepaliveParams,
KeepalivePolicy: s.opts.keepalivePolicy,
InitialWindowSize: s.opts.initialWindowSize,
InitialConnWindowSize: s.opts.initialConnWindowSize,
WriteBufferSize: s.opts.writeBufferSize,
ReadBufferSize: s.opts.readBufferSize,
SharedWriteBuffer: s.opts.sharedWriteBuffer,
ChannelzParent: s.channelz,
MaxHeaderListSize: s.opts.maxHeaderListSize,
HeaderTableSize: s.opts.headerTableSize,
BufferPool: s.opts.bufferPool,
UseDynamicWindowSizing: s.opts.useDynamicWindowSizing,
}
st, err := transport.NewServerTransport(c, config)
if err != nil {
Expand Down
Loading