From 5f407822a2b16dff987b640f9bdceedd74840892 Mon Sep 17 00:00:00 2001 From: Jaime Frey Date: Thu, 11 Jul 2019 15:54:30 -0500 Subject: [PATCH] Read multiple UDP packets per DaemonCore event loop pass. #7149 Add ability to read multiple UDP packets in the main Driver() loop. Keep receiving while there's data and at most MAX_UDP_MSGS_PER_CYCLE messages have been processed. The default is 1, the old behavior. The command handlers are dispatched directly from the main thread. --- .../condor_daemon_core.h | 1 + src/condor_daemon_core.V6/daemon_core.cpp | 46 +++++++++++++++++++ 2 files changed, 47 insertions(+) diff --git a/src/condor_daemon_core.V6/condor_daemon_core.h b/src/condor_daemon_core.V6/condor_daemon_core.h index 6592e12db7d..0eda69171ba 100644 --- a/src/condor_daemon_core.V6/condor_daemon_core.h +++ b/src/condor_daemon_core.V6/condor_daemon_core.h @@ -1635,6 +1635,7 @@ class DaemonCore : public Service int m_iMaxAcceptsPerCycle; ///< maximum number of inbound connections to accept per loop int m_iMaxReapsPerCycle; // maximum number reapers to invoke per event loop int m_MaxTimeSkip; + int m_iMaxUdpMsgsPerCycle; // max number of udp messages read per loop void Inherit( void ); // called in main() void InitDCCommandSocket( int command_port ); // called in main() diff --git a/src/condor_daemon_core.V6/daemon_core.cpp b/src/condor_daemon_core.V6/daemon_core.cpp index 41311967ff0..ec0b50f185a 100644 --- a/src/condor_daemon_core.V6/daemon_core.cpp +++ b/src/condor_daemon_core.V6/daemon_core.cpp @@ -3008,6 +3008,12 @@ DaemonCore::reconfig(void) { if( m_iMaxAcceptsPerCycle != 1 ) { dprintf(D_FULLDEBUG,"Setting maximum accepts per cycle %d.\n", m_iMaxAcceptsPerCycle); } + + m_iMaxUdpMsgsPerCycle = param_integer("MAX_UDP_MSGS_PER_CYCLE", 1); + if( m_iMaxUdpMsgsPerCycle != 1 ) { + dprintf(D_FULLDEBUG,"Setting maximum UDP messages per cycle %d.\n", m_iMaxUdpMsgsPerCycle); + } + /* Default value of MAX_REAPS_PER_CYCLE is 0 - a value of 0 means call as many reapers as are waiting at the time we exit select. @@ -3963,6 +3969,46 @@ DaemonCore::CallSocketHandler( int &i, bool default_to_HandleCommand ) { unsigned int iAcceptCnt = ( m_iMaxAcceptsPerCycle > 0 ) ? m_iMaxAcceptsPerCycle: -1; + // Dispatch UDP commands directly + if ( (*sockTable)[i].handler==NULL && (*sockTable)[i].handlercpp==NULL && + default_to_HandleCommand && + (*sockTable)[i].iosock->type() == Stream::safe_sock ) { + + unsigned msg_cnt = ( m_iMaxUdpMsgsPerCycle > 0 ) ? m_iMaxUdpMsgsPerCycle : -1; + + // We don't care about the return value for UDP command sockets + HandleReq(i); + msg_cnt--; + + // Make sure we didn't leak our priv state + CheckPrivState(); + + while ( msg_cnt ) { + Selector selector; + selector.set_timeout( 0, 0 ); + selector.add_fd( (*sockTable)[i].iosock->get_file_desc(), Selector::IO_READ ); + selector.execute(); + + if ( !selector.has_ready() ) { + // No more data, we're done + break; + } + + if ( !(*sockTable)[i].iosock->handle_incoming_packet() ) + { + // Looks like we got a fragment, try reading some more + continue; + } + // We don't care about the return value for UDP command sockets + HandleReq(i); + msg_cnt--; + + // Make sure we didn't leak our priv state + CheckPrivState(); + } + return; + } + // if it is an accepting socket it will try for the connect // up (n) elements while ( iAcceptCnt )