diff --git a/client.go b/client.go index d2a347d0..cb9a7ba6 100644 --- a/client.go +++ b/client.go @@ -206,8 +206,8 @@ func (c *client) setConnected(status uint32) { atomic.StoreUint32(&c.status, status) } -//ErrNotConnected is the error returned from function calls that are -//made when the client is not connected to a broker +// ErrNotConnected is the error returned from function calls that are +// made when the client is not connected to a broker var ErrNotConnected = errors.New("not Connected") // Connect will create a connection to the message broker, by default @@ -438,7 +438,8 @@ func (c *client) forceDisconnect() { func (c *client) disconnect() { done := c.stopCommsWorkers() if done != nil { - <-done // Wait until the disconect is complete (to limit chance that another connection will be started) + <-done // Wait until the disconnect is complete (to limit chance that another connection will be started) + DEBUG.Println(CLI, "forcefully disconnecting") c.messageIds.cleanUp() DEBUG.Println(CLI, "disconnected") c.persist.Close() @@ -498,12 +499,11 @@ func (c *client) startCommsWorkers(conn net.Conn, inboundFromStore <-chan packet go keepalive(c, conn) } + // matchAndDispatch will process messages received from the network. It may generate acknowledgements + // It will complete when incomingPubChan is closed and will close ackOut prior to exiting incomingPubChan := make(chan *packets.PublishPacket) - c.workers.Add(1) - go func() { - c.msgRouter.matchAndDispatch(incomingPubChan, c.options.Order, c) - c.workers.Done() - }() + c.workers.Add(1) // Done will be called when ackOut is closed + ackOut := c.msgRouter.matchAndDispatch(incomingPubChan, c.options.Order, c) c.setConnected(connected) DEBUG.Println(CLI, "client is connected/reconnected") @@ -526,7 +526,20 @@ func (c *client) startCommsWorkers(conn net.Conn, inboundFromStore <-chan packet commsoboundP <- msg case msg := <-c.obound: commsobound <- msg + case msg, ok := <-ackOut: + if !ok { + ackOut = nil // ignore channel going forward + c.workers.Done() // matchAndDispatch has completed + } + commsoboundP <- msg case <-c.stop: + // Attempt to transmit any outstanding acknowledgements (this may well fail but should work if this is a clean disconnect) + if ackOut != nil { + for msg := range ackOut { + commsoboundP <- msg + } + c.workers.Done() // matchAndDispatch has completed + } close(commsoboundP) // Nothing sending to these channels anymore so close them and allow comms routines to exit close(commsobound) DEBUG.Println(CLI, "startCommsWorkers output redirector finnished") @@ -535,19 +548,19 @@ func (c *client) startCommsWorkers(conn net.Conn, inboundFromStore <-chan packet } }() - commsIncommingPub, commsErrors := startComms(c.conn, c, inboundFromStore, commsoboundP, commsobound) + commsIncomingPub, commsErrors := startComms(c.conn, c, inboundFromStore, commsoboundP, commsobound) c.commsStopped = make(chan struct{}) go func() { for { - if commsIncommingPub == nil && commsErrors == nil { + if commsIncomingPub == nil && commsErrors == nil { break } select { - case pub, ok := <-commsIncommingPub: + case pub, ok := <-commsIncomingPub: if !ok { - // Incomming comms has shutdown + // Incoming comms has shutdown close(incomingPubChan) // stop the router - commsIncommingPub = nil + commsIncomingPub = nil continue } incomingPubChan <- pub @@ -561,7 +574,7 @@ func (c *client) startCommsWorkers(conn net.Conn, inboundFromStore <-chan packet continue } } - DEBUG.Println(CLI, "comms goroutine done") + DEBUG.Println(CLI, "incoming comms goroutine done") close(c.commsStopped) }() DEBUG.Println(CLI, "startCommsWorkers done") @@ -999,8 +1012,8 @@ func (c *client) OptionsReader() ClientOptionsReader { return r } -//DefaultConnectionLostHandler is a definition of a function that simply -//reports to the DEBUG log the reason for the client losing a connection. +// DefaultConnectionLostHandler is a definition of a function that simply +// reports to the DEBUG log the reason for the client losing a connection. func DefaultConnectionLostHandler(client Client, reason error) { DEBUG.Println("Connection lost:", reason.Error()) } diff --git a/router.go b/router.go index 947b7b70..994c0271 100644 --- a/router.go +++ b/router.go @@ -131,47 +131,52 @@ func (r *router) setDefaultHandler(handler MessageHandler) { // takes messages off the channel, matches them against the internal route list and calls the // associated callback (or the defaultHandler, if one exists and no other route matched). If // anything is sent down the stop channel the function will end. -func (r *router) matchAndDispatch(messages <-chan *packets.PublishPacket, order bool, client *client) { - for message := range messages { - // DEBUG.Println(ROU, "matchAndDispatch received message") - sent := false - r.RLock() - m := messageFromPublish(message, ackFunc(client.oboundP, client.persist, message)) - handlers := []MessageHandler{} - for e := r.routes.Front(); e != nil; e = e.Next() { - if e.Value.(*route).match(message.TopicName) { - if order { - handlers = append(handlers, e.Value.(*route).callback) - } else { - hd := e.Value.(*route).callback - go func() { - hd(client, m) - m.Ack() - }() +func (r *router) matchAndDispatch(messages <-chan *packets.PublishPacket, order bool, client *client) <-chan *PacketAndToken { + ackChan := make(chan *PacketAndToken) + go func() { + for message := range messages { + // DEBUG.Println(ROU, "matchAndDispatch received message") + sent := false + r.RLock() + m := messageFromPublish(message, ackFunc(ackChan, client.persist, message)) + handlers := []MessageHandler{} + for e := r.routes.Front(); e != nil; e = e.Next() { + if e.Value.(*route).match(message.TopicName) { + if order { + handlers = append(handlers, e.Value.(*route).callback) + } else { + hd := e.Value.(*route).callback + go func() { + hd(client, m) + m.Ack() + }() + } + sent = true } - sent = true } - } - if !sent { - if r.defaultHandler != nil { - if order { - handlers = append(handlers, r.defaultHandler) + if !sent { + if r.defaultHandler != nil { + if order { + handlers = append(handlers, r.defaultHandler) + } else { + go func() { + r.defaultHandler(client, m) + m.Ack() + }() + } } else { - go func() { - r.defaultHandler(client, m) - m.Ack() - }() + DEBUG.Println(ROU, "matchAndDispatch received message and no handler was available. Message will NOT be acknowledged.") } - } else { - DEBUG.Println(ROU, "matchAndDispatch received message and no handler was available. Message will NOT be acknowledged.") } + r.RUnlock() + for _, handler := range handlers { + handler(client, m) + m.Ack() + } + // DEBUG.Println(ROU, "matchAndDispatch handled message") } - r.RUnlock() - for _, handler := range handlers { - handler(client, m) - m.Ack() - } - // DEBUG.Println(ROU, "matchAndDispatch handled message") - } - DEBUG.Println(ROU, "matchAndDispatch exiting") + close(ackChan) + DEBUG.Println(ROU, "matchAndDispatch exiting") + }() + return ackChan }