Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with
or
.
Download ZIP
Browse files

- initial support for reconnection

- connection is now synchronous (you shouldn't be able to send frames either way)
  • Loading branch information...
commit c473992911b62e0a3eacff0cac7d65fc18bfe805 1 parent 22e1cd6
@ekarak authored
View
409 BoostStomp.cpp
@@ -31,7 +31,10 @@ for more information on the LGPL, see:
#include <boost/bind.hpp>
#include <boost/asio.hpp>
#include <boost/asio/buffer.hpp>
+#include <boost/lexical_cast.hpp>
#include <boost/format.hpp>
+#include <boost/thread.hpp>
+
#include "BoostStomp.hpp"
namespace STOMP {
@@ -41,25 +44,6 @@ namespace STOMP {
using namespace boost::asio;
using boost::asio::ip::tcp;
- // regular expressions to parse a STOMP server frame
- static xpressive::sregex re_stomp_client_command = as_xpr("CONNECT") | "DISCONNECT"
- | "SEND" | "SUBSCRIBE" | "UNSUBSCRIBE"
- | "BEGIN" | "COMMIT" | "ABORT"
- | "ACK" | "NACK" ;
-
- static xpressive::sregex re_stomp_server_command = as_xpr("CONNECTED")
- | "MESSAGE" | "RECEIPT" | "ERROR"
- | "ACK" | "NACK" ;
- //
- static xpressive::mark_tag command(1), headers(2), body(3);
- static xpressive::mark_tag key(1), value(2);
- //
- //static sregex re_header = (key= -+alnum) >> ':' >> (value= -+_) >> _n;
- static xpressive::sregex re_header = (key= -+~as_xpr(':')) >> ':' >> (value= -+_) >> _n;
- static xpressive::sregex re_stomp_server_frame = bos >> (command= re_stomp_server_command ) >> _n // command and newline
- >> (headers= -+(re_header)) >> _n // headers and terminating newline
- >> (body= *_) >> eos; //body till end of stream (\0)
-
// ----------------------------
// constructor
// ----------------------------
@@ -69,24 +53,15 @@ namespace STOMP {
m_port(port),
m_ackmode(ackmode),
m_stopped(false),
- m_connected(false)
+ m_connected(false),
+ m_protocol_version("1.0"),
+ m_transaction_id(0)
// ----------------------------
{
m_io_service = boost::shared_ptr< io_service > ( new io_service );
m_heartbeat_timer = boost::shared_ptr< deadline_timer> ( new deadline_timer( *m_io_service ));
std::ostream os( &m_heartbeat);
os << "\n";
-
- m_socket = new tcp::socket(*m_io_service);
- tcp::resolver resolver(*m_io_service);
- // start the asynchronous resolver
- start(resolver.resolve(tcp::resolver::query(
- m_hostname,
- to_string<int>(m_port, std::dec),
- boost::asio::ip::resolver_query_base::numeric_service)
- ));
- // engage!
- worker_thread = new boost::thread( boost::bind( &BoostStomp::worker, this, m_io_service ) );
}
@@ -99,110 +74,17 @@ namespace STOMP {
worker_thread->interrupt();
delete m_socket;
}
-
+
// ----------------------------
- vector<Frame*> BoostStomp::parse_response()
+ // worker thread
// ----------------------------
+ void BoostStomp::worker( boost::shared_ptr< boost::asio::io_service > io_service )
{
- vector<Frame*> results;
- istream response_stream(&stomp_response);
- xpressive::smatch frame_match;
- string str;
- //
- // get all the responses in response stream
- //debug_print(boost::format("parse_response before: (%1% bytes in stomp_response)") % stomp_response.size() );
- //
- // iterate over all frame matches
- //
- while ( std::getline( response_stream, str, '\0' ) ) {
- //debug_print(boost::format("parse_response in loop: (%1% bytes in stomp_response)") % stomp_response.size());
-
- if ( regex_match(str, frame_match, re_stomp_server_frame ) ) {
- Frame* next_frame = parse_frame(frame_match);
- if (next_frame) {
- results.push_back(next_frame);
- }
- } else {
- debug_print(boost::format("parse_response error: mismatched frame: \n%1%") % str);
- hexdump(str.c_str(), str.length());
- }
-
- }
- //cout << "exiting, " << stomp_response.size() << " bytes still in stomp_response" << endl;
- return(results);
- };
-
- // --------------------------------------------------
- Frame* BoostStomp::parse_frame(xpressive::smatch& frame_match)
- // --------------------------------------------------
- {
- hdrmap hm;
- size_t framesize = frame_match.length(0);
- debug_print(boost::format("-- parse_frame, frame size: %1% bytes") % framesize);
- //hexdump(frame_match.str(0).c_str(), framesize);
- stomp_response.consume(framesize);
-
- //std::cout << "Command:" << frame_match[command] << std::endl;
- // break down headers
- std::string h = std::string(frame_match[headers]);
-
- xpressive::sregex_iterator cur( h.begin(), h.end(), re_header );
- xpressive::sregex_iterator end;
- for( ; cur != end; ++cur ) {
- xpressive::smatch const &header = *cur;
- //std::cout << "H:" << header[key] << "==" << header[value] << std::endl;
- hm[*decode_header_token(header[key].str().c_str())] = *decode_header_token(header[value].str().c_str());
- }
- //
- string c = string(frame_match[command]);
- string b = (frame_match[body]) ? string(frame_match[body]) : "";
- //
- return(new Frame(c, hm , b));
- };
-
- void BoostStomp::consume_frame(Frame& _rcvd_frame) {
- debug_print(boost::format("-- consume_frame: received %1%") % _rcvd_frame.command());
- if (_rcvd_frame.command() == "CONNECTED") {
- m_connected = true;
- start_stomp_write();
- start_stomp_heartbeat();
- }
- if (_rcvd_frame.command() == "MESSAGE") {
- string* dest = new string(_rcvd_frame.headers()["destination"]);
- //
- if (pfnOnStompMessage_t callback_function = m_subscriptions[*dest]) {
- debug_print("-- consume_frame: firing callback");
- //
- callback_function(&_rcvd_frame);
- };
- };
- };
-
-
- void hexdump(const void *ptr, int buflen) {
- unsigned char *buf = (unsigned char*)ptr;
- int i, j;
- for (i=0; i<buflen; i+=16) {
- printf("%06x: ", i);
- for (j=0; j<16; j++)
- if (i+j < buflen)
- printf("%02x ", buf[i+j]);
- else
- printf(" ");
- printf(" ");
- for (j=0; j<16; j++)
- if (i+j < buflen)
- printf("%c", isprint(buf[i+j]) ? buf[i+j] : '.');
- printf("\n");
- }
+ debug_print("Worker thread starting...");
+ io_service->run();
+ debug_print("Worker thread finished.");
}
- void hexdump(boost::asio::streambuf& sb) {
- std::istream is(&sb);
- std::string str;
- is >> str;
- hexdump(str.c_str(), str.size());
- }
// ----------------------------
// ASIO HANDLERS (protected)
@@ -211,9 +93,15 @@ namespace STOMP {
// Called by the user of the client class to initiate the connection process.
// The endpoint iterator will have been obtained using a tcp::resolver.
- void BoostStomp::start(tcp::resolver::iterator endpoint_iter)
+ void BoostStomp::start()
{
- debug_print("starting...");
+ m_socket = new tcp::socket(*m_io_service);
+ tcp::resolver resolver(*m_io_service);
+ tcp::resolver::iterator endpoint_iter = resolver.resolve(tcp::resolver::query(
+ m_hostname,
+ to_string<int>(m_port, std::dec),
+ boost::asio::ip::resolver_query_base::numeric_service)
+ );
// Start the connect actor.
start_connect(endpoint_iter);
}
@@ -225,8 +113,10 @@ namespace STOMP {
{
debug_print("stopping...");
m_stopped = true;
- m_socket->close();
m_heartbeat_timer->cancel();
+ //
+ m_socket->close();
+ delete m_socket;
}
@@ -235,77 +125,54 @@ namespace STOMP {
// ---------- TCP CONNECTION SETUP ------------------
// --------------------------------------------------
+
// --------------------------------------------------
void BoostStomp::start_connect(tcp::resolver::iterator endpoint_iter)
// --------------------------------------------------
{
if (endpoint_iter != tcp::resolver::iterator())
{
- debug_print(boost::format("Trying %1%...") % endpoint_iter->endpoint() );
-
- // Start the asynchronous connect operation.
- m_socket->async_connect(endpoint_iter->endpoint(),
- boost::bind(&BoostStomp::handle_connect,
- this, _1, endpoint_iter));
+ debug_print(boost::format("TCP: Trying %1%...") % endpoint_iter->endpoint() );
+
+ // Try TCP connection synchronously (the first frame to send is the CONNECT frame)
+ boost::system::error_code ec;
+ m_socket->connect(endpoint_iter->endpoint(), ec);
+ if (!ec) {
+ // now we are connected to STOMP server's TCP port/
+ debug_print(boost::format("TCP connection to %1% is active") % endpoint_iter->endpoint() );
+
+ // Send the CONNECT request synchronously (immediately).
+ hdrmap headers;
+ headers["accept-version"] = "1.1";
+ headers["host"] = m_hostname;
+ Frame frame( "CONNECT", headers );
+ frame.encode();
+ debug_print("Sending CONNECT frame...");
+ boost::asio::write(*m_socket, frame.request);
+
+ // start the read actor so as to receive the CONNECTED frame
+ start_stomp_read();
+
+ // start worker thread (io_service.run())
+ worker_thread = new boost::thread( boost::bind( &BoostStomp::worker, this, m_io_service ) );
+ } else {
+ // We need to close the socket used in the previous connection attempt
+ // before starting a new one.
+ m_socket->close();
+ // Try the next available endpoint.
+ start_connect(++endpoint_iter);
+ }
}
else
{
- // There are no more endpoints to try. Shut down the client.
+ // There are no more endpoints to try.
stop();
+ debug_print("Connection unsuccessful. Sleeping, then retrying...");
+ sleep(3);
+ start();
}
}
- // --------------------------------------------------
- void BoostStomp::handle_connect(const boost::system::error_code& ec,
- tcp::resolver::iterator endpoint_iter)
- // --------------------------------------------------
- {
- if (m_stopped)
- return;
-
- // The async_connect() function automatically opens the socket at the start
- // of the asynchronous operation. If the socket is closed at this time then
- // the timeout handler must have run first.
- if (!m_socket->is_open())
- {
- std::cerr << "Connect timed out\n";
- debug_print(boost::format("TCP Connection to %1% timed out!!!") % endpoint_iter->endpoint() );
-
- // Try the next available endpoint.
- start_connect(++endpoint_iter);
- }
-
- // Check if the connect operation failed before the deadline expired.
- else if (ec)
- {
- // We need to close the socket used in the previous connection attempt
- // before starting a new one.
- m_socket->close();
-
- // Try the next available endpoint.
- start_connect(++endpoint_iter);
- }
-
- // Otherwise we have successfully established a connection.
- else
- {
- debug_print(boost::format("TCP connection to %1% is active") % endpoint_iter->endpoint() );
-
- // now we are connected to STOMP server's TCP port/
- // The protocol negotiation phase requires we send a CONNECT frame
-
- // The connection was successful. Send the CONNECT request immediately.
- Frame frame( "CONNECT" );
- frame.encode();
- debug_print(boost::format("Sending %1% frame...") % frame.command() );
- boost::asio::write(*m_socket, frame.request);
-
- // Start the reading actor so as to receive the CONNECTED frame,
- // The read handler will also start the writing actor, stomp_write()
- start_stomp_read();
- }
- }
-
// -----------------------------------------------
// ---------- INPUT ACTOR SETUP ------------------
// -----------------------------------------------
@@ -335,15 +202,15 @@ namespace STOMP {
if (!ec)
{
debug_print(boost::format("received server response (%1% bytes)") % stomp_response.size() );
- vector<Frame*> received_frames = parse_response();
+ vector<Frame*> received_frames = parse_all_frames(stomp_response); // in StompFrame.cpp
while ((!received_frames.empty()) && (frame = received_frames.back())) {
consume_frame(*frame);
// dispose frame, its not needed anymore
delete frame;
received_frames.pop_back();
} //while
-
- start_stomp_read();
+ // OK, go on to the next outgoing frame in send queue
+ start_stomp_write();
}
else
{
@@ -368,7 +235,7 @@ namespace STOMP {
// send all STOMP frames in queue
m_sendqueue_mutex.lock();
- while (m_sendqueue.size() > 0) {
+ if (m_sendqueue.size() > 0) {
Frame& frame = m_sendqueue.front();
frame.encode();
debug_print(boost::format("Sending %1% frame...") % frame.command() );
@@ -380,6 +247,7 @@ namespace STOMP {
m_sendqueue.pop();
}
m_sendqueue_mutex.unlock();
+
}
// -----------------------------------------------
@@ -389,7 +257,11 @@ namespace STOMP {
if (m_stopped)
return;
- if (ec) {
+ if (!ec) {
+ // read back the server's response
+ start_stomp_read();
+ } else {
+ // TODO: if disconnected, go back to the connection phase
std::cout << "Error writing to STOMP server: " << ec.message() << "\n";
stop();
}
@@ -434,28 +306,52 @@ namespace STOMP {
}
// ------------------------------------------
- // ------------ PUBLIC INTERFACE ------------
- // ------------------------------------------
-
- // ------------------------------------------
- bool BoostStomp::subscribe( string& topic, pfnOnStompMessage_t callback )
+ bool BoostStomp::acknowledge(Frame& frame)
// ------------------------------------------
{
hdrmap hm;
- hm["destination"] = topic;
- Frame frame( "SUBSCRIBE", hm );
- m_subscriptions[topic] = callback;
- return(send_frame(frame));
+ hm["message-id"] = frame.headers()["message-id"];
+ hm["subscription"] = frame.headers()["subscription"];
+ Frame _ackframe( "ACK", hm );
+ return(send_frame(_ackframe));
}
- // ------------------------------------------
- bool BoostStomp::send( string& topic, hdrmap _headers, std::string& body )
- // ------------------------------------------
- {
- _headers["destination"] = topic;
- Frame frame( "SEND", _headers, body );
- return(send_frame(frame));
- }
+ void BoostStomp::consume_frame(Frame& _rcvd_frame) {
+ debug_print(boost::format("-- consume_frame: received %1%") % _rcvd_frame.command());
+ if (_rcvd_frame.command() == "CONNECTED") {
+ m_connected = true;
+ // try to get supported protocol version from headers
+ if (_rcvd_frame.headers().find("version") != _rcvd_frame.headers().end()) {
+ m_protocol_version = _rcvd_frame.headers()["version"];
+ debug_print(boost::format("server supports STOMP version %1%") % m_protocol_version);
+ }
+ if (m_protocol_version == "1.1") {
+ // we are connected to a version 1.1 STOMP server, we can start the heartbeat actor
+ start_stomp_heartbeat();
+ }
+ }
+ if (_rcvd_frame.command() == "MESSAGE") {
+ string* dest = new string(_rcvd_frame.headers()["destination"]);
+ //
+ if (pfnOnStompMessage_t callback_function = m_subscriptions[*dest]) {
+ debug_print("-- consume_frame: firing callback");
+ //
+ callback_function(&_rcvd_frame);
+ };
+ };
+ if (_rcvd_frame.command() == "RECEIPT") {
+ string* receipt_id = new string(_rcvd_frame.headers()["receipt_id"]);
+ // do something with receipt...
+ debug_print(boost::format("receipt-id == %1%") % receipt_id);
+ }
+ if (_rcvd_frame.command() == "ERROR") {
+ string errormessage = (_rcvd_frame.headers().find("message") != _rcvd_frame.headers().end()) ?
+ _rcvd_frame.headers()["message"] :
+ "(unknown error!)";
+
+ throw(errormessage);
+ }
+ };
//-----------------------------------------
bool BoostStomp::send_frame( Frame& frame )
@@ -471,37 +367,86 @@ namespace STOMP {
m_sendqueue_mutex.lock();
m_sendqueue.push(frame);
m_sendqueue_mutex.unlock();
- // start the write actor, if we're connected and not stopped
- if (m_connected && !m_stopped) start_stomp_write();
+ // start the write actor
+ start_stomp_write();
return(true);
}
- void BoostStomp::worker( boost::shared_ptr< boost::asio::io_service > io_service )
+ // ------------------------------------------
+ // ------------ PUBLIC INTERFACE ------------
+ // ------------------------------------------
+
+ // ------------------------------------------
+ bool BoostStomp::send( string& topic, hdrmap _headers, std::string& body )
+ // ------------------------------------------
{
- debug_print("Thread Start");
- io_service->run();
- debug_print("Thread Finish");
+ _headers["destination"] = topic;
+ Frame frame( "SEND", _headers, body );
+ return(send_frame(frame));
}
- boost::mutex global_stream_lock;
- void debug_print(boost::format& fmt) {
-#ifdef DEBUG_STOMP
- global_stream_lock.lock();
- std::cout << "[" << boost::this_thread::get_id() << "] BoostStomp:" << fmt.str() << endl;
- global_stream_lock.unlock();
-#endif
+ // ------------------------------------------
+ bool BoostStomp::send( string& topic, hdrmap _headers, std::string& body, pfnOnStompMessage_t callback )
+ // ------------------------------------------
+ {
+ _headers["destination"] = topic;
+ Frame frame( "SEND", _headers, body );
+ return(send_frame(frame));
}
- void debug_print(string& str) {
- boost::format fmt = boost::format(str.c_str());
- debug_print(fmt);
+ // ------------------------------------------
+ bool BoostStomp::subscribe( string& topic, pfnOnStompMessage_t callback )
+ // ------------------------------------------
+ {
+ hdrmap hm;
+ hm["id"] = lexical_cast<string>(boost::this_thread::get_id());
+ hm["destination"] = topic;
+ Frame frame( "SUBSCRIBE", hm );
+ m_subscriptions[topic] = callback;
+ return(send_frame(frame));
}
- void debug_print(const char* cstr) {
- boost::format fmt = boost::format(cstr);
- debug_print(fmt);
+ // ------------------------------------------
+ bool BoostStomp::unsubscribe( string& topic )
+ // ------------------------------------------
+ {
+ hdrmap hm;
+ hm["destination"] = topic;
+ Frame frame( "UNSUBSCRIBE", hm );
+ m_subscriptions.erase(topic);
+ return(send_frame(frame));
}
+ // ------------------------------------------
+ int BoostStomp::begin()
+ // ------------------------------------------
+ // returns a new transaction id
+ {
+ hdrmap hm;
+ hm["transaction"] = lexical_cast<string>(m_transaction_id++);
+ Frame frame( "BEGIN", hm );
+ send_frame(frame);
+ return(m_transaction_id);
+ };
+
+ // ------------------------------------------
+ bool BoostStomp::commit(string& transaction_id)
+ // ------------------------------------------
+ {
+ hdrmap hm;
+ hm["transaction"] = transaction_id;
+ Frame frame( "COMMIT", hm );
+ return(send_frame(frame));
+ };
+ // ------------------------------------------
+ bool BoostStomp::abort(string& transaction_id)
+ // ------------------------------------------
+ {
+ hdrmap hm;
+ hm["transaction"] = transaction_id;
+ Frame frame( "ABORT", hm );
+ return(send_frame(frame));
+ };
} // end namespace STOMP
View
43 BoostStomp.hpp
@@ -37,13 +37,14 @@ for more information on the LGPL, see:
#include <boost/asio.hpp>
#include <boost/asio/deadline_timer.hpp>
-#include <boost/format.hpp>
+
#include <boost/shared_ptr.hpp>
#include <boost/thread.hpp>
#include <boost/thread/mutex.hpp>
#include <boost/xpressive/xpressive.hpp>
#include "StompFrame.hpp"
+#include "helpers.h"
// helper template function for pretty-printing just about anything
template <class T>
@@ -65,7 +66,8 @@ namespace STOMP {
// ACK mode
typedef enum {
ACK_AUTO=0, // implicit acknowledgment (no ACK is sent)
- ACK_CLIENT // explicit acknowledgment (must ACK)
+ ACK_CLIENT, // explicit acknowledgment (must ACK)
+ ACK_CLIENT_INDIVIDUAL //
} AckMode;
// Stomp message callback function prototype
@@ -105,21 +107,21 @@ namespace STOMP {
boost::thread* worker_thread;
boost::shared_ptr<deadline_timer> m_heartbeat_timer;
boost::asio::streambuf m_heartbeat;
-
- bool send_frame( Frame& _frame );
- vector<Frame*> parse_response ();
- Frame* parse_frame (xpressive::smatch& framestr);
+ string m_protocol_version;
+ int m_transaction_id;
+ //
+ bool send_frame( Frame& _frame );
//
void consume_frame(Frame& _rcvd_frame);
- void start(tcp::resolver::iterator endpoint_iter);
- void stop();
-
void start_connect(tcp::resolver::iterator endpoint_iter);
void handle_connect(const boost::system::error_code& ec, tcp::resolver::iterator endpoint_iter);
void start_stomp_connect(tcp::resolver::iterator endpoint_iter);
+
+ //TODO: void setup_stomp_heartbeat(int cx, int cy);
+
void start_stomp_heartbeat();
void handle_stomp_heartbeat(const boost::system::error_code& ec);
@@ -133,24 +135,29 @@ namespace STOMP {
//----------------
public:
//----------------
- //BoostStomp(boost::asio::io_service& io_service, const std::string& hostname, const int port);
+ // constructor
BoostStomp(string& hostname, int& port, AckMode ackmode = ACK_AUTO);
+ // destructor
~BoostStomp();
- // thread-safe methods called from outside the thread loop
- bool subscribe ( std::string& topic, pfnOnStompMessage_t callback );
+ void start();
+ void stop();
+
+ // thread-safe methods called from outside the thread loop
bool send ( std::string& topic, hdrmap _headers, std::string& body );
+ bool send ( std::string& topic, hdrmap _headers, std::string& body, pfnOnStompMessage_t callback );
+ bool subscribe ( std::string& topic, pfnOnStompMessage_t callback );
+ bool unsubscribe ( std::string& topic );
+ bool acknowledge ( Frame& _frame );
+ // STOMP transactions
+ int begin(); // returns a new transaction id
+ bool commit(string& transaction_id);
+ bool abort(string& transaction_id);
//
- //BoostStompState& get_state() { return m_fsm.getState(); };
AckMode get_ackmode() { return m_ackmode; };
//
}; //class
- // helper function
- void hexdump(const void *ptr, int buflen);
- void debug_print(boost::format& fmt);
- void debug_print(string& str);
- void debug_print(const char* str);
}
View
21 Main.cpp
@@ -38,7 +38,7 @@ using namespace STOMP;
using namespace std;
static BoostStomp* stomp_client;
-static string* notifications_topic = new string("/queue/zwave/monitor");
+static string notifications_topic = "/queue/zwave/monitor";
// -------------------------------------------------------
// a callback for any STOMP Frames in subscribed channels
@@ -62,24 +62,31 @@ int main(int argc, char *argv[]) {
int stomp_port = 61613;
try {
- // connect to STOMP server
+ // initiate a new BoostStomp client
stomp_client = new BoostStomp(stomp_host, stomp_port);
+ // start the client, (by connecting to the STOMP server)
+ stomp_client->start();
+
// subscribe to a channel
- stomp_client->subscribe(*notifications_topic, (STOMP::pfnOnStompMessage_t) &subscription_callback);
+ stomp_client->subscribe(notifications_topic, (STOMP::pfnOnStompMessage_t) &subscription_callback);
// construct a headermap
STOMP::hdrmap headers;
headers["header1"] = string("value1");
headers["header2:withcolon"] = string("value2");
headers["header3"] = string("value3");
- string body = string("this is the main message body");
+ string body = string("this is the FIRST message body");
// add an outgoing message to the queue
- stomp_client->send(*notifications_topic, headers, body);
+ stomp_client->send(notifications_topic, headers, body);
+ sleep(1);
+ string body2 = string("this is the SECOND message");
+ stomp_client->send(notifications_topic, headers, body2);
+ sleep(1);
+ string body3 = string("this is the THIRD message");
+ stomp_client->send(notifications_topic, headers, body3);
sleep(1);
- stomp_client->send(*notifications_topic, headers, body);
- while (1) sleep(1);
}
catch (std::exception& e)
{
View
7 Makefile
@@ -32,6 +32,9 @@ INCLUDES := -I .
all: main libbooststomp.a libbooststomp.so
+helpers.o: helpers.cpp helpers.h
+ $(CXX) $(CFLAGS) -c helpers.cpp $(INCLUDES)
+
BoostStomp.o: BoostStomp.cpp BoostStomp.hpp
$(CXX) $(CFLAGS) -c BoostStomp.cpp $(INCLUDES)
@@ -41,8 +44,8 @@ StompFrame.o: StompFrame.cpp StompFrame.hpp
Main.o: Main.cpp
$(CXX) $(CFLAGS) -c Main.cpp $(INCLUDES)
-main: Main.o BoostStomp.o StompFrame.o
- $(CXX) -o $@ $(LDFLAGS) Main.o BoostStomp.o StompFrame.o
+main: Main.o BoostStomp.o StompFrame.o helpers.o
+ $(CXX) -o $@ $(LDFLAGS) Main.o BoostStomp.o StompFrame.o helpers.o
# upx main
libbooststomp.a: BoostStomp.o StompFrame.o
View
18 README
@@ -17,16 +17,20 @@ along with the shared & static versions of the booststomp library.
Usage is simple: construct a BoostStomp object passing it the hostname
and port of the STOMP server:
- string stomp_host = "localhost";
- int stomp_port = 61613;
- stomp_client = new BoostStomp(stomp_host, stomp_port);
+ string stomp_host = "localhost";
+ int stomp_port = 61613;
+ stomp_client = new BoostStomp(stomp_host, stomp_port);
-subscribe to a channel, and pass in a callback for the handling of any messages of interest:
+..then, start it to initiate the TCP & STOMP connection:
- string notifications_topic = "/queue/zwave/monitor";
- stomp_client->subscribe(*notifications_topic, (STOMP::pfnOnStompMessage_t) &subscription_callback);
+ stomp_client->start();
+
+..then, subscribe to a channel, and pass in a callback for the handling of any messages of interest:
-send a STOMP frame, adding in any headers you like:
+ string notifications_topic = "/queue/zwave/monitor";
+ stomp_client->subscribe(*notifications_topic, (STOMP::pfnOnStompMessage_t) &subscription_callback);
+
+..then you can construct STOMP frames, adding in any headers you like, and add them to the send queue:
STOMP::hdrmap headers;
headers["header1"] = string("value1");
View
65 StompFrame.cpp
@@ -24,8 +24,13 @@ for more information on the LGPL, see:
#include "StompFrame.hpp"
+#include <boost/format.hpp>
+
namespace STOMP {
-
+
+ using namespace boost;
+ using namespace boost::xpressive;
+
/*
* Escaping is needed to allow header keys and values to contain those frame header
* delimiting octets as values. The CONNECT and CONNECTED frames do not escape the
@@ -88,5 +93,63 @@ namespace STOMP {
request.sputc('\0');
};
+ // --------------------------------------------------
+ Frame::Frame(xpressive::smatch& frame_match, boost::asio::streambuf& stomp_response)
+ // --------------------------------------------------
+ {
+ size_t framesize = frame_match.length(0);
+ debug_print(boost::format("-- parse_frame, frame size: %1% bytes") % framesize);
+ //hexdump(frame_match.str(0).c_str(), framesize);
+ stomp_response.consume(framesize);
+
+ //std::cout << "Command:" << frame_match[command] << std::endl;
+ // break down headers
+ std::string h = std::string(frame_match[tag_headers]);
+
+ xpressive::sregex_iterator cur( h.begin(), h.end(), re_header );
+ xpressive::sregex_iterator end;
+ for( ; cur != end; ++cur ) {
+ xpressive::smatch const &header = *cur;
+ //std::cout << "H:" << header[key] << "==" << header[value] << std::endl;
+ m_headers[*decode_header_token(header[tag_key].str().c_str())] = *decode_header_token(header[tag_value].str().c_str());
+ }
+ //
+ m_command = string(frame_match[tag_command]);
+ m_body = (frame_match[tag_body]) ? string(frame_match[tag_body]) : "";
+ };
+
+ // ----------------------------
+ vector<Frame*> parse_all_frames(boost::asio::streambuf& stomp_response)
+ // ----------------------------
+ {
+ vector<Frame*> results;
+ istream response_stream(&stomp_response);
+ xpressive::smatch frame_match;
+ string str;
+ //
+ // get all the responses in response stream
+ //debug_print(boost::format("parse_response before: (%1% bytes in stomp_response)") % stomp_response.size() );
+ //
+ // iterate over all frame matches
+ //
+ while ( std::getline( response_stream, str, '\0' ) ) {
+ //debug_print(boost::format("parse_response in loop: (%1% bytes in stomp_response)") % stomp_response.size());
+
+ if ( regex_match(str, frame_match, re_stomp_server_frame ) ) {
+ try {
+ Frame* next_frame = new Frame(frame_match, stomp_response);
+ results.push_back(next_frame);
+ } catch(...) {
+// TODO
+ }
+ } else {
+ debug_print(boost::format("parse_response error: mismatched frame: \n%1%") % str);
+ hexdump(str.c_str(), str.length());
+ }
+
+ }
+ //cout << "exiting, " << stomp_response.size() << " bytes still in stomp_response" << endl;
+ return(results);
+ };
}
View
39 StompFrame.hpp
@@ -7,12 +7,40 @@
#include <sstream>
#include <boost/asio.hpp>
#include <boost/algorithm/string/replace.hpp>
+#include <boost/xpressive/xpressive.hpp>
+
+#include "helpers.h"
namespace STOMP {
using namespace std;
+ using namespace boost;
using namespace boost::asio;
+ using namespace boost::xpressive;
+ static xpressive::sregex re_stomp_client_command = as_xpr("CONNECT") | "DISCONNECT"
+ | "SEND" | "SUBSCRIBE" | "UNSUBSCRIBE"
+ | "BEGIN" | "COMMIT" | "ABORT"
+ | "ACK" | "NACK" ;
+
+
+ // regular expressions to parse a STOMP server frame
+
+ static xpressive::sregex re_stomp_server_command = as_xpr("CONNECTED")
+ | "MESSAGE" | "RECEIPT" | "ERROR"
+ | "ACK" | "NACK" ;
+ //
+ static xpressive::mark_tag tag_command(1), tag_headers(2), tag_body(3);
+ static xpressive::mark_tag tag_key(1), tag_value(2);
+ //
+ //static sregex re_header = (key= -+alnum) >> ':' >> (value= -+_) >> _n;
+ static xpressive::sregex re_header = (tag_key= -+~as_xpr(':')) >> ':' >> (tag_value= -+_) >> _n;
+ // FIXME: use content-length header to read not up to the end of the stream (which could contain another frame too!)
+ // but up to the content-length byte.
+ static xpressive::sregex re_stomp_server_frame = bos >> (tag_command= re_stomp_server_command ) >> _n // command and newline
+ >> (tag_headers= -+(re_header)) >> _n // headers and terminating newline
+ >> (tag_body= *_) >> eos; //body till end of stream (\0)
+
/* STOMP Frame header map */
typedef map<string, string> hdrmap;
@@ -29,6 +57,7 @@ namespace STOMP {
boost::asio::streambuf request;
public:
+
// constructors
Frame(string cmd):
m_command(cmd)
@@ -53,14 +82,22 @@ namespace STOMP {
m_body = other.m_body;
};
+ // constructor from boost::xpressive regex match object
+ Frame(xpressive::smatch& framestr, boost::asio::streambuf& stomp_response);
+
+ //
string command() { return m_command; };
hdrmap headers() { return m_headers; };
string body() { return m_body; };
+ // encode a STOMP Frame into a streambuf
void encode();
}; // class Frame
-
+
+ // return a vector of all Frame's in a streambuf
+ vector<Frame*> parse_all_frames(boost::asio::streambuf& stomp_response);
+
string* encode_header_token(const char* str);
string* decode_header_token(const char* str);

0 comments on commit c473992

Please sign in to comment.
Something went wrong with that request. Please try again.