Skip to content

Commit

Permalink
add TransportReceiver and TransportSender traits; comply with new ver…
Browse files Browse the repository at this point in the history
…sion of libtransport
  • Loading branch information
Maxime2 committed Sep 9, 2019
1 parent a29ca25 commit e7f63b5
Show file tree
Hide file tree
Showing 3 changed files with 206 additions and 2 deletions.
6 changes: 4 additions & 2 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,6 @@ impl<Data> TCPtransportCfg<Data> {
}
}

//#[derive(Clone, Debug, Deserialize, Eq, PartialEq, Serialize)]
/// Struct which will be implementing the Transport trait.
/// Requires transportconfig to be wrapped in a mutex + arc combination.
pub struct TCPtransport<Data> {
Expand All @@ -94,7 +93,7 @@ pub struct TCPtransport<Data> {
/// then execute method. If quit is received, then exit loop.
fn listener<Data: 'static>(cfg_mutexed: Arc<Mutex<TCPtransportCfg<Data>>>)
where
Data: Serialize + DeserializeOwned + Send + Clone,
Data: Send + Clone,
{
// FIXME: what we do with unwrap() in threads?
// Clone the inputted config struct.
Expand Down Expand Up @@ -269,6 +268,9 @@ where
}
}

pub mod receiver;
pub mod sender;

/// Tests
#[cfg(test)]
mod tests {
Expand Down
136 changes: 136 additions & 0 deletions src/receiver.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,136 @@
use crate::listener;
use crate::TCPtransportCfg;
use bincode::deserialize;
use buffer::ReadBuffer;
use futures::stream::Stream;
use futures::task::Context;
use futures::task::Poll;
use libcommon_rs::peer::{Peer, PeerId, PeerList};
use libtransport::errors::Result;
use libtransport::TransportReceiver;
use serde::de::DeserializeOwned;
use std::io;
use std::pin::Pin;
use std::sync::mpsc::{self, Sender};
use std::sync::{Arc, Mutex};
use std::thread;
use std::thread::JoinHandle;

/// Struct which will be implementing the Transport Receiver trait.
pub struct TCPreceiver<Data> {
// The configuration struct (defined above)
config: Arc<Mutex<TCPtransportCfg<Data>>>,
quit_tx: Sender<()>,
server_handle: Option<JoinHandle<()>>,
}

/// Specify what occurs when TCPreceiver is dropped.
impl<Data> Drop for TCPreceiver<Data> {
fn drop(&mut self) {
// Send quit message.
self.quit_tx.send(()).unwrap();
self.server_handle.take().unwrap().join().unwrap();
}
}

/// Implementation of the Transport trait.
impl<Id, Pe, Data: 'static, E, PL> TransportReceiver<Id, Data, E, PL> for TCPreceiver<Data>
where
Data: DeserializeOwned + Send + Clone,
Id: PeerId,
Pe: Peer<Id>,
PL: PeerList<Id, E, P = Pe>,
{
/// Create a new TCPtransport struct and configure its values.
fn new(bind_addr: String) -> Result<Self> {
// Create a new TCPtransport struct.
let mut cfg = TCPtransportCfg::<Data>::new(bind_addr)?;

// Create a new Multi Producer Single Consumer FIFO communications channel (with receiver
// and sender)
let (tx, rx) = mpsc::channel();
// Set the config's quit receiver as the returned receiver.
cfg.set_quit_rx(rx);
// Wrap config in an Arc<Mutex<>>
let cfg_mutexed = Arc::new(Mutex::new(cfg));
// Clone the variable defined above.
let config = Arc::clone(&cfg_mutexed);
// Pass the cloned variable to the listener of the server handle.
let handle = thread::spawn(|| listener(config));
// Create a new TCPtransport struct and pass cfg_mutexed (original mutex) to the struct.
Ok(TCPreceiver {
// quit_rx: rx,
quit_tx: tx,
server_handle: Some(handle),
config: cfg_mutexed,
})
}
}

/// Allow TCPreceiver to be store in Pin (for async usage)
impl<D> Unpin for TCPreceiver<D> {}

/// Implement stream trait to allow TCPreceiver to be used asynchronously. Allows for multiple
/// values to be yielded by a future.
impl<Data> Stream for TCPreceiver<Data>
where
Data: DeserializeOwned,
{
type Item = Data;
/// Attempts to resolve the next item in the stream.
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
// Gets a mutable reference to itself from a Pin value.
let myself = Pin::get_mut(self);
// Clones config for later use.
let config = Arc::clone(&myself.config);
// Lock config and get the bare mutex guard.
let mut cfg = config.lock().unwrap();
// Check the stream in config and process all messages.
for stream in cfg.listener.incoming() {
// Check what type of stream is incoming.
match stream {
Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
// check if quit channel got message
match &cfg.quit_rx {
None => {}
Some(ch) => {
if ch.try_recv().is_ok() {
break; // meaning Poll::Pending as we are going down
}
}
}
}
Err(e) => panic!("error in accepting connection: {}", e),
// If we get a valid stream
Ok(mut stream) => {
// Create a buffer for the data
let mut buffer: Vec<u8> = Vec::with_capacity(4096);
loop {
// Check to see if we get valid data from the stream.
let n = match stream.read_buffer(&mut buffer) {
// FIXME: what we do with panics in threads?
Err(e) => panic!("error reading from a connection: {}", e),
Ok(x) => x.len(),
};
// Check if the data is not empty.
if n == 0 {
// FIXME: check correct work in case when TCP next block delivery timeout is
// greater than read_buffer() read timeout
break;
}
}
// FIXME: what should we return in case of deserialize() failure,
// Poll::Ready(None) or Poll::Pending instead of panic?
// Deserialize data and package for use.
let data: Data = deserialize::<Data>(&buffer).unwrap();
// Return a ready status which contains the data.
return Poll::Ready(Some(data));
}
}
}
// Set the config's waker to the context's waker
cfg.waker = Some(cx.waker().clone());
// Return a 'Pending' Poll variant
Poll::Pending
}
}
66 changes: 66 additions & 0 deletions src/sender.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
extern crate libtransport;
use bincode::serialize;
use core::marker::PhantomData;
use libcommon_rs::peer::{Peer, PeerId, PeerList};
use libtransport::errors::{Error, Result};
use libtransport::TransportSender;
use serde::Serialize;
use std::io::Write;
use std::net::TcpStream;

