Skip to content

Commit

Permalink
[internal-branch.go1.17-vendor] http2: shut down idle Transport conne…
Browse files Browse the repository at this point in the history
…ctions after protocol errors

Updates golang/go#49077

Change-Id: Ic4e85cdc75b4baef7cd61a65b1b09f430a2ffc4b
Reviewed-on: https://go-review.googlesource.com/c/net/+/352449
Trust: Brad Fitzpatrick <bradfitz@golang.org>
Run-TryBot: Brad Fitzpatrick <bradfitz@golang.org>
TryBot-Result: Go Bot <gobot@golang.org>
Reviewed-by: Damien Neil <dneil@google.com>
Reviewed-on: https://go-review.googlesource.com/c/net/+/357680
Trust: Damien Neil <dneil@google.com>
Run-TryBot: Damien Neil <dneil@google.com>
Reviewed-by: Dmitri Shuralyov <dmitshur@golang.org>
  • Loading branch information
bradfitz authored and dmitshur committed Oct 29, 2021
1 parent 2e3989c commit 248c63b
Show file tree
Hide file tree
Showing 2 changed files with 15 additions and 79 deletions.
19 changes: 15 additions & 4 deletions transport.go
Expand Up @@ -862,6 +862,12 @@ func (cc *ClientConn) closeIfIdle() {
cc.tconn.Close()
}

func (cc *ClientConn) isDoNotReuseAndIdle() bool {
cc.mu.Lock()
defer cc.mu.Unlock()
return cc.doNotReuse && len(cc.streams) == 0
}

var shutdownEnterWaitStateHook = func() {}

// Shutdown gracefully close the client connection, waiting for running streams to complete.
Expand Down Expand Up @@ -2267,6 +2273,9 @@ func (b transportResponseBody) Close() error {
func (rl *clientConnReadLoop) processData(f *DataFrame) error {
cc := rl.cc
cs := cc.streamByID(f.StreamID, f.StreamEnded())
if f.StreamEnded() && cc.isDoNotReuseAndIdle() {
rl.closeWhenIdle = true
}
data := f.Data()
if cs == nil {
cc.mu.Lock()
Expand Down Expand Up @@ -2513,11 +2522,15 @@ func (rl *clientConnReadLoop) processWindowUpdate(f *WindowUpdateFrame) error {
}

func (rl *clientConnReadLoop) processResetStream(f *RSTStreamFrame) error {
cs := rl.cc.streamByID(f.StreamID, true)
cc := rl.cc
cs := cc.streamByID(f.StreamID, true)
if cs == nil {
// TODO: return error if server tries to RST_STEAM an idle stream
return nil
}
if cc.isDoNotReuseAndIdle() {
rl.closeWhenIdle = true
}
select {
case <-cs.peerReset:
// Already reset.
Expand All @@ -2529,9 +2542,7 @@ func (rl *clientConnReadLoop) processResetStream(f *RSTStreamFrame) error {
if f.ErrCode == ErrCodeProtocol {
rl.cc.SetDoNotReuse()
serr.Cause = errFromPeer
// TODO(bradfitz): increment a varz here, once Transport
// takes an optional interface-typed field that expvar.Map.Add
// implements.
rl.closeWhenIdle = true
}
cs.resetErr = serr
close(cs.peerReset)
Expand Down
75 changes: 0 additions & 75 deletions transport_test.go
Expand Up @@ -5296,78 +5296,3 @@ func (p *collectClientsConnPool) GetClientConn(req *http.Request, addr string) (
func (p *collectClientsConnPool) MarkDead(cc *ClientConn) {
p.lower.MarkDead(cc)
}

func TestTransportRetriesOnStreamProtocolError(t *testing.T) {
ct := newClientTester(t)
pool := &collectClientsConnPool{
lower: &clientConnPool{t: ct.tr},
}
ct.tr.ConnPool = pool
done := make(chan struct{})
ct.client = func() error {
req, _ := http.NewRequest("GET", "https://dummy.tld/", nil)
res, err := ct.tr.RoundTrip(req)
const want = "only one dial allowed in test mode"
if got := fmt.Sprint(err); got != want {
t.Errorf("didn't dial again: got %#q; want %#q", got, want)
}
close(done)
ct.sc.Close()
if res != nil {
res.Body.Close()
}

pool.mu.Lock()
defer pool.mu.Unlock()
if pool.getErrs != 1 {
t.Errorf("pool get errors = %v; want 1", pool.getErrs)
}
if len(pool.got) == 1 {
cc := pool.got[0]
cc.mu.Lock()
if !cc.doNotReuse {
t.Error("ClientConn not marked doNotReuse")
}
cc.mu.Unlock()
} else {
t.Errorf("pool get success = %v; want 1", len(pool.got))
}
return nil
}
ct.server = func() error {
ct.greet()
var sentErr bool
for {
f, err := ct.fr.ReadFrame()
if err != nil {
select {
case <-done:
return nil
default:
return err
}
}
switch f := f.(type) {
case *WindowUpdateFrame, *SettingsFrame:
case *HeadersFrame:
if !sentErr {
sentErr = true
ct.fr.WriteRSTStream(f.StreamID, ErrCodeProtocol)
continue
}
var buf bytes.Buffer
enc := hpack.NewEncoder(&buf)
// send headers without Trailer header
enc.WriteField(hpack.HeaderField{Name: ":status", Value: "200"})
ct.fr.WriteHeaders(HeadersFrameParam{
StreamID: f.StreamID,
EndHeaders: true,
EndStream: true,
BlockFragment: buf.Bytes(),
})
}
}
return nil
}
ct.run()
}

0 comments on commit 248c63b

Please sign in to comment.