forked from giabao/paho-akka
-
Notifications
You must be signed in to change notification settings - Fork 0
/
MqttPubSub.scala
265 lines (222 loc) · 8.67 KB
/
MqttPubSub.scala
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
package com.sandinh.paho.akka
import java.net.{URLDecoder, URLEncoder}
import akka.actor._
import org.eclipse.paho.client.mqttv3._
import MqttConnectOptions.CLEAN_SESSION_DEFAULT
import scala.collection.mutable.ListBuffer
import scala.concurrent.duration._
object MqttPubSub {
private val logger = org.log4s.getLogger
//++++ public message classes ++++//
class Publish(val topic: String, payload: Array[Byte], qos: Int = 0) {
def message() = {
val msg = new MqttMessage(payload)
msg.setQos(qos)
msg
}
}
/** TODO support wildcards subscription */
case class Subscribe(topic: String, ref: ActorRef, qos: Int = 0)
case class SubscribeAck(subscribe: Subscribe)
case object SubscribeSuccess
case class SubscribeFailure(e: Throwable)
case object ConnectedToBroker
class Message(val topic: String, val payload: Array[Byte])
//++++ FSM stuff ++++//
sealed trait S
case object SDisconnected extends S
case object SConnected extends S
//++++ internal FSM event messages ++++//
private case object Connect
private case object Connected
private case object Disconnected
/** each topic is managed by a Topic actor - which is a child actor of MqttPubSub FSM - with the same name as the topic */
private class Topic extends Actor {
private[this] var subscribers = Set.empty[ActorRef]
def receive = {
case msg: Message =>
subscribers foreach (_ ! msg)
case msg @ Subscribe(_, ref, _) =>
context watch ref
subscribers += ref
ref ! SubscribeAck(msg)
case Terminated(ref) =>
subscribers -= ref
if (subscribers.isEmpty) context stop self
}
}
private class PubSubMqttCallback(owner: ActorRef) extends MqttCallback {
def connectionLost(cause: Throwable): Unit = {
logger.error(cause)("connection lost")
owner ! Disconnected
}
/** only logging */
def deliveryComplete(token: IMqttDeliveryToken): Unit = {
logger.debug("delivery complete " + java.util.Arrays.toString(token.getTopics.asInstanceOf[Array[AnyRef]]))
}
def messageArrived(topic: String, message: MqttMessage): Unit = {
logger.debug(s"message arrived $topic")
owner ! new Message(topic, message.getPayload)
}
}
private class ConnListener(owner: ActorRef) extends IMqttActionListener {
def onSuccess(asyncActionToken: IMqttToken): Unit = {
logger.info("connected")
owner ! Connected
}
def onFailure(asyncActionToken: IMqttToken, e: Throwable): Unit = {
logger.error(e)("connect failed")
owner ! Disconnected
}
}
private class SubscribeListener(owner: ActorRef) extends IMqttActionListener {
def onSuccess(asyncActionToken: IMqttToken): Unit = {
logger.info("subscribed to " + asyncActionToken.getTopics.mkString("[", ",", "]"))
owner ! SubscribeSuccess
}
def onFailure(asyncActionToken: IMqttToken, e: Throwable): Unit = {
logger.error(e)("subscribe failed to " + asyncActionToken.getTopics.mkString("[", ",", "]"))
owner ! SubscribeFailure(e)
}
}
//ultilities
@inline private def urlEnc(s: String) = URLEncoder.encode(s, "utf-8")
@inline private def urlDec(s: String) = URLDecoder.decode(s, "utf-8")
/** @param brokerUrl ex tcp://test.mosquitto.org:1883
* @param userName nullable
* @param password nullable
* @param stashTimeToLive messages received when disconnected will be stash.
* Messages isOverdue after stashTimeToLive will be discard. See also `stashCapacity`
* @param stashCapacity pubSubStash will be drop first haft elems when reach this size
* @param reconnectDelayMin when received Disconnected event, we will first delay reconnectDelayMin to try Connect.
* + if connect success => we reinit connectCount
* + else => ConnListener.onFailure will send Disconnected to this FSM =>
* we re-schedule Connect with {{{delay = reconnectDelayMin * 2^connectCount}}}
* @param reconnectDelayMax max delay to retry connecting
* @param cleanSession Sets whether the client and server should remember state across restarts and reconnects.
*/
case class PSConfig(
brokerUrl: String,
userName: String = null,
password: String = null,
stashTimeToLive: FiniteDuration = 1.minute,
stashCapacity: Int = 8000,
reconnectDelayMin: FiniteDuration = 10.millis,
reconnectDelayMax: FiniteDuration = 30.seconds,
cleanSession: Boolean = CLEAN_SESSION_DEFAULT
) {
//pre-calculate the max of connectCount that: reconnectDelayMin * 2^connectCountMax ~ reconnectDelayMax
val connectCountMax = Math.floor(Math.log(reconnectDelayMax / reconnectDelayMin) / Math.log(2)).toInt
def connectDelay(connectCount: Int) =
if (connectCount >= connectCountMax) reconnectDelayMax
else reconnectDelayMin * (1L << connectCount)
/** MqttConnectOptions */
lazy val conOpt = {
val opt = new MqttConnectOptions
if (userName != null) opt.setUserName(userName)
if (password != null) opt.setPassword(password.toCharArray)
opt.setCleanSession(cleanSession)
opt
}
}
}
import MqttPubSub._
/** Notes:
* 1. MqttClientPersistence will be set to null. @see org.eclipse.paho.client.mqttv3.MqttMessage#setQos(int)
* 2. MQTT client will auto-reconnect
*/
class MqttPubSub(cfg: PSConfig, connectListener: Option[ActorRef] = None) extends FSM[S, Unit] {
//setup MqttAsyncClient without MqttClientPersistence
private[this] val client = {
val c = new MqttAsyncClient(cfg.brokerUrl, MqttAsyncClient.generateClientId(), null)
c.setCallback(new PubSubMqttCallback(self))
c
}
//setup Connnection IMqttActionListener
private[this] val conListener = new ConnListener(self)
//use to stash the pub-sub messages when disconnected
//note that we do NOT store the sender() in to the stash as in akka.actor.StashSupport#theStash
private[this] val pubSubStash = ListBuffer.empty[(Deadline, Any)]
//reconnect attempt count, reset when connect success
private[this] var connectCount = 0
//++++ FSM logic ++++
startWith(SDisconnected, Unit)
when(SDisconnected) {
case Event(Connect, _) =>
logger.info(s"connecting to ${cfg.brokerUrl}..")
//only receive Connect when client.isConnected == false so its safe here to call client.connect
try {
client.connect(cfg.conOpt, null, conListener)
} catch {
case e: Exception =>
logger.error(e)(s"can't connect to $cfg")
delayConnect()
}
connectCount += 1
stay()
case Event(Connected, _) =>
connectCount = 0
for ((deadline, x) <- pubSubStash if deadline.hasTimeLeft()) self ! x
pubSubStash.clear()
connectListener.foreach { listener => listener ! ConnectedToBroker }
goto(SConnected)
case Event(x @ (_: Publish | _: Subscribe), _) =>
if (pubSubStash.length > cfg.stashCapacity) {
pubSubStash.remove(0, cfg.stashCapacity / 2)
}
pubSubStash += Tuple2(Deadline.now + cfg.stashTimeToLive, x)
stay()
}
when(SConnected) {
case Event(p: Publish, _) =>
try {
client.publish(p.topic, p.message())
} catch {
case e: Exception => logger.error(e)(s"can't publish to ${p.topic}")
}
stay()
case Event(msg @ Subscribe(topic, ref, qos), _) =>
val encTopic = urlEnc(topic)
val topix = context.child(encTopic) match {
case Some(t) =>
t
case None =>
context.actorOf(Props[Topic], name = encTopic)
}
topix ! msg
subscribe(topic, ref, qos, topix)
stay()
}
def subscribe(topic: String, ref: ActorRef, qos: Int, t: ActorRef): Any = {
context watch t
//FIXME we should store the current qos that client subscribed to topic (in `case Some(t)` above)
//then, when received a new Subscribe msg if msg.qos > current qos => need re-subscribe
try {
client.subscribe(topic, qos, null, new SubscribeListener(ref))
} catch {
case e: Exception => logger.error(e)(s"can't subscribe to $topic")
}
}
whenUnhandled {
case Event(msg: Message, _) =>
context.child(urlEnc(msg.topic)) foreach (_ ! msg)
stay()
case Event(Terminated(topicRef), _) =>
try {
client.unsubscribe(urlDec(topicRef.path.name))
} catch {
case e: Exception => logger.error(e)(s"can't unsubscribe from ${topicRef.path.name}")
}
stay()
case Event(Disconnected, _) =>
delayConnect()
goto(SDisconnected)
}
private def delayConnect(): Unit = {
val delay = cfg.connectDelay(connectCount)
logger.info(s"delay $delay before reconnect")
setTimer("reconnect", Connect, delay)
}
initialize()
self ! Connect
}