Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 7 additions & 8 deletions src/io/futures/connecting_stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -68,14 +68,13 @@ impl Future for ConnectingStream {

fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
match try_ready!(self.either_poll()) {
Out::WaitForStream((stream, _)) => {
Ok(Ready(Stream {
closed: false,
next_packet: Some(PacketParser::empty().parse()),
buf: Some(Vec::new()),
endpoint: Some(stream.into()),
}))
}
Out::WaitForStream((stream, _)) => Ok(Ready(Stream {
closed: false,
parser: Some(PacketParser::empty()),
packets: Default::default(),
buf: Vec::new(),
endpoint: Some(stream.into()),
})),
Out::Fail(_) => unreachable!(),
}
}
Expand Down
125 changes: 58 additions & 67 deletions src/io/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,7 @@ use io::futures::WritePacket;
use native_tls::{Certificate, Pkcs12, TlsConnector};
use myc::packets::{PacketParser, ParseResult, RawPacket};
use opts::SslOpts;
use std::cmp;
use std::io;
use std::io::Read;
use std::net::ToSocketAddrs;
use std::time::Duration;
use tokio::net::TcpStream;
Expand Down Expand Up @@ -182,8 +180,9 @@ impl AsyncWrite for Endpoint {
pub struct Stream {
endpoint: Option<Endpoint>,
closed: bool,
next_packet: Option<ParseResult>,
buf: Option<Vec<u8>>,
parser: Option<PacketParser>,
packets: ::std::collections::VecDeque<(RawPacket, u8)>,
buf: Vec<u8>,
}

impl fmt::Debug for Stream {
Expand Down Expand Up @@ -287,75 +286,67 @@ impl stream::Stream for Stream {
type Error = Error;

fn poll(&mut self) -> Poll<Option<(RawPacket, u8)>, Error> {
loop {
// should read everything from self.endpoint
let mut would_block = false;
if !self.closed {
let mut buf = [0u8; 4096];
loop {
match self.read(&mut buf[..]) {
Ok(0) => {
break;
}
Ok(size) => {
let buf_handle = self.buf.as_mut().unwrap();
buf_handle.extend_from_slice(&buf[..size]);
}
Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
would_block = true;
break;
}
Err(error) => {
self.closed = true;
return Err(Error::from(error));
}
};
}
} else {
return Ok(Ready(None));
}
// emit pending packets
if let Some(packet) = self.packets.pop_front() {
return Ok(Ready(Some(packet)));
}

// need to call again if there is a data in self.buf
// or data was written to packet parser
let mut should_poll = false;

let next_packet = self.next_packet.take().expect(
"Stream.next_packet should not be None",
);
let next_packet = match next_packet {
ParseResult::Done(packet, seq_id) => {
self.next_packet = Some(PacketParser::empty().parse());
return Ok(Ready(Some((packet, seq_id))));
// should read everything from self.endpoint
let mut buf = [0u8; 4096];
while !self.closed {
match self.poll_read(&mut buf[..]) {
Err(error) => {
self.closed = true;
bail!(error)
}
ParseResult::Incomplete(mut new_packet, needed) => {
let buf_handle = self.buf.as_mut().unwrap();
let buf_len = buf_handle.len();

let to = cmp::min(needed, buf_len);
new_packet.extend_from_slice(&buf_handle[..to]);
{
let src = unsafe { buf_handle.as_ptr().offset(to as isize) };
let dst = buf_handle.as_mut_ptr();
let len = buf_handle.len() - to;
unsafe {
::std::ptr::copy(src, dst, len);
buf_handle.set_len(len);
}
}

if buf_len != 0 {
should_poll = true;
}

new_packet
Ok(Ready(0)) => {
self.closed = true;
}
Ok(Ready(size)) => {
self.buf.extend_from_slice(&buf[..size]);
}
Ok(NotReady) => break,
};
}

self.next_packet = Some(next_packet.parse());
// parse buffer into packets
let (packets, parser, buf) = parse_packet(self.parser.take().unwrap(), &self.buf);
self.packets = packets.into();
self.parser = Some(parser);
self.buf = buf;

if !should_poll && would_block {
return Ok(NotReady)
}
if let Some(packet) = self.packets.pop_front() {
return Ok(Ready(Some(packet)));
}

if self.closed {
bail!(ErrorKind::ConnectionClosed);
}
return Ok(NotReady);
}
}

fn parse_packet(
mut parser: PacketParser,
mut buf: &[u8],
) -> (Vec<(RawPacket, u8)>, PacketParser, Vec<u8>) {
let mut packets = Vec::new();

loop {
parser = match parser.parse() {
ParseResult::Done(packet, seq_id) => {
packets.push((packet, seq_id));
PacketParser::empty()
}
ParseResult::Incomplete(mut parser, needed) => {
if buf.len() < needed {
return (packets, parser, Vec::from(buf));
}

parser.extend_from_slice(&buf[..needed]);
buf = &buf[needed..];
parser
}
};
}
}