Browse files

foreach_conns_except because i can't make boost lambda behave..

  • Loading branch information...
1 parent 74ed16b commit 162ed96d34d5a635ec70722a7e22aa4a7ae43682 @RJ committed May 29, 2009
Showing with 176 additions and 49 deletions.
  1. +21 −10 app/main.cpp
  2. +9 −3 include/libf2f/connection.h
  3. +44 −8 include/libf2f/message.h
  4. +6 −1 include/libf2f/protocol.h
  5. +18 −5 include/libf2f/router.h
  6. +27 −7 src/connection.cpp
  7. +2 −2 src/protocol.cpp
  8. +49 −13 src/router.cpp
View
31 app/main.cpp
@@ -4,14 +4,16 @@
#include <boost/asio.hpp>
#include <boost/thread/thread.hpp>
#include <boost/algorithm/string.hpp>
-
+#include "boost/lambda/lambda.hpp"
+#include <boost/lambda/if.hpp>
#include "libf2f/router.h"
#include "libf2f/protocol.h"
using namespace std;
using namespace libf2f;
+
void iorun( boost::asio::io_service * ios )
{
ios->run();
@@ -31,13 +33,14 @@ int main(int argc, char **argv)
Protocol p;
short port = atoi(argv[1]);
cout << "Listening on port " << port << endl;
- boost::asio::ip::tcp::acceptor acc(
- ios,
- boost::asio::ip::tcp::endpoint(
- boost::asio::ip::tcp::v4(),
- port)
+ boost::shared_ptr<boost::asio::ip::tcp::acceptor> accp(
+ new boost::asio::ip::tcp::acceptor
+ (ios, boost::asio::ip::tcp::endpoint(
+ boost::asio::ip::tcp::v4(),
+ port)
+ )
);
- Router r(acc, &p);
+ Router r(accp, &p);
boost::thread t( boost::bind(&iorun, &ios) );
@@ -66,12 +69,20 @@ int main(int argc, char **argv)
if(parts[0] == "pingall")
{
- message_ptr ping = PingMessage::factory();
- r.foreach_conns( boost::bind(&Connection::async_write, _1, ping) );
+ message_ptr ping = message_ptr(new PingMessage());
+ r.send_all(ping);
+ }
+
+ if(parts[0] == "query" && parts.size() == 2)
+ {
+ message_ptr search = message_ptr(new GeneralMessage(QUERY, parts[1]) );
+ r.send_all(search);
}
+
+ if(parts[0] == "quit") break;
}
- ios.stop();
+ r.stop();
t.join();
return 0;
}
View
12 include/libf2f/connection.h
@@ -4,6 +4,7 @@
#include <boost/asio.hpp>
#include <boost/bind.hpp>
#include <boost/shared_ptr.hpp>
+#include <boost/weak_ptr.hpp>
#include <boost/enable_shared_from_this.hpp>
#include <boost/tuple/tuple.hpp>
#include <boost/thread/thread.hpp>
@@ -19,6 +20,7 @@ namespace libf2f {
class Connection;
typedef boost::shared_ptr<Connection> connection_ptr;
+typedef boost::weak_ptr<Connection> connection_ptr_weak;
/// This class represents a Connection to one other libf2f user.
/// it knows how to marshal objects to and from the wire protocol
@@ -60,7 +62,8 @@ class Connection
size_t drain_writeq( std::deque< message_ptr > & out );
- void set_message_received_cb( boost::function< void(message_ptr, connection_ptr) > cb );
+ void push_message_received_cb( boost::function< void(message_ptr, connection_ptr) > cb );
+ void pop_message_received_cb();
std::string str() const;
@@ -74,13 +77,16 @@ class Connection
/// Stateful stuff the protocol handler/servent will set:
std::string m_username; // username of user at end of Connection
- bool m_authed;
+ //bool m_authed;
bool m_sending;
+ std::vector< boost::function<void(message_ptr, connection_ptr)> > m_message_received_cbs;
+ boost::mutex m_message_received_cb_mutex; // protects the above
- boost::function< void(message_ptr, connection_ptr) > m_message_received_cb;
boost::function< void(connection_ptr) > m_fin_cb; // call when we die.
+ bool m_shuttingdown;
+
};
} //ns
View
52 include/libf2f/message.h
@@ -75,7 +75,6 @@ class Message
: m_payload(0)
{
m_header = header;
- m_guid = std::string(header.guid, 36);
//std::cout << "CTOR Msg(" << m_guid << ")" << std::endl;
}
@@ -101,10 +100,23 @@ class Message
virtual const short ttl() const { return m_header.ttl; }
virtual const short hops() const { return m_header.hops; }
virtual const boost::uint32_t length() const { return ntohl(m_header.length); }
- virtual const std::string& guid() const { return m_guid; }
+ virtual const std::string& guid() const
+ {
+ if( m_guid.empty() )
+ {
+ m_guid = std::string(m_header.guid, 36);
+ }
+ return m_guid;
+ }
// payload
virtual const char * payload() const { return m_payload; }
virtual char * payload() { return m_payload; }
+ /// sucks to have to do this really, jsonspirit needs a string or stream:
+ virtual std::string payload_str() const
+ {
+ std::string s(m_payload, length());
+ return s;
+ }
virtual size_t malloc_payload()
{
@@ -132,16 +144,18 @@ class Message
return buffers;
}
-private:
+protected:
message_header m_header;
- std::string m_guid;
+ mutable std::string m_guid;
char * m_payload;
};
+
+
class PongMessage : public Message
{
public:
- static boost::shared_ptr<Message> factory()
+ PongMessage()
{
message_header h;
memcpy( &h.guid, std::string(GENUUID).data(), 36 );
@@ -150,26 +164,47 @@ class PongMessage : public Message
h.ttl = 1;
h.hops = 0;
h.length = 0;
- return boost::shared_ptr<Message>(new Message( h ));
+ m_header = h;
+ m_payload = 0;
}
};
class PingMessage : public Message
{
public:
- static boost::shared_ptr<Message> factory()
+ PingMessage()
{
message_header h;
memcpy( &h.guid, std::string(GENUUID).data(), 36 );
+ //h.guid = GENUUID;
h.type = PING;
h.ttl = 1;
h.hops = 0;
h.length = 0;
- return boost::shared_ptr<Message>(new Message( h ));
+ m_header = h;
+ m_payload = 0;
}
};
+class GeneralMessage : public Message
+{
+public:
+ GeneralMessage(const char msgtype, const std::string& body)
+ {
+ message_header h;
+ memcpy( &h.guid, std::string(GENUUID).data(), 36 );
+ //h.guid = GENUUID;
+ h.type = msgtype;
+ h.ttl = 1;
+ h.hops = 0;
+ h.length = htonl( body.length() );
+ m_header = h;
+ malloc_payload();
+ memcpy( m_payload, body.data(), body.length() );
+ }
+};
+/*
/// File transfer/streaming msgs are special in that they have an additional header describing the payload (just the sid)
@@ -195,6 +230,7 @@ class DataMessage : public Message
private:
mutable std::string m_sid;
};
+*/
}
#endif
View
7 include/libf2f/protocol.h
@@ -10,6 +10,8 @@ class Protocol
{
public:
Protocol();
+
+ virtual ~Protocol(){}
virtual void set_router(Router * r) { m_router = r; }
@@ -19,10 +21,13 @@ class Protocol
/// called when we opened a socket to a remote servent
virtual void new_outgoing_connection( connection_ptr conn );
+ /// called on a disconnection, for whatever reason
+ virtual void connection_terminated( connection_ptr conn ){}
+
/// we received a msg from this connection
virtual void message_received( message_ptr msgp, connection_ptr conn );
-private:
+protected:
Router * m_router;
};
View
23 include/libf2f/router.h
@@ -5,6 +5,8 @@
#include <boost/shared_ptr.hpp>
#include <boost/bind.hpp>
#include <boost/lexical_cast.hpp>
+#include "boost/lambda/lambda.hpp"
+#include <boost/lambda/if.hpp>
#include <iostream>
#include <vector>
@@ -17,8 +19,6 @@ class Protocol;
/// aka servent - responsible for managing connections
-/// it will authenticate connections before creating a Connection
-/// object and starting to send/recv data using flow control mechanism.
class Router
{
public:
@@ -28,8 +28,10 @@ class Router
/// acceptor(io_service,
/// boost::asio::ip::tcp::endpoint(
/// boost::asio::ip::tcp::v4(), port) )
- Router( boost::asio::ip::tcp::acceptor & acc, Protocol * p );
+ Router( boost::shared_ptr<boost::asio::ip::tcp::acceptor> accp, Protocol * p );
+ /// calls io_service::stop on acceptor.
+ void stop();
/// Handle completion of a accept operation.
void handle_accept(const boost::system::error_code& e, connection_ptr conn);
@@ -42,24 +44,35 @@ class Router
boost::asio::ip::tcp::endpoint &endpoint,
connection_ptr conn);
+ /// connection terminated for any reason
+ void connection_terminated( connection_ptr conn );
+
/// Default message recvd callback
void message_received( message_ptr msgp, connection_ptr conn );
/// apply function to all registered connections
void foreach_conns( boost::function<void(connection_ptr)> );
+ /// apply function to all registered connections *except* conn
+ void foreach_conns_except( boost::function<void(connection_ptr)> fun, connection_ptr conn );
+
+ /// send msg to all registered connections
+ void send_all( message_ptr msgp );
+
+ std::string connections_str();
private:
/// Router keeps track of connections:
void register_connection( connection_ptr conn );
void unregister_connection( connection_ptr conn );
- void connections_str();
+
/// all connections:
std::vector< connection_ptr > m_connections;
boost::mutex m_connections_mutex; // protects connections
/// The acceptor object used to accept incoming socket connections.
- boost::asio::ip::tcp::acceptor & m_acceptor;
+ boost::shared_ptr<boost::asio::ip::tcp::acceptor> m_acceptor;
+
/// protocol implementation
Protocol * m_protocol;
/// thread that enforces flow-control and sends outgoing msgs
View
34 src/connection.cpp
@@ -8,11 +8,12 @@ using namespace std;
Connection::Connection( boost::asio::io_service& io_service,
boost::function< void(message_ptr, connection_ptr) > msg_cb,
boost::function< void(connection_ptr) > fin_cb )
- : m_socket(io_service), m_authed(false), m_sending(false),
- m_message_received_cb(msg_cb),
- m_fin_cb(fin_cb)
+ : m_socket(io_service), m_sending(false),
+ m_fin_cb(fin_cb),
+ m_shuttingdown(false)
{
std::cout << "CTOR connection" << std::endl;
+ push_message_received_cb( msg_cb );
max_writeq_size = 20*1024; // 20kb
}
@@ -30,6 +31,7 @@ Connection::close()
void
Connection::fin()
{
+ m_shuttingdown = true;
std::cout << "FIN connection " << str() << std::endl;
m_fin_cb( shared_from_this() );
close();
@@ -60,6 +62,7 @@ Connection::async_write(message_ptr msg)
void
Connection::async_read()
{
+ if( m_shuttingdown ) return;
message_ptr msgp(new Message());
// Read exactly the number of bytes in a header
boost::asio::async_read(m_socket,
@@ -76,6 +79,7 @@ void
Connection::handle_read_header(const boost::system::error_code& e, message_ptr msgp)
{
//cout << "handle_read_header" << endl;
+ if( m_shuttingdown ) return;
if (e)
{
std::cerr << "err" << std::endl;
@@ -107,23 +111,37 @@ Connection::handle_read_header(const boost::system::error_code& e, message_ptr m
void
Connection::handle_read_data(const boost::system::error_code& e, message_ptr msgp)
{
+ if( m_shuttingdown ) return;
if (e)
{
std::cerr << "errrrrrr" << std::endl;
fin();
return;
}
- // inform router that we received a new message
- m_message_received_cb( msgp, shared_from_this() );
+ // report that we received a new message
+ {
+ boost::mutex::scoped_lock lk(m_message_received_cb_mutex);
+ m_message_received_cbs.back()( msgp, shared_from_this() );
+ }
// setup recv for next message:
async_read();
}
+
+void
+Connection::push_message_received_cb( boost::function< void(message_ptr, connection_ptr) > cb )
+{
+ boost::mutex::scoped_lock lk(m_message_received_cb_mutex);
+ m_message_received_cbs.push_back(cb);
+}
+
void
-Connection::set_message_received_cb( boost::function< void(message_ptr, connection_ptr) > cb )
+Connection::pop_message_received_cb()
{
- m_message_received_cb = cb;
+ boost::mutex::scoped_lock lk(m_message_received_cb_mutex);
+ m_message_received_cbs.pop_back();
}
+
/// unused atm.. todo flow control?
size_t
Connection::drain_writeq( std::deque< message_ptr > & out )
@@ -145,6 +163,8 @@ Connection::str() const
std::ostringstream os;
os << "[Connection:"
<< m_socket.remote_endpoint().address().to_string()
+ << ":"
+ << m_socket.remote_endpoint().port()
<< "]";
return os.str();
}
View
4 src/protocol.cpp
@@ -13,7 +13,7 @@ bool
Protocol::new_incoming_connection( connection_ptr conn )
{
cout << "Protocol::new_incoming_connection " << conn->str() << endl;
- conn->async_write( PingMessage::factory() );
+ conn->async_write( message_ptr(new PingMessage()) );
// first thing to expect is an ident msg, so set the msg handler to one
// that expects it, and kills the connection otherwise:
@@ -24,7 +24,7 @@ void
Protocol::new_outgoing_connection( connection_ptr conn )
{
cout << "Protocol::new_outgoing_connection " << conn->str() << endl;
- conn->async_write( PingMessage::factory() );
+ conn->async_write( message_ptr(new PingMessage()) );
}
void
View
62 src/router.cpp
@@ -7,27 +7,41 @@
#include <boost/foreach.hpp>
// How we typically prep a new connection object:
-#define NEWCONN new Connection(m_acceptor.io_service(), \
+#define NEWCONN new Connection(m_acceptor->io_service(), \
boost::bind( &Router::message_received, this, _1, _2 ), \
- boost::bind( &Router::unregister_connection, this, _1) )
+ boost::bind( &Router::connection_terminated, this, _1) )
namespace libf2f {
using namespace std;
-Router::Router( boost::asio::ip::tcp::acceptor& acc, Protocol * p )
- : m_acceptor( acc ), m_protocol( p ), seen_connections(0)
+Router::Router( boost::shared_ptr<boost::asio::ip::tcp::acceptor> accp,
+ Protocol * p )
+ : m_acceptor( accp ), m_protocol( p ), seen_connections(0)
{
p->set_router( this );
// Start an accept operation for a new connection.
connection_ptr new_conn(NEWCONN);
- m_acceptor.async_accept(new_conn->socket(),
+ m_acceptor->async_accept(new_conn->socket(),
boost::bind(&Router::handle_accept, this,
boost::asio::placeholders::error, new_conn));
}
+void
+Router::stop()
+{
+ m_acceptor->get_io_service().stop();
+}
+
+void
+Router::connection_terminated( connection_ptr conn )
+{
+ unregister_connection( conn );
+ m_protocol->connection_terminated( conn );
+}
+
/// Handle completion of a accept operation.
void
Router::handle_accept(const boost::system::error_code& e, connection_ptr conn)
@@ -54,7 +68,7 @@ Router::handle_accept(const boost::system::error_code& e, connection_ptr conn)
// Start an accept operation for a new connection.
connection_ptr new_conn(NEWCONN);
- m_acceptor.async_accept(new_conn->socket(),
+ m_acceptor->async_accept(new_conn->socket(),
boost::bind(&Router::handle_accept, this,
boost::asio::placeholders::error, new_conn));
}
@@ -75,7 +89,7 @@ Router::register_connection( connection_ptr conn )
}
}
m_connections.push_back( conn );
- connections_str();
+ cout << connections_str() << endl;
}
void
@@ -91,18 +105,20 @@ Router::unregister_connection( connection_ptr conn )
cout << "Router::unregistered " << conn->str() << endl;
}
}
- connections_str();
+ cout << connections_str() << endl;
}
-void
+string
Router::connections_str()
{
- cout << "<connections>" << endl;
+ ostringstream os;
+ os << "<connections>" << endl;
BOOST_FOREACH( connection_ptr conn, m_connections )
{
- cout << conn->str() << endl;
+ os << conn->str() << endl;
}
- cout << "</connections>" << endl;
+ os << "</connections>" << endl;
+ return os.str();
}
/// this is the default msg recvd callback passed to new connections:
@@ -126,7 +142,7 @@ Router::message_received( message_ptr msgp, connection_ptr conn )
if( msgp->type() == PING )
{
cout << "got PING, responding.." << endl;
- conn->async_write( PongMessage::factory() );
+ conn->async_write( message_ptr(new PongMessage()) );
return;
}
@@ -183,4 +199,24 @@ Router::foreach_conns( boost::function<void(connection_ptr)> fun )
}
+
+void
+Router::foreach_conns_except( boost::function<void(connection_ptr)> fun, connection_ptr conn )
+{
+ boost::mutex::scoped_lock lk(m_connections_mutex);
+ BOOST_FOREACH( connection_ptr c, m_connections )
+ {
+ if( c == conn ) continue;
+ fun( c );
+ }
+}
+
+void
+Router::send_all( message_ptr msgp )
+{
+ foreach_conns( boost::bind(&Connection::async_write, _1, msgp) );
+}
+
+
+
} //ns

0 comments on commit 162ed96

Please sign in to comment.