Skip to content

Commit

Permalink
Add --udp_buffer_size params on both sides (#6)
Browse files Browse the repository at this point in the history
Co-authored-by: aquinaou-anssi <>
  • Loading branch information
aquinaou-anssi committed Dec 15, 2023
1 parent 5277a4f commit b3b09e7
Show file tree
Hide file tree
Showing 7 changed files with 37 additions and 2 deletions.
9 changes: 9 additions & 0 deletions doc/parameters.rst
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,15 @@ On the receiver side, the option:
defines ip and port to listen for incoming UDP packets, and should be set to the same value as `--to-udp`.

Optionally, you can set the size of the UDP socket buffers with following option:

.. code-block::
--udp_buffer_size <nb_bytes>
This option is available on both sides. Default value is 1073741824 which is the highest possible value.
The specified size is then doubled by the kernel (see https://man7.org/linux/man-pages/man7/socket.7.html).

Block and packet sizes
----------------------

Expand Down
12 changes: 12 additions & 0 deletions src/bin/diode-receive.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ struct Config {
nb_clients: u16,
encoding_block_size: u64,
repair_block_size: u32,
udp_buffer_size: u32,
flush_timeout: time::Duration,
nb_decoding_threads: u8,
to: ClientConfig,
Expand Down Expand Up @@ -87,6 +88,14 @@ fn command_args() -> Config {
.value_parser(clap::value_parser!(u32))
.help("Size of repair data in bytes"),
)
.arg(
Arg::new("udp_buffer_size")
.long("udp_buffer_size")
.value_name("nb_bytes")
.default_value("1073741823") // i32::MAX / 2
.value_parser(clap::value_parser!(u32).range(..1073741824))
.help("Size of UDP socket recv buffer"),
)
.arg(
Arg::new("flush_timeout")
.long("flush_timeout")
Expand Down Expand Up @@ -128,6 +137,7 @@ fn command_args() -> Config {
let nb_clients = *args.get_one::<u16>("nb_clients").expect("default");
let nb_decoding_threads = *args.get_one::<u8>("nb_decoding_threads").expect("default");
let encoding_block_size = *args.get_one::<u64>("encoding_block_size").expect("default");
let udp_buffer_size = *args.get_one::<u32>("udp_buffer_size").expect("default");
let repair_block_size = *args.get_one::<u32>("repair_block_size").expect("default");
let flush_timeout = time::Duration::from_millis(
args.get_one::<NonZeroU64>("flush_timeout")
Expand Down Expand Up @@ -159,6 +169,7 @@ fn command_args() -> Config {
nb_decoding_threads,
encoding_block_size,
repair_block_size,
udp_buffer_size,
flush_timeout,
to,
heartbeat,
Expand Down Expand Up @@ -226,6 +237,7 @@ fn main() {
nb_clients: config.nb_clients,
encoding_block_size: config.encoding_block_size,
repair_block_size: config.repair_block_size,
udp_buffer_size: config.udp_buffer_size,
flush_timeout: config.flush_timeout,
nb_decoding_threads: config.nb_decoding_threads,
heartbeat_interval: config.heartbeat,
Expand Down
12 changes: 12 additions & 0 deletions src/bin/diode-send.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ struct Config {
nb_clients: u16,
encoding_block_size: u64,
repair_block_size: u32,
udp_buffer_size: u32,
nb_encoding_threads: u8,
to_bind: net::SocketAddr,
to_udp: net::SocketAddr,
Expand Down Expand Up @@ -80,6 +81,14 @@ fn command_args() -> Config {
.value_parser(clap::value_parser!(u32))
.help("Size of repair data in bytes"),
)
.arg(
Arg::new("udp_buffer_size")
.long("udp_buffer_size")
.value_name("nb_bytes")
.default_value("1073741823") // i32::MAX / 2
.value_parser(clap::value_parser!(u32).range(..1073741824))
.help("Size of UDP socket send buffer"),
)
.arg(
Arg::new("to_bind")
.long("to_bind")
Expand Down Expand Up @@ -128,6 +137,7 @@ fn command_args() -> Config {
let nb_encoding_threads = *args.get_one::<u8>("nb_encoding_threads").expect("default");
let encoding_block_size = *args.get_one::<u64>("encoding_block_size").expect("default");
let repair_block_size = *args.get_one::<u32>("repair_block_size").expect("default");
let udp_buffer_size = *args.get_one::<u32>("udp_buffer_size").expect("default");
let to_bind = net::SocketAddr::from_str(args.get_one::<String>("to_bind").expect("default"))
.expect("invalid to_bind parameter");
let to_udp = net::SocketAddr::from_str(args.get_one::<String>("to_udp").expect("default"))
Expand All @@ -145,6 +155,7 @@ fn command_args() -> Config {
nb_clients,
nb_encoding_threads,
encoding_block_size,
udp_buffer_size,
repair_block_size,
to_bind,
to_udp,
Expand Down Expand Up @@ -231,6 +242,7 @@ fn main() {
nb_clients: config.nb_clients,
encoding_block_size: config.encoding_block_size,
repair_block_size: config.repair_block_size,
udp_buffer_size: config.udp_buffer_size,
nb_encoding_threads: config.nb_encoding_threads,
heartbeat_interval: config.heartbeat,
to_bind: config.to_bind,
Expand Down
1 change: 1 addition & 0 deletions src/receive/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ pub struct Config {
pub nb_clients: u16,
pub encoding_block_size: u64,
pub repair_block_size: u32,
pub udp_buffer_size: u32,
pub flush_timeout: time::Duration,
pub nb_decoding_threads: u8,
pub heartbeat_interval: Option<time::Duration>,
Expand Down
2 changes: 1 addition & 1 deletion src/receive/udp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ pub(crate) fn start<F>(receiver: &receive::Receiver<F>) -> Result<(), receive::E
receiver.config.from_udp_mtu
);
let socket = net::UdpSocket::bind(receiver.config.from_udp)?;
sock_utils::set_socket_recv_buffer_size(&socket, i32::MAX)?;
sock_utils::set_socket_recv_buffer_size(&socket, receiver.config.udp_buffer_size as i32)?;
let sock_buffer_size = sock_utils::get_socket_recv_buffer_size(&socket)?;
log::info!("UDP socket receive buffer size set to {sock_buffer_size}");
if (sock_buffer_size as u64)
Expand Down
1 change: 1 addition & 0 deletions src/send/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ pub struct Config {
pub nb_clients: u16,
pub encoding_block_size: u64,
pub repair_block_size: u32,
pub udp_buffer_size: u32,
pub nb_encoding_threads: u8,
pub heartbeat_interval: Option<time::Duration>,
pub to_bind: net::SocketAddr,
Expand Down
2 changes: 1 addition & 1 deletion src/send/udp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ pub(crate) fn start<C>(sender: &send::Sender<C>) -> Result<(), send::Error> {
sender.config.to_bind
);
let socket = net::UdpSocket::bind(sender.config.to_bind)?;
sock_utils::set_socket_send_buffer_size(&socket, i32::MAX)?;
sock_utils::set_socket_send_buffer_size(&socket, sender.config.udp_buffer_size as i32)?;
let sock_buffer_size = sock_utils::get_socket_send_buffer_size(&socket)?;
log::info!("UDP socket send buffer size set to {sock_buffer_size}");
if (sock_buffer_size as u64)
Expand Down

0 comments on commit b3b09e7

Please sign in to comment.