Skip to content

Commit

Permalink
socket: support network IO deadline (#339)
Browse files Browse the repository at this point in the history
This resolves #339 by setting an r/w timeout for pipe reads/writes
(if present). This allows listening sockets to recover from bad
incoming connections, and also goroutines suffering from broken
connections, to recover.
  • Loading branch information
Gerrit Renker authored and Gerrit Renker committed Jan 30, 2019
1 parent 23b260d commit 33ba76a
Show file tree
Hide file tree
Showing 3 changed files with 43 additions and 2 deletions.
26 changes: 26 additions & 0 deletions conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"io"
"net"
"sync"
"time"
)

// conn implements the Pipe interface on top of net.Conn. The
Expand Down Expand Up @@ -49,6 +50,10 @@ func (p *conn) Recv() (*Message, error) {
var err error
var msg *Message

if err = p.setNetworkTimeout(); err != nil {
return nil, err
}

if err = binary.Read(p.c, binary.BigEndian, &sz); err != nil {
return nil, err
}
Expand Down Expand Up @@ -78,6 +83,10 @@ func (p *conn) Send(msg *Message) error {
return nil
}

if err := p.setNetworkTimeout(); err != nil {
return err
}

// send length header
if err := binary.Write(p.c, binary.BigEndian, l); err != nil {
return err
Expand Down Expand Up @@ -181,6 +190,10 @@ func (p *conn) handshake(props []interface{}) error {
p.maxrx = int64(v.(int))
}

if err = p.setNetworkTimeout(); err != nil {
return err
}

h := connHeader{S: 'S', P: 'P', Proto: p.proto.Number()}
if err = binary.Write(p.c, binary.BigEndian, &h); err != nil {
return err
Expand All @@ -207,3 +220,16 @@ func (p *conn) handshake(props []interface{}) error {
p.open = true
return nil
}

// setNetworkTimeout sets the network I/O timeout on the connection.
func (p *conn) setNetworkTimeout() error {
v, err := p.sock.GetOption(OptionNetworkIoDeadline)
if err != nil {
return err
}
// Socket implementation ensures that option type is time.Duration.
if iotimeout := v.(time.Duration); iotimeout > 0 {
return p.c.SetDeadline(time.Now().Add(iotimeout))
}
return nil
}
13 changes: 11 additions & 2 deletions core.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,8 +46,9 @@ type socket struct {
recverr error // error to return on attempts to Recv()
senderr error // error to return on attempts to Send()

rdeadline time.Duration
wdeadline time.Duration
rdeadline time.Duration // mangos socket read deadline
wdeadline time.Duration // mangos socket write deadline
iodeadline time.Duration // IO timeout on an established connection/pipe
reconntime time.Duration // reconnect time after error or disconnect
reconnmax time.Duration // max reconnect interval
linger time.Duration
Expand Down Expand Up @@ -437,6 +438,10 @@ func (sock *socket) SetOption(name string, value interface{}) error {
sock.wdeadline = value.(time.Duration)
sock.Unlock()
return nil
case OptionNetworkIoDeadline:
sock.Lock()
sock.iodeadline = value.(time.Duration)
sock.Unlock()
case OptionLinger:
sock.Lock()
sock.linger = value.(time.Duration)
Expand Down Expand Up @@ -523,6 +528,10 @@ func (sock *socket) GetOption(name string) (interface{}, error) {
sock.Lock()
defer sock.Unlock()
return sock.wdeadline, nil
case OptionNetworkIoDeadline:
sock.Lock()
defer sock.Unlock()
return sock.iodeadline, nil
case OptionLinger:
sock.Lock()
defer sock.Unlock()
Expand Down
6 changes: 6 additions & 0 deletions options.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,12 @@ const (
// non-blocking operation. By default there is no timeout.
OptionSendDeadline = "SEND-DEADLINE"

// OptionNetworkIoDeadline is the time until a (read or write) network
// operation times out. The value is a time.Duration. Setting this
// option is recommended, since it helps listening servers to recover
// from broken connections (may otherwise hang indefinitely).
OptionNetworkIoDeadline = "NETWORK-IO-DEADLINE"

// OptionRetryTime is used by REQ. The argument is a time.Duration.
// When a request has not been replied to within the given duration,
// the request will automatically be resent to an available peer.
Expand Down

0 comments on commit 33ba76a

Please sign in to comment.