Skip to content

Commit

Permalink
http2: make the Transport write request body data as it's available
Browse files Browse the repository at this point in the history
Unlike HTTP/1, we now permit streaming the write of a request body as
we read the response body, since HTTP/2's framing makes it possible.
Our behavior however is based on a heuristic: we always begin writing
the request body right away (like previously, and like HTTP/1), but if
we're still writing the request body and the server replies with a
status code over 299 (not 1xx and not 2xx), then we stop writing the
request body, assuming the server doesn't care about it. There is
currently no switch (and hopefully won't be) to force enable this
behavior. In the case where the server replied with a 1xx/2xx and
we're still writing the request body but the server doesn't want it,
the server can do a RST_STREAM, which we respect as before and stop
sending.

Also in this CL:

* adds an h2demo handler at https://http2.golang.org/ECHO to demo it

* fixes a potential flow control integer truncation bug

* start of clientTester type used for the tests in this CL, similar
  to the serverTester. It's still a bit cumbersome to write client
  tests, though.

* fix potential deadlock where awaitFlowControl could block while
  waiting a stream reset arrived. fix it by moving all checks into
  the sync.Cond loop, rather than having a sync.Cond check followed
  by a select. simplifies code, too.

* fix two data races in test-only code.

Updates golang/go#13444

Change-Id: Idfda6833a212a89fcd65293cdeb4169d1723724f
Reviewed-on: https://go-review.googlesource.com/17310
Reviewed-by: Blake Mizerany <blake.mizerany@gmail.com>
  • Loading branch information
bradfitz committed Dec 8, 2015
1 parent fa33dc7 commit 438097d
Show file tree
Hide file tree
Showing 4 changed files with 406 additions and 37 deletions.
36 changes: 36 additions & 0 deletions http2/h2demo/h2demo.go
Expand Up @@ -91,6 +91,7 @@ href="https://golang.org/issues">file a bug</a>.</p>
<li>GET <a href="/redirect">/redirect</a> to redirect back to / (this page)</li>
<li>GET <a href="/goroutines">/goroutines</a> to see all active goroutines in this server</li>
<li>PUT something to <a href="/crc32">/crc32</a> to get a count of number of bytes and its CRC-32</li>
<li>PUT something to <a href="/ECHO">/ECHO</a> and it will be streamed back to you capitalized</li>
</ul>
</body></html>`)
Expand Down Expand Up @@ -124,6 +125,40 @@ func crcHandler(w http.ResponseWriter, r *http.Request) {
}
}

type capitalizeReader struct {
r io.Reader
}

func (cr capitalizeReader) Read(p []byte) (n int, err error) {
n, err = cr.r.Read(p)
for i, b := range p[:n] {
if b >= 'a' && b <= 'z' {
p[i] = b - ('a' - 'A')
}
}
return
}

type flushWriter struct {
w io.Writer
}

func (fw flushWriter) Write(p []byte) (n int, err error) {
n, err = fw.w.Write(p)
if f, ok := fw.w.(http.Flusher); ok {
f.Flush()
}
return
}

func echoCapitalHandler(w http.ResponseWriter, r *http.Request) {
if r.Method != "PUT" {
http.Error(w, "PUT required.", 400)
return
}
io.Copy(flushWriter{w}, capitalizeReader{r.Body})
}

var (
fsGrp singleflight.Group
fsMu sync.Mutex // guards fsCache
Expand Down Expand Up @@ -217,6 +252,7 @@ func registerHandlers() {
mux2.Handle("/file/go.src.tar.gz", fileServer("https://storage.googleapis.com/golang/go1.4.1.src.tar.gz"))
mux2.HandleFunc("/reqinfo", reqInfoHandler)
mux2.HandleFunc("/crc32", crcHandler)
mux2.HandleFunc("/ECHO", echoCapitalHandler)
mux2.HandleFunc("/clockstream", clockStreamHandler)
mux2.Handle("/gophertiles", tiles)
mux2.HandleFunc("/redirect", func(w http.ResponseWriter, r *http.Request) {
Expand Down
6 changes: 3 additions & 3 deletions http2/server_test.go
Expand Up @@ -2213,6 +2213,9 @@ func testServerWithCurl(t *testing.T, permitProhibitedCipherSuites bool) {
t.Skip("skipping curl test in short mode")
}
requireCurl(t)
var gotConn int32
testHookOnConn = func() { atomic.StoreInt32(&gotConn, 1) }

const msg = "Hello from curl!\n"
ts := httptest.NewUnstartedServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
w.Header().Set("Foo", "Bar")
Expand All @@ -2226,9 +2229,6 @@ func testServerWithCurl(t *testing.T, permitProhibitedCipherSuites bool) {
ts.StartTLS()
defer ts.Close()

var gotConn int32
testHookOnConn = func() { atomic.StoreInt32(&gotConn, 1) }

t.Logf("Running test server for curl to hit at: %s", ts.URL)
container := curl(t, "--silent", "--http2", "--insecure", "-v", ts.URL)
defer kill(container)
Expand Down
98 changes: 64 additions & 34 deletions http2/transport.go
Expand Up @@ -155,6 +155,7 @@ type clientStream struct {
inflow flow // guarded by cc.mu
bytesRemain int64 // -1 means unknown; owned by transportResponseBody.Read
readErr error // sticky read error; owned by transportResponseBody.Read
stopReqBody bool // stop writing req body; guarded by cc.mu

peerReset chan struct{} // closed on peer reset
resetErr error // populated before peerReset is closed
Expand All @@ -171,6 +172,14 @@ func (cs *clientStream) checkReset() error {
}
}

func (cs *clientStream) abortRequestBodyWrite() {
cc := cs.cc
cc.mu.Lock()
cs.stopReqBody = true
cc.cond.Broadcast()
cc.mu.Unlock()
}

type stickyErrWriter struct {
w io.Writer
err *error
Expand Down Expand Up @@ -516,26 +525,33 @@ func (cc *ClientConn) RoundTrip(req *http.Request) (*http.Response, error) {
return nil, werr
}

var bodyCopyErrc chan error
var gotResHeaders chan struct{} // closed on resheaders
var bodyCopyErrc chan error // result of body copy
if hasBody {
bodyCopyErrc = make(chan error, 1)
gotResHeaders = make(chan struct{})
go func() {
bodyCopyErrc <- cs.writeRequestBody(req.Body, gotResHeaders)
bodyCopyErrc <- cs.writeRequestBody(req.Body)
}()
}

for {
select {
case re := <-cs.resc:
if gotResHeaders != nil {
close(gotResHeaders)
res := re.res
if re.err != nil || res.StatusCode > 299 {
// On error or status code 3xx, 4xx, 5xx, etc abort any
// ongoing write, assuming that the server doesn't care
// about our request body. If the server replied with 1xx or
// 2xx, however, then assume the server DOES potentially
// want our body (e.g. full-duplex streaming:
// golang.org/issue/13444). If it turns out the server
// doesn't, they'll RST_STREAM us soon enough. This is a
// heuristic to avoid adding knobs to Transport. Hopefully
// we can keep it.
cs.abortRequestBodyWrite()
}
if re.err != nil {
return nil, re.err
}
res := re.res
res.Request = req
res.TLS = cc.tlsState
return res, nil
Expand All @@ -547,45 +563,56 @@ func (cc *ClientConn) RoundTrip(req *http.Request) (*http.Response, error) {
}
}

var errServerResponseBeforeRequestBody = errors.New("http2: server sent response while still writing request body")
// errAbortReqBodyWrite is an internal error value.
// It doesn't escape to callers.
var errAbortReqBodyWrite = errors.New("http2: aborting request body write")

func (cs *clientStream) writeRequestBody(body io.Reader, gotResHeaders <-chan struct{}) error {
func (cs *clientStream) writeRequestBody(body io.ReadCloser) (err error) {
cc := cs.cc
sentEnd := false // whether we sent the final DATA frame w/ END_STREAM
buf := cc.frameScratchBuffer()
defer cc.putFrameScratchBuffer(buf)

for !sentEnd {
var sawEOF bool
n, err := io.ReadFull(body, buf)
if err == io.ErrUnexpectedEOF {
defer func() {
// TODO: write h12Compare test showing whether
// Request.Body is closed by the Transport,
// and in multiple cases: server replies <=299 and >299
// while still writing request body
cerr := body.Close()
if err == nil {
err = cerr
}
}()

var sawEOF bool
for !sawEOF {
n, err := body.Read(buf)
if err == io.EOF {
sawEOF = true
err = nil
} else if err == io.EOF {
break
} else if err != nil {
return err
}

toWrite := buf[:n]
for len(toWrite) > 0 && err == nil {
remain := buf[:n]
for len(remain) > 0 && err == nil {
var allowed int32
allowed, err = cs.awaitFlowControl(int32(len(toWrite)))
allowed, err = cs.awaitFlowControl(len(remain))
if err != nil {
return err
}

cc.wmu.Lock()
select {
case <-gotResHeaders:
err = errServerResponseBeforeRequestBody
case <-cs.peerReset:
err = cs.resetErr
default:
data := toWrite[:allowed]
toWrite = toWrite[allowed:]
sentEnd = sawEOF && len(toWrite) == 0
err = cc.fr.WriteData(cs.ID, sentEnd, data)
data := remain[:allowed]
remain = remain[allowed:]
sentEnd = sawEOF && len(remain) == 0
err = cc.fr.WriteData(cs.ID, sentEnd, data)
if err == nil {
// TODO(bradfitz): this flush is for latency, not bandwidth.
// Most requests won't need this. Make this opt-in or opt-out?
// Use some heuristic on the body type? Nagel-like timers?
// Based on 'n'? Only last chunk of this for loop, unless flow control
// tokens are low? For now, always:
err = cc.bw.Flush()
}
cc.wmu.Unlock()
}
Expand All @@ -594,8 +621,6 @@ func (cs *clientStream) writeRequestBody(body io.Reader, gotResHeaders <-chan st
}
}

var err error

cc.wmu.Lock()
if !sentEnd {
err = cc.fr.WriteData(cs.ID, true, nil)
Expand All @@ -612,21 +637,25 @@ func (cs *clientStream) writeRequestBody(body io.Reader, gotResHeaders <-chan st
// control tokens from the server.
// It returns either the non-zero number of tokens taken or an error
// if the stream is dead.
func (cs *clientStream) awaitFlowControl(maxBytes int32) (taken int32, err error) {
func (cs *clientStream) awaitFlowControl(maxBytes int) (taken int32, err error) {
cc := cs.cc
cc.mu.Lock()
defer cc.mu.Unlock()
for {
if cc.closed {
return 0, errClientConnClosed
}
if cs.stopReqBody {
return 0, errAbortReqBodyWrite
}
if err := cs.checkReset(); err != nil {
return 0, err
}
if a := cs.flow.available(); a > 0 {
take := a
if take > maxBytes {
take = maxBytes
if int(take) > maxBytes {

take = int32(maxBytes) // can't truncate int; take is int32
}
if take > int32(cc.maxFrameSize) {
take = int32(cc.maxFrameSize)
Expand Down Expand Up @@ -1092,6 +1121,7 @@ func (rl *clientConnReadLoop) processResetStream(f *RSTStreamFrame) error {
cs.resetErr = err
close(cs.peerReset)
cs.bufPipe.CloseWithError(err)
cs.cc.cond.Broadcast() // wake up checkReset via clientStream.awaitFlowControl
}
delete(rl.activeRes, cs.ID)
return nil
Expand Down

0 comments on commit 438097d

Please sign in to comment.