Skip to content

Commit

Permalink
Merge branch 'iss31' into dev
Browse files Browse the repository at this point in the history
* iss31:
  Initial implementation of optional write deadlines.
  Merge wire writes into one call per frame.
  • Loading branch information
gmallard committed Apr 22, 2017
2 parents dd3ebeb + ef5a56e commit 02885f8
Show file tree
Hide file tree
Showing 9 changed files with 223 additions and 47 deletions.
3 changes: 2 additions & 1 deletion connect.go
Expand Up @@ -78,7 +78,8 @@ func Connect(n net.Conn, h Headers) (*Connection, error) {
DisconnectReceipt: MessageData{},
ssdc: make(chan struct{}),
wtrsdc: make(chan struct{}),
scc: 1}
scc: 1,
dld: &deadlineData{}}

// Bsaic metric data
c.mets = &metrics{st: time.Now()}
Expand Down
12 changes: 9 additions & 3 deletions data.go
Expand Up @@ -126,9 +126,10 @@ type Connection struct {
Hbrf bool // Indicates a heart beat read/receive failure, which is possibly transient. Valid for 1.1+ only.
Hbsf bool // Indicates a heart beat send failure, which is possibly transient. Valid for 1.1+ only.
logger *log.Logger
mets *metrics // Client metrics
scc int // Subscribe channel capacity
discLock sync.Mutex // DISCONNECT lock
mets *metrics // Client metrics
scc int // Subscribe channel capacity
discLock sync.Mutex // DISCONNECT lock
dld *deadlineData // Deadline data
}

