Skip to content

Commit

Permalink
chained sending for connections, ping/pong, namespaced
Browse files Browse the repository at this point in the history
  • Loading branch information
RJ committed May 28, 2009
1 parent 6b90016 commit 74ed16b
Show file tree
Hide file tree
Showing 8 changed files with 185 additions and 77 deletions.
7 changes: 7 additions & 0 deletions app/main.cpp
Expand Up @@ -10,6 +10,7 @@
#include "libf2f/protocol.h" #include "libf2f/protocol.h"


using namespace std; using namespace std;
using namespace libf2f;


void iorun( boost::asio::io_service * ios ) void iorun( boost::asio::io_service * ios )
{ {
Expand Down Expand Up @@ -62,6 +63,12 @@ int main(int argc, char **argv)
boost::asio::ip::tcp::endpoint ep(ipaddr, rp); boost::asio::ip::tcp::endpoint ep(ipaddr, rp);
r.connect_to_remote( ep ); r.connect_to_remote( ep );
} }

if(parts[0] == "pingall")
{
message_ptr ping = PingMessage::factory();
r.foreach_conns( boost::bind(&Connection::async_write, _1, ping) );
}
} }


ios.stop(); ios.stop();
Expand Down
15 changes: 11 additions & 4 deletions include/libf2f/connection.h
Expand Up @@ -15,6 +15,8 @@


#include "libf2f/message.h" #include "libf2f/message.h"


namespace libf2f {

class Connection; class Connection;
typedef boost::shared_ptr<Connection> connection_ptr; typedef boost::shared_ptr<Connection> connection_ptr;


Expand All @@ -27,7 +29,8 @@ class Connection
public: public:


Connection( boost::asio::io_service& io_service, Connection( boost::asio::io_service& io_service,
boost::function< void(message_ptr, connection_ptr) > cb ); boost::function< void(message_ptr, connection_ptr) > msg_cb,
boost::function< void(connection_ptr) > fin_cb );


~Connection(); ~Connection();


Expand All @@ -39,9 +42,10 @@ class Connection
boost::asio::ip::tcp::socket& socket(); boost::asio::ip::tcp::socket& socket();


/// Asynchronously write a data structure to the socket. /// Asynchronously write a data structure to the socket.
/// This just enqueues the data, the router's flow-control algo /// This just enqueues the data and returns immediately
/// decides when/if to send it.
void async_write(message_ptr msg); void async_write(message_ptr msg);
/// this is called internally to do actual sending:
void do_async_write(const boost::system::error_code& e, message_ptr finished_msg);


/// Setup a call to read the next msg_header /// Setup a call to read the next msg_header
void async_read(); void async_read();
Expand Down Expand Up @@ -71,11 +75,14 @@ class Connection
/// Stateful stuff the protocol handler/servent will set: /// Stateful stuff the protocol handler/servent will set:
std::string m_username; // username of user at end of Connection std::string m_username; // username of user at end of Connection
bool m_authed; bool m_authed;
bool m_sending;



//Router * m_router;
boost::function< void(message_ptr, connection_ptr) > m_message_received_cb; boost::function< void(message_ptr, connection_ptr) > m_message_received_cb;
boost::function< void(connection_ptr) > m_fin_cb; // call when we die.


}; };


} //ns


#endif #endif
18 changes: 12 additions & 6 deletions include/libf2f/message.h
Expand Up @@ -20,6 +20,8 @@
#define SIDCANCEL 8 #define SIDCANCEL 8
#define BYE 9 #define BYE 9


