-
Notifications
You must be signed in to change notification settings - Fork 3
/
Copy pathmqttc.lua
946 lines (855 loc) · 24.2 KB
/
mqttc.lua
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
-- MQTT 5.0 Client. Copyright Real Time Logic.
local fmt=string.format
local btaCreate,btaCopy,btah2n,btan2h,btaSize,btaSetsize,bta2string=
ba.bytearray.create,ba.bytearray.copy,ba.bytearray.h2n,ba.bytearray.n2h,
ba.bytearray.size,ba.bytearray.setsize,ba.bytearray.tostring
local sbyte,ssub=string.byte,string.sub
local tinsert,tsort=table.insert,table.sort
local startMQTT -- forward decl
local MQTT_CONNECT =0x01<<4
local MQTT_CONNACK =0x02<<4
local MQTT_PUBLISH =0x03<<4
local MQTT_PUBACK =0x04<<4
local MQTT_PUBREC =0x05<<4
local MQTT_PUBREL =0x06<<4
local MQTT_PUBCOMP =0x07<<4
local MQTT_SUBSCRIBE =0x08<<4
local MQTT_SUBACK =0x09<<4
local MQTT_UNSUBSCRIBE=0x0a<<4
local MQTT_UNSUBACK =0x0b<<4
local MQTT_PINGREQ =0x0c<<4
local MQTT_PINGRESP =0x0d<<4
local MQTT_DISCONNECT =0x0e<<4
local function fmtArgErr(argno,exp,got)
return fmt("bad argument #%d (%s expected, got %s)",argno,exp,type(got))
end
local function argchk(argno,exp,got,level)
if exp ~= type(got) then error(fmtArgErr(argno,exp,got),level or 2) end
end
local function typeChk(name,typename,val,level)
if type(val) == typename then return end
error(fmt("%s: expected %s, got %s",name,typename,type(val)),level or 3)
end
local function copyTab(t)
local nt={}
if t then
for k,v in pairs(t) do nt[k]=v end
end
return nt
end
local function copy2Tab(to,from)
for k,v in pairs(from) do to[k]=v end
end
local function getPacketId(self)
local id=self.packetId
id=id+1
self.packetId=id < 0xFFFF and id or 1
return self.packetId
end
local function getSubscriptionId(self)
local id=self.subscriptionId
id=id+1
self.subscriptionId=id < 268435455 and id or 1
return self.subscriptionId
end
local function insertRecQosT(self,pi,bta)
local cnt=self.recQosCounter
self.recQosCounter=cnt+1
self.recQosQT[pi]={bta=bta,counter=cnt}
end
local function insertSndQosT(self,pi,bta)
local cnt=self.sndQosCounter
self.sndQosCounter=cnt+1
self.sndQosQT[pi]={bta=bta,counter=cnt}
end
local function sortCosQT(qosQT)
local sortT={}
for _,v in pairs(qosQT) do tinsert(sortT,v) end
tsort(sortT,function(a,b) return a.counter < b.counter end)
return sortT
end
-- Encode Variable Byte Integer
local function encVBInt(bta,ix,len)
if bta then
local digit
while true do
digit=len % 0x80
len=len // 0x80
if len == 0 then break end
bta[ix]=digit | 0x80
ix=ix+1
end
bta[ix]=digit
return ix+1
end
while true do
len=len // 0x80
if len == 0 then break end
ix=ix+1
end
return ix+1
end
-- Encode byte
local function encByte(bta,ix,byte)
if bta then bta[ix]=byte end
return ix+1
end
-- Encode 2 byte integer
local function enc2BInt(bta,ix,number)
if bta then btah2n(bta,ix,2,number) end
return ix+2
end
-- Encode 4 byte integer
local function enc4BInt(bta,ix,number)
if bta then btah2n(bta,ix,4,number) end
return ix+4
end
-- Encode utf8 valid string
local function encString(bta,ix,str)
local len=#str
if bta then
btah2n(bta,ix,2,len)
bta[ix+2]=str
end
return ix+2+len
end
-- Encode Binary Data
local encBinData=encString
local function btaCreate2(packetLen)
return btaCreate(1+encVBInt(nil,0,packetLen)+packetLen)
end
local encPropT={
payloadformatindicator={1,encByte},--Payload Format Indicator
messageexpiryinterval={2,enc4BInt},--Message Expiry Interval
contenttype={3,encString},-- Content Type
responsetopic={8,encString},--Response Topic
correlationdata={9,encBinData},-- Correlation Data
zz_subid={11,encVBInt},--Subscription Identifier (private)
sessionexpiryinterval={17,enc4BInt},--Session Expiry Interval
requestprobleminformation={23,encByte},--Request Problem Information
willdelayinterval={24,enc4BInt},--Will Delay Interval
requestresponseinformation={25,encByte},--Request Response Information
["$reason"]={31,encString},--Reason String
-- 38 User Property
maximumpacketsize={39,enc4BInt} --Maximum Packet Size
}
local function encProp(bta,ix,name,val)
local propA=encPropT[name]
if propA then
ix=encByte(bta,ix,propA[1])
return propA[2](bta,ix,val) -- ret ix
end
-- User Property: 38
ix=encByte(bta,ix,38)
ix=encString(bta,ix,name)
return encString(bta,ix,val) -- ret ix
end
local function encPropTab(bta,ix,propT)
if not propT then return ix end
for name,val in pairs(propT) do
ix=encProp(bta,ix,name,val)
end
return ix
end
local function checkWill(w,level)
w.qos=w.qos or 0
typeChk("opt.will", "table",w,level)
typeChk("opt.will.topic", "string",w.topic,level)
if w.prop then typeChk("opt.will.prop", "table",w.prop,level) end
typeChk("opt.will.payload", "string",w.payload,level)
end
local function encConnect(self,cleanStart)
local opt=self.opt
local prop=self.prop
local w -- will
if not prop.sessionexpiryinterval or prop.sessionexpiryinterval==0 then
cleanStart=true
end
-- Calculate total packet len
-- 10=2+4+1+1+2: protlen+'MQTT'+version+flags+keepalive
local wPropLen
local ix=encString(nil,10,opt.clientidentifier)
local propLen=encPropTab(nil,0,prop)
ix=encVBInt(nil,ix+propLen,propLen)
if opt.will then
w=opt.will
checkWill(w,5)
wPropLen=encPropTab(nil,0,w.prop)
ix=encVBInt(nil,ix+wPropLen,wPropLen)
ix=encString(nil,ix,w.topic)
ix=encString(nil,ix,w.payload or "")
end
if opt.username then
typeChk("opt.username", "string",opt.username,4)
ix=encString(nil,ix,opt.username)
end
if opt.password then
typeChk("opt.password", "string",opt.password,4)
ix=encString(nil,ix,opt.password)
end
-- Create and format packet
local packetLen=ix
local bta=btaCreate2(packetLen)
ix=encByte(bta,1,MQTT_CONNECT)
ix=encVBInt(bta,ix,packetLen)
ix=encString(bta,ix,"MQTT")
ix=encByte(bta,ix,5) -- version
local flags=(opt.username and 0x80 or 0) |
(opt.password and 0x40 or 0) |
(cleanStart and 0x02 or 0)
if w then
flags=flags |
(w.retain and 0x20 or 0) |
(w.qos<<2) |
0x04
end
ix=encByte(bta,ix,flags)
ix=enc2BInt(bta,ix,opt.keepalive)
ix=encVBInt(bta,ix,propLen)
ix=encPropTab(bta,ix,prop)
ix=encString(bta,ix,opt.clientidentifier)
if w then -- Will
ix=encVBInt(bta,ix,wPropLen)
ix=encPropTab(bta,ix,w.prop)
ix=encString(bta,ix,w.topic)
ix=encBinData(bta,ix,w.payload)
end
if opt.username then ix=encString(bta,ix,opt.username) end
if opt.password then ix=encString(bta,ix,opt.password) end
if (ix-1) ~= ba.bytearray.size(bta) then
error(fmt("ix~=bta size: %d ~= %d",ix-1,ba.bytearray.size(bta)))
end
return bta
end
-- Decode Variable Byte Integer
local function decVBInt(bta,ix)
local mult,len=1,0
repeat
local digit=bta[ix]
len=len+(digit&0x7F)*mult
mult=mult*0x80
ix=ix+1
until digit < 0x80
return len,ix
end
-- Decode byte
local function decByte(bta,ix)
return bta[ix],ix+1
end
-- Decode 2 byte integer
local function dec2BInt(bta,ix)
return btan2h(bta,ix,2),ix+2
end
-- Decode 4 byte integer
local function dec4BInt(bta,ix)
return btan2h(bta,ix,4),ix+4
end
-- Decode utf8 string
local function decString(bta,ix)
local len=btan2h(bta,ix,2)
ix=ix+2
local endIx=ix+len
return bta2string(bta,ix,endIx-1),endIx
end
-- Decode Binary Data
local decBinData=decString
local decPropT={
[1]={"payloadformatindicator",decByte},
[2]={"messageexpiryinterval",dec4BInt},
[3]={"contenttype",decString},
[8]={"responsetopic",decString},
[9]={"correlationdata",decBinData},
[11]={"subscriptionidentifier",decVBInt},
[17]={"sessionexpiryinterval",dec4BInt},
[18]={"assignedclientidentifier",decString},
[19]={"serverkeepalive",dec2BInt},
[21]={"authenticationmethod",decString},
[22]={"authenticationdata",decBinData},
[26]={"responseinformation",decString},
[28]={"serverreference",decString},
[31]={"reasonstring",decString},
[33]={"receivemaximum",dec2BInt},
[34]={"topicaliasmaximum",dec2BInt},
[35]={"topicalias",dec2BInt},
[36]={"maximumqos",decByte},
[37]={"retainavailable",decByte},
[39]={"maximumpacketsize",dec4BInt},
[40]={"wildcardsubscriptionavailable",decByte},
[41]={"subscriptionidentifieravailable",decByte},
[42]={"sharedsubscriptionavailable",decByte},
}
local function decodeProp(bta,ix,propT)
local propId=bta[ix]
ix=ix+1
local propA=decPropT[propId]
if propA then
propT[propA[1]],ix= propA[2](bta,ix)
elseif 38==propId then
local key
key,ix=decString(bta,ix)
propT[key],ix=decString(bta,ix)
else
return nil
end
return ix
end
local function decodePropT(bta,propLen,ix)
local propT={}
while propLen > 0 do
local sIx=ix
ix=decodeProp(bta,ix,propT)
if not ix then return nil,1 end
propLen=propLen - (ix - sIx)
end
if propLen == 0 then return propT,ix end
return nil,ix -- prot err
end
-- Wait for next MQTT packet
-- returns cpt,payload or nil,err. cpt: Control Packet Type e.g. CONNACK
local function mqttRec(self)
local sock=self.sock
local data,msg,err
if self.recOverflowData then
data=self.recOverflowData
self.recOverflowData =nil
else
data=""
end
local mult,len,ix=1,0,1
repeat
ix=ix + 1
while #data < ix do
msg,err=sock:read()
if not msg then return nil,err end
data=data..msg
end
local digit=sbyte(data,ix)
len=len + (digit & 0x7F) * mult
mult=mult * 0x80
until digit < 0x80
local cpt=sbyte(data,1)
local bta
if len > 0 then
bta=btaCreate(len)
local overflow=btaCopy(bta,1,data,ix+1,-ix-1)
local plen=#data-ix
while plen < len do
data,err=sock:read()
if not data then return nil,err end
overflow=btaCopy(bta,1+plen,data)
plen=plen+#data
end
if overflow > 0 then
self.recOverflowData=ssub(data,#data-overflow+1)
end
end
return cpt,bta
end
-- An unconnected cosocket, which removes from tail and sends msg
local function sndCosock(sock,self)
local sndQT=self.sndQT
while true do
local sndQTail=self.sndQTail
if self.sndQHead == sndQTail then sock:disable() end
if not self.connected then return end
assert(self.sndQHead ~= sndQTail)
local bta=sndQT[sndQTail]
assert(bta)
if not self.sock:write(bta) then
self.connected=false
return
end
sndQT[sndQTail]=nil
self.sndQTail=sndQTail+1
self.sndQElems=self.sndQElems-1
end
end
-- Queue at head and enable 'sndCosock()', which may already be
-- enabled and that is OK.
local function sendMsg(self,bta)
local sndQHead=self.sndQHead
assert(nil == self.sndQT[sndQHead])
self.sndQT[sndQHead]=bta
self.sndQHead=sndQHead+1
self.sndQElems=self.sndQElems+1
if self.connected then self.sndCosock:enable() end
end
local function sendPing(self)
local pingCounter=self.pingCounter
pingCounter=pingCounter+1
if pingCounter == 2 then
local bta=btaCreate2(0)
bta[1]=MQTT_PINGREQ
bta[2]=0 -- packet len
sendMsg(self,bta)
elseif pingCounter > 2 then
self.lasterror={etype="mqtt",status="pingtimeout"}
self.connected=false
self.sock:close()
end
self.pingCounter=pingCounter
end
-- used by recPubrec,recPubrel
local function sendAckResp(self,cpt,pi,reason)
local bta=btaCreate2(4)
bta[1]=cpt
bta[2]=4 -- packet len
btah2n(bta,3,2,pi)
bta[5]=reason
bta[6]=0 -- prop len
sendMsg(self,bta)
return bta
end
-- used by recPuback,recPubrec,recPubrel,recPubcomp
local function decAck(bta)
local reason=btaSize(bta) == 2 and 0 or bta[3]
return btan2h(bta,1,2),reason
end
local function recPublish(self,bta,cpt)
local propT,pid
local topic,ix=decString(bta,1)
if topic then
local qos=(cpt>>1)&3
if qos > 0 then pid,ix=dec2BInt(bta,ix) end
propT,ix=decodePropT(bta,decVBInt(bta,ix))
if propT then
local qosQT=self.recQosQT
if not qosQT[pid] then -- nil if pid is nil; if not a dup
local onpub=self.onpubT[propT.subscriptionidentifier] or self.onpub
btaSetsize(bta,ix)
onpub(topic,self.recbta and bta or bta2string(bta),propT,cpt)
end
-- else drop dup msg
if qos == 0 then return true end
bta=sendAckResp(self,qos==1 and MQTT_PUBACK or MQTT_PUBREC,pid,0)
if qos==2 then insertRecQosT(self,pid,bta) end
return true
end
end
return nil,"mqtt","protocolerror"
end
local function recPuback(self,bta)
local pid=decAck(bta)
self.sndQosQT[pid]=nil
return true
end
local function recPubrec(self,bta)
local qT=self.sndQosQT
local pid,reason=decAck(bta)
if reason < 0x80 then
reason=qT[pid] and 0 or 146
bta=sendAckResp(self,MQTT_PUBREL|2,pid,reason)
if 0 == reason then
insertSndQosT(self,pid,bta)
return true
end
end
qT[pid]=nil
return true
end
local function recPubrel(self,bta)
local pid,reason=decAck(bta)
self.recQosQT[pid]=nil
sendAckResp(self,MQTT_PUBCOMP,pid,reason)
return true
end
local function recPubcomp(self,bta)
local pid=decAck(bta)
self.sndQosQT[pid]=nil
return true
end
local function removeOnpubInfo(self,topic)
local subid=self.topicT[topic]
if subid then
self.topicT[topic]=nil
self.onpubT[subid]=nil
end
end
local function recSuback(self,bta)
local pi,ix=dec2BInt(bta,1)
local propT
propT,ix=decodePropT(bta,decVBInt(bta,ix))
if not propT then return nil,"mqtt","protocolerror" end
local reason=bta[ix]
local t=self.subackQT[pi]
self.sndQosQT[pi]=nil
if t then
self.subackQT[pi]=nil
removeOnpubInfo(self,t.topic) -- dups, if any
if t.onsuback then t.onsuback(t.topic,reason,propT) end
if reason < 0x80 then
if t.onpub then
self.onpubT[t.subid]=t.onpub
self.topicT[t.topic]=t.subid
end
end
end
return true
end
local function recUnsuback(self,bta)
local pi,ix=dec2BInt(bta,1)
local propT
propT,ix=decodePropT(bta,decVBInt(bta,ix))
if not propT then return nil,"mqtt","protocolerror" end
local reason=bta[ix]
local t=self.subackQT[pi]
self.sndQosQT[pi]=nil
if t then
self.subackQT[pi]=nil
removeOnpubInfo(self,t.topic) -- dups, if any
if t.onunsubscribe then t.onunsubscribe(t.topic,reason,propT) end
end
return true
end
local function recPingresp(self)
self.pingCounter=0
return true
end
local function recDisconnect(_,bta)
local propT=decodePropT(bta,decVBInt(bta,2))
if propT then
local statusT={reasoncode=bta[1],properties=propT}
return nil,"mqtt","disconnect",statusT
end
return nil,"mqtt","protocolerror"
end
local recCpT={
[MQTT_PUBLISH]=recPublish,
[MQTT_PUBACK]=recPuback,
[MQTT_PUBREC]=recPubrec,
[MQTT_PUBREL]=recPubrel,
[MQTT_PUBCOMP]=recPubcomp,
[MQTT_SUBACK]=recSuback,
[MQTT_UNSUBACK]=recUnsuback,
[MQTT_PINGRESP]=recPingresp,
[MQTT_DISCONNECT]=recDisconnect
}
local function resetQueues(self,save) -- Call at start or when not clean restart
if save then
self.savedT={
recQosQT=self.recQosQT,sndQosQT=self.sndQosQT,subackQT=
self.subackQT,onpubT=self.onpubT,topicT=self.topicT,
}
end
self.recQosQT,self.sndQosQT,self.subackQT,self.onpubT,self.topicT=
{},{},{},{},{}
end
local function restoreQueues(self,session)
if session then
local t=self.savedT
copy2Tab(self.recQosQT,t.recQosQT)
copy2Tab(self.sndQosQT,t.sndQosQT)
copy2Tab(self.subackQT,t.subackQT)
copy2Tab(self.onpubT,t.onpubT)
copy2Tab(self.topicT,t.topicT)
local qT=self.recQosQT
if next(qT) then -- if not empty
for _,x in pairs(sortCosQT(qT)) do sendMsg(self,x.bta) end
end
qT=self.sndQosQT
if next(qT) then -- if not empty
for _,x in pairs(sortCosQT(qT)) do
local bta=x.bta
local cpt=bta[1]
if (cpt & 0xF0) == MQTT_PUBLISH then bta[1]=cpt|0x08 end --DUP flag
sendMsg(self,bta)
end
end
end
self.savedT=nil
end
local function onErrStatus(self,etype,code)
local reconn=self.onstatus(etype,code)
if reconn then
self.reconTimeout="number" == type(reconn) and reconn
end
return reconn
end
local function coMqttRun(self)
local ok,etype,status
while self.connected do
local cpt,bta=mqttRec(self)
if not cpt then status=bta break end
local func=recCpT[cpt&0xF0]
if not func then etype,status="mqtt","protocolerror" break end
ok,etype,status=func(self,bta,cpt)
if not ok then break end
etype,status=nil,nil
end
local lasterror=self.lasterror
self.connected,self.lasterror,self.recOverflowData=false,nil,nil
if self.pingTimer then self.pingTimer:cancel() end
resetQueues(self,true)
if not etype then
if lasterror then
etype,status=lasterror.etype,lasterror.status
else
status=status or ""
etype="sock"
end
end
if not self.disconnected and onErrStatus(self,etype,status) then
self.connectTime=nil
if "sysshutdown" ~= status then
startMQTT(self,encConnect(self,false))
end
end
end
local function coMqttConnect(sock,self,conbta)
local reconnect
sock:write(conbta)
local cpt,bta=mqttRec(self)
if cpt then
local perr=true
if (cpt&0xF0) == MQTT_CONNACK then
local ackProp=#bta > 2 and decodePropT(bta,decVBInt(bta,3))
if ackProp then
perr=false
local session=(bta[1] & 1) == 1 and true or false
local reason=bta[2]
reconnect=self.onstatus("mqtt","connect",{
sessionpresent=session,reasoncode=reason,properties=ackProp})
if reason < 128 and reconnect then
local opt=self.opt
if ackProp.serverkeepalive and
ackProp.serverkeepalive ~= opt.keepalive then
opt.keepalive=ackProp.serverkeepalive
end
if opt.keepalive ~= 0 then
self.pingCounter=0
self.pingTimer=ba.timer(function() sendPing(self) return true end)
self.pingTimer:set(opt.keepalive*1000//2)
end
local prop=self.prop
if ackProp.sessionexpiryinterval and
ackProp.sessionexpiryinterval ~= prop.sessionexpiryinterval then
prop.sessionexpiryinterval=ackProp.sessionexpiryinterval
end
self.connected=true
restoreQueues(self,session)
self.sndCosock=ba.socket.event(sndCosock,self)
coMqttRun(self)
return -- done
end
end
end
if perr then
reconnect=onErrStatus(self,"mqtt","protocolerror")
end
else
reconnect=onErrStatus(self,"sock",bta)
end
sock:close()
if reconnect then
startMQTT(self,conbta,false)
end
end
local function coSockConnect(_,self,conbta)
self.connectTime=ba.clock()//1000
local sock,err=self.connect(self,self.opt)
if self.disconnected then
if sock then sock:close() end
return
end
self.sock=sock
if not sock then
if onErrStatus(self,"sock",err) then
-- Avoid recursion
startMQTT(self,conbta,true)
end
else
coMqttConnect(sock,self,conbta)
end
end
local C={} -- MQTT Client
C.__index=C
function C:publish(topic,msg,opt,prop)
opt=opt or {}
local qos=opt.qos or 0
qos=(qos&3)<<1
local retain=opt.retain and 1 or 0
-- Calc
local propLen=encPropTab(nil,0,prop)
local ix=encVBInt(nil,(qos>0 and 3 or 1) + propLen,propLen)
ix=encString(nil,ix,topic)
local packetLen=ix+#msg-1
-- Create
local bta=btaCreate2(packetLen)
bta[1]=MQTT_PUBLISH | qos | retain
ix=encVBInt(bta,2,packetLen)
ix=encString(bta,ix,topic)
if qos>0 then
local pi=getPacketId(self)
ix=enc2BInt(bta,ix,pi)
insertSndQosT(self,pi,bta)
end
ix=encVBInt(bta,ix,propLen)
ix=encPropTab(bta,ix,prop)
bta[ix]=msg
sendMsg(self,bta)
return self.connected
end
local function sendSubOrUnsub(self,topic,prop,subOptions)
local pi=getPacketId(self)
-- Calc size
local ix=subOptions and 2 or 1
local propLen=encPropTab(nil,0,prop)
ix=encVBInt(nil,ix+propLen,0)
ix=enc2BInt(nil,ix,pi)
ix=encString(nil,ix,topic)
--Encode
local packetLen=ix-1
local bta=btaCreate2(packetLen)
ix=encByte(bta,1,(subOptions and MQTT_SUBSCRIBE or MQTT_UNSUBSCRIBE) | 2)
ix=encVBInt(bta,ix,packetLen)
ix=enc2BInt(bta,ix,pi)
ix=encVBInt(bta,ix,propLen)
ix=encPropTab(bta,ix,prop)
ix=encString(bta,ix,topic)
if subOptions then encByte(bta,ix,subOptions) end -- subscribe
insertSndQosT(self,pi,bta)
sendMsg(self,bta)
return pi
end
function C:subscribe(topic,onsuback,opt,prop)
if "table" == type(onsuback) then
prop=opt
opt=onsuback
onsuback=nil
end
opt=opt or {}
prop=copyTab(prop)
if opt.onpub then
prop.zz_subid=getSubscriptionId(self)
end
local retain=opt.retainaspublished==true and 8 or 0
local retainhandling=0~=retain and ((opt.retainhandling or 0)<<4) or 0
local nolocal=opt.nolocal and 4 or 0
local qos=opt.qos or 0
qos=qos&3
local subOptions=retainhandling | retain | nolocal | qos
local pi=sendSubOrUnsub(self,topic,prop,subOptions)
self.subackQT[pi]={topic=topic,onsuback=onsuback,onpub=opt.onpub,subid=prop.zz_subid}
return self.connected
end
function C:unsubscribe(topic,onunsubscribe,prop)
local pi=sendSubOrUnsub(self,topic,prop)
self.subackQT[pi]={topic=topic,onunsubscribe=onunsubscribe}
return self.connected
end
function C:disconnect(reason)
local retv=self.connected
local disconnected=self.disconnected
self.disconnected=true
if not disconnected then
local bta=btaCreate2(2)
bta[1]=MQTT_DISCONNECT
bta[2]=2 -- packet len
bta[3]=reason or 0
bta[4]=0 -- prop len
sendMsg(self,bta)
if self.sock then self.sock:close() end
end
self.connected=false
return retv
end
function C:close() pcall(function() self:disconnect() end) end
C.__gc=C.close
C.__close=C.close
function C:setwill(w)
checkWill(w,3)
self.opt.will=w
end
function C:status()
return self.sndQElems,self.connected,(self.disconnected and true or false)
end
startMQTT=function(self,conbta,defer)
local function conn() ba.socket.event(coSockConnect,self,conbta) end
if self.connectTime then
local timeout=self.reconTimeout or 5
local delta=ba.clock()//1000 - self.connectTime
if delta > 0 and delta < timeout then timeout=timeout - delta end
if timeout > 0 then
ba.timer(conn):set(timeout*1000,true)
return
end
end
if defer then
ba.thread.run(conn)
else
conn()
end
end
local function connect2addr(self,opt)
if not opt.timeout then opt.timeout=5000 end
local sock,err=ba.socket.connect(
self.addr,opt.port or (opt.shark and 8883 or 1883),opt)
if not sock then return nil,err end
if opt.shark and not opt.nocheck then
local trusted,status=sock:trusted(self.addr)
if not trusted then return nil,status end
end
return sock
end
local function create(addr,onstatus,onpub,opt,prop)
if "function" ~= type(addr) and "string" ~= type(addr) then
error(fmtArgErr(1,"string | function",addr),2)
end
argchk(2,"function",onstatus)
if "table" == type(onpub) then
prop=opt
opt=onpub
onpub=nil
end
if onpub then
argchk(3,"function",onpub)
else
onpub=function(topic) trace("Received unhandled MQTT topic",topic) end
end
opt=opt or {}
prop=prop or {}
argchk(4,"table",opt)
argchk(4,"table",prop)
opt=copyTab(opt)
local self={
connected=false,
recQosCounter=0,sndQosCounter=0,
packetId=0,subscriptionId=0,
sndQT={},sndQHead=1,sndQTail=1,sndQElems=0,
onstatus=onstatus,onpub=onpub,opt=opt,
connect="function" == type(addr) and addr or connect2addr
}
self.recbta=opt.recbta~=false -- default true
resetQueues(self)
if "function" == type(addr) then
self.connect=addr
else
self.connect=connect2addr
self.addr=addr
end
if prop then self.prop=copyTab(prop) end
opt.clientidentifier=opt.clientidentifier or ba.b64urlencode(ba.rndbs(15))
if opt.keepalive then
typeChk("opt.keepalive", "number",opt.keepalive)
else
opt.keepalive=0
end
if opt.secure then
if opt.secure == true then
opt.shark=ba.sharkclient()
else
typeChk("opt.secure", "userdata",opt.secure)
opt.shark=opt.secure
end
opt.secure=nil
end
startMQTT(self,encConnect(self,true))
return setmetatable(self,C)
end
local function backwardCompat(...)
return require"mqtt3c".connect(...)
end
return {
create=create,
connect=backwardCompat
}