Skip to content

Commit

Permalink
Reset cursor of unacked packets in DataRequest (#534)
Browse files Browse the repository at this point in the history
* Reset cursor of unacked packets in DataRequest

* fix cursor indexes stored in inflight buffer

* fix clippy warnings and remove dbg!

* give more appropriate name to variable

* clean up and add unit test for retransmission_map

Co-authored-by: henil <dedaniahenil@gmail.com>
  • Loading branch information
tekjar and henil committed Dec 16, 2022
1 parent 4afbde0 commit d74a5ab
Show file tree
Hide file tree
Showing 5 changed files with 84 additions and 20 deletions.
48 changes: 47 additions & 1 deletion rumqttd/src/router/iobufs.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,7 @@
use std::{collections::VecDeque, sync::Arc};
use std::{
collections::{HashMap, VecDeque},
sync::Arc,
};

use flume::{Receiver, Sender};
use parking_lot::Mutex;
Expand Down Expand Up @@ -171,10 +174,53 @@ impl Outgoing {

Some(())
}

// Here we are assuming that the first unique filter_idx we find while iterating will have the
// least corresponding cursor because of the way we insert into the inflight_buffer
pub fn retransmission_map(&self) -> HashMap<FilterIdx, Cursor> {
let mut o = HashMap::new();
for (_, filter_idx, cursor) in self.inflight_buffer.iter() {
if !o.contains_key(filter_idx) {
o.insert(*filter_idx, *cursor);
}
}

o
}
}

