Skip to content
Browse files

removed boost::xpressive along with some ~5mb of needless binary blur…

…b, reimplemented with simple line parsing (STOMP stands for Simple after all)
  • Loading branch information...
1 parent 9c9fb05 commit 0389cbd92cdd474eed947e36bdd3dda8f58d6c9b @ekarak committed Apr 25, 2012
Showing with 261 additions and 181 deletions.
  1. +202 −55 BoostStomp.cpp
  2. +26 −11 BoostStomp.hpp
  3. +11 −8 Main.cpp
  4. +11 −71 StompFrame.cpp
  5. +11 −36 StompFrame.hpp
View
257 BoostStomp.cpp
@@ -25,8 +25,6 @@ for more information on the LGPL, see:
// based on the ASIO async TCP client example found on Boost documentation:
// http://www.boost.org/doc/libs/1_46_1/doc/html/boost_asio/example/timeouts/async_tcp_client.cpp
-// #include <cstdlib>
-// #include <deque>
#include <iostream>
#include <boost/bind.hpp>
#include <boost/asio.hpp>
@@ -44,6 +42,7 @@ namespace STOMP {
using namespace boost::asio;
using boost::asio::ip::tcp;
+
// ----------------------------
// constructor
// ----------------------------
@@ -59,9 +58,15 @@ namespace STOMP {
// ----------------------------
{
m_io_service = boost::shared_ptr< io_service > ( new io_service );
+ // Heartbeat setup
m_heartbeat_timer = boost::shared_ptr< deadline_timer> ( new deadline_timer( *m_io_service ));
std::ostream os( &m_heartbeat);
os << "\n";
+ // Server command map
+ stomp_server_command_map["CONNECTED"] = &BoostStomp::process_CONNECTED;
+ stomp_server_command_map["MESSAGE"] = &BoostStomp::process_MESSAGE;
+ stomp_server_command_map["RECEIPT"] = &BoostStomp::process_RECEIPT;
+ stomp_server_command_map["ERROR"] = &BoostStomp::process_ERROR;
}
@@ -138,7 +143,6 @@ namespace STOMP {
// ---------- TCP CONNECTION SETUP ------------------
// --------------------------------------------------
-
// --------------------------------------------------
void BoostStomp::start_connect(tcp::resolver::iterator endpoint_iter)
// --------------------------------------------------
@@ -217,7 +221,7 @@ namespace STOMP {
if (!ec)
{
debug_print(boost::format("received server response (%1% bytes)") % stomp_response.size() );
- vector<Frame*> received_frames = parse_all_frames(stomp_response); // in StompFrame.cpp
+ vector<Frame*> received_frames = parse_all_frames();
while ((!received_frames.empty()) && (frame = received_frames.back())) {
consume_frame(*frame);
// dispose frame, its not needed anymore
@@ -235,6 +239,114 @@ namespace STOMP {
}
}
+ // --------------------------------------------------
+ Frame* BoostStomp::parse_next()
+ // --------------------------------------------------
+ {
+ string _str;
+ istream _input(&stomp_response);
+ size_t bytes_to_consume = 0, content_length = 0;
+ Frame* frame = NULL;
+ vector< string > header_parts;
+
+ try {
+
+ // STEP 1: find the next STOMP command line in stomp_response, skipping non-matching lines
+ //debug_print("parse_next phase 1");
+ while (std::getline(_input, _str)) {
+ //hexdump(_str.c_str(), _str.length());
+ if ((_str.size() > 0) && (stomp_server_command_map.find(_str) != stomp_server_command_map.end())) {
+ //debug_print(boost::format("parse_next phase 1: COMMAND==%1%") % _str);
+ frame = new Frame(_str);
+ bytes_to_consume += _str.size()+1;
+ break;
+ }
+ }
+ // STEP 2: parse all headers
+ if (frame != NULL) {
+ //debug_print("parse_next phase 2");
+ while (std::getline(_input, _str)) {
+ //hexdump(_str.c_str(), _str.length());
+ boost::algorithm::split(header_parts, _str, is_any_of(":"));
+ if (header_parts.size() > 1) {
+ string* key = decode_header_token(header_parts[0].c_str());
+ string* val = decode_header_token(header_parts[1].c_str());
+ //debug_print(boost::format("parse_next phase 2: HEADER[%1%]==%2%") % *key % *val);
+ frame->m_headers[*key] = *val;
+ bytes_to_consume += _str.size()+1;
+ // special case: content-length
+ if (*key == "content-length") {
+ content_length = lexical_cast<int>(*val);
+ }
+ delete key;
+ delete val;
+ } else {
+ break;
+ }
+ }
+ bytes_to_consume += 1;
+ // STEP 3: parse the body
+ //debug_print("parse_next phase 3");
+ if (content_length > 0) {
+ char* buffer = new char[content_length];
+ // read until the specified content length
+ _input.read(buffer, content_length);
+ bytes_to_consume += content_length;
+ /*
+ for (int i=0; i<content_length; i++) {
+ frame->m_body << buffer[i];
+ } */
+ _str = string(buffer); // TODO: check if NULLs inside the body
+ //debug_print(boost::format("parse_next phase 3: BODY(%1% bytes)==%2%") % _str.size() % _str);
+ frame->m_body.assign(_str.begin(), _str.end());
+ delete buffer;
+ } else {
+ // read all bytes until the first NULL
+ std::getline(_input, _str, '\0');
+ //debug_print(boost::format("parse_next phase 3: BODY(%1% bytes)==%2%") % _str.size() % _str);
+ if (_str.length() > 0) {
+ bytes_to_consume += _str.size() + 1;
+ frame->m_body.assign(_str.begin(), _str.end());
+ };
+ }
+ } else {
+ throw("shit happens");
+ }
+ }
+ catch (...) {
+
+ }
+ stomp_response.consume(bytes_to_consume);
+ //debug_print(boost::format("-- parse_frame, consumed %1% bytes from stomp_response") % bytes_to_consume);
+ return(frame);
+ };
+
+ // ----------------------------
+ vector<Frame*> BoostStomp::parse_all_frames()
+ // ----------------------------
+ {
+ vector<Frame*> results;
+ istream response_stream(&stomp_response);
+ string str;
+ //
+ // get all the responses in response stream
+ //debug_print(boost::format("parse_all_frames before: (%1% bytes in stomp_response)") % stomp_response.size() );
+ try {
+ //
+ // iterate over all frame matches
+ //
+ while (Frame* next_frame = parse_next()) {
+ debug_print(boost::format("parse_all_frames in loop: (%1% bytes in str, %2% bytes still in stomp_response)") % str.size() % stomp_response.size());
+ results.push_back(next_frame);
+ }
+ } catch(...) {
+ debug_print("parse_response in loop: exception in Frame constructor");
+// TODO
+ }
+ //cout << "exiting, " << stomp_response.size() << " bytes still in stomp_response" << endl;
+ return(results);
+ };
+
// ------------------------------------------------
// ---------- OUTPUT ACTOR SETUP ------------------
@@ -247,7 +359,7 @@ namespace STOMP {
if ((m_stopped) || (!m_connected))
return;
- debug_print("start_stomp_write");
+ //debug_print("start_stomp_write");
// send all STOMP frames in queue
m_sendqueue_mutex.lock();
@@ -322,71 +434,98 @@ namespace STOMP {
}
// ------------------------------------------
- bool BoostStomp::acknowledge(Frame& frame)
+ bool BoostStomp::acknowledge(Frame& frame, bool acked = true)
// ------------------------------------------
{
hdrmap hm;
hm["message-id"] = frame.headers()["message-id"];
hm["subscription"] = frame.headers()["subscription"];
- Frame _ackframe( "ACK", hm );
+ string _ack_cmd = (acked ? "ACK" : "NACK");
+ Frame _ackframe( _ack_cmd, hm );
return(send_frame(_ackframe));
}
// ------------------------------------------
void BoostStomp::consume_frame(Frame& _rcvd_frame)
// ------------------------------------------
{
- debug_print(boost::format("-- consume_frame: received %1%") % _rcvd_frame.command());
- if (_rcvd_frame.command() == "CONNECTED") {
- m_connected = true;
- // try to get supported protocol version from headers
- if (_rcvd_frame.headers().find("version") != _rcvd_frame.headers().end()) {
- m_protocol_version = _rcvd_frame.headers()["version"];
- debug_print(boost::format("server supports STOMP version %1%") % m_protocol_version);
- }
- if (m_protocol_version == "1.1") {
- // we are connected to a version 1.1 STOMP server, we can start the heartbeat actor
- start_stomp_heartbeat();
- }
- // in case of reconnection, we need to re-subscribe to all subscriptions
- for (subscription_map::iterator it = m_subscriptions.begin(); it != m_subscriptions.end(); it++) {
- string topic = (*it).first;
- do_subscribe(topic);
- };
+ //debug_print(boost::format("-- consume_frame: received %1%") % _rcvd_frame.command());
+ pfnStompCommandHandler_t handler = stomp_server_command_map[_rcvd_frame.command()];
+ (this->*handler)(_rcvd_frame);
+
+
+ /*
+ if (_rcvd_frame.command() == "CONNECTED") process_CONNECTED(_rcvd_frame);
+ if (_rcvd_frame.command() == "MESSAGE") process_MESSAGE(_rcvd_frame);
+ if (_rcvd_frame.command() == "RECEIPT") process_RECEIPT(_rcvd_frame);
+ if (_rcvd_frame.command() == "ERROR") process_ERROR(_rcvd_frame);
+ */
+ };
+
+ //-----------------------------------------
+ void BoostStomp::process_CONNECTED(Frame& _rcvd_frame)
+ //-----------------------------------------
+ {
+ m_connected = true;
+ // try to get supported protocol version from headers
+ if (_rcvd_frame.headers().find("version") != _rcvd_frame.headers().end()) {
+ m_protocol_version = _rcvd_frame.headers()["version"];
+ debug_print(boost::format("server supports STOMP version %1%") % m_protocol_version);
+ }
+ if (m_protocol_version == "1.1") {
+ // we are connected to a version 1.1 STOMP server, we can start the heartbeat actor
+ start_stomp_heartbeat();
}
- if (_rcvd_frame.command() == "MESSAGE") {
- string* dest = new string(_rcvd_frame.headers()["destination"]);
+ // in case of reconnection, we need to re-subscribe to all subscriptions
+ for (subscription_map::iterator it = m_subscriptions.begin(); it != m_subscriptions.end(); it++) {
+ string topic = (*it).first;
+ do_subscribe(topic);
+ };
+ }
+
+ //-----------------------------------------
+ void BoostStomp::process_MESSAGE(Frame& _rcvd_frame)
+ //-----------------------------------------
+ {
+ bool acked = true;
+ string* dest = new string(_rcvd_frame.headers()["destination"]);
+ //
+ if (pfnOnStompMessage_t callback_function = m_subscriptions[*dest]) {
+ debug_print("-- consume_frame: firing callback");
//
- if (pfnOnStompMessage_t callback_function = m_subscriptions[*dest]) {
- debug_print("-- consume_frame: firing callback");
- //
- callback_function(&_rcvd_frame);
- };
+ acked = callback_function(_rcvd_frame);
};
- if (_rcvd_frame.command() == "RECEIPT") {
- string* receipt_id = new string(_rcvd_frame.headers()["receipt_id"]);
- // do something with receipt...
- debug_print(boost::format("receipt-id == %1%") % receipt_id);
+ // acknowledge frame, if in "Client" or "Client-Individual" ack mode
+ if ((m_ackmode == ACK_CLIENT) || (m_ackmode == ACK_CLIENT_INDIVIDUAL)) {
+ acknowledge(_rcvd_frame, acked);
}
- if (_rcvd_frame.command() == "ERROR") {
- string errormessage = (_rcvd_frame.headers().find("message") != _rcvd_frame.headers().end()) ?
- _rcvd_frame.headers()["message"] :
- "(unknown error!)";
+ }
- throw(errormessage);
- }
- };
+ //-----------------------------------------
+ void BoostStomp::process_RECEIPT(Frame& _rcvd_frame)
+ //-----------------------------------------
+ {
+ string* receipt_id = new string(_rcvd_frame.headers()["receipt_id"]);
+ // do something with receipt...
+ debug_print(boost::format("receipt-id == %1%") % receipt_id);
+ }
//-----------------------------------------
- bool BoostStomp::send_frame( Frame& frame )
+ void BoostStomp::process_ERROR(Frame& _rcvd_frame)
//-----------------------------------------
{
- xpressive::smatch tmp;
- if (!regex_match(frame.command(), tmp, re_stomp_client_command)) {
- debug_print(boost::format("send_frame: Invalid frame command (%1%)") % frame.command() );
- exit(1);
- }
+ string errormessage = (_rcvd_frame.headers().find("message") != _rcvd_frame.headers().end()) ?
+ _rcvd_frame.headers()["message"] :
+ "(unknown error!)";
+ errormessage += _rcvd_frame.body().data();
+ throw(errormessage);
+ }
+
+ //-----------------------------------------
+ bool BoostStomp::send_frame( Frame& frame )
+ //-----------------------------------------
+ {
debug_print(boost::format("send_frame: Adding %1% frame to send queue...") % frame.command() );
m_sendqueue_mutex.lock();
m_sendqueue.push(frame);
@@ -396,9 +535,11 @@ namespace STOMP {
return(true);
}
- // ------------------------------------------
- // ------------ PUBLIC INTERFACE ------------
- // ------------------------------------------
+ // ---------------------------------------------------------------------------------------
+ // ---------------------------------------------------------------------------------------
+ // ------------------------ PUBLIC INTERFACE ------------------------
+ // ---------------------------------------------------------------------------------------
+ // ---------------------------------------------------------------------------------------
// ------------------------------------------
bool BoostStomp::send( string& topic, hdrmap _headers, std::string& body )
@@ -455,30 +596,36 @@ namespace STOMP {
// returns a new transaction id
{
hdrmap hm;
+ // create a new transaction id
hm["transaction"] = lexical_cast<string>(m_transaction_id++);
Frame frame( "BEGIN", hm );
send_frame(frame);
return(m_transaction_id);
};
// ------------------------------------------
- bool BoostStomp::commit(string& transaction_id)
+ bool BoostStomp::commit(int transaction_id)
// ------------------------------------------
{
hdrmap hm;
- hm["transaction"] = transaction_id;
+ // add required header
+ hm["transaction"] = lexical_cast<string>(transaction_id);
Frame frame( "COMMIT", hm );
return(send_frame(frame));
};
// ------------------------------------------
- bool BoostStomp::abort(string& transaction_id)
+ bool BoostStomp::abort(int transaction_id)
// ------------------------------------------
{
hdrmap hm;
- hm["transaction"] = transaction_id;
+ // add required header
+ hm["transaction"] = lexical_cast<string>(transaction_id);
Frame frame( "ABORT", hm );
return(send_frame(frame));
};
+
+
} // end namespace STOMP
+
View
37 BoostStomp.hpp
@@ -41,7 +41,6 @@ for more information on the LGPL, see:
#include <boost/shared_ptr.hpp>
#include <boost/thread.hpp>
#include <boost/thread/mutex.hpp>
-#include <boost/xpressive/xpressive.hpp>
#include "StompFrame.hpp"
#include "helpers.h"
@@ -61,7 +60,6 @@ namespace STOMP {
using namespace boost;
using namespace boost::asio;
using namespace boost::asio::ip;
- using namespace boost::xpressive;
// ACK mode
typedef enum {
@@ -71,11 +69,22 @@ namespace STOMP {
} AckMode;
// Stomp message callback function prototype
- typedef void (*pfnOnStompMessage_t)( Frame* _frame );
+ typedef bool (*pfnOnStompMessage_t)( Frame& );
// Stomp subscription map (topic => callback)
typedef std::map<std::string, pfnOnStompMessage_t> subscription_map;
+
+ // STOMP server command handler methods
+ //typedef const boost::_bi::bind_t<void (&)(STOMP::Frame&), boost::_mfi::dm<void(STOMP::Frame&), STOMP::BoostStomp>, boost::_bi::list1<boost::arg<1> > > pfnStompCommandHandler_t;
+
+ typedef void (BoostStomp::*pfnStompCommandHandler_t)( Frame& );
+
+ //typedef void (*pfnStompCommandHandler_t)(BoostStomp::*, STOMP::Frame&);
+ ///typedef boost::function<void(Frame&)> pfnStompCommandHandler_t;
+ typedef std::map<string, pfnStompCommandHandler_t> stomp_server_command_map_t;
+ static stomp_server_command_map_t stomp_server_command_map;
+
// here we go
// -------------
class BoostStomp
@@ -116,14 +125,15 @@ namespace STOMP {
bool send_frame( Frame& _frame );
bool do_subscribe (string& topic);
//
-
void consume_frame(Frame& _rcvd_frame);
+ void process_CONNECTED(Frame& _rcvd_frame);
+ void process_MESSAGE(Frame& _rcvd_frame);
+ void process_RECEIPT(Frame& _rcvd_frame);
+ void process_ERROR(Frame& _rcvd_frame);
void start_connect(tcp::resolver::iterator endpoint_iter);
void handle_connect(const boost::system::error_code& ec, tcp::resolver::iterator endpoint_iter);
- void start_stomp_connect(tcp::resolver::iterator endpoint_iter);
-
//TODO: void setup_stomp_heartbeat(int cx, int cy);
void start_stomp_heartbeat();
@@ -136,6 +146,13 @@ namespace STOMP {
void handle_stomp_write(const boost::system::error_code& ec);
void worker( boost::shared_ptr< boost::asio::io_service > io_service );
+ //
+ // Frame constructor from the first available STOMP frame in stomp_response streambuf
+ Frame* parse_next();
+
+ // return a vector of all Frame's in a streambuf
+ vector<Frame*> parse_all_frames();
+
//----------------
public:
//----------------
@@ -152,18 +169,16 @@ namespace STOMP {
bool send ( std::string& topic, hdrmap _headers, std::string& body, pfnOnStompMessage_t callback );
bool subscribe ( std::string& topic, pfnOnStompMessage_t callback );
bool unsubscribe ( std::string& topic );
- bool acknowledge ( Frame& _frame );
+ bool acknowledge ( Frame& _frame, bool acked );
// STOMP transactions
int begin(); // returns a new transaction id
- bool commit(string& transaction_id);
- bool abort(string& transaction_id);
+ bool commit(int transaction_id);
+ bool abort(int transaction_id);
//
AckMode get_ackmode() { return m_ackmode; };
//
}; //class
-
}
-
#endif
View
19 Main.cpp
@@ -43,16 +43,16 @@ static string notifications_topic = "/queue/zwave/monitor";
// -------------------------------------------------------
// a callback for any STOMP Frames in subscribed channels
// -------------------------------------------------------
-STOMP::pfnOnStompMessage_t subscription_callback(STOMP::Frame* _frame) {
+bool subscription_callback(STOMP::Frame& _frame) {
cout << "--Incoming STOMP Frame--" << endl;
cout << " Headers:" << endl;
STOMP::hdrmap::iterator it;
- for ( it = _frame->headers().begin() ; it != _frame->headers().end(); it++ )
+ for ( it = _frame.headers().begin() ; it != _frame.headers().end(); it++ )
cout << "\t" << (*it).first << "\t=>\t" << (*it).second << endl;
//
- cout << " Body: (size: " << _frame->body().size() << " chars):" << endl;
- cout << _frame->body() << endl;
- return 0;
+ cout << " Body: (size: " << _frame.body().size() << " chars):" << endl;
+ cout << _frame.body().data() << endl;
+ return(true); // return false if we want to disacknowledge the frame (send NACK instead of ACK)
}
// -----------------------------------------
@@ -81,12 +81,15 @@ int main(int argc, char *argv[]) {
// add an outgoing message to the queue
stomp_client->send(notifications_topic, headers, body);
sleep(1);
- string body2 = string("this is the SECOND message.");
+ string body2 = string("this is the SECOND message.\0with a NULL in it");
stomp_client->send(notifications_topic, headers, body2);
sleep(1);
- string body3 = string("this is the THIRD message.");
+ string body3 = string("this is the THIRD message.\0\0with two NULLs in it");
+ vector<char> binbody;
+ binbody.push_back(body3.c_str());
stomp_client->send(notifications_topic, headers, body3);
- while (1) sleep(1);
+ sleep(1);
+ stomp_client->stop();
}
catch (std::exception& e)
{
View
82 StompFrame.cpp
@@ -22,14 +22,16 @@ for more information on the LGPL, see:
http://en.wikipedia.org/wiki/GNU_Lesser_General_Public_License
*/
-#include "StompFrame.hpp"
-
#include <boost/format.hpp>
+#include <boost/lexical_cast.hpp>
+#include "BoostStomp.hpp"
+//#include "StompFrame.hpp"
+#include "helpers.h"
namespace STOMP {
using namespace boost;
- using namespace boost::xpressive;
+ using namespace boost::assign;
/*
* Escaping is needed to allow header keys and values to contain those frame header
@@ -80,81 +82,19 @@ namespace STOMP {
}
}
// special header: content-length
- if( m_body.length() > 0 ) {
- os << "content-length:" << m_body.length() << "\n";
+ if( m_body.size() > 0 ) {
+ os << "content-length:" << m_body.size() << "\n";
}
// write newline signifying end of headers
os << "\n";
// step 3. Write the body
- if( m_body.length() > 0 ) {
- os << m_body;
+ if( m_body.size() > 0 ) {
+ request.sputn(m_body.data(), m_body.size());
+ //os << m_body.data();
+ // TODO: check bodies with NULL in them (data() returns char*)
}
// write terminating NULL char
request.sputc('\0');
};
- // --------------------------------------------------
- Frame::Frame(xpressive::smatch& frame_match, boost::asio::streambuf& stomp_response)
- // --------------------------------------------------
- {
- size_t framesize = frame_match.length(0);
- size_t bytes_to_consume = framesize + 2; // plus one for the NULL
- debug_print(boost::format("-- parse_frame, frame: %1% bytes, content: \n%2%") % framesize % frame_match[0]);
- hexdump(frame_match.str(0).c_str(), framesize);
- stomp_response.consume(bytes_to_consume);
- debug_print(boost::format("-- parse_frame, consumed %1% bytes from stomp_response") % bytes_to_consume);
- //std::cout << "Command:" << frame_match[command] << std::endl;
- // break down headers
- std::string h = std::string(frame_match[tag_headers]);
-
- xpressive::sregex_iterator cur( h.begin(), h.end(), re_header );
- xpressive::sregex_iterator end;
- for( ; cur != end; ++cur ) {
- xpressive::smatch const &header = *cur;
- m_headers[*decode_header_token(header[tag_key].str().c_str())] = *decode_header_token(header[tag_value].str().c_str());
- }
- //
- m_command = string(frame_match[tag_command]);
- m_body = (frame_match[tag_body]) ? string(frame_match[tag_body]) : "";
- };
-
- // ----------------------------
- vector<Frame*> parse_all_frames(boost::asio::streambuf& stomp_response)
- // ----------------------------
- {
- vector<Frame*> results;
- istream response_stream(&stomp_response);
- xpressive::smatch frame_match;
- string str;
- char lala[1024];
- //
- // get all the responses in response stream
- debug_print(boost::format("parse_all_frames before: (%1% bytes in stomp_response)") % stomp_response.size() );
- //
- // iterate over all frame matches
- //
- //while ( std::getline( response_stream, str, '\0' ) ) {
- while ( response_stream.get(lala, 1023, '\0') ) {
- str = string(lala);
-hexdump(str.c_str(), str.size());
- debug_print(boost::format("parse_all_frames in loop: (%1% bytes in str, %2% bytes still in stomp_response)") % str.size() % stomp_response.size());
-
- if ( regex_match(str, frame_match, re_stomp_server_frame ) ) {
- try {
- Frame* next_frame = new Frame(frame_match, stomp_response);
- results.push_back(next_frame);
- } catch(...) {
- debug_print("parse_response in loop: exception in Frame constructor");
-// TODO
- }
- } else {
- debug_print("parse_response error: mismatched frame:");
- hexdump(str.c_str(), str.length());
- }
-
- }
- //cout << "exiting, " << stomp_response.size() << " bytes still in stomp_response" << endl;
- return(results);
- };
-
}
View
47 StompFrame.hpp
@@ -7,39 +7,19 @@
#include <sstream>
#include <boost/asio.hpp>
#include <boost/algorithm/string/replace.hpp>
-#include <boost/xpressive/xpressive.hpp>
-
-#include "helpers.h"
+#include <boost/algorithm/string/split.hpp>
+#include <boost/algorithm/string/predicate.hpp>
+#include <boost/algorithm/string/classification.hpp>
+#include <boost/assign/list_of.hpp>
+#include <boost/assign/list_inserter.hpp>
namespace STOMP {
using namespace std;
using namespace boost;
using namespace boost::asio;
- using namespace boost::xpressive;
-
- static xpressive::sregex re_stomp_client_command = as_xpr("CONNECT") | "DISCONNECT"
- | "SEND" | "SUBSCRIBE" | "UNSUBSCRIBE"
- | "BEGIN" | "COMMIT" | "ABORT"
- | "ACK" | "NACK" ;
-
-
- // regular expressions to parse a STOMP server frame
-
- static xpressive::sregex re_stomp_server_command = as_xpr("CONNECTED")
- | "MESSAGE" | "RECEIPT" | "ERROR"
- | "ACK" | "NACK" ;
- //
- static xpressive::mark_tag tag_command(1), tag_headers(2), tag_body(3);
- static xpressive::mark_tag tag_key(1), tag_value(2);
- //
- //static sregex re_header = (key= -+alnum) >> ':' >> (value= -+_) >> _n;
- static xpressive::sregex re_header = (tag_key= -+~as_xpr(':')) >> ':' >> (tag_value= -+_) >> _n;
- // FIXME: use content-length header to read not up to the end of the stream (which could contain another frame too!)
- // but up to the content-length byte.
- static xpressive::sregex re_stomp_server_frame = bos >> (tag_command= re_stomp_server_command ) >> _n // command and newline
- >> (tag_headers= -+(re_header)) >> _n // headers and terminating newline
- >> (tag_body= *_) >> eos; //body till end of stream (\0)
+ using namespace boost::assign;
+ using namespace boost::algorithm;
/* STOMP Frame header map */
typedef map<string, string> hdrmap;
@@ -48,11 +28,12 @@ namespace STOMP {
class Frame {
friend class BoostStomp;
+ friend Frame* parse_next(boost::asio::streambuf&);
protected:
string m_command;
hdrmap m_headers;
- string m_body;
+ vector<char> m_body;
boost::asio::streambuf request;
@@ -71,7 +52,7 @@ namespace STOMP {
Frame(string cmd, hdrmap h, string b):
m_command(cmd),
m_headers(h),
- m_body(b)
+ m_body(b.begin(), b.end())
{};
// copy constructor
@@ -82,22 +63,16 @@ namespace STOMP {
m_body = other.m_body;
};
- // constructor from boost::xpressive regex match object
- Frame(xpressive::smatch& framestr, boost::asio::streambuf& stomp_response);
-
//
string command() { return m_command; };
hdrmap headers() { return m_headers; };
- string body() { return m_body; };
+ vector<char>& body() { return m_body; };
// encode a STOMP Frame into a streambuf
void encode();
}; // class Frame
- // return a vector of all Frame's in a streambuf
- vector<Frame*> parse_all_frames(boost::asio::streambuf& stomp_response);
-
string* encode_header_token(const char* str);
string* decode_header_token(const char* str);

0 comments on commit 0389cbd

Please sign in to comment.
Something went wrong with that request. Please try again.