Permalink
Browse files

fixed subtle synchronisation bug

  • Loading branch information...
1 parent a1714ef commit 9c52818e7ed6e9e39f4f3736ff1f024c8b40967c @ekarak committed May 9, 2012
Showing with 59 additions and 41 deletions.
  1. +39 −23 BoostStomp.cpp
  2. +17 −17 StompFrame.cpp
  3. +3 −1 StompFrame.hpp
View
@@ -224,12 +224,20 @@ namespace STOMP {
if (!ec)
{
std::size_t bodysize = 0;
- //debug_print(boost::format("received response (command+headers: %1% bytes)") % stomp_response.size() );
- start_stomp_read_body(bodysize);
-
try {
+ //debug_print("handle_stomp_read_headers");
m_rcvd_frame = new Frame(stomp_response, cmd_map);
+ hdrmap& _headers = m_rcvd_frame->headers();
+ // if the frame headers contain 'content-length', use that to call the proper async_read overload
+ if (_headers.find("content-length") != _headers.end()) {
+ string& content_length = _headers["content-length"];
+ debug_print(boost::format("received response (command+headers: %1% bytes, content-length: %2%)") % stomp_response.size() % content_length );
+ sleep(1);
+ bodysize = lexical_cast<size_t>(content_length);
+ }
+ start_stomp_read_body(bodysize);
} catch(NoMoreFrames&) {
+ debug_print("No more frames!");
// break;
} catch(std::exception& e) {
debug_print(boost::format("handle_stomp_read in loop: unknown exception in Frame constructor:\n%1%") % e.what());
@@ -249,7 +257,7 @@ namespace STOMP {
void BoostStomp::start_stomp_read_body(std::size_t bodysize)
// -----------------------------------------------
{
- //debug_print("start_stomp_read_body");
+ debug_print("start_stomp_read_body");
// Start an asynchronous operation to read at least the STOMP frame body
if (bodysize == 0) {
boost::asio::async_read_until(
@@ -281,8 +289,7 @@ namespace STOMP {
//
//debug_print("stomp_response contents after Frame scanning:");
//hexdump(stomp_response);
- // sleep a little: stompserver_ng has difficulties with packet flooding
- //usleep(50000);
+
// wait for the next incoming frame from the server...
start_stomp_read_headers();
}
@@ -317,10 +324,13 @@ namespace STOMP {
*m_socket,
stomp_request
);
- //debug_print("Sent!");
+ debug_print("Sent!");
+ delete frame;
} catch (boost::system::system_error& err){
m_connected = false;
debug_print(boost::format("Error writing to STOMP server: error code:%1%, message:%2%") % err.code() % err.what());
+ // put! the kot! down! slowly!
+ m_sendqueue.push(frame);
stop();
}
};
@@ -386,9 +396,9 @@ namespace STOMP {
{
m_connected = true;
// try to get supported protocol version from headers
- hdrmap headers = m_rcvd_frame->headers();
- if (headers.find("version") != headers.end()) {
- m_protocol_version = headers["version"];
+ hdrmap _headers = m_rcvd_frame->headers();
+ if (_headers.find("version") != _headers.end()) {
+ m_protocol_version = _headers["version"];
debug_print(boost::format("server supports STOMP version %1%") % m_protocol_version);
}
if (m_protocol_version == "1.1") {
@@ -411,12 +421,15 @@ namespace STOMP {
//-----------------------------------------
{
bool acked = true;
- string dest = string(m_rcvd_frame->headers()["destination"]);
- //
- if (pfnOnStompMessage_t callback_function = m_subscriptions[dest]) {
- //debug_print(boost::format("-- consume_frame: firing callback for %1%") % dest);
+ hdrmap& _headers = m_rcvd_frame->headers();
+ if (_headers.find("destination") != _headers.end()) {
+ string& dest = _headers["destination"];
//
- acked = callback_function(m_rcvd_frame);
+ if (pfnOnStompMessage_t callback_function = m_subscriptions[dest]) {
+ //debug_print(boost::format("-- consume_frame: firing callback for %1%") % dest);
+ //
+ acked = callback_function(m_rcvd_frame);
+ };
};
// acknowledge frame, if in "Client" or "Client-Individual" ack mode
if ((m_ackmode == ACK_CLIENT) || (m_ackmode == ACK_CLIENT_INDIVIDUAL)) {
@@ -428,17 +441,21 @@ namespace STOMP {
void BoostStomp::process_RECEIPT()
//-----------------------------------------
{
- // do something with receipt...
- debug_print(boost::format("receipt-id == %1%") % m_rcvd_frame->headers()["receipt_id"]);
+ hdrmap& _headers = m_rcvd_frame->headers();
+ if (_headers.find("receipt_id") != _headers.end()) {
+ string& receipt_id = _headers["receipt_id"];
+ // do something with receipt...
+ debug_print(boost::format("receipt-id == %1%") % receipt_id);
+ };
}
//-----------------------------------------
void BoostStomp::process_ERROR()
//-----------------------------------------
{
- hdrmap headers = m_rcvd_frame->headers();
- string errormessage = (headers.find("message") != headers.end()) ?
- headers["message"] :
+ hdrmap& _headers = m_rcvd_frame->headers();
+ string errormessage = (_headers.find("message") != _headers.end()) ?
+ _headers["message"] :
"(unknown error!)";
errormessage += m_rcvd_frame->body().c_str();
//throw(errormessage);
@@ -472,6 +489,7 @@ namespace STOMP {
bool BoostStomp::subscribe( string& topic, pfnOnStompMessage_t callback )
// ------------------------------------------
{
+ //debug_print(boost::format("Setting callback function for %1%") % topic);
m_subscriptions[topic] = callback;
return(do_subscribe(topic));
}
@@ -501,9 +519,7 @@ namespace STOMP {
bool BoostStomp::acknowledge(Frame* frame, bool acked = true)
// ------------------------------------------
{
- hdrmap hm;
- hm["message-id"] = frame->headers()["message-id"];
- hm["subscription"] = frame->headers()["subscription"];
+ hdrmap hm = frame->headers();
string _ack_cmd = (acked ? "ACK" : "NACK");
return(send_frame(new Frame( _ack_cmd, hm )));
}
View
@@ -101,12 +101,13 @@ namespace STOMP {
inline void mygetline (boost::asio::streambuf& sb, string& _str, char delim = '\n') {
const char* line = boost::asio::buffer_cast<const char*>(sb.data());
char _c;
+ size_t i;
_str.clear();
- for( size_t i = 0;
+ for( i = 0;
((i < sb.size()) && ((_c = line[i]) != delim));
i++
) _str += _c;
- //debug_print( boost::format("mygetline: sb.size==%1%") % sb.size() );
+ //debug_print( boost::format("mygetline: i=%1%, sb.size()==%2%") % i % sb.size() );
//hexdump(_str.c_str(), _str.size());
}
@@ -120,21 +121,21 @@ inline void mygetline (boost::asio::streambuf& sb, string& _str, char delim = '\
try {
// STEP 1: find the next STOMP command line in stomp_response.
// Chomp unknown lines till the buffer is empty, in which case an exception is raised
- //debug_print("Frame parser phase 1");
+ //debug_print(boost::format("Frame parser phase 1, stomp_response.size()==%1%") % stomp_response.size());
+ //hexdump(boost::asio::buffer_cast<const char*>(stomp_response.data()), stomp_response.size());
while (stomp_response.size() > 0) {
mygetline(stomp_response, _str);
//hexdump(_str.c_str(), _str.length());
stomp_response.consume(_str.size() + 1); // plus one for the newline
- if (_str.size() > 0) {
- if (cmd_map.find(_str) != cmd_map.end()) {
- //debug_print(boost::format("phase 1: COMMAND==%1%, sb.size==%2%") % _str % stomp_response.size());
- m_command = _str;
- break;
- }
- } else {
- throw(NoMoreFrames());
+ if (cmd_map.find(_str) != cmd_map.end()) {
+ //debug_print(boost::format("phase 1: COMMAND==%1%, sb.size==%2%") % _str % stomp_response.size());
+ m_command = _str;
+ break;
}
}
+ // if after all this trouble m_command is not set, and there's no more data in stomp_response
+ // (which shouldn't happen since we do async_read_until the double newline), then throw an exception
+ if (m_command == "") throw(NoMoreFrames());
// STEP 2: parse all headers
//debug_print("Frame parser phase 2");
@@ -144,8 +145,8 @@ inline void mygetline (boost::asio::streambuf& sb, string& _str, char delim = '\
stomp_response.consume(_str.size()+1);
boost::algorithm::split(header_parts, _str, is_any_of(":"));
if (header_parts.size() > 1) {
- string key = decode_header_token(header_parts[0]);
- string val = decode_header_token(header_parts[1]);
+ string& key = decode_header_token(header_parts[0]);
+ string& val = decode_header_token(header_parts[1]);
//debug_print(boost::format("phase 2: HEADER[%1%]==%2%") % key % val);
m_headers[key] = val;
//
@@ -167,12 +168,11 @@ inline void mygetline (boost::asio::streambuf& sb, string& _str, char delim = '\
std::size_t _content_length = 0, bytecount = 0;
string _str;
//debug_print("Frame parser phase 3");
-
// special case: content-length
if (m_headers.find("content-length") != m_headers.end()) {
- string val = m_headers["content-length"];
- _content_length = lexical_cast<int>(val);
- //debug_print(boost::format("phase 3: body content-length==%1%") % _content_length);
+ string& val = m_headers["content-length"];
+ //debug_print(boost::format("phase 3: body content-length==%1%") % val);
+ _content_length = lexical_cast<size_t>(val);
}
if (_content_length > 0) {
bytecount += _content_length;
View
@@ -112,7 +112,9 @@ namespace STOMP {
string& command() { return m_command; };
hdrmap& headers() { return m_headers; };
binbody& body() { return m_body; };
-
+ //
+ string& operator[](const char* key) { return m_headers[key]; };
+ //
// encode a STOMP Frame into m_request and return it
boost::asio::streambuf& encode(boost::asio::streambuf& _request);

0 comments on commit 9c52818

Please sign in to comment.