Skip to content

Commit

Permalink
implement ConsensusConfiguration trait; update to the latest version …
Browse files Browse the repository at this point in the history
…of libconsensus
  • Loading branch information
Maxime2 committed Aug 21, 2019
1 parent ab83eb4 commit b2280a3
Show file tree
Hide file tree
Showing 2 changed files with 70 additions and 71 deletions.
65 changes: 62 additions & 3 deletions src/conf.rs
Original file line number Diff line number Diff line change
@@ -1,20 +1,79 @@
// Config module

pub struct DAGconfig {
use libconsensus::errors::{Error, Error::AtMaxVecCapacity, Result};
use libconsensus::ConsensusConfiguration;
use os_pipe::PipeWriter;
use std::sync::mpsc::{Receiver, Sender, TryRecvError};

pub struct DAGconfig<D> {
pub(crate) inner_port: u16,
pub(crate) service_port: u16,
pub(crate) callback_timeout: u64,
// heartbeat duration in milliseconds
pub(crate) heartbeat: u64,
callback_pool: Vec<fn(data: D) -> bool>,
channel_pool: Vec<Sender<D>>,
pipe_pool: Vec<PipeWriter>,
quit_rx: Option<Receiver<()>>,
}

impl Default for DAGconfig {
fn default() -> Self {
impl<Data> DAGconfig<Data> {
pub fn set_quit_rx(&mut self, rx: Receiver<()>) {
self.quit_rx = Some(rx);
}
pub fn check_quit(&mut self) -> bool {
match &self.quit_rx {
None => return false,
Some(ch) => match ch.try_recv() {
Ok(_) | Err(TryRecvError::Disconnected) => return true,
Err(TryRecvError::Empty) => return false,
},
}
}
}

impl<Data> ConsensusConfiguration<Data> for DAGconfig<Data> {
fn new() -> Self {
return DAGconfig {
inner_port: 9000,
service_port: 12000,
callback_timeout: 100,
heartbeat: 1000,
callback_pool: Vec::with_capacity(1),
channel_pool: Vec::with_capacity(1),
pipe_pool: Vec::with_capacity(1),
quit_rx: None,
};
}

fn register_channel(&mut self, sender: Sender<Data>) -> Result<()> {
// Vec::push() panics when number of elements overflows `usize`
if self.channel_pool.len() == std::usize::MAX {
return Err(AtMaxVecCapacity);
}
self.channel_pool.push(sender);
Ok(())
}

fn register_os_pipe(&mut self, sender: PipeWriter) -> Result<()> {
// Vec::push() panics when number of elements overflows `usize`
if self.pipe_pool.len() == std::usize::MAX {
return Err(AtMaxVecCapacity);
}
self.pipe_pool.push(sender);
Ok(())
}

fn register_callback(&mut self, callback: fn(data: Data) -> bool) -> Result<()> {
// Vec::push() panics when number of elements overflows `usize`
if self.callback_pool.len() == std::usize::MAX {
return Err(AtMaxVecCapacity);
}
self.callback_pool.push(callback);
Ok(())
}

fn set_callback_timeout(&mut self, timeout: u64) {
self.callback_timeout = timeout;
}
}
76 changes: 8 additions & 68 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,55 +19,29 @@ use std::time::Duration;

// DAG node structure
pub struct DAG<T> {
conf: DAGconfig,
conf: DAGconfig<T>,
tx_pool: Vec<T>,
internal_tx_pool: Vec<InternalTransaction>,
callback_pool: Vec<fn(data: T) -> bool>,
channel_pool: Vec<Sender<T>>,
pipe_pool: Vec<PipeWriter>,
quit_rx: Receiver<()>,
quit_tx: Sender<()>,
lamport_time: LamportTime,
current_frame: Frame,
last_finalised_frame: Option<Frame>,
}

impl<D> Default for DAG<D> {
fn default() -> DAG<D> {
let (tx, rx) = mpsc::channel();
DAG {
conf: DAGconfig::default(),
tx_pool: Vec::with_capacity(1),
internal_tx_pool: Vec::with_capacity(1),
callback_pool: Vec::with_capacity(1),
channel_pool: Vec::with_capacity(1),
pipe_pool: Vec::with_capacity(1),
quit_rx: rx,
quit_tx: tx,
lamport_time: 0,
current_frame: 0,
last_finalised_frame: None,
}
}
}

impl<D> Consensus for DAG<D>
where
D: std::convert::AsRef<u8>,
{
type Configuration = DAGconfig;
type Configuration = DAGconfig<D>;
type Data = D;

fn new(cfg: DAGconfig) -> DAG<D> {
fn new(mut cfg: DAGconfig<D>) -> DAG<D> {
let (tx, rx) = mpsc::channel();
cfg.set_quit_rx(rx);
return DAG {
conf: cfg,
tx_pool: Vec::with_capacity(1),
internal_tx_pool: Vec::with_capacity(1),
callback_pool: Vec::with_capacity(1),
channel_pool: Vec::with_capacity(1),
pipe_pool: Vec::with_capacity(1),
quit_rx: rx,
quit_tx: tx,
lamport_time: 0,
current_frame: 0,
Expand All @@ -83,13 +57,10 @@ where
// DAG0 procedure A loop
loop {
// check if shutdown() has been called
match self.quit_rx.try_recv() {
Ok(_) | Err(TryRecvError::Disconnected) => {
// terminating
// FIXME: need to be implemented
break;
}
Err(TryRecvError::Empty) => {}
if self.conf.check_quit() {
// terminating
// FIXME: need to be implemented
break;
}

// wait until hearbeat interval expires
Expand All @@ -112,37 +83,6 @@ where
self.tx_pool.push(data);
true
}

fn register_callback(&mut self, callback: fn(data: Self::Data) -> bool) -> bool {
// Vec::push() panics when number of elements overflows `usize`
if self.callback_pool.len() == std::usize::MAX {
return false;
}
self.callback_pool.push(callback);
true
}

fn set_callback_timeout(&mut self, timeout: u64) {
self.conf.callback_timeout = timeout;
}

fn register_channel(&mut self, sender: Sender<Self::Data>) -> bool {
// Vec::push() panics when number of elements overflows `usize`
if self.channel_pool.len() == std::usize::MAX {
return false;
}
self.channel_pool.push(sender);
true
}

fn register_os_pipe(&mut self, sender: PipeWriter) -> bool {
// Vec::push() panics when number of elements overflows `usize`
if self.pipe_pool.len() == std::usize::MAX {
return false;
}
self.pipe_pool.push(sender);
true
}
}

impl<D> DAG<D>
Expand Down

0 comments on commit b2280a3

Please sign in to comment.