Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion async/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ build = "build.rs"
[dependencies]
log = "^0.4"
nom = "^3.0"
cookie-factory = "^0.2"
cookie-factory = "^0.2.4"
amq-protocol = "^0.19"

[dependencies.sasl]
Expand Down
31 changes: 8 additions & 23 deletions async/src/format/frame.rs
Original file line number Diff line number Diff line change
Expand Up @@ -110,29 +110,14 @@ pub fn frame(input: &[u8]) -> IResult<&[u8], Frame> {
}

pub fn gen_method_frame<'a>(input:(&'a mut [u8],usize), channel: u16, class: &Class) -> Result<(&'a mut [u8],usize),GenError> {
//FIXME: this does not take into account the BufferTooSmall errors
let r = gen_be_u8!(input, constants::FRAME_METHOD);
//println!("r: {:?}", r);
if let Ok(input1) = r {
if let Ok((sl2, index2)) = gen_be_u16!(input1, channel) {
if let Ok((sl3, index3)) = gen_class((sl2, index2 + 4), class) {
if let Ok((sl4, _)) = gen_be_u32!((sl3, index2), index3 - index2 - 4) {
gen_be_u8!((sl4, index3), constants::FRAME_END)
//if let Ok((sl5, index5)) = gen_be_u8!((sl4, index3), constants::FRAME_END)
//{
//}
} else {
Err(GenError::CustomError(1))
}
} else {
Err(GenError::CustomError(2))
}
} else {
Err(GenError::CustomError(3))
}
} else {
Err(GenError::CustomError(4))
}
do_gen!(input,
gen_be_u8!(constants::FRAME_METHOD) >>
gen_be_u16!(channel) >>
len: gen_skip!(4) >>
start: gen_class(class) >>
end: gen_at_offset!(len, gen_be_u32!(end - start)) >>
gen_be_u8!(constants::FRAME_END)
)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Heh, I was pretty sure I did something similar already... found it: https://github.com/sozu-proxy/amq-protocol/blob/master/protocol/src/frame/generation.rs#L19

I thought I synced that a while ago but apparently I didn't.
Will sync all that stuff soonish after some testing

}