#[cfg(test)]
mod test {
use super::*;

#[test]
fn retransmission_map_is_calculated_accurately() {
let (mut outgoing, _) = Outgoing::new("retransmission-test".to_string());
let mut result = HashMap::new();

result.insert(0, (0, 8));
result.insert(1, (0, 1));
result.insert(2, (1, 1));
result.insert(3, (1, 0));

let buf = vec![
(1, 0, (0, 8)),
(1, 0, (0, 10)),
(1, 1, (0, 1)),
(3, 1, (0, 4)),
(2, 2, (1, 1)),
(1, 2, (2, 6)),
(1, 2, (2, 1)),
(1, 3, (1, 0)),
(1, 3, (1, 1)),
(1, 3, (1, 3)),
(1, 3, (1, 3)),
];

outgoing.inflight_buffer.extend(buf);
assert_eq!(outgoing.retransmission_map(), result);
}

// use super::{Outgoing, MAX_INFLIGHT};
// use crate::protocol::{Publish, QoS};
// use crate::router::Forward;
Expand Down
2 changes: 1 addition & 1 deletion rumqttd/src/router/logs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,7 @@ impl DataLog {
filter_idx: FilterIdx,
offset: Offset,
len: u64,
) -> io::Result<(Position, Vec<Publish>)> {
) -> io::Result<(Position, Vec<(Publish, Offset)>)> {
// unwrap to get index of `self.native` is fine here, because when a new subscribe packet
// arrives in `Router::handle_device_payload`, it first calls the function
// `next_native_offset` which creates a new commitlog if one doesn't exist. So any new
Expand Down
13 changes: 10 additions & 3 deletions rumqttd/src/router/routing.rs
Original file line number Diff line number Diff line change
Expand Up @@ -328,7 +328,7 @@ impl Router {
// Remove connection from router
let mut connection = self.connections.remove(id);
let _incoming = self.ibufs.remove(id);
let _outgoing = self.obufs.remove(id);
let outgoing = self.obufs.remove(id);
let mut tracker = self.scheduler.remove(id);
self.connection_map.remove(&client_id);
self.ackslog.remove(id);
Expand All @@ -339,6 +339,7 @@ impl Router {
// self.readyqueue.remove(id);

let inflight_data_requests = self.datalog.clean(id);
let retransmissions = outgoing.retransmission_map();

// Remove this connection from subscriptions
for filter in connection.subscriptions.iter() {
Expand Down Expand Up @@ -367,6 +368,12 @@ impl Router {
.into_iter()
.for_each(|r| tracker.register_data_request(r));

for request in tracker.data_requests.iter_mut() {
if let Some(cursor) = retransmissions.get(&request.filter_idx) {
request.cursor = *cursor;
}
}

self.graveyard
.save(tracker, connection.subscriptions, connection.events);
} else {
Expand Down Expand Up @@ -1058,10 +1065,10 @@ fn forward_device_data(
}

// Fill and notify device data
let forwards = publishes.into_iter().map(|mut publish| {
let forwards = publishes.into_iter().map(|(mut publish, offset)| {
publish.qos = protocol::qos(qos).unwrap();
Forward {
cursor: next,
cursor: offset,
size: 0,
publish,
}
Expand Down
11 changes: 6 additions & 5 deletions rumqttd/src/segments/mod.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
use crate::Offset;
use std::usize;
use std::{collections::VecDeque, io};

Expand Down Expand Up @@ -186,7 +187,7 @@ where
&self,
mut start: (u64, u64),
mut len: u64,
out: &mut Vec<T>,
out: &mut Vec<(T, Offset)>,
) -> io::Result<Position> {
let mut cursor = start;
let _orig_cursor = cursor;
Expand Down Expand Up @@ -221,7 +222,7 @@ where
// `Segment::readv` handles conversion from absolute index to relative
// index and it returns the absolute offset.
// absolute cursor not to be confused with absolute offset
match curr_segment.readv(cursor.1, len, out)? {
match curr_segment.readv(cursor, len, out)? {
// an offset returned -> we didn't read till end -> len fulfilled -> return
SegmentPosition::Next(offset) => {
return Ok(Position::Next {
Expand Down Expand Up @@ -261,7 +262,7 @@ where
// segment's `readv` then we should return `None` as well as not possible to read further,
// whereas for older segments we simply jump onto the new one to read more.

match curr_segment.readv(cursor.1, len, out)? {
match curr_segment.readv(cursor, len, out)? {
SegmentPosition::Next(v) => {
// debug!("start: {:?}, end: ({}, {})", orig_cursor, cursor.0, cursor.1 + v - 1);
Ok(Position::Next {
Expand Down Expand Up @@ -290,10 +291,10 @@ mod tests {
Bytes::from(vec![id; size as usize])
}

fn verify(expected_id: usize, expected_size: u64, out: Bytes) {
fn verify(expected_id: usize, expected_size: u64, out: (Bytes, Offset)) {
let expected = Bytes::from(vec![expected_id as u8; expected_size as usize]);
// dbg!(expected_id, &expected);
assert_eq!(out, expected);
assert_eq!(out.0, expected);
}

#[test]
Expand Down
30 changes: 20 additions & 10 deletions rumqttd/src/segments/segment.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
use crate::{Cursor, Offset};

use super::Storage;
use std::io;

Expand Down Expand Up @@ -57,13 +59,13 @@ where
/// relative offset to absolute offset and vice-versa.
pub(crate) fn readv(
&self,
absolute_index: u64,
cursor: Cursor,
len: u64,
out: &mut Vec<T>,
out: &mut Vec<(T, Offset)>,
) -> io::Result<SegmentPosition> {
// this substraction can never overflow as checking of offset happens at
// `CommitLog::readv`.
let idx = absolute_index - self.absolute_offset;
let idx = cursor.1 - self.absolute_offset;

let mut ret: Option<u64>;

Expand All @@ -78,7 +80,12 @@ where
ret = None;
limit = self.len();
}
out.extend(self.data[idx as usize..limit as usize].iter().cloned());
let offsets = std::iter::repeat(cursor.0).zip(cursor.1..cursor.1 + limit);
let o = self.data[idx as usize..limit as usize]
.iter()
.cloned()
.zip(offsets);
out.extend(o);
}

match ret {
Expand Down Expand Up @@ -135,11 +142,14 @@ mod tests {
segment.push(Bytes::from_static(b"test9"));
assert_eq!(segment.len(), 9);

let mut out: Vec<Bytes> = Vec::new();
let _ = segment.readv(0, 2, &mut out).unwrap();
let mut out: Vec<(Bytes, Offset)> = Vec::new();
let _ = segment.readv((0, 0), 2, &mut out).unwrap();
assert_eq!(
out,
vec![Bytes::from_static(b"test1"), Bytes::from_static(b"test2")]
vec![
(Bytes::from_static(b"test1"), (0, 0)),
(Bytes::from_static(b"test2"), (0, 1))
]
);
}

Expand All @@ -157,8 +167,8 @@ mod tests {
segment.push(vec![9u8]);
assert_eq!(segment.len(), 9);

let mut out: Vec<Vec<u8>> = Vec::new();
let _ = segment.readv(0, 2, &mut out).unwrap();
assert_eq!(out, vec![vec![1u8], vec![2u8]]);
let mut out: Vec<(Vec<u8>, Offset)> = Vec::new();
let _ = segment.readv((0, 0), 2, &mut out).unwrap();
assert_eq!(out, vec![(vec![1u8], (0, 0)), (vec![2u8], (0, 1))]);
}
}

0 comments on commit d74a5ab

Please sign in to comment.