Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Correct/Simplify the Ack algorithm #177

Merged
merged 23 commits into from Apr 30, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
23 commits
Select commit Hold shift + click to select a range
2c8327f
remove some warnings
jstnlef Apr 20, 2019
e8e0d9c
work in progress change. Includes reimplementation of the sequence bu…
jstnlef Apr 21, 2019
c1628ad
move sequence buffer into the module definition
jstnlef Apr 22, 2019
f698195
finish implementation of the new acking algorithm
jstnlef Apr 22, 2019
a66f4c7
modify the dropped packet algorithm slightly to prevent and extra vec…
jstnlef Apr 22, 2019
5243df6
fix typo in the name of the `acknowledgement` module
jstnlef Apr 22, 2019
56910ad
fix the tests to point to the right module and remove some print debu…
jstnlef Apr 22, 2019
b252f8a
clean up some compiler warnings
jstnlef Apr 22, 2019
8d9b013
standardize on a single spelling of acknowledgment
jstnlef Apr 24, 2019
b779c2f
remove the dropped packets property from Acknowledgment handler and h…
jstnlef Apr 25, 2019
d3f1d9a
ran a cargo fmt
jstnlef Apr 25, 2019
43ef7b1
fix clippy warnings
jstnlef Apr 25, 2019
21aa27f
add a couple more tests for the sequence buffer
jstnlef Apr 25, 2019
7a74da9
fix an off-by-one error I had in the new acking impl
jstnlef Apr 27, 2019
04f9f91
add some more checks and a comment around `acking_many_packets_with_p…
jstnlef Apr 27, 2019
87a2c0e
turns out testing is useful
jstnlef Apr 28, 2019
864e04d
stop sending duplicate packets until we have a valid message from the…
jstnlef Apr 28, 2019
04273a9
needed to run a fmt
jstnlef Apr 28, 2019
b2009d6
lets default this to some size. 256 should do
jstnlef Apr 29, 2019
b5f1fe6
fix a typo in should_send_packet comment
jstnlef Apr 29, 2019
04f98cc
move the processing of the packet to after the drops are processed
jstnlef Apr 29, 2019
001250d
added a couple more tests and an off by one in the remove_entries fun…
jstnlef Apr 29, 2019
6b9c9ac
ran another fmt
jstnlef Apr 29, 2019
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
2 changes: 1 addition & 1 deletion examples/server_client.rs
Expand Up @@ -3,7 +3,7 @@
//! Note that in practice you don't want to implement a chat client using UDP.
use std::io::stdin;

use laminar::{Config, ErrorKind, Packet, Socket, SocketEvent};
use laminar::{ErrorKind, Packet, Socket, SocketEvent};
use std::thread;

const SERVER: &str = "127.0.0.1:12351";
Expand Down
5 changes: 3 additions & 2 deletions examples/simple_udp.rs
Expand Up @@ -4,7 +4,7 @@
//! 3. serialize data to send and deserialize when received.
use bincode::{deserialize, serialize};
use crossbeam_channel::{Receiver, Sender};
use laminar::{Config, ErrorKind, Packet, Socket, SocketEvent};
use laminar::{ErrorKind, Packet, Socket, SocketEvent};
use serde_derive::{Deserialize, Serialize};
use std::net::SocketAddr;
use std::{thread, time};
Expand Down Expand Up @@ -160,7 +160,8 @@ impl Client {
match serialized {
Ok(raw_data) => {
self.packet_sender
.send(Packet::reliable_unordered(server_address(), raw_data));
.send(Packet::reliable_unordered(server_address(), raw_data))
.unwrap();
}
Err(e) => println!("Some error occurred: {:?}", e),
}
Expand Down
2 changes: 1 addition & 1 deletion examples/udp.rs
Expand Up @@ -2,7 +2,7 @@
//! 1. sending data
//! 2. receiving data
//! 3. constructing the packet for sending.
use laminar::{Config, Packet, Socket, SocketEvent};
use laminar::{Packet, Socket, SocketEvent};

use std::net::SocketAddr;
use std::thread;
Expand Down
4 changes: 2 additions & 2 deletions src/config.rs
Expand Up @@ -14,7 +14,7 @@ pub struct Config {
///
/// Why can't I have more than 255 (u8)?
/// This is because you don't want to send more then 256 fragments over UDP, with high amounts of fragments the chance for an invalid packet is very high.
/// Use TCP instead (later we will probably support larger ranges but every fragment packet then needs to be resent if it doesn't get an acknowledgement).
/// Use TCP instead (later we will probably support larger ranges but every fragment packet then needs to be resent if it doesn't get an acknowledgment).
///
/// default: 16 but keep in mind that lower is better.
pub max_fragments: u8,
Expand All @@ -23,7 +23,7 @@ pub struct Config {
/// This is the maximum size of each fragment. It defaults to `1450` bytes, due to the default MTU on most network devices being `1500`.
pub fragment_size: u16,
/// Value which can specify the size of the buffer that queues up fragments ready to be reassembled once all fragments have arrived.```
pub fragment_reassembly_buffer_size: usize,
pub fragment_reassembly_buffer_size: u16,
/// Value that specifies the size of the buffer the UDP data will be read into. Defaults to `1450` bytes.
pub receive_buffer_max_size: usize,
/// Value which can specify the factor which will smooth out network jitter.
Expand Down
10 changes: 3 additions & 7 deletions src/infrastructure.rs
@@ -1,17 +1,13 @@
//! This module provides the logic around the processing of the packet.
//! Like ordering, sequencing, controlling congestion, fragmentation, and packet acknowledgment.

