Skip to content

Commit

Permalink
http2: Discard DATA frames from the server after the response body is…
Browse files Browse the repository at this point in the history
… closed

After a response body is closed, we keep writing to the bufPipe. This
accumulates bytes that will never be read, wasting memory. The fix is to
discard the buffer on pipe.BreakWithError.

Updates golang/go#20448

Change-Id: Ia2cf46cb8c401fd8091ef3785eb48fe7b188bb57
Reviewed-on: https://go-review.googlesource.com/43810
Run-TryBot: Tom Bergan <tombergan@google.com>
TryBot-Result: Gobot Gobot <gobot@golang.org>
Reviewed-by: Brad Fitzpatrick <bradfitz@golang.org>
  • Loading branch information
tombergan committed May 23, 2017
1 parent fd1b920 commit f9f5bca
Show file tree
Hide file tree
Showing 4 changed files with 28 additions and 10 deletions.
14 changes: 12 additions & 2 deletions pipe.go
Expand Up @@ -15,8 +15,8 @@ import (
// underlying buffer is an interface. (io.Pipe is always unbuffered)
type pipe struct {
mu sync.Mutex
c sync.Cond // c.L lazily initialized to &p.mu
b pipeBuffer
c sync.Cond // c.L lazily initialized to &p.mu
b pipeBuffer // nil when done reading
err error // read error once empty. non-nil means closed.
breakErr error // immediate read error (caller doesn't see rest of b)
donec chan struct{} // closed on error
Expand All @@ -32,6 +32,9 @@ type pipeBuffer interface {
func (p *pipe) Len() int {
p.mu.Lock()
defer p.mu.Unlock()
if p.b == nil {
return 0
}
return p.b.Len()
}

Expand All @@ -55,6 +58,7 @@ func (p *pipe) Read(d []byte) (n int, err error) {
p.readFn() // e.g. copy trailers
p.readFn = nil // not sticky like p.err
}
p.b = nil
return 0, p.err
}
p.c.Wait()
Expand All @@ -75,6 +79,9 @@ func (p *pipe) Write(d []byte) (n int, err error) {
if p.err != nil {
return 0, errClosedPipeWrite
}
if p.breakErr != nil {
return len(d), nil // discard when there is no reader
}
return p.b.Write(d)
}

Expand Down Expand Up @@ -109,6 +116,9 @@ func (p *pipe) closeWithError(dst *error, err error, fn func()) {
return
}
p.readFn = fn
if dst == &p.breakErr {
p.b = nil
}
*dst = err
p.closeDoneLocked()
}
Expand Down
14 changes: 14 additions & 0 deletions pipe_test.go
Expand Up @@ -92,6 +92,10 @@ func TestPipeCloseWithError(t *testing.T) {
if err != a {
t.Logf("read error = %v, %v", err, a)
}
// Write should fail.
if n, err := p.Write([]byte("abc")); err != errClosedPipeWrite || n != 0 {
t.Errorf("Write(abc) after close\ngot =%v, %v\nwant 0, %v", n, err, errClosedPipeWrite)
}
}

func TestPipeBreakWithError(t *testing.T) {
Expand All @@ -106,4 +110,14 @@ func TestPipeBreakWithError(t *testing.T) {
if err != a {
t.Logf("read error = %v, %v", err, a)
}
if p.b != nil {
t.Errorf("buffer should be nil after BreakWithError")
}
// Write should succeed silently.
if n, err := p.Write([]byte("abc")); err != nil || n != 3 {
t.Errorf("Write(abc) after break\ngot =%v, %v\nwant 0, nil", n, err)
}
if p.b != nil {
t.Errorf("buffer should be nil after Write")
}
}
7 changes: 1 addition & 6 deletions transport.go
Expand Up @@ -1655,6 +1655,7 @@ func (b transportResponseBody) Close() error {
cc.wmu.Lock()
if !serverSentStreamEnd {
cc.fr.WriteRSTStream(cs.ID, ErrCodeCancel)
cs.didReset = true
}
// Return connection-level flow control.
if unread > 0 {
Expand Down Expand Up @@ -1702,12 +1703,6 @@ func (rl *clientConnReadLoop) processData(f *DataFrame) error {
return nil
}
if f.Length > 0 {
if len(data) > 0 && cs.bufPipe.b == nil {
// Data frame after it's already closed?
cc.logf("http2: Transport received DATA frame for closed stream; closing connection")
return ConnectionError(ErrCodeProtocol)
}

// Check connection-level flow control.
cc.mu.Lock()
if cs.inflow.available() >= int32(f.Length) {
Expand Down
3 changes: 1 addition & 2 deletions transport_test.go
Expand Up @@ -2959,8 +2959,7 @@ func TestTransportAllocationsAfterResponseBodyClose(t *testing.T) {
t.Fatalf("res.Body = %T; want transportResponseBody", res.Body)
}
if trb.cs.bufPipe.b != nil {
// TODO(tombergan,bradfitz): turn this into an error:
t.Logf("response body pipe is still open")
t.Errorf("response body pipe is still open")
}

gotErr := <-writeErr
Expand Down

0 comments on commit f9f5bca

Please sign in to comment.