diff --git a/src/io/futures/connecting_stream.rs b/src/io/futures/connecting_stream.rs index 09eb7f29..997f628c 100644 --- a/src/io/futures/connecting_stream.rs +++ b/src/io/futures/connecting_stream.rs @@ -68,14 +68,13 @@ impl Future for ConnectingStream { fn poll(&mut self) -> Poll { match try_ready!(self.either_poll()) { - Out::WaitForStream((stream, _)) => { - Ok(Ready(Stream { - closed: false, - next_packet: Some(PacketParser::empty().parse()), - buf: Some(Vec::new()), - endpoint: Some(stream.into()), - })) - } + Out::WaitForStream((stream, _)) => Ok(Ready(Stream { + closed: false, + parser: Some(PacketParser::empty()), + packets: Default::default(), + buf: Vec::new(), + endpoint: Some(stream.into()), + })), Out::Fail(_) => unreachable!(), } } diff --git a/src/io/mod.rs b/src/io/mod.rs index 52386cf1..44d300f3 100644 --- a/src/io/mod.rs +++ b/src/io/mod.rs @@ -25,9 +25,7 @@ use io::futures::WritePacket; use native_tls::{Certificate, Pkcs12, TlsConnector}; use myc::packets::{PacketParser, ParseResult, RawPacket}; use opts::SslOpts; -use std::cmp; use std::io; -use std::io::Read; use std::net::ToSocketAddrs; use std::time::Duration; use tokio::net::TcpStream; @@ -182,8 +180,9 @@ impl AsyncWrite for Endpoint { pub struct Stream { endpoint: Option, closed: bool, - next_packet: Option, - buf: Option>, + parser: Option, + packets: ::std::collections::VecDeque<(RawPacket, u8)>, + buf: Vec, } impl fmt::Debug for Stream { @@ -287,75 +286,67 @@ impl stream::Stream for Stream { type Error = Error; fn poll(&mut self) -> Poll, Error> { - loop { - // should read everything from self.endpoint - let mut would_block = false; - if !self.closed { - let mut buf = [0u8; 4096]; - loop { - match self.read(&mut buf[..]) { - Ok(0) => { - break; - } - Ok(size) => { - let buf_handle = self.buf.as_mut().unwrap(); - buf_handle.extend_from_slice(&buf[..size]); - } - Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => { - would_block = true; - break; - } - Err(error) => { - self.closed = true; - return Err(Error::from(error)); - } - }; - } - } else { - return Ok(Ready(None)); - } + // emit pending packets + if let Some(packet) = self.packets.pop_front() { + return Ok(Ready(Some(packet))); + } - // need to call again if there is a data in self.buf - // or data was written to packet parser - let mut should_poll = false; - - let next_packet = self.next_packet.take().expect( - "Stream.next_packet should not be None", - ); - let next_packet = match next_packet { - ParseResult::Done(packet, seq_id) => { - self.next_packet = Some(PacketParser::empty().parse()); - return Ok(Ready(Some((packet, seq_id)))); + // should read everything from self.endpoint + let mut buf = [0u8; 4096]; + while !self.closed { + match self.poll_read(&mut buf[..]) { + Err(error) => { + self.closed = true; + bail!(error) } - ParseResult::Incomplete(mut new_packet, needed) => { - let buf_handle = self.buf.as_mut().unwrap(); - let buf_len = buf_handle.len(); - - let to = cmp::min(needed, buf_len); - new_packet.extend_from_slice(&buf_handle[..to]); - { - let src = unsafe { buf_handle.as_ptr().offset(to as isize) }; - let dst = buf_handle.as_mut_ptr(); - let len = buf_handle.len() - to; - unsafe { - ::std::ptr::copy(src, dst, len); - buf_handle.set_len(len); - } - } - - if buf_len != 0 { - should_poll = true; - } - - new_packet + Ok(Ready(0)) => { + self.closed = true; } + Ok(Ready(size)) => { + self.buf.extend_from_slice(&buf[..size]); + } + Ok(NotReady) => break, }; + } - self.next_packet = Some(next_packet.parse()); + // parse buffer into packets + let (packets, parser, buf) = parse_packet(self.parser.take().unwrap(), &self.buf); + self.packets = packets.into(); + self.parser = Some(parser); + self.buf = buf; - if !should_poll && would_block { - return Ok(NotReady) - } + if let Some(packet) = self.packets.pop_front() { + return Ok(Ready(Some(packet))); + } + + if self.closed { + bail!(ErrorKind::ConnectionClosed); } + return Ok(NotReady); + } +} + +fn parse_packet( + mut parser: PacketParser, + mut buf: &[u8], +) -> (Vec<(RawPacket, u8)>, PacketParser, Vec) { + let mut packets = Vec::new(); + + loop { + parser = match parser.parse() { + ParseResult::Done(packet, seq_id) => { + packets.push((packet, seq_id)); + PacketParser::empty() + } + ParseResult::Incomplete(mut parser, needed) => { + if buf.len() < needed { + return (packets, parser, Vec::from(buf)); + } + + parser.extend_from_slice(&buf[..needed]); + buf = &buf[needed..]; + parser + } + }; } }