mod acknowlegement;
mod acknowledgment;
mod congestion;
mod external_ack;
mod fragmenter;
mod local_ack;

pub mod arranging;

pub use self::acknowlegement::AcknowledgementHandler;
pub use self::acknowlegement::WaitingPacket;
pub use self::acknowledgment::AcknowledgmentHandler;
pub use self::acknowledgment::SentPacket;
pub use self::congestion::CongestionHandler;
pub use self::external_ack::ExternalAcks;
pub use self::fragmenter::Fragmentation;
pub use self::local_ack::LocalAckRecord;
280 changes: 280 additions & 0 deletions src/infrastructure/acknowledgment.rs
@@ -0,0 +1,280 @@
use crate::packet::OrderingGuarantee;
use crate::packet::SequenceNumber;
use crate::sequence_buffer::{sequence_less_than, SequenceBuffer};
use std::collections::HashMap;

const REDUNDANT_PACKET_ACKS_SIZE: u16 = 32;
const DEFAULT_SEND_PACKETS_SIZE: usize = 256;

/// Responsible for handling the acknowledgment of packets.
pub struct AcknowledgmentHandler {
// Local sequence number which we'll bump each time we send a new packet over the network
sequence_number: SequenceNumber,
// The last acked sequence number of the packets we've sent to the remote host.
remote_ack_sequence_num: SequenceNumber,
// Using a Hashmap to track every packet we send out so we can ensure that we can resend when
// dropped.
sent_packets: HashMap<u16, SentPacket>,
// However, we can only reasonably ack up to REDUNDANT_PACKET_ACKS_SIZE + 1 packets on each
// message we send so this should be that large
received_packets: SequenceBuffer<ReceivedPacket>,
}