namespace libf2f {

class Message; class Message;
typedef boost::shared_ptr<Message> message_ptr; typedef boost::shared_ptr<Message> message_ptr;


Expand Down Expand Up @@ -70,23 +72,27 @@ class Message
{} {}


Message(const message_header& header) Message(const message_header& header)
: m_payload(0)
{ {
m_header = header; m_header = header;
m_guid = std::string(header.guid, 36); m_guid = std::string(header.guid, 36);
std::cout << "CTOR Msg(" << m_guid << ")" << std::endl; //std::cout << "CTOR Msg(" << m_guid << ")" << std::endl;
} }


virtual ~Message() virtual ~Message()
{ {
std::cout << "DTOR Msg(" << m_guid << ")" << std::endl; //std::cout << "DTOR Msg(" << m_guid << ")" << std::endl;
free(m_payload); if(m_payload) free(m_payload);
} }


virtual const std::string str() const virtual const std::string str() const
{ {
std::ostringstream os; std::ostringstream os;
os << "[Msg type:" << type() << " ttl:" << ttl() << " hops:" << hops() os << "[Msg type:" << (int)type()
<< " length:" << length() << " guid:" << guid() << "]"; << " ttl:" << (int)ttl()
<< " hops:" << (int)hops()
<< " length:" << (int)length()
<< " guid:" << guid() << "]";
return os.str(); return os.str();
} }


Expand Down Expand Up @@ -190,5 +196,5 @@ class DataMessage : public Message
mutable std::string m_sid; mutable std::string m_sid;
}; };



}
#endif #endif
12 changes: 11 additions & 1 deletion include/libf2f/protocol.h
Expand Up @@ -4,9 +4,15 @@
#include "libf2f/router.h" #include "libf2f/router.h"
#include "libf2f/connection.h" #include "libf2f/connection.h"


namespace libf2f {

class Protocol class Protocol
{ {
public: public:
Protocol();

virtual void set_router(Router * r) { m_router = r; }

/// called when a client connects to us /// called when a client connects to us
virtual bool new_incoming_connection( connection_ptr conn ); virtual bool new_incoming_connection( connection_ptr conn );


Expand All @@ -16,6 +22,10 @@ class Protocol
/// we received a msg from this connection /// we received a msg from this connection
virtual void message_received( message_ptr msgp, connection_ptr conn ); virtual void message_received( message_ptr msgp, connection_ptr conn );



private:
Router * m_router;
}; };

} //ns

#endif #endif
15 changes: 11 additions & 4 deletions include/libf2f/router.h
Expand Up @@ -11,6 +11,8 @@
#include "libf2f/message.h" #include "libf2f/message.h"
#include "libf2f/connection.h" #include "libf2f/connection.h"


namespace libf2f {

class Protocol; class Protocol;




Expand Down Expand Up @@ -43,13 +45,18 @@ class Router
/// Default message recvd callback /// Default message recvd callback
void message_received( message_ptr msgp, connection_ptr conn ); void message_received( message_ptr msgp, connection_ptr conn );


/// Run in a thread, does the flow-control and sends messages /// apply function to all registered connections
void message_dispatch_runner(); void foreach_conns( boost::function<void(connection_ptr)> );



private: private:
/// Router keeps track of connections:
void register_connection( connection_ptr conn );
void unregister_connection( connection_ptr conn );
void connections_str();

/// all connections: /// all connections:
std::vector< connection_ptr > m_connections; std::vector< connection_ptr > m_connections;
boost::mutex m_connections_mutex; // protects connections


/// The acceptor object used to accept incoming socket connections. /// The acceptor object used to accept incoming socket connections.
boost::asio::ip::tcp::acceptor & m_acceptor; boost::asio::ip::tcp::acceptor & m_acceptor;
Expand All @@ -62,6 +69,6 @@ class Router
unsigned int seen_connections; // num incoming connections accepted unsigned int seen_connections; // num incoming connections accepted
}; };



} //ns


#endif #endif
60 changes: 54 additions & 6 deletions src/connection.cpp
@@ -1,10 +1,16 @@
#include "libf2f/connection.h" #include "libf2f/connection.h"
#include <boost/foreach.hpp> #include <boost/foreach.hpp>


