From 8ca335c22af872566b0b939d4e0156ca30a07734 Mon Sep 17 00:00:00 2001 From: Mahak Mukhi Date: Fri, 28 Apr 2017 11:24:30 -0700 Subject: [PATCH 1/6] Make window size configurable. --- clientconn.go | 14 ++++ server.go | 58 ++++++++++------ test/end2end_test.go | 131 ++++++++++++++++++++++++++++++++---- transport/http2_client.go | 62 ++++++++++------- transport/http2_server.go | 65 +++++++++++------- transport/transport.go | 18 +++-- transport/transport_test.go | 104 ++++++++++++++++++++++++++++ 7 files changed, 361 insertions(+), 91 deletions(-) diff --git a/clientconn.go b/clientconn.go index f542d8bd0417..36727f4541b2 100644 --- a/clientconn.go +++ b/clientconn.go @@ -106,6 +106,20 @@ const defaultClientMaxMsgSize = math.MaxInt32 // DialOption configures how we set up the connection. type DialOption func(*dialOptions) +// WithInitialWindowSize returns a DialOption which sets the value for initial window size on a stream. +func WithInitialWindowSize(s int32) DialOption { + return func(o *dialOptions) { + o.copts.InitialWindowSize = s + } +} + +// WithInitialConnWindowSize returns a DialOption which sets the value for initial window size on a connection. +func WithInitialConnWindowSize(s int32) DialOption { + return func(o *dialOptions) { + o.copts.InitialConnWindowSize = s + } +} + // WithMaxMsgSize returns a DialOption which sets the maximum message size the client can receive. func WithMaxMsgSize(s int) DialOption { return func(o *dialOptions) { diff --git a/server.go b/server.go index b15f71c6c182..fa4796f7251b 100644 --- a/server.go +++ b/server.go @@ -107,20 +107,22 @@ type Server struct { } type options struct { - creds credentials.TransportCredentials - codec Codec - cp Compressor - dc Decompressor - maxMsgSize int - unaryInt UnaryServerInterceptor - streamInt StreamServerInterceptor - inTapHandle tap.ServerInHandle - statsHandler stats.Handler - maxConcurrentStreams uint32 - useHandlerImpl bool // use http.Handler-based server - unknownStreamDesc *StreamDesc - keepaliveParams keepalive.ServerParameters - keepalivePolicy keepalive.EnforcementPolicy + creds credentials.TransportCredentials + codec Codec + cp Compressor + dc Decompressor + maxMsgSize int + unaryInt UnaryServerInterceptor + streamInt StreamServerInterceptor + inTapHandle tap.ServerInHandle + statsHandler stats.Handler + maxConcurrentStreams uint32 + useHandlerImpl bool // use http.Handler-based server + unknownStreamDesc *StreamDesc + keepaliveParams keepalive.ServerParameters + keepalivePolicy keepalive.EnforcementPolicy + initialWindowSize int32 + initialConnWindowSize int32 } var defaultMaxMsgSize = 1024 * 1024 * 4 // use 4MB as the default message size limit @@ -128,6 +130,20 @@ var defaultMaxMsgSize = 1024 * 1024 * 4 // use 4MB as the default message size l // A ServerOption sets options. type ServerOption func(*options) +// InitialWindowSize returns a ServerOption that sets window size for stream. +func InitialWindowSize(s int32) ServerOption { + return func(o *options) { + o.initialWindowSize = s + } +} + +// InitialConnWindowSize returns a ServerOption that sets window size for a connection. +func InitialConnWindowSize(s int32) ServerOption { + return func(o *options) { + o.initialConnWindowSize = s + } +} + // KeepaliveParams returns a ServerOption that sets keepalive and max-age parameters for the server. func KeepaliveParams(kp keepalive.ServerParameters) ServerOption { return func(o *options) { @@ -483,12 +499,14 @@ func (s *Server) handleRawConn(rawConn net.Conn) { // transport.NewServerTransport). func (s *Server) serveHTTP2Transport(c net.Conn, authInfo credentials.AuthInfo) { config := &transport.ServerConfig{ - MaxStreams: s.opts.maxConcurrentStreams, - AuthInfo: authInfo, - InTapHandle: s.opts.inTapHandle, - StatsHandler: s.opts.statsHandler, - KeepaliveParams: s.opts.keepaliveParams, - KeepalivePolicy: s.opts.keepalivePolicy, + MaxStreams: s.opts.maxConcurrentStreams, + AuthInfo: authInfo, + InTapHandle: s.opts.inTapHandle, + StatsHandler: s.opts.statsHandler, + KeepaliveParams: s.opts.keepaliveParams, + KeepalivePolicy: s.opts.keepalivePolicy, + InitialWindowSize: s.opts.initialWindowSize, + InitialConnWindowSize: s.opts.initialConnWindowSize, } st, err := transport.NewServerTransport("http2", c, config) if err != nil { diff --git a/test/end2end_test.go b/test/end2end_test.go index fd77cd7c57ba..d02d04f7f6e9 100644 --- a/test/end2end_test.go +++ b/test/end2end_test.go @@ -429,20 +429,24 @@ type test struct { cancel context.CancelFunc // Configurable knobs, after newTest returns: - testServer testpb.TestServiceServer // nil means none - healthServer *health.Server // nil means disabled - maxStream uint32 - tapHandle tap.ServerInHandle - maxMsgSize int - userAgent string - clientCompression bool - serverCompression bool - unaryClientInt grpc.UnaryClientInterceptor - streamClientInt grpc.StreamClientInterceptor - unaryServerInt grpc.UnaryServerInterceptor - streamServerInt grpc.StreamServerInterceptor - unknownHandler grpc.StreamHandler - sc <-chan grpc.ServiceConfig + testServer testpb.TestServiceServer // nil means none + healthServer *health.Server // nil means disabled + maxStream uint32 + tapHandle tap.ServerInHandle + maxMsgSize int + userAgent string + clientCompression bool + serverCompression bool + unaryClientInt grpc.UnaryClientInterceptor + streamClientInt grpc.StreamClientInterceptor + unaryServerInt grpc.UnaryServerInterceptor + streamServerInt grpc.StreamServerInterceptor + unknownHandler grpc.StreamHandler + sc <-chan grpc.ServiceConfig + serverInitialWindowSize int32 + serverInitialConnWindowSize int32 + clientInitialWindowSize int32 + clientInitialConnWindowSize int32 // srv and srvAddr are set once startServer is called. srv *grpc.Server @@ -510,6 +514,12 @@ func (te *test) startServer(ts testpb.TestServiceServer) { if te.unknownHandler != nil { sopts = append(sopts, grpc.UnknownServiceHandler(te.unknownHandler)) } + if te.serverInitialWindowSize > 0 { + sopts = append(sopts, grpc.InitialWindowSize(te.serverInitialWindowSize)) + } + if te.serverInitialConnWindowSize > 0 { + sopts = append(sopts, grpc.InitialConnWindowSize(te.serverInitialConnWindowSize)) + } la := "localhost:0" switch te.e.network { case "unix": @@ -603,6 +613,12 @@ func (te *test) clientConn() *grpc.ClientConn { if te.e.balancer { opts = append(opts, grpc.WithBalancer(grpc.RoundRobin(nil))) } + if te.clientInitialWindowSize > 0 { + opts = append(opts, grpc.WithInitialWindowSize(te.clientInitialWindowSize)) + } + if te.clientInitialConnWindowSize > 0 { + opts = append(opts, grpc.WithInitialConnWindowSize(te.clientInitialConnWindowSize)) + } var err error te.cc, err = grpc.Dial(te.srvAddr, opts...) if err != nil { @@ -3879,3 +3895,90 @@ func TestStreamingProxyDoesNotForwardMetadata(t *testing.T) { t.Fatalf("doFDC(_, proxy.client) = %v; want nil", err) } } + +type windowSizeConfig struct { + serverStream int32 + serverConn int32 + clientStream int32 + clientConn int32 +} + +func max(a, b int32) int32 { + if a > b { + return a + } + return b +} + +func TestConfigurableWindowSizeWithLargeWindow(t *testing.T) { + defer leakCheck(t)() + wc := windowSizeConfig{ + serverStream: 16 * 1024 * 1024, + serverConn: 32 * 1024 * 1024, + clientStream: 4 * 1024, + clientConn: 8 * 1024 * 1024, + } + for _, e := range listTestEnv() { + testConfigurableWindowSize(t, e, wc) + } +} + +func TestConfigurableWindowSizeWithSmallWindow(t *testing.T) { + defer leakCheck(t)() + wc := windowSizeConfig{ + serverStream: 1, + serverConn: 1, + clientStream: 1, + clientConn: 1, + } + for _, e := range listTestEnv() { + testConfigurableWindowSize(t, e, wc) + } +} + +func testConfigurableWindowSize(t *testing.T, e env, wc windowSizeConfig) { + te := newTest(t, e) + te.serverInitialWindowSize = wc.serverStream + te.serverInitialConnWindowSize = wc.serverConn + te.clientInitialWindowSize = wc.clientStream + te.clientInitialConnWindowSize = wc.clientConn + + te.startServer(&testServer{security: e.security}) + defer te.tearDown() + + cc := te.clientConn() + tc := testpb.NewTestServiceClient(cc) + stream, err := tc.FullDuplexCall(context.Background()) + if err != nil { + t.Fatalf("%v.FullDuplexCall(_) = _, %v, want ", tc, err) + } + numOfIter := 11 + // Set message size to exhaust largest of window sizes. + messageSize := max(max(wc.serverStream, wc.serverConn), max(wc.clientStream, wc.clientConn)) / int32(numOfIter-1) + messageSize = max(messageSize, 64*1024) + payload, err := newPayload(testpb.PayloadType_COMPRESSABLE, messageSize) + if err != nil { + t.Fatal(err) + } + respParams := []*testpb.ResponseParameters{ + { + Size: proto.Int32(messageSize), + }, + } + req := &testpb.StreamingOutputCallRequest{ + ResponseType: testpb.PayloadType_COMPRESSABLE.Enum(), + ResponseParameters: respParams, + Payload: payload, + } + for i := 0; i < numOfIter; i++ { + if err := stream.Send(req); err != nil { + t.Fatalf("%v.Send(%v) = %v, want ", stream, req, err) + } + if _, err := stream.Recv(); err != nil { + t.Fatalf("%v.Recv() = _, %v, want _, ", stream, err) + } + } + if err := stream.CloseSend(); err != nil { + t.Fatalf("%v.CloseSend() = %v, want ", stream, err) + } +} diff --git a/transport/http2_client.go b/transport/http2_client.go index 67a79459c0a0..d72880da9dec 100644 --- a/transport/http2_client.go +++ b/transport/http2_client.go @@ -110,6 +110,9 @@ type http2Client struct { statsHandler stats.Handler + initialWindowSize int32 + initialConnWindowSize int32 + mu sync.Mutex // guard the following variables state transportState // the state of underlying connection activeStreams map[uint32]*Stream @@ -209,27 +212,36 @@ func newHTTP2Client(ctx context.Context, addr TargetInfo, opts ConnectOptions) ( localAddr: conn.LocalAddr(), authInfo: authInfo, // The client initiated stream id is odd starting from 1. - nextID: 1, - writableChan: make(chan int, 1), - shutdownChan: make(chan struct{}), - errorChan: make(chan struct{}), - goAway: make(chan struct{}), - awakenKeepalive: make(chan struct{}, 1), - framer: newFramer(conn), - hBuf: &buf, - hEnc: hpack.NewEncoder(&buf), - controlBuf: newRecvBuffer(), - fc: &inFlow{limit: initialConnWindowSize}, - sendQuotaPool: newQuotaPool(defaultWindowSize), - scheme: scheme, - state: reachable, - activeStreams: make(map[uint32]*Stream), - creds: opts.PerRPCCredentials, - maxStreams: defaultMaxStreamsClient, - streamsQuota: newQuotaPool(defaultMaxStreamsClient), - streamSendQuota: defaultWindowSize, - kp: kp, - statsHandler: opts.StatsHandler, + nextID: 1, + writableChan: make(chan int, 1), + shutdownChan: make(chan struct{}), + errorChan: make(chan struct{}), + goAway: make(chan struct{}), + awakenKeepalive: make(chan struct{}, 1), + framer: newFramer(conn), + hBuf: &buf, + hEnc: hpack.NewEncoder(&buf), + controlBuf: newRecvBuffer(), + fc: &inFlow{limit: initialConnWindowSize}, + sendQuotaPool: newQuotaPool(defaultWindowSize), + scheme: scheme, + state: reachable, + activeStreams: make(map[uint32]*Stream), + creds: opts.PerRPCCredentials, + maxStreams: defaultMaxStreamsClient, + streamsQuota: newQuotaPool(defaultMaxStreamsClient), + streamSendQuota: defaultWindowSize, + kp: kp, + statsHandler: opts.StatsHandler, + initialWindowSize: initialWindowSize, + initialConnWindowSize: initialConnWindowSize, + } + if opts.InitialWindowSize >= defaultWindowSize { + t.initialWindowSize = opts.InitialWindowSize + } + if opts.InitialConnWindowSize >= defaultWindowSize { + t.initialConnWindowSize = opts.InitialConnWindowSize + t.fc.limit = uint32(t.initialConnWindowSize) } // Make sure awakenKeepalive can't be written upon. // keepalive routine will make it writable, if need be. @@ -258,10 +270,10 @@ func newHTTP2Client(ctx context.Context, addr TargetInfo, opts ConnectOptions) ( t.Close() return nil, connectionErrorf(true, err, "transport: preface mismatch, wrote %d bytes; want %d", n, len(clientPreface)) } - if initialWindowSize != defaultWindowSize { + if t.initialWindowSize != defaultWindowSize { err = t.framer.writeSettings(true, http2.Setting{ ID: http2.SettingInitialWindowSize, - Val: uint32(initialWindowSize), + Val: uint32(t.initialWindowSize), }) } else { err = t.framer.writeSettings(true) @@ -271,7 +283,7 @@ func newHTTP2Client(ctx context.Context, addr TargetInfo, opts ConnectOptions) ( return nil, connectionErrorf(true, err, "transport: %v", err) } // Adjust the connection flow control window if needed. - if delta := uint32(initialConnWindowSize - defaultWindowSize); delta > 0 { + if delta := uint32(t.initialConnWindowSize - defaultWindowSize); delta > 0 { if err := t.framer.writeWindowUpdate(true, 0, delta); err != nil { t.Close() return nil, connectionErrorf(true, err, "transport: %v", err) @@ -294,7 +306,7 @@ func (t *http2Client) newStream(ctx context.Context, callHdr *CallHdr) *Stream { method: callHdr.Method, sendCompress: callHdr.SendCompress, buf: newRecvBuffer(), - fc: &inFlow{limit: initialWindowSize}, + fc: &inFlow{limit: uint32(t.initialWindowSize)}, sendQuotaPool: newQuotaPool(int(t.streamSendQuota)), headerChan: make(chan struct{}), } diff --git a/transport/http2_server.go b/transport/http2_server.go index 31fefc7bb7cc..436d2bbc55ce 100644 --- a/transport/http2_server.go +++ b/transport/http2_server.go @@ -113,6 +113,9 @@ type http2Server struct { // 1 means yes. resetPingStrikes uint32 // Accessed atomically. + initialWindowSize int32 + initialConnWindowSize int32 + mu sync.Mutex // guard the following state transportState activeStreams map[uint32]*Stream @@ -142,16 +145,24 @@ func newHTTP2Server(conn net.Conn, config *ServerConfig) (_ ServerTransport, err Val: maxStreams, }) } - if initialWindowSize != defaultWindowSize { + iwz := int32(initialWindowSize) + if config.InitialWindowSize >= defaultWindowSize { + iwz = config.InitialWindowSize + } + icwz := int32(initialConnWindowSize) + if config.InitialConnWindowSize >= defaultWindowSize { + icwz = config.InitialConnWindowSize + } + if iwz != defaultWindowSize { settings = append(settings, http2.Setting{ ID: http2.SettingInitialWindowSize, - Val: uint32(initialWindowSize)}) + Val: uint32(iwz)}) } if err := framer.writeSettings(true, settings...); err != nil { return nil, connectionErrorf(true, err, "transport: %v", err) } // Adjust the connection flow control window if needed. - if delta := uint32(initialConnWindowSize - defaultWindowSize); delta > 0 { + if delta := uint32(icwz - defaultWindowSize); delta > 0 { if err := framer.writeWindowUpdate(true, 0, delta); err != nil { return nil, connectionErrorf(true, err, "transport: %v", err) } @@ -180,28 +191,30 @@ func newHTTP2Server(conn net.Conn, config *ServerConfig) (_ ServerTransport, err } var buf bytes.Buffer t := &http2Server{ - ctx: context.Background(), - conn: conn, - remoteAddr: conn.RemoteAddr(), - localAddr: conn.LocalAddr(), - authInfo: config.AuthInfo, - framer: framer, - hBuf: &buf, - hEnc: hpack.NewEncoder(&buf), - maxStreams: maxStreams, - inTapHandle: config.InTapHandle, - controlBuf: newRecvBuffer(), - fc: &inFlow{limit: initialConnWindowSize}, - sendQuotaPool: newQuotaPool(defaultWindowSize), - state: reachable, - writableChan: make(chan int, 1), - shutdownChan: make(chan struct{}), - activeStreams: make(map[uint32]*Stream), - streamSendQuota: defaultWindowSize, - stats: config.StatsHandler, - kp: kp, - idle: time.Now(), - kep: kep, + ctx: context.Background(), + conn: conn, + remoteAddr: conn.RemoteAddr(), + localAddr: conn.LocalAddr(), + authInfo: config.AuthInfo, + framer: framer, + hBuf: &buf, + hEnc: hpack.NewEncoder(&buf), + maxStreams: maxStreams, + inTapHandle: config.InTapHandle, + controlBuf: newRecvBuffer(), + fc: &inFlow{limit: uint32(icwz)}, + sendQuotaPool: newQuotaPool(defaultWindowSize), + state: reachable, + writableChan: make(chan int, 1), + shutdownChan: make(chan struct{}), + activeStreams: make(map[uint32]*Stream), + streamSendQuota: defaultWindowSize, + stats: config.StatsHandler, + kp: kp, + idle: time.Now(), + kep: kep, + initialWindowSize: iwz, + initialConnWindowSize: icwz, } if t.stats != nil { t.ctx = t.stats.TagConn(t.ctx, &stats.ConnTagInfo{ @@ -224,7 +237,7 @@ func (t *http2Server) operateHeaders(frame *http2.MetaHeadersFrame, handle func( id: frame.Header().StreamID, st: t, buf: buf, - fc: &inFlow{limit: initialWindowSize}, + fc: &inFlow{limit: uint32(t.initialWindowSize)}, } var state decodeState diff --git a/transport/transport.go b/transport/transport.go index 88c2c9872129..c22333cf09c9 100644 --- a/transport/transport.go +++ b/transport/transport.go @@ -392,12 +392,14 @@ const ( // ServerConfig consists of all the configurations to establish a server transport. type ServerConfig struct { - MaxStreams uint32 - AuthInfo credentials.AuthInfo - InTapHandle tap.ServerInHandle - StatsHandler stats.Handler - KeepaliveParams keepalive.ServerParameters - KeepalivePolicy keepalive.EnforcementPolicy + MaxStreams uint32 + AuthInfo credentials.AuthInfo + InTapHandle tap.ServerInHandle + StatsHandler stats.Handler + KeepaliveParams keepalive.ServerParameters + KeepalivePolicy keepalive.EnforcementPolicy + InitialWindowSize int32 + InitialConnWindowSize int32 } // NewServerTransport creates a ServerTransport with conn or non-nil error @@ -425,6 +427,10 @@ type ConnectOptions struct { KeepaliveParams keepalive.ClientParameters // StatsHandler stores the handler for stats. StatsHandler stats.Handler + // InitialWindowSize sets the intial window size for a stream. + InitialWindowSize int32 + // InitialConnWindowSize sets the intial window size for a connection. + InitialConnWindowSize int32 } // TargetInfo contains the information of the target such as network address and metadata. diff --git a/transport/transport_test.go b/transport/transport_test.go index 4e986e56a210..bae8e68baa28 100644 --- a/transport/transport_test.go +++ b/transport/transport_test.go @@ -1266,3 +1266,107 @@ func TestContextErr(t *testing.T) { } } } + +type windowSizeConfig struct { + serverStream int32 + serverConn int32 + clientStream int32 + clientConn int32 +} + +func TestAccountCheckWindowSizeWithLargeWindow(t *testing.T) { + wc := windowSizeConfig{ + serverStream: 10 * 1024 * 1024, + serverConn: 12 * 1024 * 1024, + clientStream: 6 * 1024 * 1024, + clientConn: 8 * 1024 * 1024, + } + testAccountCheckWindowSize(t, wc) +} + +func TestAccountCheckWindowSizeWithSmallWindow(t *testing.T) { + wc := windowSizeConfig{ + serverStream: defaultWindowSize, + serverConn: defaultWindowSize, + clientStream: defaultWindowSize, + clientConn: defaultWindowSize, + } + testAccountCheckWindowSize(t, wc) +} + +func testAccountCheckWindowSize(t *testing.T, wc windowSizeConfig) { + serverConfig := &ServerConfig{ + InitialWindowSize: wc.serverStream, + InitialConnWindowSize: wc.serverConn, + } + connectOptions := ConnectOptions{ + InitialWindowSize: wc.clientStream, + InitialConnWindowSize: wc.clientConn, + } + server, client := setUpWithOptions(t, 0, serverConfig, suspended, connectOptions) + defer server.stop() + defer client.Close() + // Sleep for a second to make sure connection is established. + time.Sleep(time.Second) + var st *http2Server + server.mu.Lock() + for k := range server.conns { + st = k.(*http2Server) + } + server.mu.Unlock() + ct := client.(*http2Client) + if st.fc.limit != uint32(serverConfig.InitialConnWindowSize) { + t.Fatalf("Server transport flow control window size is %v, want %v", st.fc.limit, serverConfig.InitialConnWindowSize) + } + ctx, cancel := context.WithTimeout(context.Background(), time.Second) + serverSendQuota, err := wait(ctx, nil, nil, nil, st.sendQuotaPool.acquire()) + if err != nil { + t.Fatalf("Error while acquiring sendQuota on server. Err: %v", err) + } + cancel() + st.sendQuotaPool.add(serverSendQuota) + if serverSendQuota != int(connectOptions.InitialConnWindowSize) { + t.Fatalf("Server send quota(%v) not equal to client's window size(%v) on conn.", serverSendQuota, connectOptions.InitialConnWindowSize) + } + st.mu.Lock() + if st.streamSendQuota != uint32(connectOptions.InitialWindowSize) { + t.Fatalf("Server stream send quota(%v) not equal to client's window size(%v) on stream.", ct.streamSendQuota, connectOptions.InitialWindowSize) + } + st.mu.Unlock() + if ct.fc.limit != uint32(connectOptions.InitialConnWindowSize) { + t.Fatalf("Client transport flow control window size is %v, want %v", ct.fc.limit, connectOptions.InitialConnWindowSize) + } + ctx, cancel = context.WithTimeout(context.Background(), time.Second) + clientSendQuota, err := wait(ctx, nil, nil, nil, ct.sendQuotaPool.acquire()) + if err != nil { + t.Fatalf("Error while acquiring sendQuota on client. Err: %v", err) + } + cancel() + ct.sendQuotaPool.add(clientSendQuota) + if clientSendQuota != int(serverConfig.InitialConnWindowSize) { + t.Fatalf("Client send quota(%v) not equal to server's window size(%v) on conn.", clientSendQuota, serverConfig.InitialConnWindowSize) + } + ct.mu.Lock() + if ct.streamSendQuota != uint32(serverConfig.InitialWindowSize) { + t.Fatalf("Client stream send quota(%v) not equal to server's window size(%v) on stream.", ct.streamSendQuota, serverConfig.InitialWindowSize) + } + ct.mu.Unlock() + cstream, err := client.NewStream(context.Background(), &CallHdr{Flush: true}) + if err != nil { + t.Fatalf("Failed to create stream. Err: %v", err) + } + if cstream.fc.limit != uint32(connectOptions.InitialWindowSize) { + t.Fatalf("Client stream flow control window size is %v, want %v", cstream.fc.limit, connectOptions.InitialWindowSize) + } + // Sleep for a bit to make sure server received headers for the stream. + time.Sleep(time.Millisecond * 500) + var sstream *Stream + st.mu.Lock() + for _, v := range st.activeStreams { + sstream = v + } + st.mu.Unlock() + if sstream.fc.limit != uint32(serverConfig.InitialWindowSize) { + t.Fatalf("Server stream flow control window size is %v, want %v", sstream.fc.limit, serverConfig.InitialWindowSize) + } +} From af8faf3d6ffcfb5b9fbabc411a0e5b1eecf5c64d Mon Sep 17 00:00:00 2001 From: Mahak Mukhi Date: Fri, 28 Apr 2017 15:52:16 -0700 Subject: [PATCH 2/6] Post-review commit and bug fix in test. --- clientconn.go | 2 ++ server.go | 2 ++ test/end2end_test.go | 2 +- transport/http2_client.go | 58 +++++++++++++++++++-------------------- transport/http2_server.go | 50 ++++++++++++++++----------------- 5 files changed, 57 insertions(+), 57 deletions(-) diff --git a/clientconn.go b/clientconn.go index 36727f4541b2..315f9e8b43f5 100644 --- a/clientconn.go +++ b/clientconn.go @@ -107,6 +107,7 @@ const defaultClientMaxMsgSize = math.MaxInt32 type DialOption func(*dialOptions) // WithInitialWindowSize returns a DialOption which sets the value for initial window size on a stream. +// The lower bound for window size is 64K and any value smaller than that will be ignored. func WithInitialWindowSize(s int32) DialOption { return func(o *dialOptions) { o.copts.InitialWindowSize = s @@ -114,6 +115,7 @@ func WithInitialWindowSize(s int32) DialOption { } // WithInitialConnWindowSize returns a DialOption which sets the value for initial window size on a connection. +// The lower bound for window size is 64K and any value smaller than that will be ignored. func WithInitialConnWindowSize(s int32) DialOption { return func(o *dialOptions) { o.copts.InitialConnWindowSize = s diff --git a/server.go b/server.go index fa4796f7251b..31aa195dbe20 100644 --- a/server.go +++ b/server.go @@ -131,6 +131,7 @@ var defaultMaxMsgSize = 1024 * 1024 * 4 // use 4MB as the default message size l type ServerOption func(*options) // InitialWindowSize returns a ServerOption that sets window size for stream. +// The lower bound for window size is 64K and any value smaller than that will be ignored. func InitialWindowSize(s int32) ServerOption { return func(o *options) { o.initialWindowSize = s @@ -138,6 +139,7 @@ func InitialWindowSize(s int32) ServerOption { } // InitialConnWindowSize returns a ServerOption that sets window size for a connection. +// The lower bound for window size is 64K and any value smaller than that will be ignored. func InitialConnWindowSize(s int32) ServerOption { return func(o *options) { o.initialConnWindowSize = s diff --git a/test/end2end_test.go b/test/end2end_test.go index d02d04f7f6e9..4b48b538fef9 100644 --- a/test/end2end_test.go +++ b/test/end2end_test.go @@ -3915,7 +3915,7 @@ func TestConfigurableWindowSizeWithLargeWindow(t *testing.T) { wc := windowSizeConfig{ serverStream: 16 * 1024 * 1024, serverConn: 32 * 1024 * 1024, - clientStream: 4 * 1024, + clientStream: 4 * 1024 * 1024, clientConn: 8 * 1024 * 1024, } for _, e := range listTestEnv() { diff --git a/transport/http2_client.go b/transport/http2_client.go index d72880da9dec..5756929ba348 100644 --- a/transport/http2_client.go +++ b/transport/http2_client.go @@ -110,8 +110,7 @@ type http2Client struct { statsHandler stats.Handler - initialWindowSize int32 - initialConnWindowSize int32 + initialWindowSize int32 mu sync.Mutex // guard the following variables state transportState // the state of underlying connection @@ -201,6 +200,10 @@ func newHTTP2Client(ctx context.Context, addr TargetInfo, opts ConnectOptions) ( if kp.Timeout == 0 { kp.Timeout = defaultClientKeepaliveTimeout } + icwz := int32(initialConnWindowSize) + if opts.InitialConnWindowSize >= defaultWindowSize { + icwz = opts.InitialConnWindowSize + } var buf bytes.Buffer t := &http2Client{ ctx: ctx, @@ -212,37 +215,32 @@ func newHTTP2Client(ctx context.Context, addr TargetInfo, opts ConnectOptions) ( localAddr: conn.LocalAddr(), authInfo: authInfo, // The client initiated stream id is odd starting from 1. - nextID: 1, - writableChan: make(chan int, 1), - shutdownChan: make(chan struct{}), - errorChan: make(chan struct{}), - goAway: make(chan struct{}), - awakenKeepalive: make(chan struct{}, 1), - framer: newFramer(conn), - hBuf: &buf, - hEnc: hpack.NewEncoder(&buf), - controlBuf: newRecvBuffer(), - fc: &inFlow{limit: initialConnWindowSize}, - sendQuotaPool: newQuotaPool(defaultWindowSize), - scheme: scheme, - state: reachable, - activeStreams: make(map[uint32]*Stream), - creds: opts.PerRPCCredentials, - maxStreams: defaultMaxStreamsClient, - streamsQuota: newQuotaPool(defaultMaxStreamsClient), - streamSendQuota: defaultWindowSize, - kp: kp, - statsHandler: opts.StatsHandler, - initialWindowSize: initialWindowSize, - initialConnWindowSize: initialConnWindowSize, + nextID: 1, + writableChan: make(chan int, 1), + shutdownChan: make(chan struct{}), + errorChan: make(chan struct{}), + goAway: make(chan struct{}), + awakenKeepalive: make(chan struct{}, 1), + framer: newFramer(conn), + hBuf: &buf, + hEnc: hpack.NewEncoder(&buf), + controlBuf: newRecvBuffer(), + fc: &inFlow{limit: icwz}, + sendQuotaPool: newQuotaPool(defaultWindowSize), + scheme: scheme, + state: reachable, + activeStreams: make(map[uint32]*Stream), + creds: opts.PerRPCCredentials, + maxStreams: defaultMaxStreamsClient, + streamsQuota: newQuotaPool(defaultMaxStreamsClient), + streamSendQuota: defaultWindowSize, + kp: kp, + statsHandler: opts.StatsHandler, + initialWindowSize: initialWindowSize, } if opts.InitialWindowSize >= defaultWindowSize { t.initialWindowSize = opts.InitialWindowSize } - if opts.InitialConnWindowSize >= defaultWindowSize { - t.initialConnWindowSize = opts.InitialConnWindowSize - t.fc.limit = uint32(t.initialConnWindowSize) - } // Make sure awakenKeepalive can't be written upon. // keepalive routine will make it writable, if need be. t.awakenKeepalive <- struct{}{} @@ -283,7 +281,7 @@ func newHTTP2Client(ctx context.Context, addr TargetInfo, opts ConnectOptions) ( return nil, connectionErrorf(true, err, "transport: %v", err) } // Adjust the connection flow control window if needed. - if delta := uint32(t.initialConnWindowSize - defaultWindowSize); delta > 0 { + if delta := uint32(icwz - defaultWindowSize); delta > 0 { if err := t.framer.writeWindowUpdate(true, 0, delta); err != nil { t.Close() return nil, connectionErrorf(true, err, "transport: %v", err) diff --git a/transport/http2_server.go b/transport/http2_server.go index 436d2bbc55ce..586a8b3363ca 100644 --- a/transport/http2_server.go +++ b/transport/http2_server.go @@ -113,8 +113,7 @@ type http2Server struct { // 1 means yes. resetPingStrikes uint32 // Accessed atomically. - initialWindowSize int32 - initialConnWindowSize int32 + initialWindowSize int32 mu sync.Mutex // guard the following state transportState @@ -191,30 +190,29 @@ func newHTTP2Server(conn net.Conn, config *ServerConfig) (_ ServerTransport, err } var buf bytes.Buffer t := &http2Server{ - ctx: context.Background(), - conn: conn, - remoteAddr: conn.RemoteAddr(), - localAddr: conn.LocalAddr(), - authInfo: config.AuthInfo, - framer: framer, - hBuf: &buf, - hEnc: hpack.NewEncoder(&buf), - maxStreams: maxStreams, - inTapHandle: config.InTapHandle, - controlBuf: newRecvBuffer(), - fc: &inFlow{limit: uint32(icwz)}, - sendQuotaPool: newQuotaPool(defaultWindowSize), - state: reachable, - writableChan: make(chan int, 1), - shutdownChan: make(chan struct{}), - activeStreams: make(map[uint32]*Stream), - streamSendQuota: defaultWindowSize, - stats: config.StatsHandler, - kp: kp, - idle: time.Now(), - kep: kep, - initialWindowSize: iwz, - initialConnWindowSize: icwz, + ctx: context.Background(), + conn: conn, + remoteAddr: conn.RemoteAddr(), + localAddr: conn.LocalAddr(), + authInfo: config.AuthInfo, + framer: framer, + hBuf: &buf, + hEnc: hpack.NewEncoder(&buf), + maxStreams: maxStreams, + inTapHandle: config.InTapHandle, + controlBuf: newRecvBuffer(), + fc: &inFlow{limit: uint32(icwz)}, + sendQuotaPool: newQuotaPool(defaultWindowSize), + state: reachable, + writableChan: make(chan int, 1), + shutdownChan: make(chan struct{}), + activeStreams: make(map[uint32]*Stream), + streamSendQuota: defaultWindowSize, + stats: config.StatsHandler, + kp: kp, + idle: time.Now(), + kep: kep, + initialWindowSize: iwz, } if t.stats != nil { t.ctx = t.stats.TagConn(t.ctx, &stats.ConnTagInfo{ From 547d7587eb19965a01f59b99ee9c743353982e6f Mon Sep 17 00:00:00 2001 From: Mahak Mukhi Date: Fri, 28 Apr 2017 15:55:54 -0700 Subject: [PATCH 3/6] Forgot to add a file in last commit. --- transport/http2_client.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/transport/http2_client.go b/transport/http2_client.go index 5756929ba348..c16b8bca7959 100644 --- a/transport/http2_client.go +++ b/transport/http2_client.go @@ -225,7 +225,7 @@ func newHTTP2Client(ctx context.Context, addr TargetInfo, opts ConnectOptions) ( hBuf: &buf, hEnc: hpack.NewEncoder(&buf), controlBuf: newRecvBuffer(), - fc: &inFlow{limit: icwz}, + fc: &inFlow{limit: uint32(icwz)}, sendQuotaPool: newQuotaPool(defaultWindowSize), scheme: scheme, state: reachable, From 5dbf642d27593a11e6de4f89ba792e59baf1050a Mon Sep 17 00:00:00 2001 From: Mahak Mukhi Date: Fri, 28 Apr 2017 18:29:36 -0700 Subject: [PATCH 4/6] test fix --- test/end2end_test.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/test/end2end_test.go b/test/end2end_test.go index 4b48b538fef9..2234a5ad7852 100644 --- a/test/end2end_test.go +++ b/test/end2end_test.go @@ -3913,9 +3913,9 @@ func max(a, b int32) int32 { func TestConfigurableWindowSizeWithLargeWindow(t *testing.T) { defer leakCheck(t)() wc := windowSizeConfig{ - serverStream: 16 * 1024 * 1024, - serverConn: 32 * 1024 * 1024, - clientStream: 4 * 1024 * 1024, + serverStream: 8 * 1024 * 1024, + serverConn: 12 * 1024 * 1024, + clientStream: 6 * 1024 * 1024, clientConn: 8 * 1024 * 1024, } for _, e := range listTestEnv() { From 184fc1f11108c2b23e9ea33707929545213b2d8c Mon Sep 17 00:00:00 2001 From: Mahak Mukhi Date: Tue, 2 May 2017 18:30:24 -0700 Subject: [PATCH 5/6] Got rid of arbitrary sleep from tests --- transport/transport_test.go | 47 ++++++++++++++++++++++++++++++------- 1 file changed, 39 insertions(+), 8 deletions(-) diff --git a/transport/transport_test.go b/transport/transport_test.go index bae8e68baa28..bf7a80968526 100644 --- a/transport/transport_test.go +++ b/transport/transport_test.go @@ -1306,8 +1306,13 @@ func testAccountCheckWindowSize(t *testing.T, wc windowSizeConfig) { server, client := setUpWithOptions(t, 0, serverConfig, suspended, connectOptions) defer server.stop() defer client.Close() - // Sleep for a second to make sure connection is established. - time.Sleep(time.Second) + + // Wait for server conns to be populated with new server transport. + waitWhileTrue(t, func() bool { + server.mu.Lock() + defer server.mu.Unlock() + return len(server.conns) == 0 + }) var st *http2Server server.mu.Lock() for k := range server.conns { @@ -1315,6 +1320,17 @@ func testAccountCheckWindowSize(t *testing.T, wc windowSizeConfig) { } server.mu.Unlock() ct := client.(*http2Client) + cstream, err := client.NewStream(context.Background(), &CallHdr{Flush: true}) + if err != nil { + t.Fatalf("Failed to create stream. Err: %v", err) + } + // Wait for server to receive headers. + waitWhileTrue(t, func() bool { + st.mu.Lock() + defer st.mu.Unlock() + return len(st.activeStreams) == 0 + }) + if st.fc.limit != uint32(serverConfig.InitialConnWindowSize) { t.Fatalf("Server transport flow control window size is %v, want %v", st.fc.limit, serverConfig.InitialConnWindowSize) } @@ -1351,15 +1367,9 @@ func testAccountCheckWindowSize(t *testing.T, wc windowSizeConfig) { t.Fatalf("Client stream send quota(%v) not equal to server's window size(%v) on stream.", ct.streamSendQuota, serverConfig.InitialWindowSize) } ct.mu.Unlock() - cstream, err := client.NewStream(context.Background(), &CallHdr{Flush: true}) - if err != nil { - t.Fatalf("Failed to create stream. Err: %v", err) - } if cstream.fc.limit != uint32(connectOptions.InitialWindowSize) { t.Fatalf("Client stream flow control window size is %v, want %v", cstream.fc.limit, connectOptions.InitialWindowSize) } - // Sleep for a bit to make sure server received headers for the stream. - time.Sleep(time.Millisecond * 500) var sstream *Stream st.mu.Lock() for _, v := range st.activeStreams { @@ -1370,3 +1380,24 @@ func testAccountCheckWindowSize(t *testing.T, wc windowSizeConfig) { t.Fatalf("Server stream flow control window size is %v, want %v", sstream.fc.limit, serverConfig.InitialWindowSize) } } + +func waitWhileTrue(t *testing.T, condition func() bool) { + wait := false + timer := time.NewTimer(time.Second * 5) + for { + wait = condition() + if wait { + select { + case <-timer.C: + t.Fatalf("Test timed out waiting for settings frames to be exchanged.") + default: + time.Sleep(50 * time.Millisecond) + continue + } + } + if !timer.Stop() { + <-timer.C + } + break + } +} From 7b94f2fbb4c0f71877cd9283edfaae46133156b9 Mon Sep 17 00:00:00 2001 From: Mahak Mukhi Date: Wed, 3 May 2017 11:44:48 -0700 Subject: [PATCH 6/6] More deflaking --- transport/transport_test.go | 37 ++++++++++++++++++++++++++----------- 1 file changed, 26 insertions(+), 11 deletions(-) diff --git a/transport/transport_test.go b/transport/transport_test.go index bf7a80968526..7429f2e27457 100644 --- a/transport/transport_test.go +++ b/transport/transport_test.go @@ -1287,6 +1287,7 @@ func TestAccountCheckWindowSizeWithLargeWindow(t *testing.T) { func TestAccountCheckWindowSizeWithSmallWindow(t *testing.T) { wc := windowSizeConfig{ serverStream: defaultWindowSize, + // Note this is smaller than initialConnWindowSize which is the current default. serverConn: defaultWindowSize, clientStream: defaultWindowSize, clientConn: defaultWindowSize, @@ -1308,10 +1309,13 @@ func testAccountCheckWindowSize(t *testing.T, wc windowSizeConfig) { defer client.Close() // Wait for server conns to be populated with new server transport. - waitWhileTrue(t, func() bool { + waitWhileTrue(t, func() (bool, error) { server.mu.Lock() defer server.mu.Unlock() - return len(server.conns) == 0 + if len(server.conns) == 0 { + return true, fmt.Errorf("timed out waiting for server transport to be created") + } + return false, nil }) var st *http2Server server.mu.Lock() @@ -1325,15 +1329,23 @@ func testAccountCheckWindowSize(t *testing.T, wc windowSizeConfig) { t.Fatalf("Failed to create stream. Err: %v", err) } // Wait for server to receive headers. - waitWhileTrue(t, func() bool { + waitWhileTrue(t, func() (bool, error) { st.mu.Lock() defer st.mu.Unlock() - return len(st.activeStreams) == 0 + if len(st.activeStreams) == 0 { + return true, fmt.Errorf("timed out waiting for server to receive headers") + } + return false, nil }) + // Sleeping to make sure the settings are applied in case of negative test. + time.Sleep(time.Second) - if st.fc.limit != uint32(serverConfig.InitialConnWindowSize) { - t.Fatalf("Server transport flow control window size is %v, want %v", st.fc.limit, serverConfig.InitialConnWindowSize) - } + waitWhileTrue(t, func() (bool, error) { + if lim := st.fc.limit; lim != uint32(serverConfig.InitialConnWindowSize) { + return true, fmt.Errorf("Server transport flow control window size: got %v, want %v", lim, serverConfig.InitialConnWindowSize) + } + return false, nil + }) ctx, cancel := context.WithTimeout(context.Background(), time.Second) serverSendQuota, err := wait(ctx, nil, nil, nil, st.sendQuotaPool.acquire()) if err != nil { @@ -1381,15 +1393,18 @@ func testAccountCheckWindowSize(t *testing.T, wc windowSizeConfig) { } } -func waitWhileTrue(t *testing.T, condition func() bool) { - wait := false +func waitWhileTrue(t *testing.T, condition func() (bool, error)) { + var ( + wait bool + err error + ) timer := time.NewTimer(time.Second * 5) for { - wait = condition() + wait, err = condition() if wait { select { case <-timer.C: - t.Fatalf("Test timed out waiting for settings frames to be exchanged.") + t.Fatalf(err.Error()) default: time.Sleep(50 * time.Millisecond) continue