impl AcknowledgmentHandler {
/// Constructs a new `AcknowledgmentHandler` with which you can perform acknowledgment operations.
pub fn new() -> Self {
AcknowledgmentHandler {
sequence_number: 0,
remote_ack_sequence_num: u16::max_value(),
sent_packets: HashMap::with_capacity(DEFAULT_SEND_PACKETS_SIZE),
received_packets: SequenceBuffer::with_capacity(REDUNDANT_PACKET_ACKS_SIZE + 1),
}
}

/// Returns the next sequence number to send.
pub fn local_sequence_num(&self) -> SequenceNumber {
self.sequence_number
}

/// Returns the last sequence number received from the remote host (+1)
pub fn remote_sequence_num(&self) -> SequenceNumber {
self.received_packets.sequence_num().wrapping_sub(1)
}

/// Returns the ack_bitfield corresponding to which of the past 32 packets we've
/// successfully received.
pub fn ack_bitfield(&self) -> u32 {
let most_recent_remote_seq_num: u16 = self.remote_sequence_num();
let mut ack_bitfield: u32 = 0;
let mut mask: u32 = 1;

// Iterate the past REDUNDANT_PACKET_ACKS_SIZE received packets and set the corresponding
// bit for each packet which exists in the buffer.
for i in 1..=REDUNDANT_PACKET_ACKS_SIZE {
let sequence = most_recent_remote_seq_num.wrapping_sub(i);
if self.received_packets.exists(sequence) {
ack_bitfield |= mask;
}
mask <<= 1;
}

ack_bitfield
}

/// Process the incoming sequence number.
///
/// - Acknowledge the incoming sequence number
/// - Update dropped packets
pub fn process_incoming(
&mut self,
remote_seq_num: u16,
remote_ack_seq: u16,
mut remote_ack_field: u32,
) {
self.remote_ack_sequence_num = remote_ack_seq;
self.received_packets
.insert(remote_seq_num, ReceivedPacket {});

// The current remote_ack_seq was (clearly) received so we should remove it.
self.sent_packets.remove(&remote_ack_seq);

// The remote_ack_field is going to include whether or not the past 32 packets have been
// received successfully. If so, we have no need to resend old packets.
for i in 1..=REDUNDANT_PACKET_ACKS_SIZE {
let ack_sequence = remote_ack_seq.wrapping_sub(i);
if remote_ack_field & 1 == 1 {
self.sent_packets.remove(&ack_sequence);
}
remote_ack_field >>= 1;
}
}

/// Enqueue the outgoing packet for acknowledgment.
pub fn process_outgoing(&mut self, payload: &[u8], ordering_guarantee: OrderingGuarantee) {
self.sent_packets.insert(
self.sequence_number,
SentPacket {
payload: Box::from(payload),
ordering_guarantee,
},
);

// Bump the local sequence number for the next outgoing packet.
self.sequence_number = self.sequence_number.wrapping_add(1);
}

/// Returns a `Vec` of packets we believe have been dropped.
pub fn dropped_packets(&mut self) -> Vec<SentPacket> {
let mut sent_sequences: Vec<SequenceNumber> = self.sent_packets.keys().cloned().collect();
sent_sequences.sort();

let remote_ack_sequence = self.remote_ack_sequence_num;
sent_sequences
.into_iter()
.filter(|s| {
if sequence_less_than(*s, remote_ack_sequence) {
remote_ack_sequence.wrapping_sub(*s) > REDUNDANT_PACKET_ACKS_SIZE
} else {
false
}
})
.flat_map(|s| self.sent_packets.remove(&s))
.collect()
}
}

#[derive(Clone, Debug, Default, PartialEq, Eq)]
pub struct SentPacket {
pub payload: Box<[u8]>,
pub ordering_guarantee: OrderingGuarantee,
}

// TODO: At some point we should put something useful here. Possibly timing information or total
// bytes sent for metrics tracking.
#[derive(Clone, Default)]
pub struct ReceivedPacket;

