Skip to content

Commit

Permalink
feat(rumqttd): retained messages (#683)
Browse files Browse the repository at this point in the history
  • Loading branch information
swanandx committed Aug 17, 2023
1 parent b7c3086 commit 3577628
Show file tree
Hide file tree
Showing 6 changed files with 91 additions and 38 deletions.
1 change: 1 addition & 0 deletions rumqttd/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
### Removed

### Fixed
- Retained Messages

### Security

Expand Down
2 changes: 1 addition & 1 deletion rumqttd/src/protocol/v4/subscribe.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ mod filter {
qos: qos(requested_qos).ok_or(Error::InvalidQoS(requested_qos))?,
nolocal: false,
preserve_retain: false,
retain_forward_rule: RetainForwardRule::Never,
retain_forward_rule: RetainForwardRule::OnEverySubscribe,
});
}

Expand Down
29 changes: 15 additions & 14 deletions rumqttd/src/router/iobufs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ pub struct Outgoing {
/// Handle which is given to router to allow router to communicate with this connection
pub(crate) handle: Sender<()>,
/// The buffer to keep track of inflight packets.
inflight_buffer: VecDeque<(u16, FilterIdx, Cursor)>,
inflight_buffer: VecDeque<(u16, FilterIdx, Option<Cursor>)>,
/// PubRels waiting for PubComp
pub(crate) unacked_pubrels: VecDeque<u16>,
/// Last packet id
Expand Down Expand Up @@ -208,8 +208,9 @@ impl Outgoing {
pub fn retransmission_map(&self) -> HashMap<FilterIdx, Cursor> {
let mut o = HashMap::new();
for (_, filter_idx, cursor) in self.inflight_buffer.iter() {
if !o.contains_key(filter_idx) {
o.insert(*filter_idx, *cursor);
// if cursor in None, it means it was a retained publish
if !o.contains_key(filter_idx) && cursor.is_some() {
o.insert(*filter_idx, cursor.unwrap());
}
}

Expand All @@ -232,17 +233,17 @@ mod test {
result.insert(3, (1, 0));

let buf = vec![
(1, 0, (0, 8)),
(1, 0, (0, 10)),
(1, 1, (0, 1)),
(3, 1, (0, 4)),
(2, 2, (1, 1)),
(1, 2, (2, 6)),
(1, 2, (2, 1)),
(1, 3, (1, 0)),
(1, 3, (1, 1)),
(1, 3, (1, 3)),
(1, 3, (1, 3)),
(1, 0, Some((0, 8))),
(1, 0, Some((0, 10))),
(1, 1, Some((0, 1))),
(3, 1, Some((0, 4))),
(2, 2, Some((1, 1))),
(1, 2, Some((2, 6))),
(1, 2, Some((2, 1))),
(1, 3, Some((1, 0))),
(1, 3, Some((1, 1))),
(1, 3, Some((1, 3))),
(1, 3, Some((1, 3))),
];

outgoing.inflight_buffer.extend(buf);
Expand Down
44 changes: 32 additions & 12 deletions rumqttd/src/router/logs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -269,22 +269,42 @@ impl DataLog {
self.retained_publishes.remove(&topic);
}

pub fn handle_retained_messages(
&mut self,
filter: &str,
notifications: &mut VecDeque<(ConnectionId, DataRequest)>,
) {
trace!(info = "retain-msg", filter = &filter);
pub fn read_retained_messages(&mut self, filter: &str) -> Vec<PubWithProp> {
trace!(info = "reading retain msg", filter = &filter);
let now = Instant::now();

// discard expired retained messages
self.retained_publishes.retain(|_, pubdata| {
// Keep data if no properties exists, which implies no message expiry!
let Some(properties) = pubdata.properties.as_mut() else {
return true
};

// Keep data if there is no message_expiry_interval
let Some(message_expiry_interval) = properties.message_expiry_interval.as_mut() else {
return true
};

let idx = self.filter_indexes.get(filter).unwrap();
let time_spent = (now - pubdata.timestamp).as_secs() as u32;

let datalog = self.native.get_mut(*idx).unwrap();
let is_valid = time_spent < *message_expiry_interval;

for (topic, publish) in self.retained_publishes.iter_mut() {
if matches(topic, filter) {
datalog.append(publish.clone(), notifications);
// ignore expired messages
if is_valid {
// set message_expiry_interval to (original value - time spent waiting in server)
// ref: https://docs.oasis-open.org/mqtt/mqtt/v5.0/os/mqtt-v5.0-os.html#_Toc3901112
*message_expiry_interval -= time_spent;
}
}

is_valid
});

// no need to include timestamp when returning
self.retained_publishes
.iter()
.filter(|(topic, _)| matches(topic, filter))
.map(|(_, p)| (p.publish.clone(), p.properties.clone()))
.collect()
}
}

Expand Down
3 changes: 2 additions & 1 deletion rumqttd/src/router/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ impl From<Notification> for MaybePacket {

#[derive(Debug, Clone)]
pub struct Forward {
pub cursor: (u64, u64),
pub cursor: Option<(u64, u64)>,
pub size: usize,
pub publish: Publish,
pub properties: Option<PublishProperties>,
Expand Down Expand Up @@ -192,6 +192,7 @@ pub struct DataRequest {
pub read_count: usize,
/// Maximum count of payload buffer per replica
max_count: usize,
pub(crate) forward_retained: bool,
pub(crate) group: Option<String>,
}

Expand Down
50 changes: 40 additions & 10 deletions rumqttd/src/router/routing.rs
Original file line number Diff line number Diff line change
Expand Up @@ -541,7 +541,7 @@ impl Router {

for packet in packets.drain(0..) {
match packet {
Packet::Publish(mut publish, properties) => {
Packet::Publish(publish, properties) => {
let span = tracing::error_span!("publish", topic = ?publish.topic, pkid = publish.pkid);
let _guard = span.enter();

Expand Down Expand Up @@ -591,11 +591,6 @@ impl Router {

self.router_meters.total_publishes += 1;

// Ignore retained messages
if publish.retain {
publish.retain = false;
}

// Try to append publish to commitlog
match append_to_commitlog(
id,
Expand Down Expand Up @@ -678,8 +673,6 @@ impl Router {
// and create DataRequest, while using the same datalog of "topic"
// NOTE: topic & $share/group/topic will have same filteridx!
self.prepare_filter(id, cursor, idx, f, group, subscription_id);
self.datalog
.handle_retained_messages(&filter, &mut self.notifications);

let code = match f.qos {
QoS::AtMostOnce => SubscribeReasonCode::QoS0,
Expand Down Expand Up @@ -898,6 +891,7 @@ impl Router {
}

/// Apply filter and prepare this connection to receive subscription data
/// Handle retained messages as per subscription options!
fn prepare_filter(
&mut self,
id: ConnectionId,
Expand Down Expand Up @@ -946,6 +940,13 @@ impl Router {
.insert(filter_path.clone(), subscription_id);
}

// check is group is None because retained messages aren't sent
// for shared subscriptions
// TODO: use retain forward rules
let forward_retained = group.is_none();

// call to `insert(_)` returns `true` if it didn't contain the filter_path already
// i.e. its a new subscription
if connection.subscriptions.insert(filter_path.clone()) {
let request = DataRequest {
filter: filter_path.clone(),
Expand All @@ -954,6 +955,8 @@ impl Router {
cursor,
read_count: 0,
max_count: 100,
// set true for new subscriptions
forward_retained,
group,
};

Expand All @@ -962,6 +965,10 @@ impl Router {
debug_assert!(self.scheduler.check_tracker_duplicates(id).is_none())
}

// TODO: figure out how we can update existing DataRequest
// helpful in re-subscriptions and forwarding retained messages on
// every subscribe

let meter = &mut self.ibufs.get_mut(id).unwrap().meter;
meter.register_subscription(filter_path.clone());
}
Expand Down Expand Up @@ -1193,10 +1200,11 @@ fn append_to_commitlog(
if publish.payload.is_empty() {
datalog.remove_from_retained_publishes(topic.to_owned());
} else if publish.retain {
error!("Unexpected: retain field was not unset");
datalog.insert_to_retained_publishes(publish.clone(), properties.clone(), topic.to_owned());
}

// after recording retained message, we also send that message to existing subscribers
// as normal publish message. Therefore we are setting retain to false
publish.retain = false;
let pkid = publish.pkid;

Expand Down Expand Up @@ -1358,7 +1366,23 @@ fn forward_device_data(
inflight_slots = 1;
}

let (next, publishes) =
let mut publishes = Vec::new();

if request.forward_retained {
// NOTE: ideally we want to limit the number of read messages
// and skip the messages previously read while reading next time.
// but for now, we just try to read all messages and drop the excess ones
let mut retained_publishes = datalog.read_retained_messages(&request.filter);
retained_publishes.truncate(inflight_slots as usize);

publishes.extend(retained_publishes.into_iter().map(|p| (p, None)));
inflight_slots -= publishes.len() as u64;

// we only want to forward retained messages once
request.forward_retained = false;
}

let (next, publishes_from_datalog) =
match datalog.native_readv(request.filter_idx, request.cursor, inflight_slots) {
Ok(v) => v,
Err(e) => {
Expand All @@ -1367,6 +1391,12 @@ fn forward_device_data(
}
};

publishes.extend(
publishes_from_datalog
.into_iter()
.map(|(p, offset)| (p, Some(offset))),
);

let (start, next, caughtup) = match next {
Position::Next { start, end } => (start, end, false),
Position::Done { start, end } => (start, end, true),
Expand Down

0 comments on commit 3577628

Please sign in to comment.