pub struct TCPsender<Data> {
phantom: PhantomData<Data>,
}

impl<Id, Pe, Data: 'static, E, PL> TransportSender<Id, Data, E, PL> for TCPsender<Data>
where
Data: Serialize + Send + Clone,
Id: PeerId,
Pe: Peer<Id>,
PL: PeerList<Id, E, P = Pe>,
{
fn new() -> Result<Self> {
Ok(TCPsender {
phantom: PhantomData,
})
}
/// Sends data to a single, specified peer.
/// Requires the data to be sent, as well as the net address to be sent too.
fn send(&mut self, peer_address: String, data: Data) -> Result<()> {
// Create a TCPstream to the specified address.
let mut stream = TcpStream::connect(peer_address)?;
// Serialize data into bytes so that it can be transferred.
let bytes = serialize(&data)?;
// Write the byte data and send it through the stream.
let sent = stream.write(&bytes)?;
// Check if sent data is same as the serialized data.
if sent != bytes.len() {
return Err(Error::Incomplete);
}
// Shut down the stream once the message is sent.
stream.shutdown(std::net::Shutdown::Write)?;
Ok(())
}

/// Send a message to all peers in a PeerList.
/// Requires a PeerList and data struct.
fn broadcast(&mut self, peers: &mut PL, data: Data) -> Result<()> {
// Iterate over all peers
for p in peers.iter() {
//dbg!(p.get_net_addr());
// Create a TCP stream to the current net address.
let mut stream = TcpStream::connect(p.get_net_addr())?;
// Serialize data to a bytes.
let bytes = serialize(&data)?;
// Write bytes to the stream.
let sent = stream.write(&bytes)?;
// Check if sent data is same as the bytes initially made.
if sent != bytes.len() {
return Err(Error::Incomplete);
}
// Shut down the stream once the message has been sent.
stream.shutdown(std::net::Shutdown::Write)?;
}
Ok(())
}
}

0 comments on commit e7f63b5

Please sign in to comment.