namespace libf2f {

using namespace std;

Connection::Connection( boost::asio::io_service& io_service, Connection::Connection( boost::asio::io_service& io_service,
boost::function< void(message_ptr, connection_ptr) > cb ) boost::function< void(message_ptr, connection_ptr) > msg_cb,
: m_socket(io_service), m_authed(false), boost::function< void(connection_ptr) > fin_cb )
m_message_received_cb(cb) : m_socket(io_service), m_authed(false), m_sending(false),
m_message_received_cb(msg_cb),
m_fin_cb(fin_cb)
{ {
std::cout << "CTOR connection" << std::endl; std::cout << "CTOR connection" << std::endl;
max_writeq_size = 20*1024; // 20kb max_writeq_size = 20*1024; // 20kb
Expand All @@ -25,6 +31,7 @@ void
Connection::fin() Connection::fin()
{ {
std::cout << "FIN connection " << str() << std::endl; std::cout << "FIN connection " << str() << std::endl;
m_fin_cb( shared_from_this() );
close(); close();
} }


Expand All @@ -43,8 +50,13 @@ Connection::async_write(message_ptr msg)
m_writeq.push_back(msg); m_writeq.push_back(msg);
m_writeq_size += sizeof(message_header) + msg->length(); m_writeq_size += sizeof(message_header) + msg->length();
} }
// make sure our sending loop is running:
boost::system::error_code e;
do_async_write( e, message_ptr() );
} }


/// Reading incoming messages is a chain of 3 async methods:
/// async_read() -> handle_read_headers() -> handle_read_data()
void void
Connection::async_read() Connection::async_read()
{ {
Expand All @@ -59,7 +71,7 @@ Connection::async_read()
msgp msgp
)); ));
} }

/// called when we've read the header off the wire
void void
Connection::handle_read_header(const boost::system::error_code& e, message_ptr msgp) Connection::handle_read_header(const boost::system::error_code& e, message_ptr msgp)
{ {
Expand Down Expand Up @@ -91,7 +103,7 @@ Connection::handle_read_header(const boost::system::error_code& e, message_ptr m
)); ));


} }

/// called when we've read header and payload off the wire
void void
Connection::handle_read_data(const boost::system::error_code& e, message_ptr msgp) Connection::handle_read_data(const boost::system::error_code& e, message_ptr msgp)
{ {
Expand All @@ -112,7 +124,7 @@ Connection::set_message_received_cb( boost::function< void(message_ptr, connecti
{ {
m_message_received_cb = cb; m_message_received_cb = cb;
} }

/// unused atm.. todo flow control?
size_t size_t
Connection::drain_writeq( std::deque< message_ptr > & out ) Connection::drain_writeq( std::deque< message_ptr > & out )
{ {
Expand All @@ -137,3 +149,39 @@ Connection::str() const
return os.str(); return os.str();
} }


/// Calls to do_async_write are chained - it will call itself when a write
/// completes, to send the next msg in the queue. Bails when none left.
void
Connection::do_async_write(const boost::system::error_code& e, message_ptr finished_msg)
{
message_ptr msgp;
{ // mutex scope
boost::mutex::scoped_lock lk(m_mutex);
if(m_sending && !finished_msg)
{
// this call is telling us to send, but we already are.
//cout << "bailing from do_async_write - already sending (sending=true)" << endl;
return;
}

if(m_writeq.empty())
{
//cout << "bailing from do_async_write, q empty (sending=false)" << endl;
m_sending = false;
return;
}

msgp = m_writeq.front();
m_writeq.pop_front();
m_writeq_size -= (sizeof(message_header) + msgp->length());
m_sending = true;
} // mutex scope

boost::asio::async_write( socket(), msgp->to_buffers(),
boost::bind( &Connection::do_async_write, this,
boost::asio::placeholders::error,
msgp ) );
}

} //ns

11 changes: 10 additions & 1 deletion src/protocol.cpp
@@ -1,7 +1,14 @@
#include "libf2f/protocol.h" #include "libf2f/protocol.h"


namespace libf2f {

using namespace std; using namespace std;


Protocol::Protocol()
{
cout << "CTOR Protocol" << endl;
}

bool bool
Protocol::new_incoming_connection( connection_ptr conn ) Protocol::new_incoming_connection( connection_ptr conn )
{ {
Expand All @@ -25,4 +32,6 @@ Protocol::message_received( message_ptr msgp, connection_ptr conn )
{ {
cout << "Protocol::message_received " << conn->str() << endl cout << "Protocol::message_received " << conn->str() << endl
<< msgp->str() << endl; << msgp->str() << endl;
} }

} //ns

0 comments on commit 74ed16b

Please sign in to comment.