Skip to content

Commit

Permalink
initial TCP future impl
Browse files Browse the repository at this point in the history
  • Loading branch information
bluejekyll committed Nov 20, 2016
1 parent 0a44356 commit 16cb3c3
Show file tree
Hide file tree
Showing 14 changed files with 453 additions and 206 deletions.
1 change: 0 additions & 1 deletion client/src/client/client_future.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@ use ::op::{Message, MessageType, OpCode, Query, UpdateMessage};
use ::rr::{domain, DNSClass, RData, Record, RecordType};
use ::rr::dnssec::Signer;
use ::rr::rdata::NULL;
use ::udp::UdpClientStreamHandle;

const QOS_MAX_RECEIVE_MSGS: usize = 100; // max number of messages to receive from the UDP socket

Expand Down
18 changes: 15 additions & 3 deletions client/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,19 +56,31 @@ use futures::stream::Stream;
use tokio_core::channel::Sender;

use op::Message;
use client::ClientStreamHandle;

/// A stream of serialize DNS Messages
pub type BufferStream = Stream<Item=(Vec<u8>, SocketAddr), Error=io::Error>;
/// A stream of serialized DNS Messages
pub type BufStream = Stream<Item=(Vec<u8>, SocketAddr), Error=io::Error>;

/// A sender to which serialized DNS Messages can be sent
pub type BufferStreamHandle = Sender<(Vec<u8>, SocketAddr)>;
pub type BufStreamHandle = Sender<(Vec<u8>, SocketAddr)>;

/// A stream of messsages
pub type MessageStream = Stream<Item=Message, Error=io::Error>;

/// A sender to which a Message can be sent
pub type MessageStreamHandle = Sender<Message>;

pub struct BufClientStreamHandle {
name_server: SocketAddr,
sender: BufStreamHandle,
}

impl ClientStreamHandle for BufClientStreamHandle {
fn send(&self, buffer: Vec<u8>) -> io::Result<()> {
self.sender.send((buffer, self.name_server))
}
}

/// this exposes a version function which gives access to the access
include!(concat!(env!("OUT_DIR"), "/version.rs"));

Expand Down
2 changes: 2 additions & 0 deletions client/src/tcp/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,10 @@
mod handler;
mod tcp_client_connection;
mod tcp_client_stream;
mod tcp_stream;

pub use self::handler::TcpHandler;
pub use self::handler::TcpState;
pub use self::tcp_client_connection::TcpClientConnection;
pub use self::tcp_client_stream::TcpClientStream;
pub use self::tcp_stream::TcpStream;
180 changes: 22 additions & 158 deletions client/src/tcp/tcp_client_stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,58 +5,39 @@
// http://opensource.org/licenses/MIT>, at your option. This file may not be
// copied, modified, or distributed except according to those terms.

use std::mem;
use std::net::SocketAddr;
use std::io;
use std::io::{Read, Write};

use futures::{Async, Future, Poll};
use futures::stream::{Fuse, Peekable, Stream};
use tokio_core::net::TcpStream as TokioTcpStream;
use tokio_core::channel::{channel, Sender, Receiver};
use futures::stream::Stream;
use tokio_core::reactor::{Handle};

use ::BufClientStreamHandle;
use ::tcp::TcpStream;
use ::client::ClientStreamHandle;

enum WriteTcpState {
LenBytes{ pos: usize, length: [u8; 2], bytes: Vec<u8> },
Bytes{ pos: usize, bytes: Vec<u8> },
}

enum ReadTcpState {
LenBytes{ pos: usize, bytes: [u8; 2] },
Bytes{ pos: usize, bytes: Vec<u8> },
}

#[must_use = "futures do nothing unless polled"]
pub struct TcpClientStream {
socket: TokioTcpStream,
outbound_messages: Peekable<Fuse<Receiver<Vec<u8>>>>,
send_state: Option<WriteTcpState>,
read_state: ReadTcpState,
tcp_stream: TcpStream,
}

