Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with
or
.
Download ZIP
Browse files

now the demo app works properly again

  • Loading branch information...
commit 71ae0bb5a33d39c776f19ca5735f3be36d716b4a 1 parent 19955c6
@RJ authored
View
1  CMakeLists.txt
@@ -14,6 +14,7 @@ SET(CMAKE_LIBRARY_OUTPUT_DIRECTORY "${F2F_PATH}/lib")
# alternatively, recompile boost with -fPIC like so:
# bjam --v2 cxxflags=-fPIC stage
SET(Boost_USE_STATIC_LIBS ON)
+#SET(Boost_USE_STATIC_LIBS OFF)
SET(Boost_USE_MULTITHREADED ON)
FIND_PACKAGE(Boost 1.35 REQUIRED COMPONENTS filesystem system regex thread program_options date_time)
View
8 app/demo_messages.h
@@ -12,10 +12,10 @@ using namespace libf2f;
class PongMessage : public Message
{
public:
- PongMessage()
+ PongMessage( std::string uid )
{
message_header h;
- memcpy( &h.guid, std::string(GENUUID).data(), 36 );
+ memcpy( &h.guid, uid.c_str(), 36 );
h.type = PONG;
h.ttl = 1;
h.hops = 0;
@@ -28,10 +28,10 @@ class PongMessage : public Message
class PingMessage : public Message
{
public:
- PingMessage()
+ PingMessage( std::string uid )
{
message_header h;
- memcpy( &h.guid, std::string(GENUUID).data(), 36 );
+ memcpy( &h.guid, uid.c_str(), 36 );
h.type = PING;
h.ttl = 1;
h.hops = 0;
View
3  app/demo_protocol.h
@@ -2,6 +2,7 @@
#define __F2FDEMO_PROTOCOL_H__
#include "libf2f/protocol.h"
+#include "demo_messages.h"
using namespace libf2f;
@@ -47,7 +48,7 @@ class DemoProtocol : public Protocol
{
case PING:
std::cout << "Got a ping, replying with a pong." << std::endl;
- conn->async_write( message_ptr(new PongMessage()) );
+ conn->async_write( message_ptr(new PongMessage( m_router->gen_uuid() )) );
break;
case PONG:
std::cout << "Got a pong, yay!" << std::endl;
View
14 app/main.cpp
@@ -21,6 +21,12 @@ void iorun( boost::asio::io_service * ios )
cout << "io ended" << endl;
}
+std::string
+lame_uuid_gen()
+{
+ return "266695BF-AC15-4991-A01D-21DC180FD4B1";
+}
+
int main(int argc, char **argv)
{
if( argc != 2 )
@@ -41,7 +47,7 @@ int main(int argc, char **argv)
port)
)
);
- Router r(accp, &p);
+ Router r(accp, &p, boost::bind(&lame_uuid_gen));
boost::thread t( boost::bind(&iorun, &ios) );
@@ -70,16 +76,16 @@ int main(int argc, char **argv)
if(parts[0] == "pingall")
{
- message_ptr ping = message_ptr(new PingMessage());
+ message_ptr ping = message_ptr(new PingMessage( r.gen_uuid() ));
r.send_all(ping);
}
-
+ /*
if(parts[0] == "query" && parts.size() == 2)
{
message_ptr search = message_ptr(new GeneralMessage(QUERY, parts[1]) );
r.send_all(search);
}
-
+ */
if(parts[0] == "quit") break;
}
View
20 include/libf2f/connection.h
@@ -3,6 +3,7 @@
#include <boost/asio.hpp>
#include <boost/bind.hpp>
+#include <boost/function.hpp>
#include <boost/shared_ptr.hpp>
#include <boost/weak_ptr.hpp>
#include <boost/enable_shared_from_this.hpp>
@@ -18,10 +19,6 @@
namespace libf2f {
-class Connection;
-typedef boost::shared_ptr<Connection> connection_ptr;
-typedef boost::weak_ptr<Connection> connection_ptr_weak;
-
/// This class represents a Connection to one other libf2f user.
/// it knows how to marshal objects to and from the wire protocol
/// It keeps some state related to the Connection, eg are they authenticated.
@@ -30,9 +27,7 @@ class Connection
{
public:
- Connection( boost::asio::io_service& io_service,
- boost::function< void(message_ptr, connection_ptr) > msg_cb,
- boost::function< void(connection_ptr) > fin_cb );
+ Connection( boost::asio::io_service& io_service, Router * r );
~Connection();
@@ -59,9 +54,7 @@ class Connection
void handle_read_data(const boost::system::error_code& e, message_ptr msgp);
size_t writeq_size() const { return m_writeq.size(); }
-
- size_t drain_writeq( std::deque< message_ptr > & out );
-
+
void push_message_received_cb( boost::function< void(message_ptr, connection_ptr) > cb );
void pop_message_received_cb();
@@ -86,13 +79,12 @@ class Connection
bool m_ready; // ready for normal messages (ie, we authed etc)
bool m_sending; // currently sending something?
+ bool m_shuttingdown;
std::vector< boost::function<void(message_ptr, connection_ptr)> > m_message_received_cbs;
- boost::mutex m_message_received_cb_mutex; // protects the above
-
- boost::function< void(connection_ptr) > m_fin_cb; // call when we die.
+ ///boost::mutex m_message_received_cb_mutex; // protects the above
- bool m_shuttingdown;
+ Router * m_router;
};
View
13 include/libf2f/message.h
@@ -5,14 +5,14 @@
#include <boost/asio/buffer.hpp>
#include <iostream>
-#ifndef GENUUID
-#define GENUUID "266695BF-AC15-4991-A01D-21DC180FD4B1"
-#endif
-
namespace libf2f {
class Message;
typedef boost::shared_ptr<Message> message_ptr;
+class Connection;
+class Router;
+typedef boost::shared_ptr<Connection> connection_ptr;
+typedef boost::weak_ptr<Connection> connection_ptr_weak;
/*
Bytes Description
@@ -142,11 +142,10 @@ class Message
class GeneralMessage : public Message
{
public:
- GeneralMessage(const char msgtype, const std::string& body)
+ GeneralMessage(const char msgtype, const std::string& body, const std::string& uuid)
{
message_header h;
- memcpy( &h.guid, std::string(GENUUID).data(), 36 );
- //h.guid = GENUUID;
+ memcpy( &h.guid, uuid.data(), 36 );
h.type = msgtype;
h.ttl = 1;
h.hops = 0;
View
16 include/libf2f/router.h
@@ -4,6 +4,8 @@
#include <boost/asio.hpp>
#include <boost/shared_ptr.hpp>
#include <boost/bind.hpp>
+#include <boost/function.hpp>
+#include <boost/thread.hpp>
#include <boost/lexical_cast.hpp>
#include "boost/lambda/lambda.hpp"
#include <boost/lambda/if.hpp>
@@ -11,11 +13,11 @@
#include <vector>
#include "libf2f/message.h"
-#include "libf2f/connection.h"
namespace libf2f {
class Protocol;
+class Connection;
/// aka servent - responsible for managing connections
@@ -28,7 +30,11 @@ class Router
/// acceptor(io_service,
/// boost::asio::ip::tcp::endpoint(
/// boost::asio::ip::tcp::v4(), port) )
- Router( boost::shared_ptr<boost::asio::ip::tcp::acceptor> accp, Protocol * p );
+ Router( boost::shared_ptr<boost::asio::ip::tcp::acceptor> accp, Protocol * p, boost::function<std::string()> uuidf );
+
+ /// lamest uuid generator ever, please supply your own.
+ std::string lame_uuid_gen();
+ std::string gen_uuid();
/// calls io_service::stop on acceptor.
void stop();
@@ -60,6 +66,10 @@ class Router
void send_all( message_ptr msgp );
std::string connections_str();
+
+ connection_ptr get_connection_by_name( const std::string &name );
+
+
private:
/// Router keeps track of connections:
void register_connection( connection_ptr conn );
@@ -80,6 +90,8 @@ class Router
/// misc stats:
unsigned int seen_connections; // num incoming connections accepted
+
+ boost::function<std::string()> m_uuidgen;
};
} //ns
View
61 src/connection.cpp
@@ -1,21 +1,19 @@
#include "libf2f/connection.h"
+#include "libf2f/router.h"
#include <boost/foreach.hpp>
namespace libf2f {
using namespace std;
-Connection::Connection( boost::asio::io_service& io_service,
- boost::function< void(message_ptr, connection_ptr) > msg_cb,
- boost::function< void(connection_ptr) > fin_cb )
+Connection::Connection( boost::asio::io_service& io_service, Router * r )
: m_socket(io_service),
m_ready(false),
m_sending(false),
- m_fin_cb(fin_cb),
- m_shuttingdown(false)
+ m_shuttingdown(false),
+ m_router(r)
{
std::cout << "CTOR connection" << std::endl;
- push_message_received_cb( msg_cb );
max_writeq_size = 20*1024; // 20kb
}
@@ -35,7 +33,7 @@ Connection::fin()
{
m_shuttingdown = true;
std::cout << "FIN connection " << str() << std::endl;
- m_fin_cb( shared_from_this() );
+ m_router->connection_terminated( shared_from_this() );
close();
}
@@ -120,45 +118,16 @@ Connection::handle_read_data(const boost::system::error_code& e, message_ptr msg
fin();
return;
}
+ //cout << "connection::rcvd_msg: " << msgp->str() << endl;
// report that we received a new message
- {
- boost::mutex::scoped_lock lk(m_message_received_cb_mutex);
+ if( m_message_received_cbs.empty() )
+ m_router->message_received( msgp, shared_from_this() );
+ else
m_message_received_cbs.back()( msgp, shared_from_this() );
- }
// setup recv for next message:
async_read();
}
-
-void
-Connection::push_message_received_cb( boost::function< void(message_ptr, connection_ptr) > cb )
-{
- boost::mutex::scoped_lock lk(m_message_received_cb_mutex);
- m_message_received_cbs.push_back(cb);
-}
-
-void
-Connection::pop_message_received_cb()
-{
- boost::mutex::scoped_lock lk(m_message_received_cb_mutex);
- m_message_received_cbs.pop_back();
-}
-
-/// unused atm.. todo flow control?
-size_t
-Connection::drain_writeq( std::deque< message_ptr > & out )
-{
- size_t ret;
- boost::mutex::scoped_lock lk(m_mutex);
- BOOST_FOREACH( message_ptr mp, m_writeq )
- {
- ret++;
- out.push_back( mp );
- }
- m_writeq.clear();
- return ret;
-}
-
std::string
Connection::str() const
{
@@ -206,5 +175,17 @@ Connection::do_async_write(const boost::system::error_code& e, message_ptr finis
msgp ) );
}
+void
+Connection::push_message_received_cb( boost::function< void(message_ptr, connection_ptr) > cb )
+{
+ m_message_received_cbs.push_back(cb);
+}
+
+void
+Connection::pop_message_received_cb()
+{
+ m_message_received_cbs.pop_back();
+}
+
} //ns
View
52 src/router.cpp
@@ -7,18 +7,28 @@
#include <boost/foreach.hpp>
// How we typically prep a new connection object:
-#define NEWCONN new Connection(m_acceptor->io_service(), \
- boost::bind( &Router::message_received, this, _1, _2 ), \
- boost::bind( &Router::connection_terminated, this, _1) )
+#define NEWCONN new Connection( m_acceptor->io_service(), this )
namespace libf2f {
using namespace std;
+
Router::Router( boost::shared_ptr<boost::asio::ip::tcp::acceptor> accp,
- Protocol * p )
- : m_acceptor( accp ), m_protocol( p ), seen_connections(0)
+ Protocol * p, boost::function<std::string()> uuidf )
+ : m_acceptor( accp ),
+ m_protocol( p ),
+ seen_connections(0),
+ m_uuidgen( uuidf )
{
+ cout << "Testing uuid generator... " << flush;
+ string uuid = m_uuidgen();
+ if( uuid.length() != 36 )
+ {
+ cout << "ERROR length must be 36." << endl;
+ throw;
+ }
+ cout << "OK" << endl;
p->set_router( this );
// Start an accept operation for a new connection.
connection_ptr new_conn(NEWCONN);
@@ -29,6 +39,12 @@ Router::Router( boost::shared_ptr<boost::asio::ip::tcp::acceptor> accp,
}
+std::string
+Router::gen_uuid()
+{
+ return m_uuidgen();
+}
+
void
Router::stop()
{
@@ -108,6 +124,21 @@ Router::unregister_connection( connection_ptr conn )
cout << connections_str() << endl;
}
+connection_ptr
+Router::get_connection_by_name( const std::string &name )
+{
+ boost::mutex::scoped_lock lk(m_connections_mutex);
+ vector<connection_ptr>::iterator it;
+ for( it=m_connections.begin() ; it < m_connections.end() ; ++it )
+ {
+ if( (*it)->name() == name )
+ {
+ return *it;
+ }
+ }
+ return connection_ptr();
+}
+
string
Router::connections_str()
{
@@ -132,7 +163,7 @@ Router::message_received( message_ptr msgp, connection_ptr conn )
cout << "Dropping, hop count: " << msgp->hops() << endl;
return;
}
- if( msgp->length() > 10240 ) // 10k hard limit
+ if( msgp->length() > 16384 ) // hard limit
{
cout << "Dropping, msg length: " << msgp->length() << endl;
return;
@@ -195,7 +226,6 @@ Router::foreach_conns( boost::function<void(connection_ptr)> fun )
boost::mutex::scoped_lock lk(m_connections_mutex);
BOOST_FOREACH( connection_ptr conn, m_connections )
{
- cout << "Sending to " << conn->str() << endl;
fun( conn );
}
}
@@ -216,7 +246,13 @@ Router::foreach_conns_except( boost::function<void(connection_ptr)> fun, connect
void
Router::send_all( message_ptr msgp )
{
- foreach_conns( boost::bind(&Connection::async_write, _1, msgp) );
+ //foreach_conns( boost::bind(&Connection::async_write, _1, msgp) );
+ boost::mutex::scoped_lock lk(m_connections_mutex);
+ BOOST_FOREACH( connection_ptr conn, m_connections )
+ {
+ cout << "Sending " << msgp->str() << " to " << conn->str() << endl;
+ conn->async_write( msgp );
+ }
}
Please sign in to comment.
Something went wrong with that request. Please try again.