Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Ignore and retry Connect() for non-registered clients with an option #281

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
21 changes: 17 additions & 4 deletions client.go
Original file line number Diff line number Diff line change
Expand Up @@ -214,14 +214,16 @@ func (c *client) Connect() Token {
c.ibound = make(chan packets.ControlPacket)

go func() {
c.persist.Open()

c.setConnected(connecting)
c.errors = make(chan error, 1)
c.stop = make(chan struct{})

var rc byte
protocolVersion := c.options.ProtocolVersion
RECONN:

c.persist.Open()

if len(c.options.Servers) == 0 {
t.setError(fmt.Errorf("No servers defined to connect to"))
Expand Down Expand Up @@ -284,6 +286,13 @@ func (c *client) Connect() Token {

if c.conn == nil {
ERROR.Println(CLI, "Failed to connect to a broker")
if c.options.ConnectionRetry {
time.Sleep(5 * time.Second)
c.setConnected(reconnecting)
c.persist.Close()
goto RECONN
}

c.setConnected(disconnected)
c.persist.Close()
t.returnCode = rc
Expand Down Expand Up @@ -554,7 +563,7 @@ func (c *client) Publish(topic string, qos byte, retained bool, payload interfac
token := newToken(packets.Publish).(*PublishToken)
DEBUG.Println(CLI, "enter Publish")
switch {
case !c.IsConnected():
case !c.IsConnected() && !c.options.ConnectionRetry:
token.err = ErrNotConnected
token.flowComplete()
return token
Expand All @@ -578,13 +587,17 @@ func (c *client) Publish(topic string, qos byte, retained bool, payload interfac
}

if pub.Qos != 0 && pub.MessageID == 0 {
pub.MessageID = c.getID(token)
if !c.options.CleanSession && c.options.ConnectionRetry {
pub.MessageID = c.getAsyncID(token, uint16(len(c.persist.All())+1))
} else {
pub.MessageID = c.getID(token)
}
token.messageID = pub.MessageID
}
persistOutbound(c.persist, pub)
if c.connectionStatus() == reconnecting {
DEBUG.Println(CLI, "storing publish message (reconnecting), topic:", topic)
} else {
} else if c.IsConnected() {
DEBUG.Println(CLI, "sending publish message, topic:", topic)
c.obound <- &PacketAndToken{p: pub, t: token}
}
Expand Down
12 changes: 12 additions & 0 deletions messageids.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,18 @@ func (mids *messageIds) getID(t tokenCompletor) uint16 {
return 0
}

func (mids *messageIds) getAsyncID(t tokenCompletor, min uint16) uint16 {
mids.Lock()
defer mids.Unlock()
for i := min; i < midMax; i++ {
if _, ok := mids.index[i]; !ok {
mids.index[i] = t
return i
}
}
return 0
}

func (mids *messageIds) getToken(id uint16) tokenCompletor {
mids.RLock()
defer mids.RUnlock()
Expand Down
8 changes: 8 additions & 0 deletions options.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@ type ClientOptions struct {
MessageChannelDepth uint
ResumeSubs bool
HTTPHeaders http.Header
ConnectionRetry bool
}

// NewClientOptions will create a new ClientClientOptions type with some
Expand Down Expand Up @@ -112,6 +113,7 @@ func NewClientOptions() *ClientOptions {
MessageChannelDepth: 100,
ResumeSubs: false,
HTTPHeaders: make(map[string][]string),
ConnectionRetry: false,
}
return o
}
Expand Down Expand Up @@ -338,3 +340,9 @@ func (o *ClientOptions) SetHTTPHeaders(h http.Header) *ClientOptions {
o.HTTPHeaders = h
return o
}

// SetConnectionRetry retries the Connect() call until the client registers the token.
func (o *ClientOptions) SetConnectionRetry(retry bool) *ClientOptions {
o.ConnectionRetry = retry
return o
}
2 changes: 1 addition & 1 deletion trace.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ type (
NOOPLogger struct{}
)

func (NOOPLogger) Println(v ...interface{}) {}
func (NOOPLogger) Println(v ...interface{}) {}
func (NOOPLogger) Printf(format string, v ...interface{}) {}

// Internal levels of library output that are initialised to not print
Expand Down
2 changes: 1 addition & 1 deletion unit_client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,4 +104,4 @@ func Test_isConnectionOpenNegative(t *testing.T) {
if c.IsConnectionOpen() {
t.Fail()
}
}
}