Skip to content

Commit

Permalink
Ask the server for its addresses.
Browse files Browse the repository at this point in the history
  • Loading branch information
Matthieu Boutier committed Nov 23, 2014
1 parent 078e679 commit 92d2cb7
Show file tree
Hide file tree
Showing 2 changed files with 135 additions and 13 deletions.
134 changes: 124 additions & 10 deletions src/network/network.cc
Expand Up @@ -73,6 +73,7 @@ const uint64_t SEQUENCE_MASK = 0x0000FFFFFFFFFFFF;
#define GET_DIRECTION(nonce) ( ((nonce) & DIRECTION_MASK) ? TO_CLIENT : TO_SERVER )
#define GET_FLOWID(nonce) ( uint16_t( ( (nonce) & FLOWID_MASK ) >> 48 ) )
const uint16_t PROBE_FLAG = 1 << 0;
const uint16_t ADDR_FLAG = 1 << 1;

/* Read in packet from coded string */
Packet::Packet( string coded_packet, Session *session )
Expand Down Expand Up @@ -103,6 +104,11 @@ bool Packet::is_probe( void )
return flags & PROBE_FLAG;
}

bool Packet::is_addr_msg( void )
{
return flags & ADDR_FLAG;
}

/* Output coded string from packet */
string Packet::tostring( Session *session )
{
Expand Down Expand Up @@ -142,8 +148,7 @@ void Connection::hop_port( void )
assert( !server );

setup();

assert( remote_addr.addrlen != 0 );
assert( flows.size() != 0 );
socks.push_back( Socket( PF_INET, 0, 0 ) );
socks6.push_back( Socket( PF_INET6, 0, 0 ) );

Expand Down Expand Up @@ -178,6 +183,15 @@ void Connection::prune_sockets( std::deque< Socket > &socks_queue )
}
}

void Connection::check_remote_addr( void ) {
uint64_t now = timestamp();
if ( now - last_addr_request > MAX_ADDR_REQUEST_INTERVAL ) {
last_addr_request = now;
log_dbg( LOG_DEBUG_COMMON, "Asking server addresses.\n" );
send( ADDR_FLAG, string( "" ) );
}
}

uint16_t Connection::Flow::next_flow_id = 0;
const Connection::Flow Connection::Flow::defaults;

Expand Down Expand Up @@ -331,6 +345,7 @@ Connection::Connection( const char *desired_ip, const char *desired_port ) /* se
socks6(),
has_remote_addr( false ),
remote_addr(),
received_remote_addr(),
flows(),
last_flow( NULL ),
host_addresses(),
Expand All @@ -340,6 +355,7 @@ Connection::Connection( const char *desired_ip, const char *desired_port ) /* se
direction( TO_CLIENT ),
last_heard( -1 ),
last_port_choice( -1 ),
last_addr_request( -1 ),
last_roundtrip_success( -1 ),
have_send_exception( false ),
send_exception()
Expand Down Expand Up @@ -429,6 +445,7 @@ Connection::Connection( const char *key_str, const char *ip, const char *port )
socks6(),
has_remote_addr( false ),
remote_addr(),
received_remote_addr(),
flows(),
last_flow( NULL ),
host_addresses(),
Expand All @@ -438,6 +455,7 @@ Connection::Connection( const char *key_str, const char *ip, const char *port )
direction( TO_SERVER ),
last_heard( -1 ),
last_port_choice( -1 ),
last_addr_request( -1 ),
last_roundtrip_success( -1 ),
have_send_exception( false ),
send_exception()
Expand All @@ -460,7 +478,7 @@ Connection::Connection( const char *key_str, const char *ip, const char *port )
hints.ai_socktype = SOCK_DGRAM;
hints.ai_flags = AI_NUMERICHOST | AI_NUMERICSERV;
AddrInfo ai( ip, port, &hints );
fatal_assert( ai.res->ai_addrlen <= sizeof( remote_addr ) );
fatal_assert( ai.res->ai_addrlen <= sizeof( struct Addr ) );

has_remote_addr = true;

Expand All @@ -478,6 +496,9 @@ Connection::Connection( const char *key_str, const char *ip, const char *port )
socks.push_back( Socket( PF_INET, 0, 0 ) );
socks6.push_back( Socket( PF_INET6, 0, 0 ) );

/* Ask the server what are its addresses. */
check_remote_addr();
last_addr_request = timestamp();
send_probes(); /* This should check all flows. */
}

Expand Down Expand Up @@ -515,6 +536,44 @@ bool Connection::send_probe( Flow *flow )
return ( bytes_sent != static_cast<ssize_t>( p.size() ) );
}

