From 70915955955130a3813cb8e1a8f3a3cc1c2499a1 Mon Sep 17 00:00:00 2001 From: Preetha Appan Date: Mon, 26 Mar 2018 08:56:40 -0500 Subject: [PATCH] Update yamux to pick up performance improvements --- vendor/github.com/hashicorp/yamux/session.go | 59 ++++++++++++++------ vendor/github.com/hashicorp/yamux/stream.go | 45 +++++++++------ vendor/github.com/hashicorp/yamux/util.go | 15 +++++ vendor/vendor.json | 2 +- 4 files changed, 86 insertions(+), 35 deletions(-) diff --git a/vendor/github.com/hashicorp/yamux/session.go b/vendor/github.com/hashicorp/yamux/session.go index e17981839f44..d8446fa65ee1 100644 --- a/vendor/github.com/hashicorp/yamux/session.go +++ b/vendor/github.com/hashicorp/yamux/session.go @@ -123,6 +123,12 @@ func (s *Session) IsClosed() bool { } } +// CloseChan returns a read-only channel which is closed as +// soon as the session is closed. +func (s *Session) CloseChan() <-chan struct{} { + return s.shutdownCh +} + // NumStreams returns the number of currently open streams func (s *Session) NumStreams() int { s.streamLock.Lock() @@ -323,8 +329,17 @@ func (s *Session) waitForSend(hdr header, body io.Reader) error { // potential shutdown. Since there's the expectation that sends can happen // in a timely manner, we enforce the connection write timeout here. func (s *Session) waitForSendErr(hdr header, body io.Reader, errCh chan error) error { - timer := time.NewTimer(s.config.ConnectionWriteTimeout) - defer timer.Stop() + t := timerPool.Get() + timer := t.(*time.Timer) + timer.Reset(s.config.ConnectionWriteTimeout) + defer func() { + timer.Stop() + select { + case <-timer.C: + default: + } + timerPool.Put(t) + }() ready := sendReady{Hdr: hdr, Body: body, Err: errCh} select { @@ -349,8 +364,17 @@ func (s *Session) waitForSendErr(hdr header, body io.Reader, errCh chan error) e // the send happens right here, we enforce the connection write timeout if we // can't queue the header to be sent. func (s *Session) sendNoWait(hdr header) error { - timer := time.NewTimer(s.config.ConnectionWriteTimeout) - defer timer.Stop() + t := timerPool.Get() + timer := t.(*time.Timer) + timer.Reset(s.config.ConnectionWriteTimeout) + defer func() { + timer.Stop() + select { + case <-timer.C: + default: + } + timerPool.Put(t) + }() select { case s.sendCh <- sendReady{Hdr: hdr}: @@ -408,11 +432,20 @@ func (s *Session) recv() { } } +// Ensure that the index of the handler (typeData/typeWindowUpdate/etc) matches the message type +var ( + handlers = []func(*Session, header) error{ + typeData: (*Session).handleStreamMessage, + typeWindowUpdate: (*Session).handleStreamMessage, + typePing: (*Session).handlePing, + typeGoAway: (*Session).handleGoAway, + } +) + // recvLoop continues to receive data until a fatal error is encountered func (s *Session) recvLoop() error { defer close(s.recvDoneCh) hdr := header(make([]byte, headerSize)) - var handler func(header) error for { // Read the header if _, err := io.ReadFull(s.bufRead, hdr); err != nil { @@ -428,22 +461,12 @@ func (s *Session) recvLoop() error { return ErrInvalidVersion } - // Switch on the type - switch hdr.MsgType() { - case typeData: - handler = s.handleStreamMessage - case typeWindowUpdate: - handler = s.handleStreamMessage - case typeGoAway: - handler = s.handleGoAway - case typePing: - handler = s.handlePing - default: + mt := hdr.MsgType() + if mt < typeData || mt > typeGoAway { return ErrInvalidMsgType } - // Invoke the handler - if err := handler(hdr); err != nil { + if err := handlers[mt](s, hdr); err != nil { return err } } diff --git a/vendor/github.com/hashicorp/yamux/stream.go b/vendor/github.com/hashicorp/yamux/stream.go index d216e281ca1d..aa2391973983 100644 --- a/vendor/github.com/hashicorp/yamux/stream.go +++ b/vendor/github.com/hashicorp/yamux/stream.go @@ -47,8 +47,8 @@ type Stream struct { recvNotifyCh chan struct{} sendNotifyCh chan struct{} - readDeadline time.Time - writeDeadline time.Time + readDeadline atomic.Value // time.Time + writeDeadline atomic.Value // time.Time } // newStream is used to construct a new stream within @@ -67,6 +67,8 @@ func newStream(session *Session, id uint32, state streamState) *Stream { recvNotifyCh: make(chan struct{}, 1), sendNotifyCh: make(chan struct{}, 1), } + s.readDeadline.Store(time.Time{}) + s.writeDeadline.Store(time.Time{}) return s } @@ -122,8 +124,9 @@ START: WAIT: var timeout <-chan time.Time var timer *time.Timer - if !s.readDeadline.IsZero() { - delay := s.readDeadline.Sub(time.Now()) + readDeadline := s.readDeadline.Load().(time.Time) + if !readDeadline.IsZero() { + delay := readDeadline.Sub(time.Now()) timer = time.NewTimer(delay) timeout = timer.C } @@ -188,7 +191,7 @@ START: // Send the header s.sendHdr.encode(typeData, flags, s.id, max) - if err := s.session.waitForSendErr(s.sendHdr, body, s.sendErr); err != nil { + if err = s.session.waitForSendErr(s.sendHdr, body, s.sendErr); err != nil { return 0, err } @@ -200,8 +203,9 @@ START: WAIT: var timeout <-chan time.Time - if !s.writeDeadline.IsZero() { - delay := s.writeDeadline.Sub(time.Now()) + writeDeadline := s.writeDeadline.Load().(time.Time) + if !writeDeadline.IsZero() { + delay := writeDeadline.Sub(time.Now()) timeout = time.After(delay) } select { @@ -238,18 +242,25 @@ func (s *Stream) sendWindowUpdate() error { // Determine the delta update max := s.session.config.MaxStreamWindowSize - delta := max - atomic.LoadUint32(&s.recvWindow) + var bufLen uint32 + s.recvLock.Lock() + if s.recvBuf != nil { + bufLen = uint32(s.recvBuf.Len()) + } + delta := (max - bufLen) - s.recvWindow // Determine the flags if any flags := s.sendFlags() // Check if we can omit the update if delta < (max/2) && flags == 0 { + s.recvLock.Unlock() return nil } // Update our window - atomic.AddUint32(&s.recvWindow, delta) + s.recvWindow += delta + s.recvLock.Unlock() // Send the header s.controlHdr.encode(typeWindowUpdate, flags, s.id, delta) @@ -392,16 +403,18 @@ func (s *Stream) readData(hdr header, flags uint16, conn io.Reader) error { if length == 0 { return nil } - if remain := atomic.LoadUint32(&s.recvWindow); length > remain { - s.session.logger.Printf("[ERR] yamux: receive window exceeded (stream: %d, remain: %d, recv: %d)", s.id, remain, length) - return ErrRecvWindowExceeded - } // Wrap in a limited reader conn = &io.LimitedReader{R: conn, N: int64(length)} // Copy into buffer s.recvLock.Lock() + + if length > s.recvWindow { + s.session.logger.Printf("[ERR] yamux: receive window exceeded (stream: %d, remain: %d, recv: %d)", s.id, s.recvWindow, length) + return ErrRecvWindowExceeded + } + if s.recvBuf == nil { // Allocate the receive buffer just-in-time to fit the full data frame. // This way we can read in the whole packet without further allocations. @@ -414,7 +427,7 @@ func (s *Stream) readData(hdr header, flags uint16, conn io.Reader) error { } // Decrement the receive window - atomic.AddUint32(&s.recvWindow, ^uint32(length-1)) + s.recvWindow -= length s.recvLock.Unlock() // Unblock any readers @@ -435,13 +448,13 @@ func (s *Stream) SetDeadline(t time.Time) error { // SetReadDeadline sets the deadline for future Read calls. func (s *Stream) SetReadDeadline(t time.Time) error { - s.readDeadline = t + s.readDeadline.Store(t) return nil } // SetWriteDeadline sets the deadline for future Write calls func (s *Stream) SetWriteDeadline(t time.Time) error { - s.writeDeadline = t + s.writeDeadline.Store(t) return nil } diff --git a/vendor/github.com/hashicorp/yamux/util.go b/vendor/github.com/hashicorp/yamux/util.go index 5fe45afcdf2c..8a73e9249a61 100644 --- a/vendor/github.com/hashicorp/yamux/util.go +++ b/vendor/github.com/hashicorp/yamux/util.go @@ -1,5 +1,20 @@ package yamux +import ( + "sync" + "time" +) + +var ( + timerPool = &sync.Pool{ + New: func() interface{} { + timer := time.NewTimer(time.Hour * 1e6) + timer.Stop() + return timer + }, + } +) + // asyncSendErr is used to try an async send of an error func asyncSendErr(ch chan error, err error) { if ch == nil { diff --git a/vendor/vendor.json b/vendor/vendor.json index 42e31687b51d..f4cf9408582b 100644 --- a/vendor/vendor.json +++ b/vendor/vendor.json @@ -72,7 +72,7 @@ {"path":"github.com/hashicorp/raft-boltdb","checksumSHA1":"QAxukkv54/iIvLfsUP6IK4R0m/A=","revision":"d1e82c1ec3f15ee991f7cc7ffd5b67ff6f5bbaee","revisionTime":"2015-02-01T20:08:39Z"}, {"path":"github.com/hashicorp/serf/coordinate","checksumSHA1":"0PeWsO2aI+2PgVYlYlDPKfzCLEQ=","comment":"v0.7.0-66-g6c4672d","revision":"b6017ae61f4420ed0c02d5eeeb9ff3fc02953f14","revisionTime":"2018-01-19T22:43:00Z"}, {"path":"github.com/hashicorp/serf/serf","checksumSHA1":"QGImnWfhk0ILLZszcf3vRs/Ft7g=","comment":"v0.7.0-66-g6c4672d","revision":"b6017ae61f4420ed0c02d5eeeb9ff3fc02953f14","revisionTime":"2018-01-19T22:43:00Z"}, - {"path":"github.com/hashicorp/yamux","checksumSHA1":"ZhK6IO2XN81Y+3RAjTcVm1Ic7oU=","revision":"d1caa6c97c9fc1cc9e83bbe34d0603f9ff0ce8bd","revisionTime":"2016-07-20T23:31:40Z"}, + {"path":"github.com/hashicorp/yamux","checksumSHA1":"NnWv17i1tpvBNJtpdRRWpE6j4LY=","revision":"2658be15c5f05e76244154714161f17e3e77de2e","revisionTime":"2018-03-14T20:07:45Z"}, {"path":"github.com/mattn/go-isatty","checksumSHA1":"xZuhljnmBysJPta/lMyYmJdujCg=","revision":"66b8e73f3f5cda9f96b69efd03dd3d7fc4a5cdb8","revisionTime":"2016-08-06T12:27:52Z"}, {"path":"github.com/miekg/dns","checksumSHA1":"Jo+pItYOocIRdoFL0fc4nHhUEJY=","revision":"bbca4873b326f5dc54bfe31148446d4ed79a5a02","revisionTime":"2017-08-08T22:19:10Z"}, {"path":"github.com/mitchellh/cli","checksumSHA1":"GzfpPGtV2UJH9hFsKwzGjKrhp/A=","revision":"dff723fff508858a44c1f4bd0911f00d73b0202f","revisionTime":"2017-09-05T22:10:09Z"},