Skip to content

Commit

Permalink
Read multiple UDP packets per DaemonCore event loop pass. #7149
Browse files Browse the repository at this point in the history
Add ability to read multiple UDP packets in the main Driver() loop. Keep
receiving while there's data and at most MAX_UDP_MSGS_PER_CYCLE messages
have been processed. The default is 1, the old behavior. The command
handlers are dispatched directly from the main thread.
  • Loading branch information
JaimeFrey committed Jul 11, 2019
1 parent 5e044fb commit 5f40782
Show file tree
Hide file tree
Showing 2 changed files with 47 additions and 0 deletions.
1 change: 1 addition & 0 deletions src/condor_daemon_core.V6/condor_daemon_core.h
Expand Up @@ -1635,6 +1635,7 @@ class DaemonCore : public Service
int m_iMaxAcceptsPerCycle; ///< maximum number of inbound connections to accept per loop
int m_iMaxReapsPerCycle; // maximum number reapers to invoke per event loop
int m_MaxTimeSkip;
int m_iMaxUdpMsgsPerCycle; // max number of udp messages read per loop

void Inherit( void ); // called in main()
void InitDCCommandSocket( int command_port ); // called in main()
Expand Down
46 changes: 46 additions & 0 deletions src/condor_daemon_core.V6/daemon_core.cpp
Expand Up @@ -3008,6 +3008,12 @@ DaemonCore::reconfig(void) {
if( m_iMaxAcceptsPerCycle != 1 ) {
dprintf(D_FULLDEBUG,"Setting maximum accepts per cycle %d.\n", m_iMaxAcceptsPerCycle);
}

m_iMaxUdpMsgsPerCycle = param_integer("MAX_UDP_MSGS_PER_CYCLE", 1);
if( m_iMaxUdpMsgsPerCycle != 1 ) {
dprintf(D_FULLDEBUG,"Setting maximum UDP messages per cycle %d.\n", m_iMaxUdpMsgsPerCycle);
}

/*
Default value of MAX_REAPS_PER_CYCLE is 0 - a value of 0 means
call as many reapers as are waiting at the time we exit select.
Expand Down Expand Up @@ -3963,6 +3969,46 @@ DaemonCore::CallSocketHandler( int &i, bool default_to_HandleCommand )
{
unsigned int iAcceptCnt = ( m_iMaxAcceptsPerCycle > 0 ) ? m_iMaxAcceptsPerCycle: -1;

// Dispatch UDP commands directly
if ( (*sockTable)[i].handler==NULL && (*sockTable)[i].handlercpp==NULL &&
default_to_HandleCommand &&
(*sockTable)[i].iosock->type() == Stream::safe_sock ) {

unsigned msg_cnt = ( m_iMaxUdpMsgsPerCycle > 0 ) ? m_iMaxUdpMsgsPerCycle : -1;

// We don't care about the return value for UDP command sockets
HandleReq(i);
msg_cnt--;

// Make sure we didn't leak our priv state
CheckPrivState();

while ( msg_cnt ) {
Selector selector;
selector.set_timeout( 0, 0 );
selector.add_fd( (*sockTable)[i].iosock->get_file_desc(), Selector::IO_READ );
selector.execute();

if ( !selector.has_ready() ) {
// No more data, we're done
break;
}

if ( !(*sockTable)[i].iosock->handle_incoming_packet() )
{
// Looks like we got a fragment, try reading some more
continue;
}
// We don't care about the return value for UDP command sockets
HandleReq(i);
msg_cnt--;

// Make sure we didn't leak our priv state
CheckPrivState();
}
return;
}

// if it is an accepting socket it will try for the connect
// up (n) elements
while ( iAcceptCnt )
Expand Down

0 comments on commit 5f40782

Please sign in to comment.