forked from eclipse/paho.golang
/
error.go
99 lines (84 loc) · 3.32 KB
/
error.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
package autopaho
import (
"fmt"
"sync"
"github.com/imkos/paho.golang/paho"
)
// errorHandler provides the onClientError callback function that will be called by the Paho library. The sole aim
// of this is to pass a single error onto the error channel (the library may send multiple errors; only the first
// will be processed).
// The callback userOnClientError will be called a maximum of one time. If userOnServerDisconnect is called, then
// userOnClientError will not be called (but there is a small chance that userOnClientError will be called followed
// by userOnServerDisconnect (if we encounter an error sending but there is a DISCONNECT in the queue).
type errorHandler struct {
debug paho.Logger
mu sync.Mutex
errChan chan error // receives connection errors
userOnClientError func(error) // User provided onClientError function
userOnServerDisconnect func(*paho.Disconnect) // User provided OnServerDisconnect function
}
// shutdown prevents any further calls from emitting a message
func (e *errorHandler) shutdown() {
e.mu.Lock()
defer e.mu.Unlock()
e.errChan = nil
}
// onClientError called by the paho library when an error occurs. We assume that the error is always fatal
func (e *errorHandler) onClientError(err error) {
if e.handleError(err) && e.userOnClientError != nil {
go e.userOnClientError(err)
}
}
// onClientError called by the paho library when the server requests a disconnection (for example, as part of a
// clean broker shutdown). We want to begin attempting to reconnect when this occurs (and pass a detectable error
// to the user)
func (e *errorHandler) onServerDisconnect(d *paho.Disconnect) {
e.handleError(&DisconnectError{err: fmt.Sprintf("server requested disconnect (reason: %d)", d.ReasonCode)})
if e.userOnServerDisconnect != nil {
go e.userOnServerDisconnect(d)
}
}
// handleError ensures that only a single error is sent to the channel (all errors go to the users OnClientError function)
// Returns true if the error was sent to the channel (i.e. this is the first error we have seen)
func (e *errorHandler) handleError(err error) bool {
e.mu.Lock()
errChan := e.errChan // prevent any chance of deadlock with concurrent call to e.shutdown
e.errChan = nil
e.mu.Unlock()
if errChan != nil {
e.debug.Printf("received error: %s", err)
errChan <- err
return true
}
e.debug.Printf("received extra error: %s", err)
return false
}
// DisconnectError will be passed when the server requests disconnection (allows this error type to be detected)
type DisconnectError struct{ err string }
func (d *DisconnectError) Error() string {
return d.err
}
// ConnackError will be passed when the server denies connection in CONNACK packet
type ConnackError struct {
ReasonCode byte // CONNACK reason code
Reason string // CONNACK Reason string from properties
Err error // underlying error
}
// NewConnackError returns a new ConnackError
func NewConnackError(err error, connack *paho.Connack) *ConnackError {
reason := ""
if connack.Properties != nil {
reason = connack.Properties.ReasonString
}
return &ConnackError{
ReasonCode: connack.ReasonCode,
Reason: reason,
Err: err,
}
}
func (c *ConnackError) Error() string {
return fmt.Sprintf("server denied connect (reason: %d): %s", c.ReasonCode, c.Err)
}
func (c *ConnackError) Unwrap() error {
return c.Err
}