pub fn gen_heartbeat_frame<'a>(input:(&'a mut [u8],usize)) -> Result<(&'a mut [u8],usize),GenError> {
Expand Down
2 changes: 1 addition & 1 deletion futures/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,7 @@

extern crate cookie_factory;
extern crate bytes;
#[macro_use] extern crate futures;
extern crate futures;
extern crate lapin_async;
#[macro_use] extern crate log;
extern crate nom;
Expand Down
172 changes: 132 additions & 40 deletions futures/src/transport.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,11 @@ use lapin_async::format::frame::*;

use nom::{IResult,Offset};
use cookie_factory::GenError;
use bytes::BytesMut;
use bytes::{BufMut, BytesMut};
use std::cmp;
use std::iter::repeat;
use std::io::{self,Error,ErrorKind};
use futures::{Async,AsyncSink,Poll,Sink,Stream,Future,future};
use futures::{Async,AsyncSink,Poll,Sink,StartSend,Stream,Future,future};
use tokio_io::{AsyncRead,AsyncWrite};
use tokio_io::codec::{Decoder,Encoder,Framed};
use channel::BasicProperties;
Expand Down Expand Up @@ -49,52 +49,51 @@ impl Encoder for AMQPCodec {
type Error = io::Error;

fn encode(&mut self, frame: Frame, buf: &mut BytesMut) -> Result<(), Self::Error> {
let length = buf.len();
// Ensure we at least allocate 8192 so that the buffer is big enough for the frame_max
// negociation. Afterwards, use frame_max if > 8192.
let frame_max = cmp::max(self.frame_max, 8192) as usize;
if length < frame_max {
//reserve more capacity and intialize it
buf.extend(repeat(0).take(frame_max - length));
}
trace!("amqp encoder; frame={:?}", frame);

trace!("encoder; frame={:?}", frame);
let offset = buf.len();
loop {
// If the buffer starts running out of capacity (the threshold is 1/4 of a frame), we
// reserve more bytes upfront to avoid putting too much strain on the allocator.
if buf.remaining_mut() < frame_max / 4 {
trace!("encoder; reserve={}", frame_max * 2);
buf.reserve(frame_max * 2);
}

let gen_res = match &frame {
&Frame::ProtocolHeader => {
gen_protocol_header((buf, 0)).map(|tup| tup.1)
gen_protocol_header((buf, offset)).map(|tup| tup.1)
},
&Frame::Heartbeat(_) => {
gen_heartbeat_frame((buf, 0)).map(|tup| tup.1)
gen_heartbeat_frame((buf, offset)).map(|tup| tup.1)
},
&Frame::Method(channel, ref method) => {
gen_method_frame((buf, 0), channel, method).map(|tup| tup.1)
gen_method_frame((buf, offset), channel, method).map(|tup| tup.1)
},
&Frame::Header(channel_id, class_id, ref header) => {
gen_content_header_frame((buf, 0), channel_id, class_id, header.body_size, &header.properties).map(|tup| tup.1)
gen_content_header_frame((buf, offset), channel_id, class_id, header.body_size, &header.properties).map(|tup| tup.1)
},
&Frame::Body(channel_id, ref data) => {
gen_content_body_frame((buf, 0), channel_id, data).map(|tup| tup.1)
gen_content_body_frame((buf, offset), channel_id, data).map(|tup| tup.1)
}
};

match gen_res {
Ok(sz) => {
buf.truncate(sz);
trace!("amqp serializer; frame_size={}", sz);
trace!("encoder; frame_size={}", sz - offset);
return Ok(());
},
Err(GenError::BufferTooSmall(sz)) => {
// BufferTooSmall error variant returns the index the next write would have
// occured if there was enough space in the buffer. Thus we subtract the
// buffer's length to know how much bytes we sould make available.
let length = buf.len();
trace!("encoder; sz={} length={} extend={}", sz, length, sz - length);
buf.extend(repeat(0).take(sz - length));
},
Err(e) => {
error!("error generating frame: {:?}", e);
match e {
GenError::BufferTooSmall(sz) => {
buf.extend(repeat(0).take(sz - length));
//return Err(Error::new(ErrorKind::InvalidData, "send buffer too small"));
},
GenError::InvalidOffset | GenError::CustomError(_) | GenError::NotYetImplemented => {
return Err(Error::new(ErrorKind::InvalidData, "could not generate"));
}
}
return Err(Error::new(ErrorKind::InvalidData, "could not generate"));
}
}
}
Expand All @@ -104,7 +103,6 @@ impl Encoder for AMQPCodec {
/// Wrappers over a `Framed` stream using `AMQPCodec` and lapin-async's `Connection`
pub struct AMQPTransport<T> {
upstream: Framed<T,AMQPCodec>,
flush_needed: bool,
pub conn: Connection,
}

Expand Down Expand Up @@ -132,7 +130,6 @@ impl<T> AMQPTransport<T>
};
let t = AMQPTransport {
upstream: stream.framed(codec),
flush_needed: false,
conn: conn,
};
let connector = AMQPTransportConnector {
Expand Down Expand Up @@ -197,21 +194,11 @@ impl<T> AMQPTransport<T>

/// Poll the network to send outcoming frames.
pub fn poll_send(&mut self) -> Poll<(), io::Error> {
// Flush any pending frame.
if self.flush_needed == true {
try_ready!(self.upstream.poll_complete());
self.flush_needed = false;
}
while let Some(frame) = self.conn.next_frame() {
trace!("transport poll_send; frame={:?}", frame);
match self.upstream.start_send(frame)? {
AsyncSink::Ready => {
trace!("transport poll_send; status=Ready");
// The current `Framed` codec implementation requires us to flush after each send.
if let Async::NotReady = self.upstream.poll_complete()? {
self.flush_needed = true;
return Ok(Async::NotReady);
}
},
AsyncSink::NotReady(frame) => {
trace!("transport poll_send; status=NotReady");
Expand All @@ -220,7 +207,7 @@ impl<T> AMQPTransport<T>
}
}
}
Ok(Async::Ready(()))
self.upstream.poll_complete()
}
}

Expand All @@ -241,6 +228,24 @@ impl<T> Future for AMQPTransport<T>
}
}

impl <T> Sink for AMQPTransport<T>
where T: AsyncWrite,
T: Send,
T: 'static {
type SinkItem = Frame;
type SinkError = io::Error;

fn start_send(&mut self, frame: Self::SinkItem) -> StartSend<Self::SinkItem, Self::SinkError> {
trace!("transport start_send; frame={:?}", frame);
self.upstream.start_send(frame)
}

fn poll_complete(&mut self) -> Poll<(), Self::SinkError> {
trace!("transport poll_complete");
self.upstream.poll_complete()
}
}