#[cfg(test)]
mod test {
use crate::infrastructure::acknowledgment::ReceivedPacket;
use crate::infrastructure::{AcknowledgmentHandler, SentPacket};
use crate::packet::OrderingGuarantee;
use log::debug;

#[test]
fn increment_local_seq_num_on_process_outgoing() {
let mut handler = AcknowledgmentHandler::new();
assert_eq!(handler.local_sequence_num(), 0);
for i in 0..10 {
handler.process_outgoing(vec![].as_slice(), OrderingGuarantee::None);
assert_eq!(handler.local_sequence_num(), i + 1);
}
}

#[test]
fn local_seq_num_wraps_on_overflow() {
let mut handler = AcknowledgmentHandler::new();
handler.sequence_number = u16::max_value();
handler.process_outgoing(vec![].as_slice(), OrderingGuarantee::None);
assert_eq!(handler.local_sequence_num(), 0);
}

#[test]
fn ack_bitfield_with_empty_receive() {
let handler = AcknowledgmentHandler::new();
assert_eq!(handler.ack_bitfield(), 0)
}

#[test]
fn ack_bitfield_with_some_values() {
let mut handler = AcknowledgmentHandler::new();
handler
.received_packets
.insert(0, ReceivedPacket::default());
handler
.received_packets
.insert(1, ReceivedPacket::default());
handler
.received_packets
.insert(3, ReceivedPacket::default());
assert_eq!(handler.remote_sequence_num(), 3);
assert_eq!(handler.ack_bitfield(), 0b110)
}

#[test]
fn packet_is_not_acked() {
let mut handler = AcknowledgmentHandler::new();

handler.sequence_number = 0;
handler.process_outgoing(vec![1, 2, 3].as_slice(), OrderingGuarantee::None);
handler.sequence_number = 40;
handler.process_outgoing(vec![1, 2, 4].as_slice(), OrderingGuarantee::None);

static ARBITRARY: u16 = 23;
handler.process_incoming(ARBITRARY, 40, 0);

assert_eq!(
handler.dropped_packets(),
vec![SentPacket {
payload: vec![1, 2, 3].into_boxed_slice(),
ordering_guarantee: OrderingGuarantee::None,
}]
);
}

#[test]
fn acking_500_packets_without_packet_drop() {
let mut handler = AcknowledgmentHandler::new();
let mut other = AcknowledgmentHandler::new();

for i in 0..500 {
handler.sequence_number = i;
handler.process_outgoing(vec![1, 2, 3].as_slice(), OrderingGuarantee::None);

other.process_incoming(i, handler.remote_sequence_num(), handler.ack_bitfield());
handler.process_incoming(i, other.remote_sequence_num(), other.ack_bitfield());
}

assert_eq!(handler.dropped_packets().len(), 0);
}

#[test]
fn acking_many_packets_with_packet_drop() {
let mut handler = AcknowledgmentHandler::new();
let mut other = AcknowledgmentHandler::new();

let mut drop_count = 0;

for i in 0..100 {
handler.process_outgoing(vec![1, 2, 3].as_slice(), OrderingGuarantee::None);
handler.sequence_number = i;

// dropping every 4th with modulo's
if i % 4 == 0 {
debug!("Dropping packet: {}", drop_count);
drop_count += 1;
} else {
// We send them a packet
other.process_incoming(i, handler.remote_sequence_num(), handler.ack_bitfield());
// Skipped: other.process_outgoing
// And it makes it back
handler.process_incoming(i, other.remote_sequence_num(), other.ack_bitfield());
}
}
assert_eq!(drop_count, 25);
assert_eq!(handler.remote_sequence_num(), 99);
// Ack reads from right to left. So we know we have 99 since it's the last one we received.
// Then, the first bit is acking 98, then 97, then we're missing 96 which makes sense
// because 96 is evenly divisible by 4 and so on...
assert_eq!(handler.ack_bitfield(), 0b10111011101110111011101110111011);
assert_eq!(handler.dropped_packets().len(), 17);
}

#[test]
fn remote_seq_num_will_be_updated() {
let mut handler = AcknowledgmentHandler::new();
assert_eq!(handler.remote_sequence_num(), 65535);
handler.process_incoming(0, 0, 0);
assert_eq!(handler.remote_sequence_num(), 0);
handler.process_incoming(1, 0, 0);
assert_eq!(handler.remote_sequence_num(), 1);
}

#[test]
fn processing_a_full_set_of_packets() {
let mut handler = AcknowledgmentHandler::new();
for i in 0..33 {
handler.process_incoming(i, 0, 0);
}
assert_eq!(handler.remote_sequence_num(), 32);
assert_eq!(handler.ack_bitfield(), !0);
}

#[test]
fn test_process_outgoing() {
let mut handler = AcknowledgmentHandler::new();
handler.process_outgoing(vec![1, 2, 3].as_slice(), OrderingGuarantee::None);
assert_eq!(handler.sent_packets.len(), 1);
assert_eq!(handler.local_sequence_num(), 1);
}
}