Skip to content

Commit

Permalink
While waiting for headers from the server, client should wait on erro…
Browse files Browse the repository at this point in the history
…r chans as well.
  • Loading branch information
MakMukhi committed Dec 13, 2017
1 parent dba60db commit 0a4e6fa
Showing 1 changed file with 19 additions and 10 deletions.
29 changes: 19 additions & 10 deletions transport/transport.go
Original file line number Diff line number Diff line change
Expand Up @@ -248,11 +248,28 @@ type Stream struct {
unprocessed bool // set if the server sends a refused stream or GOAWAY including this stream
}

func (s *Stream) waitOnHeader() error {
wc := s.waiters
select {
case <-wc.tctx.Done():
return ErrConnClosing
case <-wc.ctx.Done():
return ContextErr(wc.ctx.Err())
case <-wc.goAway:
return errStreamDrain
case <-s.headerChan:
return nil
}
}

// RecvCompress returns the compression algorithm applied to the inbound
// message. It is empty string if there is no compression applied.
func (s *Stream) RecvCompress() string {
if s.headerChan != nil {
<-s.headerChan
if err := s.waitOnHeader(); err != nil {
fmt.Println(err)
return ""
}
}
return s.recvCompress
}
Expand All @@ -278,15 +295,7 @@ func (s *Stream) GoAway() <-chan struct{} {
// is available. It blocks until i) the metadata is ready or ii) there is no
// header metadata or iii) the stream is canceled/expired.
func (s *Stream) Header() (metadata.MD, error) {
var err error
select {
case <-s.ctx.Done():
err = ContextErr(s.ctx.Err())
case <-s.goAway:
err = errStreamDrain
case <-s.headerChan:
return s.header.Copy(), nil
}
err := s.waitOnHeader()
// Even if the stream is closed, header is returned if available.
select {
case <-s.headerChan:
Expand Down

0 comments on commit 0a4e6fa

Please sign in to comment.