Skip to content
Browse files

Merge branch 'master' of https://github.com/dardok/Equalizer

  • Loading branch information...
2 parents 5899666 + 2c9705c commit a117da6d5874fe5a246f77eae97765606472b74e @eile eile committed Apr 17, 2012
Showing with 145 additions and 83 deletions.
  1. +128 −72 libs/co/rdmaConnection.cpp
  2. +13 −8 libs/co/rdmaConnection.h
  3. +4 −3 libs/co/udtConnection.cpp
View
200 libs/co/rdmaConnection.cpp
@@ -41,9 +41,9 @@
#define IPV6_DEFAULT 0
#define RDMA_PROTOCOL_MAGIC 0xC0
-#define RDMA_PROTOCOL_VERSION 0x02
+#define RDMA_PROTOCOL_VERSION 0x03
-#define RDMA_CONNECT_ATTEMPTS 3
+#define RDMA_CONNECT_ATTEMPTS 1
#define RDMA_CONNECT_RETRY_SLEEP 500 // ms
namespace co
@@ -74,7 +74,8 @@ struct RDMASetupPayload
*/
struct RDMAFCPayload
{
- uint32_t bytes_read;
+ uint32_t bytes_received;
+ uint32_t writes_received;
};
/**
@@ -94,7 +95,11 @@ struct RDMAMessage
/**
* "IMM" data sent with RDMA write, tells sink about send progress
*/
-typedef uint32_t RDMAFCImm;
+struct RDMAFCImm
+{
+ uint32_t bytes_sent:28;
+ uint32_t acks_received:4;
+};
/**
* An RDMA connection implementation.
@@ -154,10 +159,6 @@ typedef uint32_t RDMAFCImm;
* Send perf: 3240.72MB/s (3240.72pps)
* Send perf: 3240.72MB/s (3240.72pps)
* Send perf: 3240.95MB/s (3240.95pps)
- *
- * TODO : Consider using WR_SEND_WITH_IMM and doing away with the message buffer
- * entirely. We'd still need to register memory for the setup message, but
- * after that we'd just send acks via imm_data.
*/
RDMAConnection::RDMAConnection( )
: _notifier( -1 )
@@ -168,6 +169,8 @@ RDMAConnection::RDMAConnection( )
, _pd( NULL )
, _established( false )
, _depth( 0L )
+ , _writes( 0L )
+ , _acks( 0L )
, _credits( 0L )
, _completions( 0U )
, _msgbuf( sizeof(RDMAMessage) )
@@ -211,6 +214,8 @@ bool RDMAConnection::connect( )
retry:
bool retry = false;
+ _cleanup( );
+
setState( STATE_CONNECTING );
if( !_lookupAddress( false ) || ( NULL == _rai ))
@@ -253,6 +258,8 @@ bool RDMAConnection::connect( )
_credits = _depth =
Global::getIAttribute( Global::IATTR_RDMA_SEND_QUEUE_DEPTH );
+ _writes = 0L;
+ _acks = 0L;
if( _depth <= 0L )
{
LBERROR << "Invalid queue depth." << std::endl;
@@ -355,6 +362,8 @@ bool RDMAConnection::listen( )
if( STATE_CLOSED != _state )
return false;
+ _cleanup( );
+
setState( STATE_CONNECTING );
if( !_lookupAddress( true ))
@@ -431,20 +440,25 @@ void RDMAConnection::close( )
{
LBVERB << (void *)this << ".close( )" << std::endl;
- lunchbox::ScopedMutex<> close_mutex( _close_lock );
+ lunchbox::ScopedMutex<> close_mutex( _poll_lock );
if( STATE_CLOSED != _state )
{
LBASSERT( STATE_CLOSING != _state );
setState( STATE_CLOSING );
-#if 0
+ // TODO : verify this method of determining if we can call disconnect
+ // without getting a error (and spitting out a unnecessary warning).
+ if( _cm_id && _cm_id->qp && ( _cm_id->qp->state > IBV_QPS_INIT ) &&
+ ::rdma_disconnect( _cm_id ))
+ LBWARN << "rdma_disconnect : " << lunchbox::sysError << std::endl;
+
lunchbox::Clock clock;
const int64_t start = clock.getTime64( );
const uint32_t timeout = Global::getTimeout( );
- // Wait for outstanding acks.
- while( !_rptr.isEmpty( ) && _established && _pollCQ( ))
+ // Wait for outstanding acks or disconnect.
+ while( _established && !_rptr.isEmpty( ) && _pollCQ( ))
{
if( LB_TIMEOUT_INDEFINITE != timeout )
{
@@ -457,20 +471,11 @@ void RDMAConnection::close( )
lunchbox::Thread::yield( );
}
-#endif
_eventThreadUnregister( );
- // TODO : verify this method of determining if we can call disconnect
- // without getting a error (and spitting out a unnecessary warning).
- if( _cm_id && _cm_id->qp && ( _cm_id->qp->state > IBV_QPS_INIT ) &&
- ::rdma_disconnect( _cm_id ))
- LBWARN << "rdma_disconnect : " << lunchbox::sysError << std::endl;
-
_established = false;
- _cleanup( );
-
setState( STATE_CLOSED );
}
}
@@ -569,24 +574,14 @@ int64_t RDMAConnection::readSync( void* buffer, const uint64_t bytes,
goto retry2;
}
-
- // TODO : post FC less frequently
- if( _established && !_postFC( bytes_taken ))
+ // TODO : send FC less frequently
+ if( !_postFC( bytes_taken ))
LBWARN << "Error while posting flow control message." << std::endl;
+ if( !_rearmCQ( ))
{
- // We only want to clear the "readability" of the notifier when we know
- // we no longer have any data in the buffer and need to be notified
- // when we receive more. Take the poll mutex so another thread in
- // write() can't take events off the CQ (or modify _sinkptr) between
- // check and rearm.
- lunchbox::ScopedWrite poll_mutex( _poll_lock );
-
- if( _sinkptr.isEmpty( ) && !_rearmCQ( ))
- {
- LBERROR << "Error while rearming receive channel." << std::endl;
- goto err;
- }
+ LBERROR << "Error while rearming receive channel." << std::endl;
+ goto err;
}
// LBWARN << (void *)this << std::dec << ".read(" << bytes << ")"
@@ -616,9 +611,8 @@ int64_t RDMAConnection::write( const void* buffer, const uint64_t bytes )
_stats.writes++;
- // Can only send sizeof(struct ibv_wc.imm_data) per shot.
- const uint32_t can_put = static_cast< uint32_t >( std::min( bytes,
- static_cast< uint64_t >( std::numeric_limits< uint32_t >::max( ))));
+ // Can only send sizeof(struct RDMAFCImm.bytes_sent) per rdma write.
+ const uint32_t can_put = std::min( bytes, (uint64_t)0xFFFFFFF );
uint32_t bytes_put;
retry:
@@ -708,6 +702,8 @@ RDMAConnection::~RDMAConnection( )
LBVERB << (void *)this << ".delete" << std::endl;
close( );
+
+ _cleanup( );
}
void RDMAConnection::setState( const State state )
@@ -723,7 +719,7 @@ void RDMAConnection::setState( const State state )
void RDMAConnection::_cleanup( )
{
- LBASSERT( STATE_CLOSING == _state );
+ LBASSERT( STATE_CLOSED == _state );
_sourcebuf.clear( );
_sinkbuf.clear( );
@@ -851,6 +847,8 @@ bool RDMAConnection::_finishAccept( struct rdma_event_channel *listen_channel )
}
_credits = _depth = _cpd.depth;
+ _writes = 0L;
+ _acks = 0L;
if( _depth <= 0L )
{
LBERROR << "Invalid (unsent?) queue depth." << std::endl;
@@ -1041,7 +1039,9 @@ bool RDMAConnection::_createId( )
bool RDMAConnection::_createQP( )
{
struct ibv_qp_init_attr init_attr;
+#if 0
int flags;
+#endif
_pd = ::ibv_alloc_pd( _cm_id->verbs );
if( NULL == _pd )
@@ -1057,14 +1057,14 @@ bool RDMAConnection::_createQP( )
init_attr.cap.max_send_sge = 1;
init_attr.sq_sig_all = 1; // aka always IBV_SEND_SIGNALED
init_attr.qp_type = IBV_QPT_RC;
- //init_attr.cap.max_inline_data = sizeof(RDMAMessage);
if( ::rdma_create_qp( _cm_id, _pd, &init_attr ))
{
LBERROR << "rdma_create_qp : " << lunchbox::sysError << std::endl;
goto err;
}
+#if 0
flags = ::fcntl( _cm_id->recv_cq_channel->fd, F_GETFL );
if( -1 == flags )
{
@@ -1078,6 +1078,7 @@ bool RDMAConnection::_createQP( )
LBERROR << "fcntl : " << lunchbox::sysError << std::endl;
goto err;
}
+#endif
// Request only solicited events (i.e. don't wake up Collage on ACKs).
if( ::rdma_seterrno( ::ibv_req_notify_cq( _cm_id->recv_cq, 1 )))
@@ -1204,9 +1205,11 @@ bool RDMAConnection::_connect( )
conn_param.private_data_len = sizeof(struct RDMAConnParamData);
conn_param.initiator_depth = RDMA_MAX_INIT_DEPTH;
conn_param.responder_resources = RDMA_MAX_RESP_RES;
+#if 0
// Magic 3-bit values.
conn_param.retry_count = 7;
conn_param.rnr_retry_count = 7;
+#endif
LBINFO << "Connect on source lid : " << std::showbase
<< std::hex << ntohs( _cm_id->route.path_rec->slid ) << " ("
@@ -1308,8 +1311,10 @@ bool RDMAConnection::_accept( )
accept_param.private_data_len = sizeof(struct RDMAConnParamData);
accept_param.initiator_depth = RDMA_MAX_INIT_DEPTH;
accept_param.responder_resources = RDMA_MAX_RESP_RES;
+#if 0
// Magic 3-bit value.
accept_param.rnr_retry_count = 7;
+#endif
LBINFO << "Accept on source lid : "<< std::showbase
<< std::hex << ntohs( _cm_id->route.path_rec->slid ) << " ("
@@ -1382,13 +1387,44 @@ bool RDMAConnection::_postReceives( const uint32_t count )
/* inline */
void RDMAConnection::_recvRDMAWrite( const uint32_t imm_data )
{
+ union
+ {
+ uint32_t val;
+ RDMAFCImm fc;
+ } x;
+ x.val = ntohl( imm_data );
+
// Analysis:
//
// Since the ring pointers are circular, a malicious (presumably overflow)
// value here would at worst only result in us reading arbitrary regions
// from our sink buffer, not segfaulting. If the other side wanted us to
// reread a previous message it should just resend it!
- _sinkptr.incrHead( ntohl( imm_data ));
+ _sinkptr.incrHead( x.fc.bytes_sent );
+
+ _credits += x.fc.acks_received;
+ LBASSERTINFO( _credits <= _depth, _credits << " > " << _depth );
+
+ _writes++;
+}
+
+/* inline */
+uint32_t RDMAConnection::_makeImm( const uint32_t b )
+{
+ union
+ {
+ uint32_t val;
+ RDMAFCImm fc;
+ } x;
+
+ // Can ack only one byte's worth of acks per rdma write.
+ x.fc.acks_received = std::min( (uint16_t)0xF, (uint16_t)_acks );
+ _acks -= x.fc.acks_received;
+ LBASSERT( _acks >= 0 );
+
+ LBASSERT( b <= (uint32_t)0xFFFFFFF );
+ x.fc.bytes_sent = b;
+ return htonl( x.val );
}
bool RDMAConnection::_postRDMAWrite( )
@@ -1409,14 +1445,13 @@ bool RDMAConnection::_postRDMAWrite( )
wr.num_sge = 1;
wr.opcode = IBV_WR_RDMA_WRITE_WITH_IMM;
wr.send_flags = IBV_SEND_SOLICITED; // Important!
- wr.imm_data = htonl( (uint32_t)sge.length );
+ wr.imm_data = _makeImm( (uint32_t)sge.length );
wr.wr.rdma.rkey = _rkey;
wr.wr.rdma.remote_addr = (uint64_t)( (uintptr_t)_rbase +
_rptr.ptr( _rptr.HEAD ));
_rptr.incrHead( (uint32_t)sge.length );
_credits--;
-
LBASSERT( _credits >= 0L );
struct ibv_send_wr *bad_wr;
@@ -1432,6 +1467,25 @@ bool RDMAConnection::_postRDMAWrite( )
return false;
}
+bool RDMAConnection::_postMessage( const RDMAMessage &message )
+{
+ _credits--;
+ LBASSERT( _credits >= 0L );
+
+ if( ::rdma_post_send( _cm_id, (void *)&message, (void *)&message,
+ offsetof( RDMAMessage, payload ) + message.length, _msgbuf.getMR( ),
+ 0 ))
+ {
+ LBERROR << "rdma_post_send : " << lunchbox::sysError << std::endl;
+ goto err;
+ }
+
+ return true;
+
+err:
+ return false;
+}
+
void RDMAConnection::_recvMessage( const RDMAMessage &message )
{
switch( message.opcode )
@@ -1465,27 +1519,12 @@ void RDMAConnection::_recvFC( const RDMAFCPayload &fc )
// control over those ring pointers. Worst case, we'd and up writing to
// arbitrary regions of the remote buffer, since this ring pointer is
// circular as well.
- _rptr.incrTail( fc.bytes_read );
-}
-
-bool RDMAConnection::_postMessage( const RDMAMessage &message )
-{
- _credits--;
-
- LBASSERT( _credits >= 0L );
+ _rptr.incrTail( fc.bytes_received );
- if( ::rdma_post_send( _cm_id, (void *)&message, (void *)&message,
- offsetof( RDMAMessage, payload ) + message.length, _msgbuf.getMR( ),
- 0 /*IBV_SEND_INLINE*/ ))
- {
- LBERROR << "rdma_post_send : " << lunchbox::sysError << std::endl;
- goto err;
- }
+ _credits += fc.writes_received;
+ LBASSERTINFO( _credits <= _depth, _credits << " > " << _depth );
- return true;
-
-err:
- return false;
+ _acks++;
}
bool RDMAConnection::_postFC( const uint32_t bytes_taken )
@@ -1496,7 +1535,10 @@ bool RDMAConnection::_postFC( const uint32_t bytes_taken )
message.opcode = FC;
message.length = (uint8_t)sizeof(struct RDMAFCPayload);
- message.payload.fc.bytes_read = bytes_taken;
+ message.payload.fc.bytes_received = bytes_taken;
+ message.payload.fc.writes_received = _writes;
+ _writes -= message.payload.fc.writes_received;
+ LBASSERT( _writes >= 0 );
return _postMessage( message );
}
@@ -1583,16 +1625,20 @@ bool RDMAConnection::_doCMEvent( struct rdma_event_channel *channel,
#ifndef NDEBUG
if( ok )
+ {
LBVERB << (void *)this
<< " (" << _addr << ":" << _serv << ")"
<< " event : " << ::rdma_event_str( event->event )
<< std::endl;
+ }
else
+ {
LBINFO << (void *)this
<< " (" << _addr << ":" << _serv << ")"
<< " event : " << ::rdma_event_str( event->event )
<< " expected: " << ::rdma_event_str( expected )
<< std::endl;
+ }
#endif
if( ok && ( RDMA_CM_EVENT_DISCONNECTED == event->event ))
@@ -1734,11 +1780,6 @@ bool RDMAConnection::_pollCQ( )
_sourceptr.incrTail( (uint32_t)wc.wr_id );
else
LBUNREACHABLE;
-
- // All send completions replenish credit.
- _credits++;
-
- LBASSERTINFO( _credits <= _depth, _credits << " > " << _depth );
}
return true;
@@ -1752,12 +1793,24 @@ bool RDMAConnection::_rearmCQ( )
struct ibv_cq *ev_cq;
void *ev_ctx;
+ // We only want to clear the "readability" of the notifier when we know
+ // we no longer have any data in the buffer and need to be notified
+ // when we receive more. Take the poll mutex so another thread in
+ // write() can't take events off the CQ (or modify _sinkptr) between
+ // check and rearm.
+ lunchbox::ScopedWrite poll_mutex( _poll_lock );
+
+ if( !_sinkptr.isEmpty( ))
+ goto out;
+
if( ::ibv_get_cq_event( _cm_id->recv_cq_channel, &ev_cq, &ev_ctx ))
{
+#if 0
// We may attempt to rearm without an active event waiting, the
// receive channel is non-blocking so we just skip ack.
if( EAGAIN == errno )
goto rearm;
+#endif
LBERROR << "ibv_get_cq_event : " << lunchbox::sysError << std::endl;
goto err;
@@ -1771,14 +1824,17 @@ bool RDMAConnection::_rearmCQ( )
_completions = 0U;
}
+#if 0
rearm:
+#endif
// Solicited only!
if( ::rdma_seterrno( ::ibv_req_notify_cq( _cm_id->recv_cq, 1 )))
{
LBERROR << "ibv_req_notify_cq : " << lunchbox::sysError << std::endl;
goto err;
}
+out:
return true;
err:
@@ -1936,13 +1992,13 @@ void RDMAConnection::ChannelEventThread::run( )
RDMAConnection *to_add = _to_add;
_to_add = NULL;
- evctl.events = EPOLLIN /*| EPOLLONESHOT*/;
+ evctl.events = EPOLLIN | EPOLLONESHOT;
evctl.data.ptr =
reinterpret_cast< void * >( &to_add->_context );
if( ::epoll_ctl( _epoll_fd, EPOLL_CTL_ADD, to_add->_cm->fd,
&evctl ))
{
- LBERROR << "epoll_ctl : " << lunchbox::sysError << std::endl;
+ LBERROR << "epoll_ctl : " <<lunchbox::sysError << std::endl;
to_add->_cmd_block.set( CMD_FAIL );
}
else
@@ -1989,7 +2045,7 @@ void RDMAConnection::ChannelEventThread::run( )
LBWARN << "Unexpected event on connection." << std::endl;
// TODO : should we rdma_disconnect on *any* event?
else if( ::rdma_disconnect( conn->_cm_id ))
- LBWARN << "rdma_disconnect : " << lunchbox::sysError << std::endl;
+ LBWARN << "rdma_disconnect : " <<lunchbox::sysError<< std::endl;
}
else
LBUNREACHABLE;
View
21 libs/co/rdmaConnection.h
@@ -198,14 +198,18 @@ class RDMAConnection : public Connection
/* Protocol */
bool _postReceives( const uint32_t count );
+
inline void _recvRDMAWrite( const uint32_t imm_data );
+ inline uint32_t _makeImm( const uint32_t b );
bool _postRDMAWrite( );
- void _recvMessage( const RDMAMessage &message );
+
bool _postMessage( const RDMAMessage &message );
+ void _recvMessage( const RDMAMessage &message );
inline void _recvFC( const RDMAFCPayload &fc );
bool _postFC( const uint32_t bytes_taken );
void _recvSetup( const RDMASetupPayload &setup );
bool _postSetup( );
+
bool _waitRecvSetup( );
private:
@@ -221,10 +225,7 @@ class RDMAConnection : public Connection
/* _cm->fd (listener) or _cm_id->recv_cq_channel->fd (connection) */
Notifier _notifier;
- /* Protect close( ) from multiple threads */
- lunchbox::Lock _close_lock;
-
- /* Protect _pollCQ( ) from multiple threads */
+ /* Protect RDMA/Verbs vars from multiple threads */
lunchbox::Lock _poll_lock;
/* Timeout for resolving RDMA address & route */
@@ -242,11 +243,15 @@ class RDMAConnection : public Connection
struct RDMAConnParamData _cpd;
bool _established;
- int32_t _depth;
- lunchbox::a_int32_t _credits;
+
+ int32_t _depth; // Maximum sends in flight (writes & acks)
+ lunchbox::a_int32_t _writes; // Number of RDMA writes received
+ lunchbox::a_int32_t _acks; // Number of FC messages received
+ lunchbox::a_int32_t _credits; // Number of send credits available
+
unsigned int _completions;
- /* MR for setup and FC messages */
+ /* MR for setup and ack messages */
BufferPool _msgbuf;
/* source RDMA MR */
View
7 libs/co/udtConnection.cpp
@@ -19,9 +19,10 @@
#include "connectionDescription.h"
#include "global.h"
-#include "base/thread.h"
-#include "base/scopedMutex.h"
-#include "base/clock.h"
+
+#include <lunchbox/thread.h>
+#include <lunchbox/scopedMutex.h>
+#include <lunchbox/clock.h>
#ifdef _WIN32
# include <winsock2.h>

0 comments on commit a117da6

Please sign in to comment.
Something went wrong with that request. Please try again.