diff --git a/src/network/network.cc b/src/network/network.cc index 48ebab8f0..adead099c 100644 --- a/src/network/network.cc +++ b/src/network/network.cc @@ -73,6 +73,7 @@ const uint64_t SEQUENCE_MASK = 0x0000FFFFFFFFFFFF; #define GET_DIRECTION(nonce) ( ((nonce) & DIRECTION_MASK) ? TO_CLIENT : TO_SERVER ) #define GET_FLOWID(nonce) ( uint16_t( ( (nonce) & FLOWID_MASK ) >> 48 ) ) const uint16_t PROBE_FLAG = 1 << 0; +const uint16_t ADDR_FLAG = 1 << 1; /* Read in packet from coded string */ Packet::Packet( string coded_packet, Session *session ) @@ -103,6 +104,11 @@ bool Packet::is_probe( void ) return flags & PROBE_FLAG; } +bool Packet::is_addr_msg( void ) +{ + return flags & ADDR_FLAG; +} + /* Output coded string from packet */ string Packet::tostring( Session *session ) { @@ -142,8 +148,7 @@ void Connection::hop_port( void ) assert( !server ); setup(); - - assert( remote_addr.addrlen != 0 ); + assert( flows.size() != 0 ); socks.push_back( Socket( PF_INET, 0, 0 ) ); socks6.push_back( Socket( PF_INET6, 0, 0 ) ); @@ -178,6 +183,15 @@ void Connection::prune_sockets( std::deque< Socket > &socks_queue ) } } +void Connection::check_remote_addr( void ) { + uint64_t now = timestamp(); + if ( now - last_addr_request > MAX_ADDR_REQUEST_INTERVAL ) { + last_addr_request = now; + log_dbg( LOG_DEBUG_COMMON, "Asking server addresses.\n" ); + send( ADDR_FLAG, string( "" ) ); + } +} + uint16_t Connection::Flow::next_flow_id = 0; const Connection::Flow Connection::Flow::defaults; @@ -331,6 +345,7 @@ Connection::Connection( const char *desired_ip, const char *desired_port ) /* se socks6(), has_remote_addr( false ), remote_addr(), + received_remote_addr(), flows(), last_flow( NULL ), host_addresses(), @@ -340,6 +355,7 @@ Connection::Connection( const char *desired_ip, const char *desired_port ) /* se direction( TO_CLIENT ), last_heard( -1 ), last_port_choice( -1 ), + last_addr_request( -1 ), last_roundtrip_success( -1 ), have_send_exception( false ), send_exception() @@ -429,6 +445,7 @@ Connection::Connection( const char *key_str, const char *ip, const char *port ) socks6(), has_remote_addr( false ), remote_addr(), + received_remote_addr(), flows(), last_flow( NULL ), host_addresses(), @@ -438,6 +455,7 @@ Connection::Connection( const char *key_str, const char *ip, const char *port ) direction( TO_SERVER ), last_heard( -1 ), last_port_choice( -1 ), + last_addr_request( -1 ), last_roundtrip_success( -1 ), have_send_exception( false ), send_exception() @@ -460,7 +478,7 @@ Connection::Connection( const char *key_str, const char *ip, const char *port ) hints.ai_socktype = SOCK_DGRAM; hints.ai_flags = AI_NUMERICHOST | AI_NUMERICSERV; AddrInfo ai( ip, port, &hints ); - fatal_assert( ai.res->ai_addrlen <= sizeof( remote_addr ) ); + fatal_assert( ai.res->ai_addrlen <= sizeof( struct Addr ) ); has_remote_addr = true; @@ -478,6 +496,9 @@ Connection::Connection( const char *key_str, const char *ip, const char *port ) socks.push_back( Socket( PF_INET, 0, 0 ) ); socks6.push_back( Socket( PF_INET6, 0, 0 ) ); + /* Ask the server what are its addresses. */ + check_remote_addr(); + last_addr_request = timestamp(); send_probes(); /* This should check all flows. */ } @@ -515,6 +536,44 @@ bool Connection::send_probe( Flow *flow ) return ( bytes_sent != static_cast( p.size() ) ); } +void Connection::send_addresses( void ) +{ + assert( server ); + string payload; + std::vector< Addr > addresses = host_addresses.get_host_addresses( NULL ); + for ( std::vector< Addr >::const_iterator la_it = addresses.begin(); + la_it != addresses.end(); + la_it++ ) { + if ( la_it->is_loopback() ) { + continue; + } + uint8_t len; + uint8_t family; + uint16_t port = htons( socks.back().port ); + string addr; + int addrlen; + /* Set our listening port. */ + if ( la_it->sa.sa_family == AF_INET ) { + addrlen = 4; + family = 4; /* AF_INET6 is not standard. */ + addr = string( (char *) &la_it->sin.sin_addr, 4 ); + } else if ( la_it->sa.sa_family == AF_INET6 ) { + addrlen = 16; + family = 6; + addr = string( (char *) &la_it->sin6.sin6_addr, 16 ); + } else { + continue; + } + len = 1 + 2 + addrlen; /* "len" is not considered */ + log_dbg( LOG_DEBUG_COMMON, "Sending my address: %s.\n", la_it->tostring().c_str() ); + payload += string( (char *) &len, 1 ) + + string( (char *) &family, 1 ) + + string( (char *) &port, 2 ) + + addr; + } + send( ADDR_FLAG, payload ); +} + ssize_t Connection::sendfromto( int sock, const char *buffer, size_t size, int flags, Addr from, Addr to ) { struct msghdr msghdr; @@ -583,6 +642,11 @@ ssize_t Connection::sendfromto( int sock, const char *buffer, size_t size, int f } void Connection::send( string s ) +{ + send( 0, s); +} + +void Connection::send( uint16_t flags, string s ) { if ( !has_remote_addr ) { return; @@ -593,7 +657,7 @@ void Connection::send( string s ) log_dbg( LOG_DEBUG_COMMON, "Sending data" ); ssize_t bytes_sent = -1; if ( server ) { - Packet px = new_packet( last_flow, 0, s ); + Packet px = new_packet( last_flow, flags, s ); string p = px.tostring( &session ); @@ -610,7 +674,7 @@ void Connection::send( string s ) for ( std::map< uint16_t, Flow* >::iterator it = flows.begin(); it != flows.end(); it++ ) { - Packet px = new_packet( it->second, 0, s ); + Packet px = new_packet( it->second, flags, s ); string p = px.tostring( &session ); log_dbg( LOG_DEBUG_COMMON, " %s -> %s ", it->second->src.tostring().c_str(), it->second->dst.tostring().c_str() ); @@ -635,7 +699,7 @@ void Connection::send( string s ) for ( std::vector< std::pair< uint16_t, Flow* > >::const_iterator it = flows_vect.begin(); it != flows_vect.end(); it ++ ) { - Packet px = new_packet( it->second, 0, s ); + Packet px = new_packet( it->second, flags, s ); string p = px.tostring( &session ); bytes_sent = sendfromto( it->second->dst.sa.sa_family == AF_INET ? sock() : sock6(), p.data(), p.size(), MSG_DONTWAIT, it->second->src, it->second->dst ); @@ -680,6 +744,7 @@ void Connection::send( string s ) && ( now - last_roundtrip_success > PORT_HOP_INTERVAL ) ) { hop_port(); } + check_remote_addr(); } } @@ -865,6 +930,7 @@ string Connection::recv_one( int sock_to_recv ) last_heard = timestamp(); if ( server ) { /* only client can roam */ + bool has_roam = last_flow != flow_info; if ( p.is_probe() ) { if ( UNLIKELY( !last_flow ) ) { /* This should only occurs once. */ last_flow = flow_info; @@ -874,11 +940,9 @@ string Connection::recv_one( int sock_to_recv ) } last_flow = flow_info; - if ( (socklen_t)remote_addr.addrlen != header.msg_namelen || - memcmp( &remote_addr, &packet_remote_addr, remote_addr.addrlen ) != 0 ) { - remote_addr = packet_remote_addr; + if ( has_roam ) { char host[ NI_MAXHOST ], serv[ NI_MAXSERV ]; - int errcode = getnameinfo( &remote_addr.sa, remote_addr.addrlen, + int errcode = getnameinfo( &last_flow->dst.sa, last_flow->dst.addrlen, host, sizeof( host ), serv, sizeof( serv ), NI_DGRAM | NI_NUMERICHOST | NI_NUMERICSERV ); if ( errcode != 0 ) { @@ -890,9 +954,59 @@ string Connection::recv_one( int sock_to_recv ) } } + if ( p.is_addr_msg() ) { + if ( server ) { + send_addresses(); + assert( p.payload.empty() ); + } else { + parse_received_addresses( p.payload ); + p.payload = string(""); + } + } + return p.payload; /* we do return out-of-order or duplicated packets to caller */ } +void Connection::parse_received_addresses( string payload ) +{ + int size = payload.size(); + const unsigned char *data = (const unsigned char*) payload.data(); + std::vector< Addr > addr; + while( size > 0 ) { + int len = (int)data[0]; + if ( size < 1 + len ) { + log_dbg( LOG_DEBUG_COMMON, "Truncated message received.\n" ); + break; + } + Addr tmp; + uint8_t family = data[1]; + if ( family == 4 ) { + tmp = Addr( AF_INET ); + memcpy(&tmp.sin.sin_port, data + 2, 2); + memcpy(&tmp.sin.sin_addr, data + 4, 4); + } else if ( family == 6 ) { + tmp = Addr( AF_INET6 ); + memcpy(&tmp.sin6.sin6_port, data + 2, 2); + memcpy(&tmp.sin6.sin6_addr, data + 4, 16); + } + addr.push_back( tmp ); + log_dbg( LOG_DEBUG_COMMON, "Remote address received: %s.\n", tmp.tostring().c_str() ); + data += 1 + len; + size -= 1 + len; + } + assert( size == 0 ); + + /* don't retain addresses already registered in remote_addr */ + received_remote_addr.resize( addr.size() ); + std::sort( addr.begin(), addr.end() ); + std::sort( remote_addr.begin(), remote_addr.end() ); + std::vector< Addr >::const_iterator it; + it = std::set_difference( addr.begin(), addr.end(), + remote_addr.begin(), remote_addr.end(), + received_remote_addr.begin() ); + received_remote_addr.resize( it - received_remote_addr.begin() ); +} + std::string Connection::port( void ) const { Addr local_addr; diff --git a/src/network/network.h b/src/network/network.h index 38cd2bb2e..c7fd67486 100644 --- a/src/network/network.h +++ b/src/network/network.h @@ -88,6 +88,7 @@ namespace Network { Packet( string coded_packet, Session *session ); bool is_probe( void ); + bool is_addr_msg( void ); string tostring( Session *session ); }; @@ -102,6 +103,7 @@ namespace Network { static const unsigned int SERVER_ASSOCIATION_TIMEOUT = 40000; static const unsigned int PORT_HOP_INTERVAL = 10000; + static const unsigned int MAX_ADDR_REQUEST_INTERVAL = 10000; static const unsigned int MAX_PORTS_OPEN = 10; static const unsigned int MAX_OLD_SOCKET_AGE = 60000; @@ -160,7 +162,8 @@ namespace Network { std::deque< Socket > socks; std::deque< Socket > socks6; bool has_remote_addr; - Addr remote_addr; + std::vector< Addr > remote_addr; + std::vector< Addr > received_remote_addr; std::map< uint16_t, Flow* > flows; /* do NEVER remove flows when server, for security reason. */ Flow *last_flow; Addresses host_addresses; @@ -176,6 +179,7 @@ namespace Network { uint64_t last_heard; uint64_t last_port_choice; + uint64_t last_addr_request; uint64_t last_roundtrip_success; /* transport layer needs to tell us this */ /* Exception from send(), to be delivered if the frontend asks for it, @@ -186,6 +190,7 @@ namespace Network { Packet new_packet( Flow *flow, uint16_t flags, string &s_payload ); void hop_port( void ); + void check_remote_addr( void ); int sock( void ) const { assert( !socks.empty() ); return socks.back().fd(); } int sock6( void ) const { assert( !socks6.empty() ); return socks6.back().fd(); } @@ -193,10 +198,13 @@ namespace Network { void prune_sockets( void ); void prune_sockets( std::deque< Socket > &socks_vect ); + void send( uint16_t flags, string s ); void send_probes( void ); bool send_probe( Flow *flow ); + void send_addresses( void ); ssize_t sendfromto( int sock, const char *buffer, size_t size, int flags, Addr from, Addr to ); string recv_one( int sock_to_recv ); + void parse_received_addresses( string payload ); public: Connection( const char *desired_ip, const char *desired_port ); /* server */ @@ -214,8 +222,8 @@ namespace Network { uint64_t timeout( void ) const; double get_SRTT( void ) const { return last_flow ? last_flow->SRTT : 1000; } - const Addr &get_remote_addr( void ) const { return remote_addr; } - socklen_t get_remote_addr_len( void ) const { return remote_addr.addrlen; } + const Addr &get_remote_addr( void ) const { return last_flow ? last_flow->dst : remote_addr.back(); } + socklen_t get_remote_addr_len( void ) const { return last_flow ? last_flow->dst.addrlen : 0; } const NetworkException *get_send_exception( void ) const {