Skip to content

Commit

Permalink
feat: load saved inflight publishes on uplink restart (#307)
Browse files Browse the repository at this point in the history
* Implemeted reading from disk and publishing onto the network

* inflight persistence logic

* doc: make the code readable

---------

Co-authored-by: Devdutt Shenoi <devdutt@bytebeam.io>
  • Loading branch information
Vilayat-Ali and de-sh committed Dec 1, 2023
1 parent 5c85fd4 commit c4dc8b9
Showing 1 changed file with 35 additions and 3 deletions.
38 changes: 35 additions & 3 deletions uplink/src/base/serializer/mod.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,11 @@
mod metrics;

use std::collections::{HashMap, VecDeque};
use std::io::{self, Write};
use std::fs::File;
use std::io::{self, Read, Write};
use std::{sync::Arc, time::Duration};

use bytes::Bytes;
use bytes::{Bytes, BytesMut};
use flume::{Receiver, RecvError, Sender};
use log::{debug, error, info, trace};
use lz4_flex::frame::FrameEncoder;
Expand Down Expand Up @@ -506,7 +507,7 @@ impl<C: MqttClient> Serializer<C> {

/// Starts operation of the uplink serializer, which can transition between the modes mentioned earlier.
pub async fn start(mut self) -> Result<(), Error> {
// check for and publish the packets in persistence/inflight file
self.reload_inflight().await?;
let mut status = Status::EventLoopReady;

loop {
Expand All @@ -520,6 +521,37 @@ impl<C: MqttClient> Serializer<C> {
status = next_status;
}
}

/// check for and publish the packets in persistence/inflight file
/// once done, delete the file.
async fn reload_inflight(&self) -> Result<(), Error> {
let mut path = self.config.persistence_path.clone();
path.push("inflight");

if !path.is_file() {
return Ok(());
}

let mut buf = Vec::new();
let mut inflight_file = File::open(&path)?;
inflight_file.read_to_end(&mut buf)?;
let mut buf = BytesMut::from(buf.as_slice());

while let Ok(packet) = read(&mut buf, self.config.mqtt.max_packet_size) {
match packet {
Packet::Publish(publish) => {
self.client
.publish(publish.topic, QoS::AtLeastOnce, false, publish.payload)
.await?;
}
packet => unreachable!("Unexpected packet: {:?}", packet),
}
}
info!("Read and published inflight packets; removing file: {}", path.display());
std::fs::remove_file(path)?;

Ok(())
}
}

async fn send_publish<C: MqttClient>(
Expand Down

0 comments on commit c4dc8b9

Please sign in to comment.