diff --git a/src/conf.rs b/src/conf.rs index c3195de..c74cfa8 100644 --- a/src/conf.rs +++ b/src/conf.rs @@ -1,20 +1,79 @@ // Config module -pub struct DAGconfig { +use libconsensus::errors::{Error, Error::AtMaxVecCapacity, Result}; +use libconsensus::ConsensusConfiguration; +use os_pipe::PipeWriter; +use std::sync::mpsc::{Receiver, Sender, TryRecvError}; + +pub struct DAGconfig { pub(crate) inner_port: u16, pub(crate) service_port: u16, pub(crate) callback_timeout: u64, // heartbeat duration in milliseconds pub(crate) heartbeat: u64, + callback_pool: Vec bool>, + channel_pool: Vec>, + pipe_pool: Vec, + quit_rx: Option>, } -impl Default for DAGconfig { - fn default() -> Self { +impl DAGconfig { + pub fn set_quit_rx(&mut self, rx: Receiver<()>) { + self.quit_rx = Some(rx); + } + pub fn check_quit(&mut self) -> bool { + match &self.quit_rx { + None => return false, + Some(ch) => match ch.try_recv() { + Ok(_) | Err(TryRecvError::Disconnected) => return true, + Err(TryRecvError::Empty) => return false, + }, + } + } +} + +impl ConsensusConfiguration for DAGconfig { + fn new() -> Self { return DAGconfig { inner_port: 9000, service_port: 12000, callback_timeout: 100, heartbeat: 1000, + callback_pool: Vec::with_capacity(1), + channel_pool: Vec::with_capacity(1), + pipe_pool: Vec::with_capacity(1), + quit_rx: None, }; } + + fn register_channel(&mut self, sender: Sender) -> Result<()> { + // Vec::push() panics when number of elements overflows `usize` + if self.channel_pool.len() == std::usize::MAX { + return Err(AtMaxVecCapacity); + } + self.channel_pool.push(sender); + Ok(()) + } + + fn register_os_pipe(&mut self, sender: PipeWriter) -> Result<()> { + // Vec::push() panics when number of elements overflows `usize` + if self.pipe_pool.len() == std::usize::MAX { + return Err(AtMaxVecCapacity); + } + self.pipe_pool.push(sender); + Ok(()) + } + + fn register_callback(&mut self, callback: fn(data: Data) -> bool) -> Result<()> { + // Vec::push() panics when number of elements overflows `usize` + if self.callback_pool.len() == std::usize::MAX { + return Err(AtMaxVecCapacity); + } + self.callback_pool.push(callback); + Ok(()) + } + + fn set_callback_timeout(&mut self, timeout: u64) { + self.callback_timeout = timeout; + } } diff --git a/src/lib.rs b/src/lib.rs index 42a354c..e162f74 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -19,55 +19,29 @@ use std::time::Duration; // DAG node structure pub struct DAG { - conf: DAGconfig, + conf: DAGconfig, tx_pool: Vec, internal_tx_pool: Vec, - callback_pool: Vec bool>, - channel_pool: Vec>, - pipe_pool: Vec, - quit_rx: Receiver<()>, quit_tx: Sender<()>, lamport_time: LamportTime, current_frame: Frame, last_finalised_frame: Option, } -impl Default for DAG { - fn default() -> DAG { - let (tx, rx) = mpsc::channel(); - DAG { - conf: DAGconfig::default(), - tx_pool: Vec::with_capacity(1), - internal_tx_pool: Vec::with_capacity(1), - callback_pool: Vec::with_capacity(1), - channel_pool: Vec::with_capacity(1), - pipe_pool: Vec::with_capacity(1), - quit_rx: rx, - quit_tx: tx, - lamport_time: 0, - current_frame: 0, - last_finalised_frame: None, - } - } -} - impl Consensus for DAG where D: std::convert::AsRef, { - type Configuration = DAGconfig; + type Configuration = DAGconfig; type Data = D; - fn new(cfg: DAGconfig) -> DAG { + fn new(mut cfg: DAGconfig) -> DAG { let (tx, rx) = mpsc::channel(); + cfg.set_quit_rx(rx); return DAG { conf: cfg, tx_pool: Vec::with_capacity(1), internal_tx_pool: Vec::with_capacity(1), - callback_pool: Vec::with_capacity(1), - channel_pool: Vec::with_capacity(1), - pipe_pool: Vec::with_capacity(1), - quit_rx: rx, quit_tx: tx, lamport_time: 0, current_frame: 0, @@ -83,13 +57,10 @@ where // DAG0 procedure A loop loop { // check if shutdown() has been called - match self.quit_rx.try_recv() { - Ok(_) | Err(TryRecvError::Disconnected) => { - // terminating - // FIXME: need to be implemented - break; - } - Err(TryRecvError::Empty) => {} + if self.conf.check_quit() { + // terminating + // FIXME: need to be implemented + break; } // wait until hearbeat interval expires @@ -112,37 +83,6 @@ where self.tx_pool.push(data); true } - - fn register_callback(&mut self, callback: fn(data: Self::Data) -> bool) -> bool { - // Vec::push() panics when number of elements overflows `usize` - if self.callback_pool.len() == std::usize::MAX { - return false; - } - self.callback_pool.push(callback); - true - } - - fn set_callback_timeout(&mut self, timeout: u64) { - self.conf.callback_timeout = timeout; - } - - fn register_channel(&mut self, sender: Sender) -> bool { - // Vec::push() panics when number of elements overflows `usize` - if self.channel_pool.len() == std::usize::MAX { - return false; - } - self.channel_pool.push(sender); - true - } - - fn register_os_pipe(&mut self, sender: PipeWriter) -> bool { - // Vec::push() panics when number of elements overflows `usize` - if self.pipe_pool.len() == std::usize::MAX { - return false; - } - self.pipe_pool.push(sender); - true - } } impl DAG