Skip to content

Commit

Permalink
Merge pull request #50 from Annopaolo/dont-send-without-introspection
Browse files Browse the repository at this point in the history
Do not send data before introspection during initialisation
  • Loading branch information
bettio committed Jun 23, 2022
2 parents c4658c4 + f5434d3 commit 9e1009a
Show file tree
Hide file tree
Showing 3 changed files with 24 additions and 7 deletions.
14 changes: 12 additions & 2 deletions device/device.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"os"
"path/filepath"
"runtime"
"sync"

"github.com/astarte-platform/astarte-go/client"
"github.com/astarte-platform/astarte-go/interfaces"
Expand All @@ -34,6 +35,11 @@ const (
DefaultInitialConnectionAttempts = 10
)

type messageQueue struct {
sync.Mutex
queue chan astarteMessageInfo
}

// Device is the base struct for Astarte Devices
type Device struct {
deviceID string
Expand All @@ -43,7 +49,7 @@ type Device struct {
astarteAPIClient *client.Client
brokerURL string
db *gorm.DB
messageQueue chan astarteMessageInfo
inflightMessages messageQueue
isSendingStoredMessages bool
volatileMessages []astarteMessageInfo
lastSentIntrospection string
Expand Down Expand Up @@ -232,7 +238,11 @@ func (d *Device) Connect(result chan<- error) {
}

// Now that the client is up and running, we can start sending messages
d.messageQueue = make(chan astarteMessageInfo, d.opts.MaxInflightMessages)
d.inflightMessages.queue = make(chan astarteMessageInfo, d.opts.MaxInflightMessages)
// When initialized, mutexes are unlocked (see https://pkg.go.dev/sync#Mutex),
// so we lock it in order to allow publishing messages
// only if introspection has already been sent
d.inflightMessages.Lock()
go d.sendLoop()

// All good: notify, and our routine is over.
Expand Down
13 changes: 10 additions & 3 deletions device/protocol_mqtt_v1.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,8 @@ func (d *Device) initializeMQTTClient() error {
})

opts.SetConnectionLostHandler(func(client mqtt.Client, err error) {
// If connection is lost, we should stop dequeuing messages from the channel
d.inflightMessages.Lock()
if d.OnErrors != nil {
d.OnErrors(d, err)
}
Expand Down Expand Up @@ -257,6 +259,9 @@ func astarteOnConnectHandler(d *Device, sessionPresent bool) {
}
}

// Since control messages have been sent, we allow to send data
d.inflightMessages.Unlock()

// If some messages must be retried, do so
d.resendFailedMessages()

Expand Down Expand Up @@ -388,8 +393,10 @@ func (d *Device) UnsetProperty(interfaceName, path string) error {

// The main publishing loop: retrieves messages from the publishing channel and sends them one at a time, in order
func (d *Device) sendLoop() {
for next := range d.messageQueue {
d.publishMessage(next)
for {
d.inflightMessages.Lock()
d.publishMessage(<-d.inflightMessages.queue)
d.inflightMessages.Unlock()
}
}

Expand Down Expand Up @@ -467,7 +474,7 @@ func (d *Device) enqueueRawMqttV1Message(astarteInterface interfaces.AstarteInte
fmt.Println("Sending previously stored messages with non-discard retention, the current message may be scheduled later")
}
message := makeAstarteMessageInfo(expiry, retention, astarteInterface.Name, interfacePath, astarteInterface.MajorVersion, qos, bsonPayload)
d.messageQueue <- message
d.inflightMessages.queue <- message

return nil
}
Expand Down
4 changes: 2 additions & 2 deletions device/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ func (d *Device) resendStoredMessages() {
for _, message := range messages {
if !isStoredMessageExpired(message) && !d.isInterfaceOutdatedInIntrospection(message.InterfaceName, message.InterfaceMajor) {
// if the message is not expired, try resending it
d.messageQueue <- message
d.inflightMessages.queue <- message
} else {
// else, it can be removed
d.removeFailedMessageFromStorage(message.StorageId)
Expand All @@ -135,7 +135,7 @@ func (d *Device) resendVolatileMessages() {
d.volatileMessages = d.volatileMessages[1:]
// try resending the message only if it is not expired
if !isStoredMessageExpired(message) && !d.isInterfaceOutdatedInIntrospection(message.InterfaceName, message.InterfaceMajor) {
d.messageQueue <- message
d.inflightMessages.queue <- message
}
}
}
Expand Down

0 comments on commit 9e1009a

Please sign in to comment.