/
broker_publish_transaction.go
105 lines (92 loc) · 2.68 KB
/
broker_publish_transaction.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
package gateway
import (
"fmt"
mqPkts "github.com/eclipse/paho.mqtt.golang/packets"
snPkts "github.com/energomonitor/bisquitt/packets"
snPkts1 "github.com/energomonitor/bisquitt/packets1"
"github.com/energomonitor/bisquitt/transactions"
"github.com/energomonitor/bisquitt/util"
)
// Transactions states constants
type transactionState int
const (
transactionDone transactionState = iota
awaitingRegack
awaitingPuback
awaitingPubrec
awaitingPubrel
awaitingPubcomp
)
type transactionWithRegack interface {
Regack(snRegack *snPkts1.Regack) error
}
type brokerPublishTransaction interface {
transactions.StatefulTransaction
SetSNPublish(*snPkts1.Publish)
ProceedSN(newState transactionState, snPkt snPkts.Packet) error
ProceedMQTT(newState transactionState, mqPkt mqPkts.ControlPacket) error
}
type brokerPublishTransactionBase struct {
*transactions.RetryTransaction
log util.Logger
snPublish *snPkts1.Publish
handler *handler1
}
func (t *brokerPublishTransactionBase) SetSNPublish(snPublish *snPkts1.Publish) {
t.snPublish = snPublish
}
func (t *brokerPublishTransactionBase) regack(snRegack *snPkts1.Regack, newState transactionState) error {
if t.State != awaitingRegack {
t.log.Debug("Unexpected packet in %d: %v", t.State, snRegack)
return nil
}
if snRegack.ReturnCode != snPkts1.RC_ACCEPTED {
t.Fail(fmt.Errorf("REGACK return code: %d", snRegack.ReturnCode))
return nil
}
snRegister := t.Data.(*snPkts1.Register)
t.handler.registeredTopics.Store(snRegister.TopicID, snRegister.TopicName)
return t.ProceedSN(newState, t.snPublish)
}
func (t *brokerPublishTransactionBase) ProceedSN(newState transactionState, snPkt snPkts.Packet) error {
t.Proceed(newState, snPkt)
if err := t.handler.snSend(snPkt); err != nil {
t.Fail(err)
return err
}
if newState == transactionDone {
t.Success()
}
return nil
}
func (t *brokerPublishTransactionBase) ProceedMQTT(newState transactionState, mqPkt mqPkts.ControlPacket) error {
t.Proceed(newState, mqPkt)
if err := t.handler.mqttSend(mqPkt); err != nil {
t.Fail(err)
return err
}
if newState == transactionDone {
t.Success()
}
return nil
}
// Resend MQTT or MQTT-SN packet.
func (t *brokerPublishTransactionBase) resend(pktx interface{}) error {
t.log.Debug("Resend.")
switch pkt := pktx.(type) {
case snPkts.Packet:
// Set DUP if applicable.
if dupPkt, ok := pkt.(snPkts.PacketWithDUP); ok {
dupPkt.SetDUP(true)
}
return t.handler.snSend(pkt)
case mqPkts.ControlPacket:
// PUBLISH is the only packet with DUP in MQTT.
if publish, ok := pkt.(*mqPkts.PublishPacket); ok {
publish.Dup = true
}
return t.handler.mqttSend(pkt)
default:
return fmt.Errorf("invalid package type (%T): %v", pktx, pktx)
}
}