Skip to content

Commit

Permalink
Make Gmond TCP accept channel listen in seperate thread
Browse files Browse the repository at this point in the history
Gmond can appear to hang when responding to TCP requests for XML
data. This can happen if gmond is busy trying to process a lot
of UDP metric data that has been sent to the agent in one go.
The TCP requests are now serviced by a dedicated thread that
responds almost immediately.
  • Loading branch information
satterly committed Sep 17, 2012
1 parent 719e700 commit 74cee73
Showing 1 changed file with 82 additions and 12 deletions.
94 changes: 82 additions & 12 deletions gmond/gmond.c
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@
#include <apr_poll.h>
#include <apr_network_io.h>
#include <apr_signal.h>
#include <apr_thread_proc.h> /* for apr_proc_detach(). no threads used. */
#include <apr_thread_proc.h>
#include <apr_tables.h>
#include <apr_dso.h>
#include <apr_version.h>
Expand Down Expand Up @@ -146,8 +146,9 @@ struct Ganglia_channel {
};
typedef struct Ganglia_channel Ganglia_channel;

/* This pollset holds the tcp_accept and udp_recv channels */
apr_pollset_t *listen_channels = NULL;
/* Two separate pollsets hold the tcp_accept and udp_recv channels */
apr_pollset_t *udp_listen_channels = NULL;
apr_pollset_t *tcp_listen_channels = NULL;

/* These are the TCP listen channels */
apr_socket_t **tcp_sockets = NULL;
Expand Down Expand Up @@ -253,8 +254,10 @@ reload_ganglia_configuration(void)
int i = 0;
char *gmond_bin = gmond_argv[0];

if(listen_channels != NULL)
apr_pollset_destroy(listen_channels);
if(udp_listen_channels != NULL)
apr_pollset_destroy(udp_listen_channels);
if(tcp_listen_channels != NULL)
apr_pollset_destroy(tcp_listen_channels);
if(tcp_sockets != NULL)
for(i = 0; tcp_sockets[i] != 0; i++)
apr_socket_close(tcp_sockets[i]);
Expand Down Expand Up @@ -604,7 +607,14 @@ setup_listen_channels_pollset( void )
pollset_opts = APR_POLLSET_THREADSAFE;
}
#endif
if((status = apr_pollset_create(&listen_channels, total_listen_channels, global_context, pollset_opts)) != APR_SUCCESS)
if((status = apr_pollset_create(&udp_listen_channels, num_udp_recv_channels, global_context, pollset_opts)) != APR_SUCCESS)
{
char apr_err[512];
apr_strerror(status, apr_err, 511);
err_msg("apr_pollset_create failed: %s", apr_err);
exit(1);
}
if((status = apr_pollset_create(&tcp_listen_channels, num_tcp_accept_channels, global_context, pollset_opts)) != APR_SUCCESS)
{
char apr_err[512];
apr_strerror(status, apr_err, 511);
Expand Down Expand Up @@ -780,7 +790,7 @@ setup_listen_channels_pollset( void )
socket_pollfd.client_data = channel;

/* Add the socket to the pollset */
status = apr_pollset_add(listen_channels, &socket_pollfd);
status = apr_pollset_add(udp_listen_channels, &socket_pollfd);
if(status != APR_SUCCESS)
{
err_msg("Failed to add socket to pollset. Exiting.\n");
Expand Down Expand Up @@ -851,7 +861,7 @@ setup_listen_channels_pollset( void )
socket_pollfd.client_data = channel;

/* Add the socket to the pollset */
status = apr_pollset_add(listen_channels, &socket_pollfd);
status = apr_pollset_add(tcp_listen_channels, &socket_pollfd);
if(status != APR_SUCCESS)
{
err_msg("Failed to add socket to pollset. Exiting.\n");
Expand Down Expand Up @@ -1876,15 +1886,15 @@ process_tcp_accept_channel(const apr_pollfd_t *desc, apr_time_t now)


static void
poll_listen_channels( apr_interval_time_t timeout, apr_time_t now)
poll_udp_listen_channels( apr_interval_time_t timeout, apr_time_t now)
{
apr_status_t status;
const apr_pollfd_t *descs = NULL;
apr_int32_t num = 0;
apr_int32_t i;

/* Poll for incoming data */
status = apr_pollset_poll(listen_channels, timeout, &num, &descs);
/* Poll for incoming UDP data */
status = apr_pollset_poll(udp_listen_channels, timeout, &num, &descs);
if (status != APR_SUCCESS && status != APR_TIMEUP) {
char buff[128];
debug_msg("apr_pollset_poll returned unexpected status %d = %s\n",
Expand All @@ -1901,6 +1911,34 @@ poll_listen_channels( apr_interval_time_t timeout, apr_time_t now)
process_udp_recv_channel(descs+i, now);
udp_last_heard = apr_time_now();
break;
default:
continue;
}
}
}

static void
poll_tcp_listen_channels( apr_interval_time_t timeout, apr_time_t now)
{
apr_status_t status;
const apr_pollfd_t *descs = NULL;
apr_int32_t num = 0;
apr_int32_t i;

/* Poll for incoming TCP requests */
status = apr_pollset_poll(tcp_listen_channels, timeout, &num, &descs);
if (status != APR_SUCCESS && status != APR_TIMEUP) {
char buff[128];
debug_msg("apr_pollset_poll returned unexpected status %d = %s\n",
status, apr_strerror(status, buff, 128));
return;
}

for(i = 0; i< num ; i++)
{
Ganglia_channel *channel = descs[i].client_data;
switch( channel->type )
{
case TCP_ACCEPT_CHANNEL:
process_tcp_accept_channel(descs+i, now);
break;
Expand Down Expand Up @@ -2958,6 +2996,31 @@ void sig_handler(int i)
}
}

static void* APR_THREAD_FUNC tcp_listener(apr_thread_t *thd, void *data)
{
apr_time_t now;
apr_interval_time_t wait = 1000;

debug_msg("Starting TCP listener thread...");
for(;!done;)
{
if(!deaf)
{
now = apr_time_now();
/* Pull in incoming data */
poll_tcp_listen_channels(wait, now);
}
else
{
apr_sleep( wait );
}

}
apr_thread_exit(thd, APR_SUCCESS);

return NULL;
}

int
main ( int argc, char *argv[] )
{
Expand Down Expand Up @@ -3073,6 +3136,13 @@ main ( int argc, char *argv[] )
/* Initialize time variables */
udp_last_heard = last_cleanup = next_collection = now = apr_time_now();

/* Create TCP listener thread */
apr_thread_t *thread;
if (apr_thread_create(&thread, NULL, tcp_listener, NULL, global_context) != APR_SUCCESS)
{
debug_msg("Thread create error");
}

/* Loop */
for(;!done;)
{
Expand All @@ -3081,7 +3151,7 @@ main ( int argc, char *argv[] )
if(!deaf)
{
/* Pull in incoming data */
poll_listen_channels(wait, now);
poll_udp_listen_channels(wait, now);
}
else
{
Expand Down

0 comments on commit 74cee73

Please sign in to comment.