void Connection::send_addresses( void )
{
assert( server );
string payload;
std::vector< Addr > addresses = host_addresses.get_host_addresses( NULL );
for ( std::vector< Addr >::const_iterator la_it = addresses.begin();
la_it != addresses.end();
la_it++ ) {
if ( la_it->is_loopback() ) {
continue;
}
uint8_t len;
uint8_t family;
uint16_t port = htons( socks.back().port );
string addr;
int addrlen;
/* Set our listening port. */
if ( la_it->sa.sa_family == AF_INET ) {
addrlen = 4;
family = 4; /* AF_INET6 is not standard. */
addr = string( (char *) &la_it->sin.sin_addr, 4 );
} else if ( la_it->sa.sa_family == AF_INET6 ) {
addrlen = 16;
family = 6;
addr = string( (char *) &la_it->sin6.sin6_addr, 16 );
} else {
continue;
}
len = 1 + 2 + addrlen; /* "len" is not considered */
log_dbg( LOG_DEBUG_COMMON, "Sending my address: %s.\n", la_it->tostring().c_str() );
payload += string( (char *) &len, 1 ) +
string( (char *) &family, 1 ) +
string( (char *) &port, 2 ) +
addr;
}
send( ADDR_FLAG, payload );
}

ssize_t Connection::sendfromto( int sock, const char *buffer, size_t size, int flags, Addr from, Addr to )
{
struct msghdr msghdr;
Expand Down Expand Up @@ -583,6 +642,11 @@ ssize_t Connection::sendfromto( int sock, const char *buffer, size_t size, int f
}

void Connection::send( string s )
{
send( 0, s);
}

void Connection::send( uint16_t flags, string s )
{
if ( !has_remote_addr ) {
return;
Expand All @@ -593,7 +657,7 @@ void Connection::send( string s )
log_dbg( LOG_DEBUG_COMMON, "Sending data" );
ssize_t bytes_sent = -1;
if ( server ) {
Packet px = new_packet( last_flow, 0, s );
Packet px = new_packet( last_flow, flags, s );

string p = px.tostring( &session );

Expand All @@ -610,7 +674,7 @@ void Connection::send( string s )
for ( std::map< uint16_t, Flow* >::iterator it = flows.begin();
it != flows.end();
it++ ) {
Packet px = new_packet( it->second, 0, s );
Packet px = new_packet( it->second, flags, s );
string p = px.tostring( &session );
log_dbg( LOG_DEBUG_COMMON, " %s -> %s ",
it->second->src.tostring().c_str(), it->second->dst.tostring().c_str() );
Expand All @@ -635,7 +699,7 @@ void Connection::send( string s )
for ( std::vector< std::pair< uint16_t, Flow* > >::const_iterator it = flows_vect.begin();
it != flows_vect.end();
it ++ ) {
Packet px = new_packet( it->second, 0, s );
Packet px = new_packet( it->second, flags, s );
string p = px.tostring( &session );
bytes_sent = sendfromto( it->second->dst.sa.sa_family == AF_INET ? sock() : sock6(),
p.data(), p.size(), MSG_DONTWAIT, it->second->src, it->second->dst );
Expand Down Expand Up @@ -680,6 +744,7 @@ void Connection::send( string s )
&& ( now - last_roundtrip_success > PORT_HOP_INTERVAL ) ) {
hop_port();
}
check_remote_addr();
}
}

Expand Down Expand Up @@ -865,6 +930,7 @@ string Connection::recv_one( int sock_to_recv )
last_heard = timestamp();

if ( server ) { /* only client can roam */
bool has_roam = last_flow != flow_info;
if ( p.is_probe() ) {
if ( UNLIKELY( !last_flow ) ) { /* This should only occurs once. */
last_flow = flow_info;
Expand All @@ -874,11 +940,9 @@ string Connection::recv_one( int sock_to_recv )
}
last_flow = flow_info;

if ( (socklen_t)remote_addr.addrlen != header.msg_namelen ||
memcmp( &remote_addr, &packet_remote_addr, remote_addr.addrlen ) != 0 ) {
remote_addr = packet_remote_addr;
if ( has_roam ) {
char host[ NI_MAXHOST ], serv[ NI_MAXSERV ];
int errcode = getnameinfo( &remote_addr.sa, remote_addr.addrlen,
int errcode = getnameinfo( &last_flow->dst.sa, last_flow->dst.addrlen,
host, sizeof( host ), serv, sizeof( serv ),
NI_DGRAM | NI_NUMERICHOST | NI_NUMERICSERV );
if ( errcode != 0 ) {
Expand All @@ -890,9 +954,59 @@ string Connection::recv_one( int sock_to_recv )
}
}

if ( p.is_addr_msg() ) {
if ( server ) {
send_addresses();
assert( p.payload.empty() );
} else {
parse_received_addresses( p.payload );
p.payload = string("");
}
}

return p.payload; /* we do return out-of-order or duplicated packets to caller */
}

void Connection::parse_received_addresses( string payload )
{
int size = payload.size();
const unsigned char *data = (const unsigned char*) payload.data();
std::vector< Addr > addr;
while( size > 0 ) {
int len = (int)data[0];
if ( size < 1 + len ) {
log_dbg( LOG_DEBUG_COMMON, "Truncated message received.\n" );
break;
}
Addr tmp;
uint8_t family = data[1];
if ( family == 4 ) {
tmp = Addr( AF_INET );
memcpy(&tmp.sin.sin_port, data + 2, 2);
memcpy(&tmp.sin.sin_addr, data + 4, 4);
} else if ( family == 6 ) {
tmp = Addr( AF_INET6 );
memcpy(&tmp.sin6.sin6_port, data + 2, 2);
memcpy(&tmp.sin6.sin6_addr, data + 4, 16);
}
addr.push_back( tmp );
log_dbg( LOG_DEBUG_COMMON, "Remote address received: %s.\n", tmp.tostring().c_str() );
data += 1 + len;
size -= 1 + len;
}
assert( size == 0 );

/* don't retain addresses already registered in remote_addr */
received_remote_addr.resize( addr.size() );
std::sort( addr.begin(), addr.end() );
std::sort( remote_addr.begin(), remote_addr.end() );
std::vector< Addr >::const_iterator it;
it = std::set_difference( addr.begin(), addr.end(),
remote_addr.begin(), remote_addr.end(),
received_remote_addr.begin() );
received_remote_addr.resize( it - received_remote_addr.begin() );
}

std::string Connection::port( void ) const
{
Addr local_addr;
Expand Down
14 changes: 11 additions & 3 deletions src/network/network.h
Expand Up @@ -88,6 +88,7 @@ namespace Network {
Packet( string coded_packet, Session *session );

bool is_probe( void );
bool is_addr_msg( void );
string tostring( Session *session );
};

Expand All @@ -102,6 +103,7 @@ namespace Network {

static const unsigned int SERVER_ASSOCIATION_TIMEOUT = 40000;
static const unsigned int PORT_HOP_INTERVAL = 10000;
static const unsigned int MAX_ADDR_REQUEST_INTERVAL = 10000;

static const unsigned int MAX_PORTS_OPEN = 10;
static const unsigned int MAX_OLD_SOCKET_AGE = 60000;
Expand Down Expand Up @@ -160,7 +162,8 @@ namespace Network {
std::deque< Socket > socks;
std::deque< Socket > socks6;
bool has_remote_addr;
Addr remote_addr;
std::vector< Addr > remote_addr;
std::vector< Addr > received_remote_addr;
std::map< uint16_t, Flow* > flows; /* do NEVER remove flows when server, for security reason. */
Flow *last_flow;
Addresses host_addresses;
Expand All @@ -176,6 +179,7 @@ namespace Network {

uint64_t last_heard;
uint64_t last_port_choice;
uint64_t last_addr_request;
uint64_t last_roundtrip_success; /* transport layer needs to tell us this */

/* Exception from send(), to be delivered if the frontend asks for it,
Expand All @@ -186,17 +190,21 @@ namespace Network {
Packet new_packet( Flow *flow, uint16_t flags, string &s_payload );

void hop_port( void );
void check_remote_addr( void );

int sock( void ) const { assert( !socks.empty() ); return socks.back().fd(); }
int sock6( void ) const { assert( !socks6.empty() ); return socks6.back().fd(); }

void prune_sockets( void );
void prune_sockets( std::deque< Socket > &socks_vect );

void send( uint16_t flags, string s );
void send_probes( void );
bool send_probe( Flow *flow );
void send_addresses( void );
ssize_t sendfromto( int sock, const char *buffer, size_t size, int flags, Addr from, Addr to );
string recv_one( int sock_to_recv );
void parse_received_addresses( string payload );

public:
Connection( const char *desired_ip, const char *desired_port ); /* server */
Expand All @@ -214,8 +222,8 @@ namespace Network {
uint64_t timeout( void ) const;
double get_SRTT( void ) const { return last_flow ? last_flow->SRTT : 1000; }

const Addr &get_remote_addr( void ) const { return remote_addr; }
socklen_t get_remote_addr_len( void ) const { return remote_addr.addrlen; }
const Addr &get_remote_addr( void ) const { return last_flow ? last_flow->dst : remote_addr.back(); }
socklen_t get_remote_addr_len( void ) const { return last_flow ? last_flow->dst.addrlen : 0; }

const NetworkException *get_send_exception( void ) const
{
Expand Down

0 comments on commit 92d2cb7

Please sign in to comment.