Skip to content

Commit

Permalink
Hop new ports, but keep the old [for a minute, and up to 10 at a time]
Browse files Browse the repository at this point in the history
(One is silver and the other gold...)
  • Loading branch information
keithw committed Oct 5, 2012
1 parent d17fb78 commit c0092a6
Show file tree
Hide file tree
Showing 7 changed files with 204 additions and 45 deletions.
37 changes: 32 additions & 5 deletions src/examples/ntester.cc
Expand Up @@ -74,18 +74,22 @@ int main( int argc, char *argv[] )

if ( server ) {
Select &sel = Select::get_instance();
sel.add_fd( n->fd() );
uint64_t last_num = n->get_remote_state_num();
while ( true ) {
try {
sel.clear_fds();
std::vector< int > fd_list( n->fds() );
assert( fd_list.size() == 1 ); /* servers don't hop */
int network_fd = fd_list.back();
sel.add_fd( network_fd );
if ( sel.select( n->wait_time() ) < 0 ) {
perror( "select" );
exit( 1 );
}

n->tick();

if ( sel.read( n->fd() ) ) {
if ( sel.read( network_fd ) ) {
n->recv();

if ( n->get_remote_state_num() != last_num ) {
Expand Down Expand Up @@ -116,10 +120,18 @@ int main( int argc, char *argv[] )
}

Select &sel = Select::get_instance();
sel.add_fd( STDIN_FILENO );
sel.add_fd( n->fd() );

while( true ) {
sel.clear_fds();
sel.add_fd( STDIN_FILENO );

std::vector< int > fd_list( n->fds() );
for ( std::vector< int >::const_iterator it = fd_list.begin();
it != fd_list.end();
it++ ) {
sel.add_fd( *it );
}

try {
if ( sel.select( n->wait_time() ) < 0 ) {
perror( "select" );
Expand All @@ -133,7 +145,22 @@ int main( int argc, char *argv[] )
n->get_current_state().push_back( Parser::UserByte( x ) );
}

if ( sel.read( n->fd() ) ) {
bool network_ready_to_read = false;
for ( std::vector< int >::const_iterator it = fd_list.begin();
it != fd_list.end();
it++ ) {
if ( sel.read( *it ) ) {
/* packet received from the network */
/* we only read one socket each run */
network_ready_to_read = true;
}

if ( sel.error( *it ) ) {
break;
}
}

if ( network_ready_to_read ) {
n->recv();
}
} catch ( NetworkException e ) {
Expand Down
9 changes: 6 additions & 3 deletions src/frontend/mosh-server.cc
Expand Up @@ -534,7 +534,10 @@ void serve( int host_fd, Terminal::Complete &terminal, ServerConnection &network

/* poll for events */
sel.clear_fds();
sel.add_fd( network.fd() );
std::vector< int > fd_list( network.fds() );
assert( fd_list.size() == 1 ); /* servers don't hop */
int network_fd = fd_list.back();
sel.add_fd( network_fd );
sel.add_fd( host_fd );

int active_fds = sel.select( timeout );
Expand All @@ -546,7 +549,7 @@ void serve( int host_fd, Terminal::Complete &terminal, ServerConnection &network
now = Network::timestamp();
uint64_t time_since_remote_state = now - network.get_latest_remote_state().timestamp;

if ( sel.read( network.fd() ) ) {
if ( sel.read( network_fd ) ) {
/* packet received from the network */
network.recv();

Expand Down Expand Up @@ -652,7 +655,7 @@ void serve( int host_fd, Terminal::Complete &terminal, ServerConnection &network
}
}

if ( sel.error( network.fd() ) ) {
if ( sel.error( network_fd ) ) {
/* network problem */
break;
}
Expand Down
32 changes: 24 additions & 8 deletions src/frontend/stmclient.cc
Expand Up @@ -324,7 +324,12 @@ void STMClient::main( void )
/* poll for events */
/* network->fd() can in theory change over time */
sel.clear_fds();
sel.add_fd( network->fd() );
std::vector< int > fd_list( network->fds() );
for ( std::vector< int >::const_iterator it = fd_list.begin();
it != fd_list.end();
it++ ) {
sel.add_fd( *it );
}
sel.add_fd( STDIN_FILENO );

int active_fds = sel.select( wait_time );
Expand All @@ -333,8 +338,24 @@ void STMClient::main( void )
break;
}

if ( sel.read( network->fd() ) ) {
/* packet received from the network */
bool network_ready_to_read = false;

for ( std::vector< int >::const_iterator it = fd_list.begin();
it != fd_list.end();
it++ ) {
if ( sel.read( *it ) ) {
/* packet received from the network */
/* we only read one socket each run */
network_ready_to_read = true;
}

if ( sel.error( *it ) ) {
/* network problem */
break;
}
}

if ( network_ready_to_read ) {
if ( !process_network_input() ) { return; }
}

Expand Down Expand Up @@ -370,11 +391,6 @@ void STMClient::main( void )
}
}

if ( sel.error( network->fd() ) ) {
/* network problem */
break;
}

if ( sel.error( STDIN_FILENO ) ) {
/* user problem */
if ( !network->has_remote_addr() ) {
Expand Down
132 changes: 109 additions & 23 deletions src/network/network.cc
Expand Up @@ -111,50 +111,90 @@ void Connection::hop_port( void )
{
assert( !server );

if ( close( sock ) < 0 ) {
throw NetworkException( "close", errno );
setup();

prune_sockets();
}

void Connection::prune_sockets( void )
{
/* don't keep old sockets if the new socket has been working for long enough */
if ( socks.size() > 1 ) {
if ( timestamp() - last_port_choice > MAX_OLD_SOCKET_AGE ) {
int num_to_kill = socks.size() - 1;
for ( int i = 0; i < num_to_kill; i++ ) {
socks.pop_front();
}
}
} else {
return;
}

setup();
/* make sure we don't have too many receive sockets open */
if ( socks.size() > MAX_PORTS_OPEN ) {
int num_to_kill = socks.size() - MAX_PORTS_OPEN;
for ( int i = 0; i < num_to_kill; i++ ) {
socks.pop_front();
}
}
}

void Connection::setup( void )
Connection::Socket::Socket()
: _fd( socket( AF_INET, SOCK_DGRAM, 0 ) ),
_moved( false )
{
/* create socket */
sock = socket( AF_INET, SOCK_DGRAM, 0 );
if ( sock < 0 ) {
if ( _fd < 0 ) {
throw NetworkException( "socket", errno );
}

last_port_choice = timestamp();

/* Disable path MTU discovery */
#ifdef HAVE_IP_MTU_DISCOVER
char flag = IP_PMTUDISC_DONT;
socklen_t optlen = sizeof( flag );
if ( setsockopt( sock, IPPROTO_IP, IP_MTU_DISCOVER, &flag, optlen ) < 0 ) {
if ( setsockopt( _fd, IPPROTO_IP, IP_MTU_DISCOVER, &flag, optlen ) < 0 ) {
throw NetworkException( "setsockopt", errno );
}
#endif

/* set diffserv values to AF42 + ECT */
uint8_t dscp = 0x92;
if ( setsockopt( sock, IPPROTO_IP, IP_TOS, &dscp, 1) < 0 ) {
if ( setsockopt( _fd, IPPROTO_IP, IP_TOS, &dscp, 1) < 0 ) {
// perror( "setsockopt( IP_TOS )" );
}

/* request explicit congestion notification on received datagrams */
#ifdef HAVE_IP_RECVTOS
char tosflag = true;
socklen_t tosoptlen = sizeof( tosflag );
if ( setsockopt( sock, IPPROTO_IP, IP_RECVTOS, &tosflag, tosoptlen ) < 0 ) {
if ( setsockopt( _fd, IPPROTO_IP, IP_RECVTOS, &tosflag, tosoptlen ) < 0 ) {
perror( "setsockopt( IP_RECVTOS )" );
}
#endif
}

void Connection::setup( void )
{
/* create socket */
socks.push_back( Socket() );

last_port_choice = timestamp();
}

const std::vector< int > Connection::fds( void ) const
{
std::vector< int > ret;

for ( std::deque< Socket >::const_iterator it = socks.begin();
it != socks.end();
it++ ) {
ret.push_back( it->fd() );
}

return ret;
}

Connection::Connection( const char *desired_ip, const char *desired_port ) /* server */
: sock( -1 ),
: socks(),
has_remote_addr( false ),
remote_addr(),
server( true ),
Expand Down Expand Up @@ -213,7 +253,7 @@ Connection::Connection( const char *desired_ip, const char *desired_port ) /* se
/* try to bind to desired IP first */
if ( desired_ip_addr != INADDR_ANY ) {
try {
if ( try_bind( sock, desired_ip_addr, desired_port_no ) ) { return; }
if ( try_bind( sock(), desired_ip_addr, desired_port_no ) ) { return; }
} catch ( const NetworkException& e ) {
struct in_addr sin_addr;
sin_addr.s_addr = desired_ip_addr;
Expand All @@ -225,7 +265,7 @@ Connection::Connection( const char *desired_ip, const char *desired_port ) /* se

/* now try any local interface */
try {
if ( try_bind( sock, INADDR_ANY, desired_port_no ) ) { return; }
if ( try_bind( sock(), INADDR_ANY, desired_port_no ) ) { return; }
} catch ( const NetworkException& e ) {
fprintf( stderr, "Error binding to any interface: %s: %s\n",
e.function.c_str(), strerror( e.the_errno ) );
Expand Down Expand Up @@ -266,7 +306,7 @@ bool Connection::try_bind( int socket, uint32_t addr, int port )
}

Connection::Connection( const char *key_str, const char *ip, int port ) /* client */
: sock( -1 ),
: socks(),
has_remote_addr( false ),
remote_addr(),
server( false ),
Expand Down Expand Up @@ -312,7 +352,7 @@ void Connection::send( string s )

string p = px.tostring( &session );

ssize_t bytes_sent = sendto( sock, p.data(), p.size(), 0,
ssize_t bytes_sent = sendto( sock(), p.data(), p.size(), 0,
(sockaddr *)&remote_addr, sizeof( remote_addr ) );

if ( bytes_sent == static_cast<ssize_t>( p.size() ) ) {
Expand Down Expand Up @@ -340,6 +380,34 @@ void Connection::send( string s )
}

string Connection::recv( void )
{
assert( !socks.empty() );
for ( std::deque< Socket >::const_iterator it = socks.begin();
it != socks.end();
it++ ) {
bool islast = (it + 1) == socks.end();
string payload;
try {
payload = recv_one( it->fd(), !islast );
} catch ( NetworkException & e ) {
if ( (e.the_errno == EAGAIN)
|| (e.the_errno == EWOULDBLOCK) ) {
assert( !islast );
continue;
} else {
throw e;
}
}

/* succeeded */
prune_sockets();
return payload;
}
assert( false );
return "";
}

string Connection::recv_one( int sock_to_recv, bool nonblocking )
{
/* receive source address, ECN, and payload in msghdr structure */
struct sockaddr_in packet_remote_addr;
Expand All @@ -366,10 +434,10 @@ string Connection::recv( void )
/* receive flags */
header.msg_flags = 0;

ssize_t received_len = recvmsg( sock, &header, 0 );
ssize_t received_len = recvmsg( sock_to_recv, &header, nonblocking ? MSG_DONTWAIT : 0 );

if ( received_len < 0 ) {
throw NetworkException( "recvfrom", errno );
throw NetworkException( "recvmsg", errno );
}

if ( header.msg_flags & MSG_TRUNC ) {
Expand Down Expand Up @@ -456,7 +524,7 @@ int Connection::port( void ) const
struct sockaddr_in local_addr;
socklen_t addrlen = sizeof( local_addr );

if ( getsockname( sock, (sockaddr *)&local_addr, &addrlen ) < 0 ) {
if ( getsockname( sock(), (sockaddr *)&local_addr, &addrlen ) < 0 ) {
throw NetworkException( "getsockname", errno );
}

Expand Down Expand Up @@ -501,9 +569,27 @@ uint64_t Connection::timeout( void ) const
return RTO;
}

Connection::~Connection()
Connection::Socket::~Socket()
{
if ( close( sock ) < 0 ) {
throw NetworkException( "close", errno );
if ( !_moved ) {
if ( close( _fd ) < 0 ) {
throw NetworkException( "close", errno );
}
}
}

Connection::Socket::Socket( const Socket & other )
: _fd( other._fd ),
_moved( false )
{
other.move();
}

const Connection::Socket & Connection::Socket::operator=( const Socket & other )
{
_fd = other._fd;

other.move();

return *this;
}

0 comments on commit c0092a6

Please sign in to comment.