Skip to content
Permalink
Browse files Browse the repository at this point in the history
Improve uplink de-duplication.
This removes the de-duplication at the MQTT backend. Some MQTT brokers
provide shared subscription options in which case this additional
de-duplication is not needed.

For MQTT brokers that do not support shared subscriptions, the
collectAndCallOnce function will still filter out duplicated uplinks.

By adding the TXInfo object (hex encoded) to the de-duplication key,
uplink messages received on multiple channels will be handled
separately as they will result in different de-duplication keys.
  • Loading branch information
brocaar committed Aug 31, 2020
1 parent e6a941f commit f996bb0
Show file tree
Hide file tree
Showing 2 changed files with 10 additions and 19 deletions.
15 changes: 0 additions & 15 deletions internal/backend/gateway/mqtt/backend.go
Expand Up @@ -5,7 +5,6 @@ import (
"crypto/tls"
"crypto/x509"
"encoding/base64"
"encoding/hex"
"fmt"
"io/ioutil"
"strings"
Expand Down Expand Up @@ -249,20 +248,6 @@ func (b *Backend) rxPacketHandler(c paho.Client, msg paho.Message) {
"gateway_id": gatewayID,
}).Info("gateway/mqtt: uplink frame received")

// Since with MQTT all subscribers will receive the uplink messages sent
// by all the gateways, the first instance receiving the message must lock it,
// so that other instances can ignore the same message (from the same gw).
key := fmt.Sprintf("lora:ns:uplink:lock:%s:%d:%d:%d:%s", gatewayID, uplinkFrame.TxInfo.Frequency, uplinkFrame.RxInfo.Board, uplinkFrame.RxInfo.Antenna, hex.EncodeToString(uplinkFrame.PhyPayload))
if locked, err := b.isLocked(key); err != nil || locked {
if err != nil {
log.WithError(err).WithFields(log.Fields{
"uplink_id": uplinkID,
"key": key,
}).Error("gateway/mqtt: acquire lock error")
}
return
}

b.rxPacketChan <- uplinkFrame
}

Expand Down
14 changes: 10 additions & 4 deletions internal/uplink/collect.go
Expand Up @@ -19,8 +19,8 @@ import (

// Templates used for generating Redis keys
const (
CollectKeyTempl = "lora:ns:rx:collect:%s"
CollectLockKeyTempl = "lora:ns:rx:collect:%s:lock"
CollectKeyTempl = "lora:ns:rx:collect:%s:%s"
CollectLockKeyTempl = "lora:ns:rx:collect:%s:%s:lock"
)

// collectAndCallOnce collects the package, sleeps the configured duraction and
Expand All @@ -33,8 +33,14 @@ const (
// unique set per gateway MAC and packet MIC.
func collectAndCallOnce(rxPacket gw.UplinkFrame, callback func(packet models.RXPacket) error) error {
phyKey := hex.EncodeToString(rxPacket.PhyPayload)
key := fmt.Sprintf(CollectKeyTempl, phyKey)
lockKey := fmt.Sprintf(CollectLockKeyTempl, phyKey)
txInfoB, err := proto.Marshal(rxPacket.TxInfo)
if err != nil {
return errors.Wrap(err, "marshal protobuf error")
}
txInfoHEX := hex.EncodeToString(txInfoB)

key := fmt.Sprintf(CollectKeyTempl, txInfoHEX, phyKey)
lockKey := fmt.Sprintf(CollectLockKeyTempl, txInfoHEX, phyKey)

// this way we can set a really low DeduplicationDelay for testing, without
// the risk that the set already expired in redis on read
Expand Down

0 comments on commit f996bb0

Please sign in to comment.