Skip to content

Commit

Permalink
refactor(mqtt): use atomic bool to keep running state of listener
Browse files Browse the repository at this point in the history
  • Loading branch information
gsalomao committed Apr 23, 2023
1 parent 044f47c commit b66e2b7
Showing 1 changed file with 5 additions and 19 deletions.
24 changes: 5 additions & 19 deletions internal/mqtt/listener.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"errors"
"net"
"sync"
"sync/atomic"

"github.com/gsalomao/maxmq/internal/logger"
"github.com/gsalomao/maxmq/internal/mqtt/handler"
Expand All @@ -38,8 +39,7 @@ type Listener struct {
connectionMgr *connectionManager
conf handler.Configuration
wg sync.WaitGroup
running bool
mtx sync.Mutex
running atomic.Bool
}

// NewListener creates a new MQTT Listener with the given options.
Expand Down Expand Up @@ -73,7 +73,7 @@ func (l *Listener) Start() error {

l.tcpLsn = lsn
l.connectionMgr.start()
l.setRunningState(true)
l.running.Store(true)
l.wg.Add(1)

l.log.Info().Msg("Listening on " + lsn.Addr().String())
Expand All @@ -89,7 +89,7 @@ func (l *Listener) Start() error {

tcpConn, err = lsn.Accept()
if err != nil {
if !l.isRunning() {
if !l.running.Load() {
break
}

Expand Down Expand Up @@ -117,7 +117,7 @@ func (l *Listener) Start() error {
func (l *Listener) Stop() {
l.log.Debug().Msg("Stopping listener")

l.setRunningState(false)
l.running.Store(false)
_ = l.tcpLsn.Close()
l.connectionMgr.stop()

Expand All @@ -128,17 +128,3 @@ func (l *Listener) Stop() {
func (l *Listener) Wait() {
l.wg.Wait()
}

func (l *Listener) setRunningState(st bool) {
l.mtx.Lock()
defer l.mtx.Unlock()

l.running = st
}

func (l *Listener) isRunning() bool {
l.mtx.Lock()
defer l.mtx.Unlock()

return l.running
}

0 comments on commit b66e2b7

Please sign in to comment.