Skip to content

Commit

Permalink
Close connection when inactivity (goburrow#19)
Browse files Browse the repository at this point in the history
  • Loading branch information
nqv committed Mar 8, 2017
1 parent f7afd8d commit 9ffe737
Show file tree
Hide file tree
Showing 6 changed files with 276 additions and 80 deletions.
31 changes: 17 additions & 14 deletions asciiclient.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ import (
"bytes"
"encoding/hex"
"fmt"
"log"
"time"
)

const (
Expand All @@ -30,6 +30,8 @@ type ASCIIClientHandler struct {
func NewASCIIClientHandler(address string) *ASCIIClientHandler {
handler := &ASCIIClientHandler{}
handler.Address = address
handler.Timeout = serialTimeout
handler.IdleTimeout = serialIdleTimeout
return handler
}

Expand Down Expand Up @@ -156,23 +158,26 @@ func (mb *asciiPackager) Decode(adu []byte) (pdu *ProtocolDataUnit, err error) {
// asciiSerialTransporter implements Transporter interface.
type asciiSerialTransporter struct {
serialPort

Logger *log.Logger
}

func (mb *asciiSerialTransporter) Send(aduRequest []byte) (aduResponse []byte, err error) {
if !mb.isConnected {
if err = mb.Connect(); err != nil {
return
}
defer mb.Close()
}
if mb.Logger != nil {
mb.Logger.Printf("modbus: sending %q\n", aduRequest)
mb.serialPort.mu.Lock()
defer mb.serialPort.mu.Unlock()

// Make sure port is connected
if err = mb.serialPort.connect(); err != nil {
return
}
// Start the timer to close when idle
mb.serialPort.lastActivity = time.Now()
mb.serialPort.startCloseTimer()

// Send the request
mb.serialPort.logf("modbus: sending %q\n", aduRequest)
if _, err = mb.port.Write(aduRequest); err != nil {
return
}
// Get the response
var n int
var data [asciiMaxSize]byte
length := 0
Expand All @@ -192,9 +197,7 @@ func (mb *asciiSerialTransporter) Send(aduRequest []byte) (aduResponse []byte, e
}
}
aduResponse = data[:length]
if mb.Logger != nil {
mb.Logger.Printf("modbus: received %q\n", aduResponse)
}
mb.serialPort.logf("modbus: received %q\n", aduResponse)
return
}

Expand Down
28 changes: 13 additions & 15 deletions rtuclient.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ import (
"encoding/binary"
"fmt"
"io"
"log"
"time"
)

Expand All @@ -27,6 +26,8 @@ type RTUClientHandler struct {
func NewRTUClientHandler(address string) *RTUClientHandler {
handler := &RTUClientHandler{}
handler.Address = address
handler.Timeout = serialTimeout
handler.IdleTimeout = serialIdleTimeout
return handler
}

Expand Down Expand Up @@ -102,23 +103,22 @@ func (mb *rtuPackager) Decode(adu []byte) (pdu *ProtocolDataUnit, err error) {
return
}

// asciiSerialTransporter implements Transporter interface.
// rtuSerialTransporter implements Transporter interface.
type rtuSerialTransporter struct {
serialPort

Logger *log.Logger
}

func (mb *rtuSerialTransporter) Send(aduRequest []byte) (aduResponse []byte, err error) {
if !mb.isConnected {
if err = mb.Connect(); err != nil {
return
}
defer mb.Close()
}
if mb.Logger != nil {
mb.Logger.Printf("modbus: sending % x\n", aduRequest)
// Make sure port is connected
if err = mb.serialPort.connect(); err != nil {
return
}
// Start the timer to close when idle
mb.serialPort.lastActivity = time.Now()
mb.serialPort.startCloseTimer()

// Send the request
mb.serialPort.logf("modbus: sending %q\n", aduRequest)
if _, err = mb.port.Write(aduRequest); err != nil {
return
}
Expand Down Expand Up @@ -159,9 +159,7 @@ func (mb *rtuSerialTransporter) Send(aduRequest []byte) (aduResponse []byte, err
return
}
aduResponse = data[:n]
if mb.Logger != nil {
mb.Logger.Printf("modbus: received % x\n", aduResponse)
}
mb.serialPort.logf("modbus: received % x\n", aduResponse)
return
}

Expand Down
88 changes: 71 additions & 17 deletions serial.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,44 +5,98 @@
package modbus

import (
"io"
"log"
"sync"
"time"

"github.com/goburrow/serial"
)

const (
// Default timeout
serialTimeoutMillis = 5000
serialTimeout = 5 * time.Second
serialIdleTimeout = 60 * time.Second
)

// serialPort has configuration and I/O controller.
type serialPort struct {
// Serial port configuration.
serial.Config

Logger *log.Logger
IdleTimeout time.Duration

mu sync.Mutex
// port is platform-dependent data structure for serial port.
port serial.Port
// Read timeout
isConnected bool
port io.ReadWriteCloser
lastActivity time.Time
closeTimer *time.Timer
}

func (mb *serialPort) Connect() (err error) {
if mb.isConnected {
return
}
mb.mu.Lock()
defer mb.mu.Unlock()

return mb.connect()
}

// connect connects to the serial port if it is not connected. Caller must hold the mutex.
func (mb *serialPort) connect() error {
if mb.port == nil {
mb.port, err = serial.Open(&mb.Config)
} else {
err = mb.port.Open(&mb.Config)
port, err := serial.Open(&mb.Config)
if err != nil {
return err
}
mb.port = port
}
if err == nil {
mb.isConnected = true
return nil
}

func (mb *serialPort) Close() (err error) {
mb.mu.Lock()
defer mb.mu.Unlock()

return mb.close()
}

// close closes the serial port if it is connected. Caller must hold the mutex.
func (mb *serialPort) close() (err error) {
if mb.port != nil {
err = mb.port.Close()
mb.port = nil
}
return
}

func (mb *serialPort) Close() (err error) {
if !mb.isConnected {
func (mb *serialPort) logf(format string, v ...interface{}) {
if mb.Logger != nil {
mb.Logger.Printf(format, v...)
}
}

func (mb *serialPort) startCloseTimer() {
if mb.IdleTimeout <= 0 {
return
}
err = mb.port.Close()
mb.isConnected = false
return
if mb.closeTimer == nil {
mb.closeTimer = time.AfterFunc(mb.IdleTimeout, mb.closeIdle)
} else {
mb.closeTimer.Reset(mb.IdleTimeout)
}
}

// closeIdle closes the connection if last activity is passed behind IdleTimeout.
func (mb *serialPort) closeIdle() {
mb.mu.Lock()
defer mb.mu.Unlock()

if mb.IdleTimeout <= 0 {
return
}
idle := time.Now().Sub(mb.lastActivity)
if idle >= mb.IdleTimeout {
mb.logf("modbus: closing connection due to idle timeout: %v", idle)
mb.close()
}
}
36 changes: 36 additions & 0 deletions serial_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
package modbus

import (
"bytes"
"io"
"testing"
"time"
)

type nopCloser struct {
io.ReadWriter

closed bool
}

func (n *nopCloser) Close() error {
n.closed = true
return nil
}

func TestSerialCloseIdle(t *testing.T) {
port := &nopCloser{
ReadWriter: &bytes.Buffer{},
}
s := serialPort{
port: port,
IdleTimeout: 100 * time.Millisecond,
}
s.lastActivity = time.Now()
s.startCloseTimer()

time.Sleep(150 * time.Millisecond)
if !port.closed || s.port != nil {
t.Fatalf("serial port is not closed when inactivity: %+v", port)
}
}
Loading

0 comments on commit 9ffe737

Please sign in to comment.