Skip to content
Browse files

Extract common stopping logic

  • Loading branch information...
1 parent bb91f41 commit 8298c28cd98f7bd9b60f92fb8265f8d20151581e @pietern pietern committed
Showing with 73 additions and 84 deletions.
  1. +3 −42 client.go
  2. +3 −42 connection.go
  3. +67 −0 stopper.go
View
45 client.go
@@ -200,12 +200,9 @@ func (sr *subscriptionRegistry) Deliver(m *readMessage) {
type Client struct {
subscriptionRegistry
+ Stopper
cc chan *Connection
-
- // Stop channel channel, stop acknowledgement channel
- scc chan chan bool
- sackc chan bool
}
func NewClient() *Client {
@@ -215,9 +212,6 @@ func NewClient() *Client {
t.cc = make(chan *Connection)
- t.scc = make(chan chan bool, 1)
- t.sackc = nil
-
return t
}
@@ -283,25 +277,6 @@ func (t *Client) PublishAndConfirm(s string, m []byte) bool {
return t.publish(s, m, true)
}
-func (t *Client) Stop() {
- var sc chan bool
-
- select {
- case sc = <-t.scc:
- default:
- }
-
- if sc == nil {
- return
- }
-
- // Trigger stop
- close(sc)
-
- // Wait for acknowledgement
- <-t.sackc
-}
-
func (t *Client) runConnection(n net.Conn, sc chan bool) error {
var e error
var c *Connection
@@ -349,15 +324,8 @@ func (t *Client) Run(d Dialer, h Handshaker) error {
// There will not be more messages after Run returns
defer t.subscriptionRegistry.teardown()
- // Create stop acknowledgement channel
- // This doesn't need a lock because it can only be used after Stop() has
- // acquired the stop channel, which is not yet available at this point.
- t.sackc = make(chan bool)
- defer close(t.sackc)
-
- // Create stop channel
- var sc = make(chan bool)
- t.scc <- sc
+ var sc = t.MarkStart()
+ defer t.MarkStop()
var n net.Conn
var e error
@@ -382,13 +350,6 @@ func (t *Client) Run(d Dialer, h Handshaker) error {
}
}
- // Close stop channel if it is still available
- select {
- case sc = <-t.scc:
- close(sc)
- default:
- }
-
return nil
}
View
45 connection.go
@@ -19,9 +19,7 @@ type Connection struct {
rLock sync.Mutex
wLock sync.Mutex
- // Stop channel channel, stop acknowledgement channel
- scc chan chan bool
- sackc chan bool
+ Stopper
// Sequencer for PINGs/receiving corresponding PONGs
ps textproto.Pipeline
@@ -43,9 +41,6 @@ func NewConnection(rw io.ReadWriteCloser) *Connection {
c.rec = make(chan error, 1)
c.wec = make(chan error, 1)
- c.scc = make(chan chan bool, 1)
- c.sackc = nil
-
c.pc = make(chan bool)
c.oc = make(chan readObject)
@@ -189,43 +184,16 @@ func (c *Connection) WriteAndPing(o writeObject) bool {
return c.pingAndWaitForPong(w)
}
-func (c *Connection) Stop() {
- var sc chan bool
-
- select {
- case sc = <-c.scc:
- default:
- }
-
- if sc == nil {
- return
- }
-
- // Trigger stop
- close(sc)
-
- // Wait for acknowledgement
- <-c.sackc
-}
-
func (c *Connection) Run() error {
var r *bufio.Reader
var rc chan readObject
- var sc chan bool
r = c.acquireReader()
defer c.releaseReader()
rc = make(chan readObject)
- // Create stop acknowledgement channel
- // This doesn't need a lock because it can only be used after Stop() has
- // acquired the stop channel, which is not yet available at this point.
- c.sackc = make(chan bool)
- defer close(c.sackc)
-
- // Create stop channel
- sc = make(chan bool)
- c.scc <- sc
+ var sc = c.MarkStart()
+ defer c.MarkStop()
go func() {
var stop bool
@@ -287,13 +255,6 @@ func (c *Connection) Run() error {
// Close connection
c.rw.Close()
- // Close stop channel if it is still available
- select {
- case sc = <-c.scc:
- close(sc)
- default:
- }
-
// Can't receive more PONGs
close(c.pc)
View
67 stopper.go
@@ -0,0 +1,67 @@
+package nats
+
+import (
+ "sync"
+)
+
+type Stopper struct {
+ sync.Once
+
+ // Stop channel channel, stop acknowledgement channel
+ scc chan chan bool
+ sackc chan bool
+}
+
+func (s *Stopper) Init() {
+ s.Do(func() { s.scc = make(chan chan bool, 1) })
+}
+
+func (s *Stopper) Stop() {
+ var sc chan bool
+
+ s.Init()
+
+ select {
+ case sc = <-s.scc:
+ // Trigger stop
+ close(sc)
+
+ // Wait for acknowledgement
+ <-s.sackc
+
+ default:
+ return
+ }
+}
+
+func (s *Stopper) MarkStart() chan bool {
+ var sc chan bool
+
+ s.Init()
+
+ // Create stop acknowledgement channel
+ // This doesn't need a lock because it can only be used after Stop() has
+ // acquired the stop channel, which is not yet available at this point.
+ s.sackc = make(chan bool)
+
+ // Create stop channel
+ sc = make(chan bool)
+ s.scc <- sc
+
+ return sc
+}
+
+func (s *Stopper) MarkStop() {
+ var sc chan bool
+
+ // Close stop channel if it is still available
+ select {
+ case sc = <-s.scc:
+ close(sc)
+
+ default:
+ }
+
+ // Acknowledge stop by closing the acknowledgement channel
+ close(s.sackc)
+}

0 comments on commit 8298c28

Please sign in to comment.
Something went wrong with that request. Please try again.