/// implements a future of `AMQPTransport`
///
/// this structure is used to perform the AMQP handshake and provide
Expand Down Expand Up @@ -290,3 +295,90 @@ macro_rules! try_lock_transport (
}
});
);

#[cfg(test)]
mod tests {
extern crate env_logger;

use super::*;

#[test]
fn encode_multiple_frames() {
let _ = env_logger::try_init();

let mut codec = AMQPCodec { frame_max: 8192 };
let mut buffer = BytesMut::with_capacity(8192);
let r = codec.encode(Frame::Heartbeat(0), &mut buffer);
assert_eq!(false, r.is_err());
assert_eq!(8, buffer.len());
let r = codec.encode(Frame::Heartbeat(0), &mut buffer);
assert_eq!(false, r.is_err());
assert_eq!(16, buffer.len());
let r = codec.encode(Frame::Heartbeat(0), &mut buffer);
assert_eq!(false, r.is_err());
assert_eq!(24, buffer.len());
}

#[test]
fn encode_nested_frame() {
use lapin_async::content::ContentHeader;

let _ = env_logger::try_init();

let mut codec = AMQPCodec { frame_max: 8192 };
let mut buffer = BytesMut::with_capacity(8192);
let frame = Frame::Header(0, 10, ContentHeader {
class_id: 10,
weight: 0,
body_size: 64,
properties: BasicProperties::default()
});
let r = codec.encode(frame, &mut buffer);
assert_eq!(false, r.is_err());
assert_eq!(22, buffer.len());
}

#[test]
fn encode_initial_extend_buffer() {
let _ = env_logger::try_init();

let mut codec = AMQPCodec { frame_max: 8192 };
let frame_max = codec.frame_max as usize;
let mut buffer = BytesMut::new();

let r = codec.encode(Frame::Heartbeat(0), &mut buffer);
assert_eq!(false, r.is_err());
assert_eq!(true, buffer.capacity() >= frame_max);
assert_eq!(8, buffer.len());
}

#[test]
fn encode_anticipation_extend_buffer() {
let _ = env_logger::try_init();

let mut codec = AMQPCodec { frame_max: 8192 };
let frame_max = codec.frame_max as usize;
let mut buffer = BytesMut::new();

let r = codec.encode(Frame::Heartbeat(0), &mut buffer);
assert_eq!(false, r.is_err());
assert_eq!(frame_max * 2, buffer.capacity());
assert_eq!(8, buffer.len());

let payload = repeat(0u8)
// Use 80% of the remaining space (it shouldn't trigger buffer capacity expansion)
.take(((buffer.capacity() as f64 - buffer.len() as f64) * 0.8) as usize)
.collect::<Vec<u8>>();
let r = codec.encode(Frame::Body(1, payload), &mut buffer);
assert_eq!(false, r.is_err());
assert_eq!(frame_max * 2, buffer.capacity());

let payload = repeat(0u8)
// Use 80% of the remaining space (it should trigger a buffer capacity expansion)
.take(((buffer.capacity() as f64 - buffer.len() as f64) * 0.8) as usize)
.collect::<Vec<u8>>();
let r = codec.encode(Frame::Body(1, payload), &mut buffer);
assert_eq!(false, r.is_err());
assert_eq!(frame_max * 4, buffer.capacity());
}
}