Skip to content

Commit

Permalink
support inbound flow control checking to protect against misbehaved p…
Browse files Browse the repository at this point in the history
…eers
  • Loading branch information
iamqizhao committed Apr 3, 2015
1 parent c7b9fa2 commit 4320b5b
Show file tree
Hide file tree
Showing 5 changed files with 318 additions and 53 deletions.
65 changes: 65 additions & 0 deletions transport/control.go
Expand Up @@ -34,6 +34,7 @@
package transport

import (
"fmt"
"sync"

"github.com/bradfitz/http2"
Expand Down Expand Up @@ -151,3 +152,67 @@ func (qb *quotaPool) reset(v int) {
func (qb *quotaPool) acquire() <-chan int {
return qb.c
}

type inFlow struct {
limit uint32
conn *inFlow

mu sync.Mutex
pendingData uint32
// The amount of data user has consumed but grpc has not sent window update
// for them. Used to reduce window update frequency. It is always part of
// pendingData.
pendingUpdate uint32
}

func (f *inFlow) onData(n uint32) error {
if n == 0 {
return nil
}
f.mu.Lock()
defer f.mu.Unlock()
if f.pendingData+n > f.limit {
return fmt.Errorf("recieved %d-bytes data exceeding the limit %d bytes", f.pendingData+n, f.limit)
}
if f.conn != nil {
if err := f.conn.onData(n); err != nil {
return ConnectionErrorf("%v", err)
}
}
f.pendingData += n
return nil
}

func (f *inFlow) onRead(n uint32) uint32 {
if n == 0 {
return 0
}
f.mu.Lock()
defer f.mu.Unlock()
f.pendingUpdate += n
if f.pendingUpdate >= f.limit/4 {
ret := f.pendingUpdate
f.pendingData -= ret
f.pendingUpdate = 0
return ret
}
return 0
}

func (f *inFlow) restoreConn() uint32 {
if f.conn == nil {
return 0
}
f.mu.Lock()
defer f.mu.Unlock()
ret := f.pendingData
f.conn.mu.Lock()
f.conn.pendingData -= ret
if f.conn.pendingUpdate > f.conn.pendingData {
f.conn.pendingUpdate = f.conn.pendingData
}
f.conn.mu.Unlock()
f.pendingData = 0
f.pendingUpdate = 0
return ret
}
52 changes: 34 additions & 18 deletions transport/http2_client.go
Expand Up @@ -76,8 +76,7 @@ type http2Client struct {
// controlBuf delivers all the control related tasks (e.g., window
// updates, reset streams, and various settings) to the controller.
controlBuf *recvBuffer
// The inbound quota being set
recvQuota uint32
fc *inFlow
// sendQuotaPool provides flow control to outbound message.
sendQuotaPool *quotaPool

Expand All @@ -91,8 +90,6 @@ type http2Client struct {
activeStreams map[uint32]*Stream
// The max number of concurrent streams
maxStreams uint32
// The accumulated inbound quota pending for window update.
updateQuota uint32
// the per-stream outbound flow control window size set by the peer.
streamSendQuota uint32
}
Expand Down Expand Up @@ -164,7 +161,7 @@ func newHTTP2Client(addr string, opts *ConnectOptions) (_ ClientTransport, err e
hBuf: &buf,
hEnc: hpack.NewEncoder(&buf),
controlBuf: newRecvBuffer(),
recvQuota: initialConnWindowSize,
fc: &inFlow{limit: initialConnWindowSize},
sendQuotaPool: newQuotaPool(defaultWindowSize),
scheme: scheme,
state: reachable,
Expand All @@ -184,12 +181,16 @@ func newHTTP2Client(addr string, opts *ConnectOptions) (_ ClientTransport, err e
}

func (t *http2Client) newStream(ctx context.Context, callHdr *CallHdr) *Stream {
fc := &inFlow{
limit: initialWindowSize,
conn: t.fc,
}
// TODO(zhaoq): Handle uint32 overflow of Stream.id.
s := &Stream{
id: t.nextID,
method: callHdr.Method,
buf: newRecvBuffer(),
recvQuota: initialWindowSize,
fc: fc,
sendQuotaPool: newQuotaPool(int(t.streamSendQuota)),
headerChan: make(chan struct{}),
}
Expand Down Expand Up @@ -311,6 +312,9 @@ func (t *http2Client) CloseStream(s *Stream, err error) {
delete(t.activeStreams, s.id)
t.mu.Unlock()
s.mu.Lock()
if q := s.fc.restoreConn(); q > 0 {
t.controlBuf.put(&windowUpdate{0, q})
}
if s.state == streamDone {
s.mu.Unlock()
return
Expand Down Expand Up @@ -475,18 +479,11 @@ func (t *http2Client) getStream(f http2.Frame) (*Stream, bool) {
// Window updates will deliver to the controller for sending when
// the cumulative quota exceeds the corresponding threshold.
func (t *http2Client) updateWindow(s *Stream, n uint32) {
t.mu.Lock()
t.updateQuota += n
if t.updateQuota >= t.recvQuota/4 {
t.controlBuf.put(&windowUpdate{0, t.updateQuota})
t.updateQuota = 0
if q := t.fc.onRead(n); q > 0 {
t.controlBuf.put(&windowUpdate{0, q})
}
t.mu.Unlock()

s.updateQuota += n
if s.updateQuota >= s.recvQuota/4 {
t.controlBuf.put(&windowUpdate{s.id, s.updateQuota})
s.updateQuota = 0
if q := s.fc.onRead(n); q > 0 {
t.controlBuf.put(&windowUpdate{s.id, q})
}
}

Expand All @@ -496,10 +493,29 @@ func (t *http2Client) handleData(f *http2.DataFrame) {
if !ok {
return
}
size := len(f.Data())
if err := s.fc.onData(uint32(size)); err != nil {
if _, ok := err.(ConnectionError); ok {
t.notifyError(err)
return
}
s.mu.Lock()
if s.state == streamDone {
s.mu.Unlock()
return
}
s.state = streamDone
s.statusCode = codes.ResourceExhausted
s.statusDesc = err.Error()
s.mu.Unlock()
s.write(recvMsg{err: io.EOF})
t.controlBuf.put(&resetStream{s.id, http2.ErrCodeFlowControl})
return
}
// TODO(bradfitz, zhaoq): A copy is required here because there is no
// guarantee f.Data() is consumed before the arrival of next frame.
// Can this copy be eliminated?
data := make([]byte, len(f.Data()))
data := make([]byte, size)
copy(data, f.Data())
s.write(recvMsg{data: data})
}
Expand Down
50 changes: 29 additions & 21 deletions transport/http2_server.go
Expand Up @@ -75,16 +75,13 @@ type http2Server struct {
// controlBuf delivers all the control related tasks (e.g., window
// updates, reset streams, and various settings) to the controller.
controlBuf *recvBuffer
// The inbound quota being set
recvQuota uint32
fc *inFlow
// sendQuotaPool provides flow control to outbound message.
sendQuotaPool *quotaPool

mu sync.Mutex // guard the following
state transportState
activeStreams map[uint32]*Stream
// The accumulated inbound quota pending for window update.
updateQuota uint32
// the per-stream outbound flow control window size set by the peer.
streamSendQuota uint32
}
Expand Down Expand Up @@ -124,7 +121,7 @@ func newHTTP2Server(conn net.Conn, maxStreams uint32) (_ ServerTransport, err er
hEnc: hpack.NewEncoder(&buf),
maxStreams: maxStreams,
controlBuf: newRecvBuffer(),
recvQuota: initialConnWindowSize,
fc: &inFlow{limit: initialConnWindowSize},
sendQuotaPool: newQuotaPool(defaultWindowSize),
state: reachable,
writableChan: make(chan int, 1),
Expand Down Expand Up @@ -256,11 +253,15 @@ func (t *http2Server) HandleStreams(handle func(*Stream)) {
}
t.maxStreamID = id
buf := newRecvBuffer()
fc := &inFlow{
limit: initialWindowSize,
conn: t.fc,
}
curStream = &Stream{
id: frame.Header().StreamID,
st: t,
buf: buf,
recvQuota: initialWindowSize,
id: frame.Header().StreamID,
st: t,
buf: buf,
fc: fc,
}
endStream := frame.Header().Flags.Has(http2.FlagHeadersEndStream)
curStream = t.operateHeaders(hDec, curStream, frame, endStream, handle, &wg)
Expand Down Expand Up @@ -304,18 +305,11 @@ func (t *http2Server) getStream(f http2.Frame) (*Stream, bool) {
// Window updates will deliver to the controller for sending when
// the cumulative quota exceeds the corresponding threshold.
func (t *http2Server) updateWindow(s *Stream, n uint32) {
t.mu.Lock()
t.updateQuota += n
if t.updateQuota >= t.recvQuota/4 {
t.controlBuf.put(&windowUpdate{0, t.updateQuota})
t.updateQuota = 0
if q := t.fc.onRead(n); q > 0 {
t.controlBuf.put(&windowUpdate{0, q})
}
t.mu.Unlock()

s.updateQuota += n
if s.updateQuota >= s.recvQuota/4 {
t.controlBuf.put(&windowUpdate{s.id, s.updateQuota})
s.updateQuota = 0
if q := s.fc.onRead(n); q > 0 {
t.controlBuf.put(&windowUpdate{s.id, q})
}
}

Expand All @@ -325,10 +319,21 @@ func (t *http2Server) handleData(f *http2.DataFrame) {
if !ok {
return
}
size := len(f.Data())
if err := s.fc.onData(uint32(size)); err != nil {
if _, ok := err.(ConnectionError); ok {
log.Printf("transport: http2Server %v", err)
t.Close()
return
}
t.closeStream(s)
t.controlBuf.put(&resetStream{s.id, http2.ErrCodeFlowControl})
return
}
// TODO(bradfitz, zhaoq): A copy is required here because there is no
// guarantee f.Data() is consumed before the arrival of next frame.
// Can this copy be eliminated?
data := make([]byte, len(f.Data()))
data := make([]byte, size)
copy(data, f.Data())
s.write(recvMsg{data: data})
if f.Header().Flags.Has(http2.FlagDataEndStream) {
Expand Down Expand Up @@ -643,6 +648,9 @@ func (t *http2Server) closeStream(s *Stream) {
t.mu.Lock()
delete(t.activeStreams, s.id)
t.mu.Unlock()
if q := s.fc.restoreConn(); q > 0 {
t.controlBuf.put(&windowUpdate{0, q})
}
s.mu.Lock()
if s.state == streamDone {
s.mu.Unlock()
Expand Down
7 changes: 4 additions & 3 deletions transport/transport.go
Expand Up @@ -173,7 +173,7 @@ type Stream struct {
buf *recvBuffer
dec io.Reader

// The inbound quota being set
fc *inFlow
recvQuota uint32
// The accumulated inbound quota pending for window update.
updateQuota uint32
Expand All @@ -197,8 +197,9 @@ type Stream struct {
// multiple times.
headerDone bool
// the status received from the server.
statusCode codes.Code
statusDesc string
statusCode codes.Code
statusDesc string
pendingData uint32
}

// Header acquires the key-value pairs of header metadata once it
Expand Down

0 comments on commit 4320b5b

Please sign in to comment.