Skip to content

Commit

Permalink
remake into async/.awat framework; make compatible with latest versio…
Browse files Browse the repository at this point in the history
…n of Transport trait
  • Loading branch information
Maxime2 committed Aug 26, 2019
1 parent 57b6f67 commit 3c7432d
Show file tree
Hide file tree
Showing 2 changed files with 86 additions and 95 deletions.
1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -14,3 +14,4 @@ serde = "1.0.98"
serde_derive = "1.0.98"
buffer = "0.1.8"
os_pipe="0.8.0"
futures-preview = { version = "=0.3.0-alpha.17", features = ["compat"] }
180 changes: 85 additions & 95 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,69 +4,55 @@ extern crate serde_derive;

use bincode::{deserialize, serialize};
use buffer::ReadBuffer;
use futures::stream::Stream;
use futures::task::Context;
use futures::task::Poll;
use futures::task::Waker;
use libcommon_rs::peer::{Peer, PeerId, PeerList};
use libtransport::errors::{Error, Error::AtMaxVecCapacity, Result};
use libtransport::errors::{Error, Result};
use libtransport::{Transport, TransportConfiguration};
//use os_pipe::PipeWriter;
use serde::de::DeserializeOwned;
use serde::Serialize;
use std::io;
use std::io::Write;
use std::marker::PhantomData;
use std::net::{TcpListener, TcpStream};
use std::pin::Pin;
use std::sync::mpsc::{self, Receiver, Sender};
use std::sync::{Arc, Mutex};
use std::thread;
use std::thread::JoinHandle;

pub struct TCPtransportCfg<Data> {
bind_net_addr: String,
channel_pool: Vec<Sender<Data>>,
//pipe_pool: Vec<PipeWriter>,
callback_pool: Vec<fn(Data) -> bool>,
callback_timeout: u64,
quit_rx: Option<Receiver<()>>,
listener: TcpListener,
waker: Option<Waker>,
phantom: PhantomData<Data>,
}

impl<Data> TransportConfiguration<Data> for TCPtransportCfg<Data> {
fn new(set_bind_net_addr: String) -> Self {
let listener = TcpListener::bind(set_bind_net_addr.clone()).unwrap();
listener
.set_nonblocking(true)
.expect("unable to set non-blocking");
TCPtransportCfg {
bind_net_addr: set_bind_net_addr,
channel_pool: Vec::with_capacity(1),
//pipe_pool: Vec::with_capacity(1),
callback_pool: Vec::with_capacity(1),
callback_timeout: 100, // 100 millisecond timeout by default
quit_rx: None,
listener,
waker: None,
phantom: PhantomData,
}
}
fn register_channel(&mut self, sender: Sender<Data>) -> Result<()> {
// Vec::push() panics when number of elements overflows `usize`
if self.channel_pool.len() == std::usize::MAX {
return Err(AtMaxVecCapacity);
}
self.channel_pool.push(sender);
Ok(())
}
//fn register_os_pipe(&mut self, sender: PipeWriter) -> Result<()> {
// // Vec::push() panics when number of elements overflows `usize`
// if self.pipe_pool.len() == std::usize::MAX {
// return Err(AtMaxVecCapacity);
// }
// self.pipe_pool.push(sender);
// Ok(())
//}
fn register_callback(&mut self, callback: fn(Data) -> bool) -> Result<()> {
// Vec::push() panics when number of elements overflows `usize`
if self.callback_pool.len() == std::usize::MAX {
return Err(AtMaxVecCapacity);
}
self.callback_pool.push(callback);
Ok(())
}
fn set_callback_timeout(&mut self, timeout: u64) {
self.callback_timeout = timeout;
}
fn set_bind_net_addr(&mut self, address: String) -> Result<()> {
self.bind_net_addr = address;
let listener = TcpListener::bind(self.bind_net_addr.clone()).unwrap();
listener
.set_nonblocking(true)
.expect("unable to set non-blocking");
use std::mem;
drop(mem::replace(&mut self.listener, listener));
Ok(())
}
}
Expand All @@ -84,68 +70,27 @@ pub struct TCPtransport<Data> {
server_handle: Option<JoinHandle<()>>,
}

