Skip to content

Commit

Permalink
Fix publish for non-registered events
Browse files Browse the repository at this point in the history
Retry the connect() call until the client registers its token
  • Loading branch information
Praveenrajmani committed Feb 18, 2019
1 parent 572c08d commit febd502
Show file tree
Hide file tree
Showing 5 changed files with 39 additions and 6 deletions.
21 changes: 17 additions & 4 deletions client.go
Original file line number Diff line number Diff line change
Expand Up @@ -214,14 +214,16 @@ func (c *client) Connect() Token {
c.ibound = make(chan packets.ControlPacket)

go func() {
c.persist.Open()

c.setConnected(connecting)
c.errors = make(chan error, 1)
c.stop = make(chan struct{})

var rc byte
protocolVersion := c.options.ProtocolVersion
RECONN:

c.persist.Open()

if len(c.options.Servers) == 0 {
t.setError(fmt.Errorf("No servers defined to connect to"))
Expand Down Expand Up @@ -284,6 +286,13 @@ func (c *client) Connect() Token {

if c.conn == nil {
ERROR.Println(CLI, "Failed to connect to a broker")
if c.options.ConnectionRetry {
time.Sleep(5 * time.Second)
c.setConnected(reconnecting)
c.persist.Close()
goto RECONN
}

c.setConnected(disconnected)
c.persist.Close()
t.returnCode = rc
Expand Down Expand Up @@ -554,7 +563,7 @@ func (c *client) Publish(topic string, qos byte, retained bool, payload interfac
token := newToken(packets.Publish).(*PublishToken)
DEBUG.Println(CLI, "enter Publish")
switch {
case !c.IsConnected():
case !c.IsConnected() && !c.options.ConnectionRetry:
token.err = ErrNotConnected
token.flowComplete()
return token
Expand All @@ -578,13 +587,17 @@ func (c *client) Publish(topic string, qos byte, retained bool, payload interfac
}

if pub.Qos != 0 && pub.MessageID == 0 {
pub.MessageID = c.getID(token)
if !c.options.CleanSession && c.options.ConnectionRetry {
pub.MessageID = c.getAsyncID(token, uint16(len(c.persist.All())+1))
} else {
pub.MessageID = c.getID(token)
}
token.messageID = pub.MessageID
}
persistOutbound(c.persist, pub)
if c.connectionStatus() == reconnecting {
DEBUG.Println(CLI, "storing publish message (reconnecting), topic:", topic)
} else {
} else if c.IsConnected() {
DEBUG.Println(CLI, "sending publish message, topic:", topic)
c.obound <- &PacketAndToken{p: pub, t: token}
}
Expand Down
12 changes: 12 additions & 0 deletions messageids.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,18 @@ func (mids *messageIds) getID(t tokenCompletor) uint16 {
return 0
}

func (mids *messageIds) getAsyncID(t tokenCompletor, min uint16) uint16 {
mids.Lock()
defer mids.Unlock()
for i := min; i < midMax; i++ {
if _, ok := mids.index[i]; !ok {
mids.index[i] = t
return i
}
}
return 0
}

func (mids *messageIds) getToken(id uint16) tokenCompletor {
mids.RLock()
defer mids.RUnlock()
Expand Down
8 changes: 8 additions & 0 deletions options.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@ type ClientOptions struct {
MessageChannelDepth uint
ResumeSubs bool
HTTPHeaders http.Header
ConnectionRetry bool
}

// NewClientOptions will create a new ClientClientOptions type with some
Expand Down Expand Up @@ -112,6 +113,7 @@ func NewClientOptions() *ClientOptions {
MessageChannelDepth: 100,
ResumeSubs: false,
HTTPHeaders: make(map[string][]string),
ConnectionRetry: false,
}
return o
}
Expand Down Expand Up @@ -338,3 +340,9 @@ func (o *ClientOptions) SetHTTPHeaders(h http.Header) *ClientOptions {
o.HTTPHeaders = h
return o
}

// SetConnectionRetry retries the Connect() call until the client registers the token.
func (o *ClientOptions) SetConnectionRetry(retry bool) *ClientOptions {
o.ConnectionRetry = retry
return o
}
2 changes: 1 addition & 1 deletion trace.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ type (
NOOPLogger struct{}
)

func (NOOPLogger) Println(v ...interface{}) {}
func (NOOPLogger) Println(v ...interface{}) {}
func (NOOPLogger) Printf(format string, v ...interface{}) {}

// Internal levels of library output that are initialised to not print
Expand Down
2 changes: 1 addition & 1 deletion unit_client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,4 +104,4 @@ func Test_isConnectionOpenNegative(t *testing.T) {
if c.IsConnectionOpen() {
t.Fail()
}
}
}

0 comments on commit febd502

Please sign in to comment.