impl TcpClientStream {
/// it is expected that the resolver wrapper will be responsible for creating and managing
/// new TcpClients such that each new client would have a random port (reduce chance of cache
/// poisoning)
pub fn new(name_server: SocketAddr, loop_handle: Handle) -> (Box<Future<Item=TcpClientStream, Error=io::Error>>, Box<ClientStreamHandle>) {
let (message_sender, outbound_messages) = channel(&loop_handle).expect("somethings wrong with the event loop");
let tcp = TokioTcpStream::connect(&name_server, &loop_handle);
let (stream_future, sender) = TcpStream::new(name_server, loop_handle);

// This set of futures collapses the next tcp socket into a stream which can be used for
// sending and receiving tcp packets.
let stream: Box<Future<Item=TcpClientStream, Error=io::Error>> = Box::new(tcp
.map(move |tcp_stream| {
let new_future: Box<Future<Item=TcpClientStream, Error=io::Error>> =
Box::new(stream_future.map(move |tcp_stream| {
TcpClientStream {
socket: tcp_stream,
outbound_messages: outbound_messages.fuse().peekable(),
send_state: None,
read_state: ReadTcpState::LenBytes { pos: 0, bytes: [0u8; 2] },
tcp_stream: tcp_stream,
}
}));

(stream, Box::new(message_sender))
let sender = Box::new(BufClientStreamHandle{ name_server: name_server, sender: sender });

(new_future, sender)
}
}

Expand All @@ -65,143 +46,26 @@ impl Stream for TcpClientStream {
type Error = io::Error;

fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> {
debug!("being polled");

// this will not accept incoming data while there is data to send
// makes this self throttling.
// TODO: it might be interesting to try and split the sending and receiving futures.
loop {
// in the case we are sending, send it all?
if self.send_state.is_some() {
// sending...
match self.send_state {
Some(WriteTcpState::LenBytes{ ref mut pos, ref length, .. }) => {
let wrote = try_nb!(self.socket.write(&length[*pos..]));
*pos += wrote;
},
Some(WriteTcpState::Bytes{ ref mut pos, ref bytes }) => {
let wrote = try_nb!(self.socket.write(&bytes[*pos..]));
*pos += wrote;
},
_ => (),
match try_ready!(self.tcp_stream.poll()) {
Some((buffer, src_addr)) => {
// this is busted if the tcp connection doesn't have a peer
let peer = try!(self.tcp_stream.peer_addr());
if src_addr != peer {
// FIXME: this should be an error...
warn!("{} does not match name_server: {}", src_addr, peer)
}

// get current state
let current_state = mem::replace(&mut self.send_state, None);

// switch states
match current_state {
Some(WriteTcpState::LenBytes{ pos, length, bytes }) => {
if pos < length.len() {
mem::replace(&mut self.send_state, Some(WriteTcpState::LenBytes{ pos: pos, length: length, bytes: bytes }));
} else{
mem::replace(&mut self.send_state, Some(WriteTcpState::Bytes{ pos: 0, bytes: bytes }));
}
},
Some(WriteTcpState::Bytes{ pos, bytes }) => {
if pos < bytes.len() {
mem::replace(&mut self.send_state, Some(WriteTcpState::Bytes{ pos: pos, bytes: bytes }));
} else {
mem::replace(&mut self.send_state, None);
}
},
None => (),
};
} else {
// then see if there is more to send
match try!(self.outbound_messages.poll()) {
// already handled above, here to make sure the poll() pops the next message
Async::Ready(Some(buffer)) => {
// will return if the socket will block
debug!("received buffer, sending");

// the length is 16 bits
let len: [u8; 2] = [(buffer.len() >> 8 & 0xFF) as u8,
(buffer.len() & 0xFF) as u8];

self.send_state = Some(WriteTcpState::LenBytes{ pos: 0, length: len, bytes: buffer });
},
// now we get to drop through to the receives...
// TODO: should we also return None if there are no more messages to send?
Async::NotReady | Async::Ready(None) => { debug!("no messages to send"); break },
}
Ok(Async::Ready(Some(buffer)))
}
}

debug!("continuing to read");
let mut ret_buf: Option<Vec<u8>> = None;

// this will loop while there is data to read, or the data has been read, or an IO
// event would block
while ret_buf.is_none() {
// Evaluates the next state. If None is the result, then no state change occurs,
// if Some(_) is returned, then that will be used as the next state.
let new_state: Option<ReadTcpState> = match self.read_state {
ReadTcpState::LenBytes { ref mut pos, ref mut bytes } => {
debug!("in ReadTcpState::LenBytes: {}", pos);

// debug!("reading length {}", bytes.len());
let read = try_nb!(self.socket.read(&mut bytes[*pos..]));
*pos += read;

if *pos < bytes.len() {
debug!("remain ReadTcpState::LenBytes: {}", pos);
None
} else {
let length = (bytes[0] as u16) << 8 & 0xFF00 | bytes[1] as u16 & 0x00FF;
debug!("got length: {}", length);
let mut bytes = Vec::with_capacity(length as usize);
bytes.resize(length as usize, 0);

debug!("move ReadTcpState::Bytes: {}", bytes.len());
Some(ReadTcpState::Bytes{ pos: 0, bytes: bytes })
}
},
ReadTcpState::Bytes { ref mut pos, ref mut bytes } => {
debug!("in ReadTcpState::Bytes: {}", bytes.len());
let read = try_nb!(self.socket.read(&mut bytes[*pos..]));
*pos += read;

if *pos < bytes.len() {
debug!("remain ReadTcpState::Bytes: {}", bytes.len());
None
} else {
debug!("reset ReadTcpState::LenBytes: {}", 0);
Some(ReadTcpState::LenBytes{ pos: 0, bytes: [0u8; 2] })
}
},
};

// this will move to the next state,
// if it was a completed receipt of bytes, then it will move out the bytes
if let Some(state) = new_state {
match mem::replace(&mut self.read_state, state) {
ReadTcpState::Bytes{ pos, bytes } => {
debug!("returning bytes");
assert_eq!(pos, bytes.len());
ret_buf = Some(bytes);
},
_ => (),
}
}
}

// if the buffer is ready, return it, if not we're NotReady
if let Some(buffer) = ret_buf {
debug!("returning buffer");
return Ok(Async::Ready(Some(buffer)))
} else {
debug!("bottomed out");
// at a minimum the outbound_messages should have been polled,
// which will wake this future up later...
return Ok(Async::NotReady)
None => Ok(Async::Ready(None)),
}
}
}

#[cfg(test)] use std::net::{IpAddr, Ipv4Addr, Ipv6Addr};


#[cfg(test)] use std::net::{IpAddr, Ipv4Addr, Ipv6Addr};

#[test]
// this fails on linux for some reason. It appears that a buffer somewhere is dirty
// and subsequent reads of a mesage buffer reads the wrong length. It works for 2 iterations
Expand Down

0 comments on commit 16cb3c3

Please sign in to comment.