fn handle_client<D: Clone>(cfg_mutexed: Arc<Mutex<TCPtransportCfg<D>>>, mut stream: TcpStream)
where
D: DeserializeOwned,
{
let mut buffer: Vec<u8> = Vec::with_capacity(4096);
loop {
let n = match stream.read_buffer(&mut buffer) {
// FIXME: what we do with panics in threads?
Err(e) => panic!("error reading from a connection: {}", e),
Ok(x) => x.len(),
};
if n == 0 {
// FIXME: check correct work in case when TCP next block delivery timeout is
// greater than read_buffer() read timeout
break;
}
}
let data: D = deserialize::<D>(&buffer).unwrap();
//dbg!(buffer);
let cfg = cfg_mutexed.lock().unwrap();
//dbg!(cfg.channel_pool.len());
for ch in cfg.channel_pool.iter() {
//println!("sending to channel.");
ch.send(data.clone()).unwrap();
}
}

fn listener<Data: 'static>(cfg_mutexed: Arc<Mutex<TCPtransportCfg<Data>>>)
where
Data: Serialize + DeserializeOwned + Send + Clone,
{
// FIXME: what we do with unwrap() in threads?
let config = Arc::clone(&cfg_mutexed);
let listener = {
let cfg = config.lock().unwrap();
TcpListener::bind(cfg.bind_net_addr.clone()).unwrap()
};
listener
.set_nonblocking(true)
.expect("unable to set non-blocking");
for stream in listener.incoming() {
match stream {
Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
// check if quit channel got message
let cfg = config.lock().unwrap();
match &cfg.quit_rx {
None => {}
Some(ch) => {
if ch.try_recv().is_ok() {
break;
}
}
loop {
// check if quit channel got message
let mut cfg = config.lock().unwrap();
match &cfg.quit_rx {
None => {}
Some(ch) => {
if ch.try_recv().is_ok() {
break;
}
continue;
}
Err(e) => panic!("error in accepting connection: {}", e),
Ok(stream) => {
let config = Arc::clone(&cfg_mutexed);
// receive Data and push it into channels, pipes and call callbacks
thread::spawn(move || handle_client(config, stream));
}
}
// allow to pool again if waker is set
if let Some(waker) = cfg.waker.take() {
waker.wake()
}
}
}

Expand Down Expand Up @@ -203,12 +148,57 @@ where
}
Ok(())
}
}

impl<D> Unpin for TCPtransport<D> {}

// fn register_channel(&mut self, sender: Sender<Data>) -> Result<()> {
// let mut cfg = self.config.lock()?;
// cfg.register_channel(sender)?;
// Ok(())
// }
impl<Data> Stream for TCPtransport<Data>
where
Data: DeserializeOwned,
{
type Item = Data;
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let myself = Pin::get_mut(self);
let config = Arc::clone(&myself.config);
let mut cfg = config.lock().unwrap();
for stream in cfg.listener.incoming() {
match stream {
Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
// check if quit channel got message
match &cfg.quit_rx {
None => {}
Some(ch) => {
if ch.try_recv().is_ok() {
break; // meaning Poll::Pending as we are going down
}
}
}
}
Err(e) => panic!("error in accepting connection: {}", e),
Ok(mut stream) => {
let mut buffer: Vec<u8> = Vec::with_capacity(4096);
loop {
let n = match stream.read_buffer(&mut buffer) {
// FIXME: what we do with panics in threads?
Err(e) => panic!("error reading from a connection: {}", e),
Ok(x) => x.len(),
};
if n == 0 {
// FIXME: check correct work in case when TCP next block delivery timeout is
// greater than read_buffer() read timeout
break;
}
}
// FIXME: what should we return in case of deserialize() failure,
// Poll::Ready(None) or Poll::Pending instead of panic?
let data: Data = deserialize::<Data>(&buffer).unwrap();
return Poll::Ready(Some(data));
}
}
}
cfg.waker = Some(cx.waker().clone());
Poll::Pending
}
}

#[cfg(test)]
Expand Down

0 comments on commit 3c7432d

Please sign in to comment.