Skip to content

Commit

Permalink
Use single encoder/decoder and buffered io
Browse files Browse the repository at this point in the history
Update Send and Receive objects to lock access and save their encoder/decoder.  Additionally use a buffered writer on encode.

Signed-off-by: Derek McGowan <derek@mcgstyle.net> (github: dmcgowan)
  • Loading branch information
dmcgowan committed Feb 14, 2015
1 parent c2d7628 commit a44d6a0
Showing 1 changed file with 27 additions and 24 deletions.
51 changes: 27 additions & 24 deletions spdy/session.go
@@ -1,7 +1,7 @@
package spdy

import (
"bytes"
"bufio"
"errors"
"io"
"net"
Expand Down Expand Up @@ -48,6 +48,11 @@ type channel struct {
stream *spdystream.Stream
session *Transport
direction direction
encodeLock sync.Mutex
encoder *msgpack.Encoder
decodeLock sync.Mutex
decoder *msgpack.Decoder
buffer *bufio.Writer
}

// NewClientTransport creates a new stream transport from the
Expand Down Expand Up @@ -126,6 +131,7 @@ func (s *Transport) newStreamHandler(stream *spdystream.Stream) {
stream: stream,
session: s,
}

s.channelC.L.Lock()
s.channels[referenceID] = c
s.channelC.Broadcast()
Expand Down Expand Up @@ -306,21 +312,19 @@ func (c *channel) Send(message interface{}) error {
return ErrWrongDirection
}

buf := bytes.NewBuffer(nil)
encoder := msgpack.NewEncoder(buf)
encoder.AddExtensions(c.initializeExtensions())
encodeErr := encoder.Encode(message)
if encodeErr != nil {
return encodeErr
c.encodeLock.Lock()
defer c.encodeLock.Unlock()
if c.encoder == nil {
c.buffer = bufio.NewWriter(c.stream)
c.encoder = msgpack.NewEncoder(c.buffer)
c.encoder.AddExtensions(c.initializeExtensions())
}

// TODO check length of buf
_, writeErr := c.stream.Write(buf.Bytes())
if writeErr != nil {
return writeErr
if err := c.encoder.Encode(message); err != nil {
return err
}

return nil
return c.buffer.Flush()
}

// Receive receives a message sent across the channel from
Expand All @@ -329,21 +333,20 @@ func (c *channel) Receive(message interface{}) error {
if c.direction == outbound {
return ErrWrongDirection
}
buf, readErr := c.stream.ReadData()
if readErr != nil {
if readErr == io.EOF {
c.stream.Close()
}
return readErr

c.decodeLock.Lock()
defer c.decodeLock.Unlock()
if c.decoder == nil {
c.decoder = msgpack.NewDecoder(c.stream)
c.decoder.AddExtensions(c.initializeExtensions())
}

decoder := msgpack.NewDecoder(bytes.NewReader(buf))
decoder.AddExtensions(c.initializeExtensions())
decodeErr := decoder.Decode(message)
if decodeErr != nil {
return decodeErr
decodeErr := c.decoder.Decode(message)
if decodeErr == io.EOF {
c.stream.Close()
c.decoder = nil
}
return nil
return decodeErr
}

func (c *channel) SendTo(dst libchan.Sender) (int, error) {
Expand Down

0 comments on commit a44d6a0

Please sign in to comment.