Skip to content
Browse files

- valgrind has revealed subtle thread sync bugs. Fixed.

- boost::asio::io_service::strand for serialized I/O access to the worker thread
- more verbose debug logging
- frame parser split in two stages (old one could choke when multiple NULLs in frame body)
  • Loading branch information...
1 parent f05a636 commit 7b0a3cd2bd9a472b587abec2b9c68b84918ea4f7 @ekarak committed May 5, 2012
Showing with 315 additions and 245 deletions.
  1. +146 −181 BoostStomp.cpp
  2. +26 −38 BoostStomp.hpp
  3. +10 −2 Main.cpp
  4. +4 −3 Makefile
  5. +106 −7 StompFrame.cpp
  6. +21 −10 StompFrame.hpp
  7. +2 −4 helpers.cpp
View
327 BoostStomp.cpp
@@ -56,21 +56,19 @@ namespace STOMP {
m_connected(false),
m_io_service (new io_service()),
m_io_service_work (new io_service::work(*m_io_service)),
+ m_strand (new io_service::strand(*m_io_service)),
m_socket (new tcp::socket(*m_io_service)),
// private members
m_protocol_version("1.0"),
m_transaction_id(0)
// ----------------------------
{
- // Heartbeat setup
- m_heartbeat_timer = boost::shared_ptr< deadline_timer> ( new deadline_timer( *m_io_service ));
- std::ostream os( &m_heartbeat);
- os << "\n";
- // Server command map
- stomp_server_command_map["CONNECTED"] = &BoostStomp::process_CONNECTED;
- stomp_server_command_map["MESSAGE"] = &BoostStomp::process_MESSAGE;
- stomp_server_command_map["RECEIPT"] = &BoostStomp::process_RECEIPT;
- stomp_server_command_map["ERROR"] = &BoostStomp::process_ERROR;
+ debug_print("Initializing BoostStomp command map");
+ // map STOMP server commands to handler methods
+ cmd_map["CONNECTED"] = &BoostStomp::process_CONNECTED;
+ cmd_map["MESSAGE"] = &BoostStomp::process_MESSAGE;
+ cmd_map["RECEIPT"] = &BoostStomp::process_RECEIPT;
+ cmd_map["ERROR"] = &BoostStomp::process_ERROR;
}
@@ -91,12 +89,13 @@ namespace STOMP {
// ----------------------------
// worker thread
// ----------------------------
- void BoostStomp::worker( boost::shared_ptr< boost::asio::io_service > io_service )
+ void BoostStomp::worker( boost::shared_ptr< boost::asio::io_service > _io_service )
{
- debug_print("Worker thread starting...");
+ debug_print("Worker thread: starting...");
while(!m_stopped) {
- io_service->run();
- io_service->reset();
+ _io_service->run();
+ debug_print("Worker thread: io_service is stopped...");
+ _io_service->reset();
sleep(1);
}
debug_print("Worker thread finished.");
@@ -132,12 +131,15 @@ namespace STOMP {
debug_print("stopping...");
if (m_connected && m_socket->is_open()) {
Frame frame( "DISCONNECT");
+ frame.encode(stomp_request);
debug_print("Sending DISCONNECT frame...");
- boost::asio::write(*m_socket, frame.encode());
+ boost::asio::write(*m_socket, stomp_request);
}
m_connected = false;
m_stopped = true;
- m_heartbeat_timer->cancel();
+ if (m_heartbeat_timer != NULL) {
+ m_heartbeat_timer->cancel();
+ }
//
m_socket->close();
//
@@ -169,11 +171,12 @@ namespace STOMP {
headers["accept-version"] = "1.1";
headers["host"] = m_hostname;
Frame frame( "CONNECT", headers );
+ frame.encode(stomp_request);
//debug_print("Sending CONNECT frame...");
- boost::asio::write(*m_socket, frame.encode());
-
+ boost::asio::write(*m_socket, stomp_request);
+ // FIXME: what about stomp_request.commit() ???
// start the read actor so as to receive the CONNECTED frame
- start_stomp_read();
+ start_stomp_read_headers();
// start worker thread (m_io_service.run())
worker_thread = new boost::thread( boost::bind( &BoostStomp::worker, this, m_io_service ) );
@@ -202,39 +205,40 @@ namespace STOMP {
// -----------------------------------------------
// -----------------------------------------------
- void BoostStomp::start_stomp_read()
+ void BoostStomp::start_stomp_read_headers()
// -----------------------------------------------
{
debug_print("start_stomp_read");
- // Start an asynchronous operation to read a null-delimited message.
- boost::asio::async_read_until(
+ // Start an asynchronous operation to read at least the STOMP frame command & headers (till the double newline delimiter)
+ boost::asio::async_read_until(
*m_socket,
stomp_response,
- '\0',
- boost::bind(&BoostStomp::handle_stomp_read, this, _1));
+ "\n\n",
+ boost::bind(&BoostStomp::handle_stomp_read_headers, this, placeholders::error()));
}
// -----------------------------------------------
- void BoostStomp::handle_stomp_read(const boost::system::error_code& ec)
+ void BoostStomp::handle_stomp_read_headers(const boost::system::error_code& ec)
// -----------------------------------------------
{
- Frame* frame;
-
if (m_stopped)
return;
if (!ec)
{
- debug_print(boost::format("received server response (%1% bytes)") % stomp_response.size() );
- vector<Frame*> received_frames = parse_all_frames();
- while ((!received_frames.empty()) && (frame = received_frames.back())) {
- consume_frame(*frame);
- // dispose frame, its not needed anymore
- delete frame;
- received_frames.pop_back();
- } //while
- // wait for any incoming frames from the server...
- start_stomp_read();
+ std::size_t bodysize = 0;
+ debug_print(boost::format("received response (command+headers: %1% bytes)") % stomp_response.size() );
+ start_stomp_read_body(bodysize);
+
+ try {
+ m_rcvd_frame = new Frame(stomp_response, cmd_map);
+ } catch(NoMoreFrames&) {
+// break;
+ } catch(std::exception& e) {
+ debug_print(boost::format("handle_stomp_read in loop: unknown exception in Frame constructor:\n%1%") % e.what());
+ exit(10);
+ }
+
}
else
{
@@ -244,104 +248,52 @@ namespace STOMP {
}
}
- // --------------------------------------------------
- Frame* BoostStomp::parse_next()
- // --------------------------------------------------
+ // -----------------------------------------------
+ void BoostStomp::start_stomp_read_body(std::size_t bodysize)
+ // -----------------------------------------------
{
- string _str;
- istream _input(&stomp_response);
- size_t bytes_to_consume = 0, content_length = 0;
- Frame* frame = NULL;
-
- try {
-
- // STEP 1: find the next STOMP command line in stomp_response, skipping non-matching lines
- //debug_print("parse_next phase 1");
- while (std::getline(_input, _str)) {
- //hexdump(_str.c_str(), _str.length());
- if ((_str.size() > 0) && (stomp_server_command_map.find(_str) != stomp_server_command_map.end())) {
- //debug_print(boost::format("parse_next phase 1: COMMAND==%1%") % _str);
- frame = new Frame(_str);
- bytes_to_consume += _str.size()+1;
- break;
- }
- }
- // STEP 2: parse all headers
- if (frame != NULL) {
- //debug_print("parse_next phase 2");
- vector< string > header_parts;
- while (std::getline(_input, _str)) {
- //hexdump(_str.c_str(), _str.length());
- boost::algorithm::split(header_parts, _str, is_any_of(":"));
- if (header_parts.size() > 1) {
- string& key = decode_header_token(header_parts[0]);
- string& val = decode_header_token(header_parts[1]);
- //debug_print(boost::format("parse_next phase 2: HEADER[%1%]==%2%") % key % val);
- frame->m_headers[key] = val;
- bytes_to_consume += _str.size()+1;
- // special case: content-length
- if (key == "content-length") {
- content_length = lexical_cast<int>(val);
- //debug_print(boost::format("content-length read back value==%1%") % content_length);
- }
- } else {
- break;
- }
- }
- bytes_to_consume += 1;
- // STEP 3: parse the body
- //debug_print("parse_next phase 3");
- if (content_length > 0) {
- // read back the body byte by byte
- char c;
- for (size_t i=0; i<content_length; i++) {
- _input.get(c);
- frame->m_body << c;
- }
- } else {
- // read all bytes until the first NULL
- std::getline(_input, _str, '\0');
- //debug_print(boost::format("parse_next phase 3: BODY(%1% bytes)==%2%") % _str.size() % _str);
- if (_str.length() > 0) {
- bytes_to_consume += _str.size() + 1;
- frame->m_body << _str;
- };
- }
- } else {
- throw("shit happens");
- }
- }
- catch (...) {
-
- }
- stomp_response.consume(bytes_to_consume);
- debug_print(boost::format("-- parse_frame, consumed %1% bytes from stomp_response") % bytes_to_consume);
- return(frame);
- };
+ debug_print("start_stomp_read");
+ // Start an asynchronous operation to read at least the STOMP frame body
+ if (bodysize == 0) {
+ boost::asio::async_read_until(
+ *m_socket, stomp_response,
+ '\0', // NULL signifies the end of the body
+ boost::bind(&BoostStomp::handle_stomp_read_body, this, placeholders::error(), placeholders::bytes_transferred()));
+ } else {
+ boost::asio::async_read(
+ *m_socket, stomp_response,
+ boost::asio::transfer_at_least(bodysize),
+ boost::bind(&BoostStomp::handle_stomp_read_body, this, placeholders::error(), placeholders::bytes_transferred()));
+ }
+ }
- // ----------------------------
- vector<Frame*> BoostStomp::parse_all_frames()
- // ----------------------------
+ // -----------------------------------------------
+ void BoostStomp::handle_stomp_read_body(const boost::system::error_code& ec, std::size_t bytes_transferred = 0)
+ // -----------------------------------------------
{
- vector<Frame*> results;
- //
- // get all the responses in response stream
- debug_print(boost::format("parse_all_frames before: (%1% bytes in stomp_response)") % stomp_response.size() );
- try {
- //
- // iterate over all frame matches
- //
- while (Frame* next_frame = parse_next()) {
- debug_print(boost::format("parse_all_frames in loop: (%1% bytes still in stomp_response)") % stomp_response.size());
- results.push_back(next_frame);
- }
- } catch(...) {
- debug_print("parse_response in loop: exception in Frame constructor");
- }
- //cout << "exiting, " << stomp_response.size() << " bytes still in stomp_response" << endl;
- return(results);
- };
+ if (m_stopped)
+ return;
+ if (!ec)
+ {
+ debug_print(boost::format("received response (%1% bytes) (buffer: %2% bytes)") % bytes_transferred % stomp_response.size() );
+ if (m_rcvd_frame != NULL) {
+ m_rcvd_frame->parse_body(stomp_response);
+ consume_received_frame();
+ }
+ //
+ debug_print("stomp_response contents after Frame scanning:");
+ hexdump(stomp_response);
+ // wait for the next incoming frame from the server...
+ start_stomp_read_headers();
+ }
+ else
+ {
+ std::cerr << "BoostStomp: Error on receive: " << ec.message() << "\n";
+ stop();
+ start();
+ }
+ }
// ------------------------------------------------
// ---------- OUTPUT ACTOR SETUP ------------------
@@ -355,17 +307,21 @@ namespace STOMP {
return;
debug_print("start_stomp_write");
-
+ Frame* frame;
// send all STOMP frames in queue
m_sendqueue_mutex.lock();
if (m_sendqueue.size() > 0) {
- Frame& frame = m_sendqueue.front();
- debug_print(boost::format("Sending %1% frame...") % frame.command() );
- boost::asio::async_write(
+ frame = m_sendqueue.front();
+ if (frame != NULL) {
+ debug_print(boost::format("Sending %1% frame...") % frame->command() );
+ frame->encode(stomp_request);
+
+ boost::asio::async_write(
*m_socket,
- frame.encode(),
+ stomp_request,
boost::bind(&BoostStomp::handle_stomp_write, this, _1)
- );
+ );
+ };
}
m_sendqueue_mutex.unlock();
@@ -379,6 +335,7 @@ namespace STOMP {
return;
if (!ec) {
+ debug_print("Sent!");
// call pop() to delete the last frame in queue
m_sendqueue_mutex.lock();
m_sendqueue.pop();
@@ -387,7 +344,7 @@ namespace STOMP {
start_stomp_write();
} else {
m_connected = false;
- std::cout << "Error writing to STOMP server: " << ec.message() << "\n";
+ debug_print(boost::format("Error writing to STOMP server: %1%") % ec.message());
stop();
}
}
@@ -431,103 +388,113 @@ namespace STOMP {
}
// ------------------------------------------
- bool BoostStomp::acknowledge(Frame& frame, bool acked = true)
+ bool BoostStomp::acknowledge(Frame* frame, bool acked = true)
// ------------------------------------------
{
hdrmap hm;
- hm["message-id"] = frame.headers()["message-id"];
- hm["subscription"] = frame.headers()["subscription"];
+ hm["message-id"] = frame->headers()["message-id"];
+ hm["subscription"] = frame->headers()["subscription"];
string _ack_cmd = (acked ? "ACK" : "NACK");
- Frame _ackframe( _ack_cmd, hm );
+ Frame* _ackframe = new Frame( _ack_cmd, hm );
return(send_frame(_ackframe));
}
// ------------------------------------------
- void BoostStomp::consume_frame(Frame& _rcvd_frame)
+ void BoostStomp::consume_received_frame()
// ------------------------------------------
{
- debug_print(boost::format("-- consume_frame: calling %1% command handler") % _rcvd_frame.command());
- pfnStompCommandHandler_t handler = stomp_server_command_map[_rcvd_frame.command()];
- (this->*handler)(_rcvd_frame);
-
-
- /*
- if (_rcvd_frame.command() == "CONNECTED") process_CONNECTED(_rcvd_frame);
- if (_rcvd_frame.command() == "MESSAGE") process_MESSAGE(_rcvd_frame);
- if (_rcvd_frame.command() == "RECEIPT") process_RECEIPT(_rcvd_frame);
- if (_rcvd_frame.command() == "ERROR") process_ERROR(_rcvd_frame);
- */
+ if (m_rcvd_frame != NULL) {
+ pfnStompCommandHandler_t handler = cmd_map[m_rcvd_frame->command()];
+ if (handler != NULL) {
+ debug_print(boost::format("-- consume_frame: calling %1% command handler") % m_rcvd_frame->command());
+ // call STOMP command handler
+ (this->*handler)();
+ }
+ delete m_rcvd_frame;
+ }
+ m_rcvd_frame = NULL;
};
//-----------------------------------------
- void BoostStomp::process_CONNECTED(Frame& _rcvd_frame)
+ void BoostStomp::process_CONNECTED()
//-----------------------------------------
{
m_connected = true;
// try to get supported protocol version from headers
- if (_rcvd_frame.headers().find("version") != _rcvd_frame.headers().end()) {
- m_protocol_version = _rcvd_frame.headers()["version"];
+ hdrmap headers = m_rcvd_frame->headers();
+ if (headers.find("version") != headers.end()) {
+ m_protocol_version = headers["version"];
debug_print(boost::format("server supports STOMP version %1%") % m_protocol_version);
}
if (m_protocol_version == "1.1") {
- // we are connected to a version 1.1 STOMP server, we can start the heartbeat actor
+ // we are connected to a version 1.1 STOMP server, setup heartbeat
+ m_heartbeat_timer = boost::shared_ptr< deadline_timer> ( new deadline_timer( *m_io_service ));
+ std::ostream os( &m_heartbeat);
+ os << "\n";
+ // we can start the heartbeat actor
start_stomp_heartbeat();
}
// in case of reconnection, we need to re-subscribe to all subscriptions
for (subscription_map::iterator it = m_subscriptions.begin(); it != m_subscriptions.end(); it++) {
- string topic = (*it).first;
- do_subscribe(topic);
+ //string topic = (*it).first;
+ do_subscribe((*it).first);
};
}
//-----------------------------------------
- void BoostStomp::process_MESSAGE(Frame& _rcvd_frame)
+ void BoostStomp::process_MESSAGE()
//-----------------------------------------
{
bool acked = true;
- string dest = string(_rcvd_frame.headers()["destination"]);
+ string dest = string(m_rcvd_frame->headers()["destination"]);
//
if (pfnOnStompMessage_t callback_function = m_subscriptions[dest]) {
debug_print(boost::format("-- consume_frame: firing callback for %1%") % dest);
//
- acked = callback_function(_rcvd_frame);
+ acked = callback_function(m_rcvd_frame);
};
// acknowledge frame, if in "Client" or "Client-Individual" ack mode
if ((m_ackmode == ACK_CLIENT) || (m_ackmode == ACK_CLIENT_INDIVIDUAL)) {
- acknowledge(_rcvd_frame, acked);
+ acknowledge(m_rcvd_frame, acked);
}
}
//-----------------------------------------
- void BoostStomp::process_RECEIPT(Frame& _rcvd_frame)
+ void BoostStomp::process_RECEIPT()
//-----------------------------------------
{
- string receipt_id = string(_rcvd_frame.headers()["receipt_id"]);
// do something with receipt...
- debug_print(boost::format("receipt-id == %1%") % receipt_id);
+ debug_print(boost::format("receipt-id == %1%") % m_rcvd_frame->headers()["receipt_id"]);
}
//-----------------------------------------
- void BoostStomp::process_ERROR(Frame& _rcvd_frame)
+ void BoostStomp::process_ERROR()
//-----------------------------------------
{
- string errormessage = (_rcvd_frame.headers().find("message") != _rcvd_frame.headers().end()) ?
- _rcvd_frame.headers()["message"] :
+ hdrmap headers = m_rcvd_frame->headers();
+ string errormessage = (headers.find("message") != headers.end()) ?
+ headers["message"] :
"(unknown error!)";
- errormessage += _rcvd_frame.body().c_str();
+ errormessage += m_rcvd_frame->body().c_str();
throw(errormessage);
}
//-----------------------------------------
- bool BoostStomp::send_frame( Frame& frame )
+ bool BoostStomp::send_frame( Frame* frame )
//-----------------------------------------
{
- debug_print(boost::format("send_frame: Adding %1% frame to send queue...") % frame.command() );
+ // send_frame is called from the application thread. Do not dereference frame here!!! (shared data)
+ //debug_print(boost::format("send_frame: Adding frame to send queue...") % frame->command() );
+ debug_print("send_frame: Adding frame to send queue...");
m_sendqueue_mutex.lock();
m_sendqueue.push(frame);
m_sendqueue_mutex.unlock();
- start_stomp_write();
+ // tell io_service to send the frame from the worker thread
+ usleep(1000);
+ m_strand->post(
+ boost::bind(&BoostStomp::start_stomp_write, this)
+ );
return(true);
}
@@ -547,13 +514,13 @@ namespace STOMP {
}
// ------------------------------------------
- bool BoostStomp::do_subscribe (string& topic)
+ bool BoostStomp::do_subscribe(const string& topic)
// ------------------------------------------
{
hdrmap hm;
hm["id"] = lexical_cast<string>(boost::this_thread::get_id());
hm["destination"] = topic;
- Frame frame( "SUBSCRIBE", hm );
+ Frame* frame = new Frame( "SUBSCRIBE", hm );
return(send_frame(frame));
}
@@ -564,7 +531,7 @@ namespace STOMP {
{
hdrmap hm;
hm["destination"] = topic;
- Frame frame( "UNSUBSCRIBE", hm );
+ Frame* frame = new Frame( "UNSUBSCRIBE", hm );
m_subscriptions.erase(topic);
return(send_frame(frame));
}
@@ -577,7 +544,7 @@ namespace STOMP {
hdrmap hm;
// create a new transaction id
hm["transaction"] = lexical_cast<string>(m_transaction_id++);
- Frame frame( "BEGIN", hm );
+ Frame* frame = new Frame( "BEGIN", hm );
send_frame(frame);
return(m_transaction_id);
};
@@ -589,7 +556,7 @@ namespace STOMP {
hdrmap hm;
// add required header
hm["transaction"] = lexical_cast<string>(transaction_id);
- Frame frame( "COMMIT", hm );
+ Frame* frame = new Frame( "COMMIT", hm );
return(send_frame(frame));
};
@@ -600,11 +567,9 @@ namespace STOMP {
hdrmap hm;
// add required header
hm["transaction"] = lexical_cast<string>(transaction_id);
- Frame frame( "ABORT", hm );
+ Frame* frame = new Frame( "ABORT", hm );
return(send_frame(frame));
};
-
-
} // end namespace STOMP
View
64 BoostStomp.hpp
@@ -69,22 +69,11 @@ namespace STOMP {
} AckMode;
// Stomp message callback function prototype
- typedef bool (*pfnOnStompMessage_t)( Frame& );
+ typedef bool (*pfnOnStompMessage_t)( Frame* );
// Stomp subscription map (topic => callback)
typedef std::map<std::string, pfnOnStompMessage_t> subscription_map;
-
- // STOMP server command handler methods
- //typedef const boost::_bi::bind_t<void (&)(STOMP::Frame&), boost::_mfi::dm<void(STOMP::Frame&), STOMP::BoostStomp>, boost::_bi::list1<boost::arg<1> > > pfnStompCommandHandler_t;
-
- typedef void (BoostStomp::*pfnStompCommandHandler_t)( Frame& );
-
- //typedef void (*pfnStompCommandHandler_t)(BoostStomp::*, STOMP::Frame&);
- ///typedef boost::function<void(Frame&)> pfnStompCommandHandler_t;
- typedef std::map<string, pfnStompCommandHandler_t> stomp_server_command_map_t;
- static stomp_server_command_map_t stomp_server_command_map;
-
// here we go
// -------------
class BoostStomp
@@ -93,8 +82,8 @@ namespace STOMP {
//----------------
protected:
//----------------
-
- std::queue<Frame> m_sendqueue;
+ Frame* m_rcvd_frame;
+ std::queue<Frame*> m_sendqueue;
boost::mutex m_sendqueue_mutex;
subscription_map m_subscriptions;
//
@@ -105,32 +94,33 @@ namespace STOMP {
bool m_stopped;
bool m_connected; // have we completed application-level STOMP connection?
- boost::shared_ptr< io_service > m_io_service;
- boost::shared_ptr< io_service::work > m_io_service_work;
+ boost::shared_ptr< io_service > m_io_service;
+ boost::shared_ptr< io_service::work > m_io_service_work;
+ boost::shared_ptr< io_service::strand> m_strand;
tcp::socket* m_socket;
- //io_service* m_io_service;
- //io_service::work* m_io_service_work;
+
+
+ boost::asio::streambuf stomp_request, stomp_response;
//----------------
private:
//----------------
- boost::asio::streambuf stomp_request;
- boost::asio::streambuf stomp_response;
boost::mutex stream_mutex;
boost::thread* worker_thread;
boost::shared_ptr<deadline_timer> m_heartbeat_timer;
boost::asio::streambuf m_heartbeat;
string m_protocol_version;
int m_transaction_id;
+
//
- bool send_frame( Frame& _frame );
- bool do_subscribe (string& topic);
+ bool send_frame( Frame* _frame );
+ bool do_subscribe (const string& topic);
//
- void consume_frame(Frame& _rcvd_frame);
- void process_CONNECTED(Frame& _rcvd_frame);
- void process_MESSAGE(Frame& _rcvd_frame);
- void process_RECEIPT(Frame& _rcvd_frame);
- void process_ERROR(Frame& _rcvd_frame);
+ void consume_received_frame();
+ void process_CONNECTED();
+ void process_MESSAGE();
+ void process_RECEIPT();
+ void process_ERROR();
void start_connect(tcp::resolver::iterator endpoint_iter);
void handle_connect(const boost::system::error_code& ec, tcp::resolver::iterator endpoint_iter);
@@ -140,19 +130,15 @@ namespace STOMP {
void start_stomp_heartbeat();
void handle_stomp_heartbeat(const boost::system::error_code& ec);
- void start_stomp_read();
- void handle_stomp_read(const boost::system::error_code& ec);
+ void start_stomp_read_headers();
+ void handle_stomp_read_headers(const boost::system::error_code& ec);
+ void start_stomp_read_body(std::size_t);
+ void handle_stomp_read_body(const boost::system::error_code& ec, std::size_t bytes_transferred);
void start_stomp_write();
void handle_stomp_write(const boost::system::error_code& ec);
void worker( boost::shared_ptr< boost::asio::io_service > io_service );
- //
- // Frame constructor from the first available STOMP frame in stomp_response streambuf
- Frame* parse_next();
-
- // return a vector of all Frame's in a streambuf
- vector<Frame*> parse_all_frames();
//----------------
public:
@@ -162,23 +148,25 @@ namespace STOMP {
// destructor
~BoostStomp();
+ stomp_server_command_map_t cmd_map;
+
void start();
void stop();
// thread-safe methods called from outside the thread loop
template <typename BodyType>
bool send ( std::string& _topic, hdrmap _headers, BodyType& _body, pfnOnStompMessage_t callback = NULL) {
_headers["destination"] = _topic;
- Frame frame( "SEND", _headers, _body );
+ Frame* frame = new Frame( "SEND", _headers, _body );
return(send_frame(frame));
}
-
//bool send ( std::string& topic, hdrmap _headers, std::string& body );
//
bool subscribe ( std::string& topic, pfnOnStompMessage_t callback );
bool unsubscribe ( std::string& topic );
- bool acknowledge ( Frame& _frame, bool acked );
+ bool acknowledge ( Frame* _frame, bool acked );
+
// STOMP transactions
int begin(); // returns a new transaction id
bool commit(int transaction_id);
View
12 Main.cpp
@@ -80,8 +80,8 @@ int main(int argc, char *argv[]) {
// add an outgoing message to the queue
stomp_client->send(notifications_topic, headers, body);
- sleep(1);
- // send another one
+
+ // send another one right away
string body2 = string("this is the SECOND message.");
stomp_client->send(notifications_topic, headers, body2);
sleep(1);
@@ -92,6 +92,14 @@ int main(int argc, char *argv[]) {
bb << "with a NULL in it.";
stomp_client->send(notifications_topic, headers, bb);
sleep(1);
+ // now some stress test (100 frames)
+ STOMP::hdrmap headers2;
+ for (int i = 0; i < 3; i++) {
+ cout << "Sending stress frame " << i << endl;
+ headers2["count"] = to_string<int>(i);
+ stomp_client->send(notifications_topic, headers2, "");
+ };
+ sleep(4);
stomp_client->stop();
}
catch (std::exception& e)
View
7 Makefile
@@ -6,16 +6,17 @@
.SUFFIXES: .cpp .o .a .s
-CC := gcc
-CXX := g++
+CC := colorgcc
+# CXX := g++
+CXX := colorgcc
LD := ld
AR := ar rc
RANLIB := ranlib
# Change for DEBUG or RELEASE
TARGET := DEBUG
-DEBUG_CFLAGS := -Wall -Wno-format -g -DDEBUG -Werror -O0 -DDEBUG_STOMP
+DEBUG_CFLAGS := -Wall -Wno-format -g -DDEBUG -Werror -O0 -DDEBUG_STOMP -DBOOST_ASIO_ENABLE_BUFFER_DEBUGGING
RELEASE_CFLAGS := -Wall -Wno-unknown-pragmas -Wno-format -O3 -DNDEBUG
DEBUG_LDFLAGS := -g
View
113 StompFrame.cpp
@@ -24,14 +24,13 @@ for more information on the LGPL, see:
#include <boost/format.hpp>
#include <boost/lexical_cast.hpp>
-
#include "BoostStomp.hpp"
#include "helpers.h"
namespace STOMP {
using namespace boost;
- using namespace boost::assign;
+ using namespace boost::asio;
/*
* Escaping is needed to allow header keys and values to contain those frame header
@@ -59,11 +58,11 @@ namespace STOMP {
return(str);
};
- boost::asio::streambuf& Frame::encode()
+ boost::asio::streambuf& Frame::encode(boost::asio::streambuf& _request)
// -------------------------------------
{
// prepare an output stream
- ostream os(&m_request);
+ ostream os(&_request);
// step 1. write the command
if (m_command.length() > 0) {
os << m_command << "\n";
@@ -89,11 +88,111 @@ namespace STOMP {
os << "\n";
// step 3. Write the body
if( m_body.v.size() > 0 ) {
- m_request.sputn(m_body.v.data(), m_body.v.size());
+ _request.sputn(m_body.v.data(), m_body.v.size());
+ //_request.commit(m_body.v.size());
}
// write terminating NULL char
- m_request.sputc('\0');
- return(m_request);
+ _request.sputc('\0');
+ //_request.commit(1);
+ return(_request);
+ };
+
+ // my own version of getline for an asio streambuf
+inline void mygetline (boost::asio::streambuf& sb, string& _str, char delim = '\n') {
+ const char* line = boost::asio::buffer_cast<const char*>(sb.data());
+ char _c;
+ _str.clear();
+ for( size_t i = 0;
+ ((i < sb.size()) && ((_c = line[i]) != delim));
+ i++
+ ) _str += _c;
+ debug_print( boost::format("mygetline: sb.size==%1%") % sb.size() );
+ hexdump(_str.c_str(), _str.size());
+ //
+ //usleep(100000);
+}
+
+ // construct STOMP frame (command & header) from a streambuf
+ // --------------------------------------------------
+ Frame::Frame(boost::asio::streambuf& stomp_response, const stomp_server_command_map_t& cmd_map)
+ // --------------------------------------------------
+ {
+ string _str;
+
+ try {
+ // STEP 1: find the next STOMP command line in stomp_response.
+ // Chomp unknown lines till the buffer is empty, in which case an exception is raised
+ debug_print("Frame parser phase 1");
+ while (stomp_response.size() > 0) {
+ mygetline(stomp_response, _str);
+ //hexdump(_str.c_str(), _str.length());
+ stomp_response.consume(_str.size() + 1); // plus one for the newline
+ if (_str.size() > 0) {
+ if (cmd_map.find(_str) != cmd_map.end()) {
+ debug_print(boost::format("phase 1: COMMAND==%1%, sb.size==%2%") % _str % stomp_response.size());
+ m_command = _str;
+ break;
+ }
+ } else {
+ throw(NoMoreFrames());
+ }
+ }
+
+ // STEP 2: parse all headers
+ debug_print("Frame parser phase 2");
+ vector< string > header_parts;
+ while (stomp_response.size() > 0) {
+ mygetline(stomp_response, _str);
+ stomp_response.consume(_str.size()+1);
+ boost::algorithm::split(header_parts, _str, is_any_of(":"));
+ if (header_parts.size() > 1) {
+ string key = decode_header_token(header_parts[0]);
+ string val = decode_header_token(header_parts[1]);
+ debug_print(boost::format("phase 2: HEADER[%1%]==%2%") % key % val);
+ m_headers[key] = val;
+ //
+ } else {
+ // no valid header line detected, on to the body scanner
+ break;
+ }
+ }
+ //
+ } catch(NoMoreFrames& e) {
+ debug_print("-- Frame parser ended (no more frames)");
+ throw(e);
+ }
};
+ // STEP 3: parse the body
+ size_t Frame::parse_body(boost::asio::streambuf& _response)
+ {
+ std::size_t _content_length = 0, bytecount = 0;
+ string _str;
+ debug_print("Frame parser phase 3");
+
+ // special case: content-length
+ if (m_headers.find("content-length") != m_headers.end()) {
+ string val = m_headers["content-length"];
+ _content_length = lexical_cast<int>(val);
+ debug_print(boost::format("phase 3: body content-length==%1%") % _content_length);
+ }
+ if (_content_length > 0) {
+ bytecount += _content_length;
+ // read back the body byte by byte
+ const char* rawdata = boost::asio::buffer_cast<const char*>(_response.data());
+ for (size_t i = 0; i < _content_length; i++ ) {
+ m_body << rawdata[i];
+ }
+ } else {
+ // read all bytes until the first NULL
+ mygetline(_response, _str, '\0');
+ bytecount += _str.size();
+ m_body << _str;
+ }
+ bytecount += 1; // for the final frame-terminating NULL
+ debug_print(boost::format("phase 3: consumed %1% bytes, BODY(%2% bytes)==%3%") % bytecount % _str.size() % _str);
+ _response.consume(bytecount);
+ return(bytecount);
+ }
+
}
View
31 StompFrame.hpp
@@ -10,21 +10,24 @@
#include <boost/algorithm/string/split.hpp>
#include <boost/algorithm/string/predicate.hpp>
#include <boost/algorithm/string/classification.hpp>
-#include <boost/assign/list_of.hpp>
-#include <boost/assign/list_inserter.hpp>
+
namespace STOMP {
using namespace std;
using namespace boost;
using namespace boost::asio;
- using namespace boost::assign;
using namespace boost::algorithm;
/* STOMP Frame header map */
typedef map<string, string> hdrmap;
class BoostStomp;
+ class Frame;
+
+ // STOMP server command handler methods
+ typedef void (BoostStomp::*pfnStompCommandHandler_t) ( );
+ typedef std::map<string, pfnStompCommandHandler_t> stomp_server_command_map_t;
// an std::vector encapsulation in order to store binary strings
// (STOMP doesn't prohibit NULLs inside the frame body)
@@ -35,6 +38,9 @@ namespace STOMP {
vector<char> v;
// constructors:
binbody() {};
+ binbody(binbody &other) {
+ v = other.v;
+ }
binbody(string b) {
v.assign(b.begin(), b.end());
}
@@ -59,17 +65,18 @@ namespace STOMP {
};
};
+ //
+ class NoMoreFrames: public boost::exception {};
+
+ //
class Frame {
friend class BoostStomp;
- friend Frame* parse_next(boost::asio::streambuf&);
protected:
string m_command;
hdrmap m_headers;
binbody m_body;
- boost::asio::streambuf m_request;
-
public:
// constructors
@@ -97,13 +104,17 @@ namespace STOMP {
m_body = other.m_body;
};
+ // constructor from a raw streambuf and a STOMP command map
+ Frame(boost::asio::streambuf&, const stomp_server_command_map_t&);
+ // parse the body from the streambuf, given its size (when==0, parse up to the next NULL)
+ size_t parse_body(boost::asio::streambuf&);
//
- string command() { return m_command; };
- hdrmap headers() { return m_headers; };
- binbody& body() { return m_body; };
+ string& command() { return m_command; };
+ hdrmap& headers() { return m_headers; };
+ binbody& body() { return m_body; };
// encode a STOMP Frame into m_request and return it
- boost::asio::streambuf& encode();
+ boost::asio::streambuf& encode(boost::asio::streambuf& _request);
}; // class Frame
View
6 helpers.cpp
@@ -27,10 +27,8 @@
}
void hexdump(boost::asio::streambuf& sb) {
- std::istream is(&sb);
- std::string str;
- is >> str;
- hexdump(str.c_str(), str.size());
+ const char* rawdata = boost::asio::buffer_cast<const char*>(sb.data());
+ hexdump(rawdata, sb.size());
}
void debug_print(string& str) {

0 comments on commit 7b0a3cd

Please sign in to comment.
Something went wrong with that request. Please try again.