Skip to content

Commit

Permalink
Assemble messages as frames are queued
Browse files Browse the repository at this point in the history
Attempt to assemble envelopes frames are added to the internal frame queue. This
will allow memory to be more readily freed, and prevent unbounded growth of
memory where QoS > 1.

This should solve issue #56 completely.
  • Loading branch information
alanxz committed Sep 3, 2013
1 parent c65fa63 commit e2402b6
Show file tree
Hide file tree
Showing 2 changed files with 112 additions and 6 deletions.
63 changes: 63 additions & 0 deletions src/ChannelImpl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -315,6 +315,69 @@ std::vector<amqp_channel_t> ChannelImpl::GetAllConsumerChannels() const
return ret;
}

bool ChannelImpl::CheckForQueuedMessageOnChannel(amqp_channel_t channel) const
{
frame_queue_t::const_iterator it = std::find_if(m_frame_queue.begin(),
m_frame_queue.end(),
boost::bind(&ChannelImpl::is_method_on_channel,
_1, AMQP_BASIC_DELIVER_METHOD, channel));

if (it == m_frame_queue.end())
{
return false;
}

it = std::find_if(it+1, m_frame_queue.end(), boost::bind(&ChannelImpl::is_on_channel,
_1, channel));

if (it == m_frame_queue.end())
{
return false;
}
if (it->frame_type != AMQP_FRAME_HEADER)
{
throw std::runtime_error("Protocol error");
}

uint64_t body_length = it->payload.properties.body_size;
uint64_t body_received = 0;

while (body_received < body_length)
{
it = std::find_if(it+1, m_frame_queue.end(),
boost::bind(&ChannelImpl::is_on_channel, _1, channel));

if (it == m_frame_queue.end())
{
return false;
}
if (it->frame_type != AMQP_FRAME_BODY)
{
throw std::runtime_error("Protocol error");
}
body_received += it->payload.body_fragment.len;
}

return true;
}

void ChannelImpl::AddToFrameQueue(const amqp_frame_t &frame)
{
m_frame_queue.push_back(frame);

if (CheckForQueuedMessageOnChannel(frame.channel))
{
boost::array<amqp_channel_t, 1> channel = {{frame.channel}};
Envelope::ptr_t envelope;
if (!ConsumeMessageOnChannelInner(channel, envelope, -1))
{
throw std::logic_error("ConsumeMessageOnChannelInner returned false unexpectedly");
}

m_delivered_messages.push_back(envelope);
}
}

bool ChannelImpl::GetNextFrameFromBroker(amqp_frame_t &frame, boost::chrono::microseconds timeout)
{
struct timeval *tvp = NULL;
Expand Down
55 changes: 49 additions & 6 deletions src/SimpleAmqpClient/ChannelImpl.h
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,9 @@ class ChannelImpl : boost::noncopyable

bool GetNextFrameFromBroker(amqp_frame_t &frame, boost::chrono::microseconds timeout);

bool CheckForQueuedMessageOnChannel(amqp_channel_t message_on_channel) const;
void AddToFrameQueue(const amqp_frame_t &frame);

template <class ChannelListType>
bool GetNextFrameFromBrokerOnChannel(const ChannelListType channels, amqp_frame_t &frame_out,
boost::chrono::microseconds timeout = boost::chrono::microseconds::max())
Expand Down Expand Up @@ -103,7 +106,7 @@ class ChannelImpl : boost::noncopyable
}
else
{
m_frame_queue.push_back(frame);
AddToFrameQueue(frame);
}

if (timeout != boost::chrono::microseconds::max())
Expand All @@ -122,14 +125,27 @@ class ChannelImpl : boost::noncopyable
bool GetNextFrameOnChannel(amqp_channel_t channel, amqp_frame_t &frame,
boost::chrono::microseconds timeout = boost::chrono::microseconds::max());

static bool is_on_channel(const amqp_frame_t &frame, amqp_channel_t channel)
static bool is_on_channel(const amqp_frame_t frame, amqp_channel_t channel)
{
return channel == frame.channel;
}

static bool is_frame_type_on_channel(const amqp_frame_t frame, uint8_t frame_type, amqp_channel_t channel)
{
return frame.frame_type == frame_type &&
frame.channel == channel;
}

static bool is_method_on_channel(const amqp_frame_t frame, amqp_method_number_t method, amqp_channel_t channel)
{
return frame.channel == channel &&
frame.frame_type == AMQP_FRAME_METHOD &&
frame.payload.method.id == method;
}

template <class ChannelListType, class ResponseListType>
static bool is_expected_method(const amqp_frame_t &frame, const ChannelListType channels,
const ResponseListType &expected_responses)
static bool is_expected_method_on_channel(const amqp_frame_t &frame, const ChannelListType channels,
const ResponseListType &expected_responses)
{
return channels.end() != std::find(channels.begin(), channels.end(), frame.channel) &&
AMQP_FRAME_METHOD == frame.frame_type &&
Expand All @@ -143,7 +159,7 @@ class ChannelImpl : boost::noncopyable
{
frame_queue_t::iterator desired_frame =
std::find_if(m_frame_queue.begin(), m_frame_queue.end(),
boost::bind(&ChannelImpl::is_expected_method<ChannelListType, ResponseListType>, _1,
boost::bind(&ChannelImpl::is_expected_method_on_channel<ChannelListType, ResponseListType>, _1,
channels, expected_responses));

if (m_frame_queue.end() != desired_frame)
Expand All @@ -163,7 +179,7 @@ class ChannelImpl : boost::noncopyable
amqp_frame_t incoming_frame;
while (GetNextFrameFromBrokerOnChannel(channels, incoming_frame, timeout_left))
{
if (is_expected_method(incoming_frame, channels, expected_responses))
if (is_expected_method_on_channel(incoming_frame, channels, expected_responses))
{
frame = incoming_frame;
return true;
Expand Down Expand Up @@ -218,8 +234,32 @@ class ChannelImpl : boost::noncopyable
return ret;
}

template <class ChannelListType>
static bool envelope_on_channel(const Envelope::ptr_t &envelope, const ChannelListType channels)
{
return channels.end() != std::find(channels.begin(), channels.end(), envelope->DeliveryTag());
}

template <class ChannelListType>
bool ConsumeMessageOnChannel(const ChannelListType channels, Envelope::ptr_t &message, int timeout)
{
envelope_list_t::iterator it = std::find_if(m_delivered_messages.begin(),
m_delivered_messages.end(),
boost::bind(ChannelImpl::envelope_on_channel<ChannelListType>,
_1, channels));

if (it != m_delivered_messages.end())
{
message = *it;
m_delivered_messages.erase(it);
return true;
}

return ConsumeMessageOnChannelInner(channels, message, timeout);
}

template <class ChannelListType>
bool ConsumeMessageOnChannelInner(const ChannelListType channels, Envelope::ptr_t &message, int timeout)
{
const boost::array<boost::uint32_t, 2> DELIVER_OR_CANCEL = { { AMQP_BASIC_DELIVER_METHOD,
AMQP_BASIC_CANCEL_METHOD } };
Expand Down Expand Up @@ -293,6 +333,9 @@ class ChannelImpl : boost::noncopyable
private:
frame_queue_t m_frame_queue;

typedef std::vector<Envelope::ptr_t> envelope_list_t;
envelope_list_t m_delivered_messages;

typedef std::map<std::string, amqp_channel_t> consumer_map_t;
consumer_map_t m_consumer_channel_map;

Expand Down

0 comments on commit e2402b6

Please sign in to comment.