type subscription struct {
Expand Down Expand Up @@ -368,3 +369,8 @@ const (
const (
StompPlusDrainAfter = "sng_drafter" // SUBSCRIBE Header
)

var (
LFB = []byte("\n")
ZRB = []byte{0}
)
70 changes: 70 additions & 0 deletions deadline_data.go
@@ -0,0 +1,70 @@
//
// Copyright © 2017 Guy M. Allard
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed, an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//

package stompngo

import "time"

/*
ExpiredNotification is a callback function, provided by the client
and called when a deadline expires. The err parameter will contain
the actual expired error. The rw parameter will be true if
the notification is for a write, and false otherwise.
*/
type ExpiredNotification func(err error, rw bool)

/*
DeadlineData controls the use of deadlines in network I/O.
*/
type deadlineData struct {
wde bool // Write deadline data enabled
wdld time.Duration // Write deadline duration
wds bool // True if duration has been set
//
dlnotify ExpiredNotification
dns bool // True if dlnotify has been set
}

/*
WriteDeadline sets the write deadline duration.
*/
func (c *Connection) WriteDeadline(d time.Duration) {
c.dld.wdld = d
c.dld.wds = true
}

/*
EnableWriteDeadline enables/disables the use of write deadlines.
*/
func (c *Connection) EnableWriteDeadline(e bool) {
c.dld.wde = e
}

/*
ExpiredNotification sets the expired notification callback function.
*/
func (c *Connection) ExpiredNotification(enf ExpiredNotification) {
c.dld.dlnotify = enf
c.dld.dns = true
}

/*
IsWriteDeadlineEnabled returns the current value of write deadline
enablement.
*/
func (c *Connection) IsWriteDeadlineEnabled() bool {
return c.dld.wde
}
45 changes: 45 additions & 0 deletions deadline_test.go
@@ -0,0 +1,45 @@
//
// Copyright © 2017 Guy M. Allard
//
// Licensed under the Apache License, Veridon 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permisidons and
// limitations under the License.
//

package stompngo

import (
"fmt"
"testing"
)

var _ = fmt.Println

/*
Test Deadline Enablement.
*/
func TestDeadlineEnablement(t *testing.T) {
n, _ = openConn(t)
ch := login_headers
conn, e = Connect(n, ch)
if e != nil {
t.Fatalf("TestDeadlineEnablement CONNECT expected nil, got %v\n", e)
}
//
dle := conn.IsWriteDeadlineEnabled()
if dle != wdleInit {
t.Errorf("TestDeadlineEnablement expected false, got true\n")
}
checkReceived(t, conn)
e = conn.Disconnect(empty_headers)
checkDisconnectError(t, e)
_ = closeConn(t, n)
}
36 changes: 36 additions & 0 deletions frame_methods.go
Expand Up @@ -16,6 +16,10 @@

package stompngo

import (
"bytes"
)

/*
Size returns the size of Frame on the wire, in bytes.
*/
Expand All @@ -24,3 +28,35 @@ func (f *Frame) Size(e bool) int64 {
r += int64(len(f.Command)) + 1 + f.Headers.Size(e) + 1 + int64(len(f.Body)) + 1
return r
}

/*
Bytes returns a byte slice of all frame data, ready for the wire
*/
func (f *Frame) Bytes(sclok bool) []byte {
b := make([]byte, 0, 8*1024)
b = append(b, f.Command+"\n"...)
hb := f.Headers.Bytes()
if len(hb) > 0 {
b = append(b, hb...)
}
b = append(b, "\n"...)
if len(f.Body) > 0 {
if sclok {
nz := bytes.IndexByte(f.Body, 0)
// fmt.Printf("WDBG41 ok:%v\n", nz)
if nz == 0 {
f.Body = []byte{}
// fmt.Printf("WDBG42 body:%v bodystring: %v\n", f.Body, string(f.Body))
} else if nz > 0 {
f.Body = f.Body[0:nz]
// fmt.Printf("WDBG43 body:%v bodystring: %v\n", f.Body, string(f.Body))
}
}
if len(f.Body) > 0 {
b = append(b, f.Body...)
}
}
b = append(b, ZRB...)
//
return b
}
11 changes: 11 additions & 0 deletions header_methods.go
Expand Up @@ -171,3 +171,14 @@ func (h Headers) String() string {
}
return string(b)
}

/*
Bytes returns a byte slice of the headers
*/
func (h Headers) Bytes() []byte {
b := make([]byte, 0, 1024)
for i := 0; i < len(h); i += 2 {
b = append(b, h[i]+":"+h[i+1]+"\n"...)
}
return b
}
21 changes: 21 additions & 0 deletions testdata_test.go
Expand Up @@ -889,6 +889,27 @@ const (
// None at present.
)

//=============================================================================
//= deadline_test type ========================================================
//=============================================================================
type (
// None at present.
)

//=============================================================================
//= deadline_test var =========================================================
//=============================================================================
var (
wdleInit = false // Enabled just after init
)

//=============================================================================
//= deadline_test const =======================================================
//=============================================================================
const (
// None at present.
)

//=============================================================================
//= for use by all type =======================================================
//=============================================================================
Expand Down
4 changes: 2 additions & 2 deletions version.go
Expand Up @@ -35,9 +35,9 @@ var (

minor = "0" // Minor

patch = "7" // Patch
//patch = "7" // Patch

// patch = "7.plvl.001" // Patch
patch = "7.plvl.001" // Patch
)

func Version() string {
Expand Down
68 changes: 27 additions & 41 deletions writer.go
Expand Up @@ -18,14 +18,15 @@ package stompngo

import (
"bufio"
"bytes"
"net"
// "bytes"
"strconv"
"time"
)

/*
Logical network writer. Read wiredata structures from the communication
channel, and put them on the wire.
channel, and put the frame on the wire.
*/
func (c *Connection) writer() {
writerLoop:
Expand Down Expand Up @@ -59,23 +60,28 @@ func (c *Connection) wireWrite(d wiredata) {
// fmt.Printf("WWD01 f:[%v]\n", f)
switch f.Command {
case "\n": // HeartBeat frame
if _, e := c.wtr.WriteString(f.Command); e != nil {
if c.dld.wde && c.dld.dns {
_ = c.netconn.SetWriteDeadline(time.Now().Add(c.dld.wdld))
}
_, e := c.wtr.WriteString(f.Command)
if e != nil {
if e.(net.Error).Timeout() {
if c.dld.dns {
c.dld.dlnotify(e, true)
}
}
d.errchan <- e
return
}
default: // Other frames
if e := f.writeFrame(c.wtr, c.Protocol()); e != nil {
if e := f.writeFrame(c.wtr, c); e != nil {
d.errchan <- e
return
}
if e := c.wtr.Flush(); e != nil {
d.errchan <- e
return
}
if e := c.wtr.WriteByte('\x00'); e != nil {
d.errchan <- e
return
}
}
if e := c.wtr.Flush(); e != nil {
d.errchan <- e
Expand All @@ -95,13 +101,9 @@ func (c *Connection) wireWrite(d wiredata) {
}

/*
Frame physical write.
Physical frame write to the wire.
*/
func (f *Frame) writeFrame(w *bufio.Writer, l string) error {
// Write the frame Command
if _, e := w.WriteString(f.Command + "\n"); e != nil {
return e
}
func (f *Frame) writeFrame(w *bufio.Writer, c *Connection) error {

var sctok bool
// Content type. Always add it if the client does not suppress and does not
Expand All @@ -124,39 +126,23 @@ func (f *Frame) writeFrame(w *bufio.Writer, l string) error {
}
}

// Write the frame Headers
for i := 0; i < len(f.Headers); i += 2 {
if l > SPL_10 && f.Command != CONNECT {
if c.Protocol() > SPL_10 && f.Command != CONNECT {
for i := 0; i < len(f.Headers); i += 2 {
f.Headers[i] = encode(f.Headers[i])
f.Headers[i+1] = encode(f.Headers[i+1])
}
_, e := w.WriteString(f.Headers[i] + ":" + f.Headers[i+1] + "\n")
if e != nil {
return e
}
}
// Write the last Header LF
if e := w.WriteByte('\n'); e != nil {
return e
if c.dld.wde && c.dld.dns {
_ = c.netconn.SetWriteDeadline(time.Now().Add(c.dld.wdld))
}
// fmt.Printf("WDBG40 ok:%v\n", sclok)
if sclok {
nz := bytes.IndexByte(f.Body, 0)
// fmt.Printf("WDBG41 ok:%v\n", nz)
if nz == 0 {
f.Body = []byte{}
// fmt.Printf("WDBG42 body:%v bodystring: %v\n", f.Body, string(f.Body))
} else if nz > 0 {
f.Body = f.Body[0:nz]
// fmt.Printf("WDBG43 body:%v bodystring: %v\n", f.Body, string(f.Body))
}
}
// Write the body
if len(f.Body) != 0 { // Foolish to write 0 length data
// fmt.Printf("WDBG99 body:%v bodystring: %v\n", f.Body, string(f.Body))
if _, e := w.Write(f.Body); e != nil {
return e
_, e := w.Write(f.Bytes(sclok))
if e != nil {
if e.(net.Error).Timeout() {
if c.dld.dns {
c.dld.dlnotify(e, true)
}
}
return e
}
return nil
}

0 comments on commit 02885f8

Please sign in to comment.