Skip to content

Commit

Permalink
Version v1.0.2. Commit list:
Browse files Browse the repository at this point in the history
* dev:
  Bump version to v1.0.2.
  Help stomp 1.0 users that subscribe without a sub ID.
  Add subscription data area, which:
  Relax locking strategy for subscription locks.
  Correct variable typo.
  Add NMSGS to supported environment variables.
  Add STOMP_DEST to supported environment variables.
  Move environment get logic to individual methods.
  Enhange stompngo environment variable helper.
  Copyright date updates.
  Start next version work.
  • Loading branch information
gmallard committed Jul 5, 2016
2 parents a488ae2 + 42ac5b1 commit b1f1d96
Show file tree
Hide file tree
Showing 10 changed files with 143 additions and 57 deletions.
2 changes: 1 addition & 1 deletion connect.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ func Connect(n net.Conn, h Headers) (*Connection, error) {
connected: false,
session: "",
protocol: SPL_10,
subs: make(map[string]chan MessageData),
subs: make(map[string]*subscription),
DisconnectReceipt: MessageData{},
scc: 1}

Expand Down
10 changes: 6 additions & 4 deletions connection.go
Original file line number Diff line number Diff line change
Expand Up @@ -203,9 +203,10 @@ func (c *Connection) shutdown() {
// Stop writer go routine
c.wsd <- true
// Close all individual subscribe channels
// This is a write lock
c.subsLock.Lock()
for key := range c.subs {
close(c.subs[key])
close(c.subs[key].md)
}
c.connected = false
c.subsLock.Unlock()
Expand All @@ -220,13 +221,14 @@ func (c *Connection) handleReadError(md MessageData) {
// Notify any general subscriber of error
c.input <- md
// Notify all individual subscribers of error
c.subsLock.Lock()
// This is a read lock
c.subsLock.RLock()
if c.connected {
for key := range c.subs {
c.subs[key] <- md
c.subs[key].md <- md
}
}
c.subsLock.Unlock()
c.subsLock.RUnlock()
// Let further shutdown logic proceed normally.
return
}
10 changes: 8 additions & 2 deletions data.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,8 +116,8 @@ type Connection struct {
input chan MessageData
output chan wiredata
netconn net.Conn
subs map[string]chan MessageData
subsLock sync.Mutex
subs map[string]*subscription
subsLock sync.RWMutex
wsd chan bool // writer shutdown
rsd chan bool // reader shutdown
hbd *heartBeatData
Expand All @@ -131,6 +131,12 @@ type Connection struct {
discLock sync.Mutex // DISCONNECT lock
}

type subscription struct {
md chan MessageData // Subscription specific MessageData channel
id string // Subscription id (unique, self reference)
am string // ACK mode for this subscription
}

/*
Error definition.
*/
Expand Down
8 changes: 5 additions & 3 deletions reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,10 +50,12 @@ func (c *Connection) reader() {
// Headers already decoded
c.mets.tbr += m.Size(false) // Total bytes read
d := MessageData{m, e}
// TODO ? Maybe ? Rethink this logic.
if sid, ok := f.Headers.Contains("subscription"); ok {
c.subsLock.Lock()
c.subs[sid] <- d
c.subsLock.Unlock()
// This is a read lock
c.subsLock.RLock()
c.subs[sid].md <- d
c.subsLock.RUnlock()
} else {
c.input <- d
}
Expand Down
89 changes: 69 additions & 20 deletions senv/senv.go
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
//
// Copyright © 2014-2015 Guy M. Allard
// Copyright © 2014-2016 Guy M. Allard
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
Expand All @@ -24,20 +24,26 @@
package senv

import (
"fmt"
"os"
"strconv"
)

var (
host = "localhost" // default host
port = "61613" // default port
protocol = "1.2" // Default protocol level
login = "guest" // default login
passcode = "guest" // default passcode
vhost = "localhost" // default vhost
host = "localhost" // default host
port = "61613" // default port
protocol = "1.2" // Default protocol level
login = "guest" // default login
passcode = "guest" // default passcode
vhost = "localhost" // default vhost
heartbeats = "0,0" // default (no) heartbeats
dest = "/queue/sample.stomp.destination" // default destination
nmsgs = 1 // default number of messages (useful at times)
)

// Host returns a default connection hostname.
func Host() string {
// Host
he := os.Getenv("STOMP_HOST")
if he != "" {
host = he
Expand All @@ -47,9 +53,10 @@ func Host() string {

// Port returns a default connection port.
func Port() string {
pe := os.Getenv("STOMP_PORT")
if pe != "" {
port = pe
// Port
pt := os.Getenv("STOMP_PORT")
if pt != "" {
port = pt
}
return port
}
Expand All @@ -61,15 +68,17 @@ func HostAndPort() (string, string) {

// Protocol returns a default level.
func Protocol() string {
p := os.Getenv("STOMP_PROTOCOL")
if p != "" {
protocol = p
// Protocol
pr := os.Getenv("STOMP_PROTOCOL")
if pr != "" {
protocol = pr
}
return protocol
}

// Login returns a default login ID.
func Login() string {
// Login
l := os.Getenv("STOMP_LOGIN")
if l != "" {
login = l
Expand All @@ -82,21 +91,61 @@ func Login() string {

// Passcode returns a default passcode.
func Passcode() string {
p := os.Getenv("STOMP_PASSCODE")
if p != "" {
passcode = p
// Passcode
pc := os.Getenv("STOMP_PASSCODE")
if pc != "" {
passcode = pc
}
if p == "NONE" {
if pc == "NONE" {
passcode = ""
}
return passcode
}

// Vhost returns a default vhost name.
func Vhost() string {
ve := os.Getenv("STOMP_VHOST")
if ve != "" {
vhost = ve
// Vhost
vh := os.Getenv("STOMP_VHOST")
if vh != "" {
vhost = vh
} else {
vhost = Host()
}
return vhost
}

// Heartbeats returns client requested heart beat values.
func Heartbeats() string {
// Heartbeats
hb := os.Getenv("STOMP_HEARTBEATS")
if hb != "" {
heartbeats = hb
}
return heartbeats
}

// Destination
func Dest() string {
// Destination
de := os.Getenv("STOMP_DEST")
if de != "" {
dest = de
}
return dest
}

// Number of messages
func Nmsgs() int {
// Number of messages
ns := os.Getenv("STOMP_NMSGS")
if ns == "" {
return nmsgs
}
n, e := strconv.ParseInt(ns, 10, 0)
if e != nil {
fmt.Printf("NMSGS Conversion error: %v\n", e)
return nmsgs
}
nmsgs = int(n)
return nmsgs
}
14 changes: 13 additions & 1 deletion senv/senv_test.go
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
//
// Copyright © 2014-2015 Guy M. Allard
// Copyright © 2014-2016 Guy M. Allard
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -53,4 +53,16 @@ func TestSenvDefaults(t *testing.T) {
if v != "localhost" {
t.Errorf("Senv Vhost, expected [%s], got [%s]\n", "localhost", v)
}
//
d := Dest()
if d != "/queue/sample.stomp.destination" {
t.Errorf("Senv Dest, expected [%s], got [%s]\n",
"/queue/sample.stomp.destination", d)
}
//
n := Nmsgs()
if n != 1 {
t.Errorf("Senv Nmsgs, expected [%d], got [%d]\n",
1, n)
}
}
35 changes: 24 additions & 11 deletions subscribe.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ func (c *Connection) Subscribe(h Headers) (<-chan MessageData, error) {
if _, ok := ch.Contains("ack"); !ok {
ch = append(ch, "ack", "auto")
}
s, e, ch := c.establishSubscription(ch)
sub, e, ch := c.establishSubscription(ch)
if e != nil {
return nil, e
}
Expand All @@ -75,13 +75,14 @@ func (c *Connection) Subscribe(h Headers) (<-chan MessageData, error) {
c.output <- wiredata{f, r}
e = <-r
c.log(SUBSCRIBE, "end", ch, c.Protocol())
return s, e
return sub.md, e
}

/*
Handle subscribe id.
*/
func (c *Connection) establishSubscription(h Headers) (<-chan MessageData, error, Headers) {
func (c *Connection) establishSubscription(h Headers) (*subscription, error, Headers) {
// This is a write lock
c.subsLock.Lock()
defer c.subsLock.Unlock()
//
Expand All @@ -99,21 +100,33 @@ func (c *Connection) establishSubscription(h Headers) (<-chan MessageData, error
}
//

sd := new(subscription) // New subscription data
lam := "auto" // Default/used ACK mode
if ham, ok := h.Contains("ack"); ok {
lam = ham // Reset (possible) used ack mode
}

sd.md = make(chan MessageData, c.scc) // Make subscription MD channel
sd.am = lam // Set subscription ack mode

if c.Protocol() == SPL_10 {
if hid { // If 1.0 client wants one, assign it.
c.subs[id] = make(chan MessageData, c.scc)
sd.id = id // Set subscription ID
} else {
return c.input, nil, h // 1.0 clients with no id take their own chances
// Try to help 1.0 clients that subscribe without using an 'id' header
ds, _ := h.Contains("destination") // Destination exists or we would not be here
nsid := Sha1(ds) // This will be unique for a given estination
sd.id = nsid // for 1.0 with no ID, allow 1 subscribe per destination
h = h.Add("id", nsid) // Add unique id to the headers
}
} else { // 1.1+
if hid { // Client specified id
c.subs[id] = make(chan MessageData, c.scc) // Assign subscription
sd.id = id // Set subscription ID
} else {
h = h.Add("id", uuid1)
c.subs[uuid1] = make(chan MessageData, c.scc) // Assign subscription
id = uuid1 // reset
h = h.Add("id", uuid1) // Add unique id to the headers
sd.id = uuid1 // Set subscription ID to that
}
}

return c.subs[id], nil, h
c.subs[sd.id] = sd // Add subscription to the connection subscription map
return sd, nil, h // Return the subscription pointer
}
13 changes: 7 additions & 6 deletions unsubscribe.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,14 +44,15 @@ func (c *Connection) Unsubscribe(h Headers) error {

//
_, okd := h.Contains("destination")
sid, oki := h.Contains("id")
hid, oki := h.Contains("id")
if !okd && !oki {
return EREQDIUNS
}

c.subsLock.Lock()
_, p := c.subs[sid]
c.subsLock.Unlock()
// This is a read lock
c.subsLock.RLock()
_, p := c.subs[hid]
c.subsLock.RUnlock()

switch c.Protocol() {
case SPL_12:
Expand Down Expand Up @@ -87,9 +88,9 @@ func (c *Connection) Unsubscribe(h Headers) error {
}

if oki {
// This is a write lock
c.subsLock.Lock()
close(c.subs[sid])
delete(c.subs, sid)
delete(c.subs, hid)
c.subsLock.Unlock()
}
c.log(UNSUBSCRIBE, "end", h)
Expand Down
11 changes: 6 additions & 5 deletions utils_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -146,11 +146,12 @@ func sendMultipleBytes(md multi_send_data) error {
Test helper.
*/
func getMessageData(c *Connection, s <-chan MessageData) (r MessageData) {
if os.Getenv("STOMP_TEST11p") == "" {
r = <-c.MessageData
} else {
r = <-s
}
//
// With other parts of this change, we should not see any data from the
// c.MessageData channel here. Attempting to read from that source will hang
// with a 1.0 client.
//
r = <-s
return r
}

Expand Down
8 changes: 4 additions & 4 deletions version.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,10 +29,10 @@ import (
)

var (
pref = "v" // Prefix
major = "1" // Major
minor = "0" // Minor
patch = "1" // Patch
pref = "v" // Prefix
major = "1" // Major
minor = "0" // Minor
patch = "2" // Patch
)

func Version() string {
Expand Down

0 comments on commit b1f1d96

Please sign in to comment.