Skip to content
Browse files

Adding gzipped-XML-tree support to gmond, gmetad, and gmetad-python

This patch does the following:
- Adds a command-line parameter for gmond, "-z", which causes it to
  emit its output XML trees as a gzip stream instead of as plaintext
- Changes gmetad and gmetad-python to automatically detect a
  gzipped data stream from a gmond and handle it appropriately
  • Loading branch information...
1 parent 20ac458 commit 23b6f573aa01ebdca2b167cafc7e685cd4d5ca28 @comptonqc comptonqc committed Jan 28, 2013
Showing with 294 additions and 2 deletions.
  1. +18 −0 configure.ac
  2. +7 −0 gmetad-python/Gmetad/gmetad_gmondReader.py
  3. +92 −0 gmetad/data_thread.c
  4. +18 −1 gmond/cmdline.c.in
  5. +3 −0 gmond/cmdline.h
  6. +1 −0 gmond/cmdline.sh
  7. +155 −1 gmond/gmond.c
View
18 configure.ac
@@ -595,6 +595,24 @@ else
fi
AM_CONDITIONAL(HAVE_SYSTEMD, [test -n "$with_systemdsystemunitdir" -a "x$with_systemdsystemunitdir" != xno ])
+echo
+echo Checking for zlib
+AC_ARG_WITH([zlib],
+ AS_HELP_STRING([--with-zlib=DIR], [Specify location for zlib]),
+ [if test x"$withval" != xno; then libzlib="yes"; libzlibpath="$withval"; fi])
+if test x"$libzlibpath" != x && test x"$libzlibpath" != xyes; then
+ CFLAGS="$CFLAGS -I$libzlibpath/include"
+ CPPFLAGS="$CPPFLAGS -I$libzlibpath/include"
+ LDFLAGS="$LDFLAGS -L$libzlibpath/${LIB_SUFFIX}"
+ echo "Added -I$libzlibpath/include to CFLAGS and CPPFLAGS"
+ echo "Added -L$libzlibpath/${LIB_SUFFIX} to LDFLAGS"
+fi
+AC_CHECK_HEADERS([zlib.h])
+AC_CHECK_LIB(z, deflate)
+if test x"$ac_cv_lib_z_deflate" != xyes; then
+ echo "zlib library not configured properly"; exit 1;
+fi
+echo "Found a suitable zlib"
echo
View
7 gmetad-python/Gmetad/gmetad_gmondReader.py
@@ -36,6 +36,7 @@
import socket
import time
import logging
+import zlib
from gmetad_config import GmetadConfig, getConfig
from gmetad_random import getRandomInterval
@@ -135,6 +136,12 @@ def run(self):
break
xmlbuf += buf
sock.close()
+
+ # These are the gzip header magic numbers, per RFC 1952 section 2.3.1
+ if xmlbuf[0:2] == '\x1f\x8b':
+ # 32 is a magic number in zlib.h for autodetecting the zlib or gzip header
+ xmlbuf = zlib.decompress(xmlbuf, zlib.MAX_WBITS + 32)
+
# Create an XML parser and parse the buffer
gch = GmondContentHandler()
xml.sax.parseString(xmlbuf, gch)
View
92 gmetad/data_thread.c
@@ -6,6 +6,7 @@
#include <sys/time.h>
#include <gmetad.h>
#include <string.h>
+#include <zlib.h>
#include <apr_time.h>
@@ -190,6 +191,97 @@ data_thread ( void *arg )
}
}
+ /* These are the gzip header magic numbers, per RFC 1952 section 2.3.1 */
+ if(read_index > 2 && (unsigned char)buf[0] == 0x1f && (unsigned char)buf[1] == 0x8b)
+ {
+ /* Uncompress the buffer */
+ int ret;
+ z_stream strm;
+ char * uncompressed;
+ unsigned int write_index = 0;
+
+ if( get_debug_msg_level() > 1 )
+ {
+ err_msg("GZIP compressed data for [%s] data source, %d bytes", d->name, read_index);
+ }
+
+ uncompressed = malloc(buf_size);
+ if( !uncompressed )
+ {
+ err_quit("data_thread() unable to malloc enough room for [%s] GZIP", d->name);
+ }
+
+ strm.zalloc = NULL;
+ strm.zfree = NULL;
+ strm.opaque = NULL;
+ strm.next_in = (Bytef *)buf;
+ strm.avail_in = read_index;
+
+ /* Initialize the stream, 15 and 16 are magic numbers (gzip and max window size) */
+ ret = inflateInit2(&strm, 15 + 16);
+ if( ret != Z_OK )
+ {
+ err_msg("InflateInitError! for [%s] data source, failed to call inflateInit", d->name);
+ d->dead = 1;
+
+ free(buf);
+ buf = uncompressed;
+ goto take_a_break;
+ }
+
+ while (1)
+ {
+ /* Create more buffer space if needed */
+ if ( (write_index + 2048) > buf_size)
+ {
+ buf_size += 2048;
+ uncompressed = realloc(uncompressed, buf_size);
+ if(!uncompressed)
+ {
+ err_quit("data_thread() unable to realloc enough room for [%s] GZIP", d->name) ;
+ }
+ }
+
+ /* Do the inflate */
+ strm.next_out = (Bytef *)(uncompressed + write_index);
+ strm.avail_out = buf_size - write_index - 1;
+
+ ret = inflate(&strm, Z_FINISH);
+ write_index = strm.total_out;
+
+ if (ret == Z_OK || ret == Z_BUF_ERROR)
+ {
+ /* These are normal - just continue on */
+ continue;
+ }
+ else if( ret == Z_STREAM_END )
+ {
+ /* We have finished, set things up for the XML parser */
+ free (buf);
+ buf = uncompressed;
+ read_index = write_index;
+ if(get_debug_msg_level() > 1)
+ {
+ err_msg("Uncompressed to %d bytes", read_index);
+ }
+ break;
+ }
+ else
+ {
+ /* Oh dear, something bad */
+ inflateEnd(&strm);
+
+ err_msg("InflateError! for [%s] data source, failed to call inflate (%s)", d->name, zError(ret));
+ d->dead = 1;
+
+ free(buf);
+ buf = uncompressed;
+ goto take_a_break;
+ }
+ }
+ inflateEnd(&strm);
+ }
+
buf[read_index] = '\0';
/* Parse the buffer */
View
19 gmond/cmdline.c.in
@@ -43,6 +43,7 @@ const char *gengetopt_args_info_help[] = {
" -b, --bandwidth Calculate minimum bandwidth use for configuration \n (default=off)",
" -r, --convert=STRING Convert a 2.5.x configuration file to the new 3.x \n format",
" -p, --pid-file=STRING Write process-id to file",
+ " -z, --gzip-output Compress output with gzip before sending \n (default=off)",
0
};
@@ -79,6 +80,7 @@ void clear_given (struct gengetopt_args_info *args_info)
args_info->bandwidth_given = 0 ;
args_info->convert_given = 0 ;
args_info->pid_file_given = 0 ;
+ args_info->gzip_output_given = 0 ;
}
static
@@ -99,6 +101,7 @@ void clear_args (struct gengetopt_args_info *args_info)
args_info->convert_orig = NULL;
args_info->pid_file_arg = NULL;
args_info->pid_file_orig = NULL;
+ args_info->gzip_output_flag = 0;
}
@@ -118,6 +121,7 @@ void init_args_info(struct gengetopt_args_info *args_info)
args_info->bandwidth_help = gengetopt_args_info_help[8] ;
args_info->convert_help = gengetopt_args_info_help[9] ;
args_info->pid_file_help = gengetopt_args_info_help[10] ;
+ args_info->gzip_output_help = gengetopt_args_info_help[11] ;
}
@@ -259,6 +263,8 @@ cmdline_parser_dump(FILE *outfile, struct gengetopt_args_info *args_info)
write_into_file(outfile, "convert", args_info->convert_orig, 0);
if (args_info->pid_file_given)
write_into_file(outfile, "pid-file", args_info->pid_file_orig, 0);
+ if (args_info->gzip_output_given)
+ write_into_file(outfile, "gzip-output", 0, 0 );
i = EXIT_SUCCESS;
@@ -524,10 +530,11 @@ cmdline_parser_internal (
{ "bandwidth", 0, NULL, 'b' },
{ "convert", 1, NULL, 'r' },
{ "pid-file", 1, NULL, 'p' },
+ { "gzip-output", 0, NULL, 'z' },
{ 0, 0, 0, 0 }
};
- c = getopt_long (argc, argv, "hVc:l:d:ftmbr:p:", long_options, &option_index);
+ c = getopt_long (argc, argv, "hVc:l:d:ftmbr:p:z", long_options, &option_index);
if (c == -1) break; /* Exit from `while (1)' loop. */
@@ -643,6 +650,16 @@ cmdline_parser_internal (
goto failure;
break;
+ case 'z': /* Compress output with gzip before sending. */
+
+
+ if (update_arg((void *)&(args_info->gzip_output_flag), 0, &(args_info->gzip_output_given),
+ &(local_args_info.gzip_output_given), optarg, 0, 0, ARG_FLAG,
+ check_ambiguity, override, 1, 0, "gzip-output", 'z',
+ additional_error))
+ goto failure;
+
+ break;
case 0: /* Long option with no short option */
case '?': /* Invalid option. */
View
3 gmond/cmdline.h
@@ -62,6 +62,8 @@ struct gengetopt_args_info
char * pid_file_arg; /**< @brief Write process-id to file. */
char * pid_file_orig; /**< @brief Write process-id to file original value given at command line. */
const char *pid_file_help; /**< @brief Write process-id to file help description. */
+ unsigned int gzip_output_flag; /**< @brief Compress output with gzip before sending (default=off). */
+ const char *gzip_output_help; /**< @brief Compress output with gzip before sending help description. */
unsigned int help_given ; /**< @brief Whether help was given. */
unsigned int version_given ; /**< @brief Whether version was given. */
@@ -74,6 +76,7 @@ struct gengetopt_args_info
unsigned int bandwidth_given ; /**< @brief Whether bandwidth was given. */
unsigned int convert_given ; /**< @brief Whether convert was given. */
unsigned int pid_file_given ; /**< @brief Whether pid-file was given. */
+ unsigned int gzip_output_given ; /**< @brief Whether gzip-output was given. */
} ;
View
1 gmond/cmdline.sh
@@ -15,6 +15,7 @@ option "metrics" m "Print the list of metrics this gmond supports" flag off
option "bandwidth" b "Calculate minimum bandwidth use for configuration" flag off
option "convert" r "Convert a 2.5.x configuration file to the new 3.x format" string no
option "pid-file" p "Write process-id to file" string no
+option "gzip-output" z "Compress output with gzip before sending" flag off
#Usage (a little tutorial)
#
View
156 gmond/gmond.c
@@ -22,6 +22,7 @@
#ifdef LINUX
#include <sys/utsname.h>
#endif
+#include <zlib.h>
#include <apr.h>
#include <apr_strings.h>
@@ -60,6 +61,9 @@
before retry. Specified in seconds */
#define RETRY_BIND_DELAY 60
+/* The key in the apr_socket_t struct where our gzipped data is stored */
+#define GZIP_KEY "gzip"
+
/* When this gmond was started */
apr_time_t started;
/* My name */
@@ -226,7 +230,7 @@ extern char **environ;
/* apr_socket_send can't assure all characters in buf been sent. */
static apr_status_t
-socket_send(apr_socket_t *sock, const char *buf, apr_size_t *len)
+socket_send_raw(apr_socket_t *sock, const char *buf, apr_size_t *len)
{
apr_size_t total = *len;
apr_size_t thisTime = total;
@@ -248,6 +252,52 @@ socket_send(apr_socket_t *sock, const char *buf, apr_size_t *len)
return ret;
}
+/* wrap socket_send_raw with gzip deflate if enabled. */
+static apr_status_t
+socket_send(apr_socket_t *sock, const char *buf, apr_size_t *len)
+{
+ char outputbuffer[2048];
+ const int outputlen = sizeof(outputbuffer);
+ apr_size_t wlen;
+ apr_status_t ret;
+ z_stream *strm;
+ int z_ret;
+
+ ret = apr_socket_data_get((void**)&strm, GZIP_KEY, sock);
+ if (ret != APR_SUCCESS)
+ {
+ ret = socket_send_raw( sock, buf, len );
+ }
+ else
+ {
+ strm->next_in = (Bytef *)buf;
+ strm->avail_in = *len;
+
+ while( strm->avail_in )
+ {
+ strm->next_out = (Bytef *)outputbuffer;
+ strm->avail_out = outputlen;
+
+ z_ret = deflate( strm, 0 );
+ if (z_ret != Z_OK)
+ {
+ return APR_ENOMEM;
+ }
+
+ wlen = outputlen - strm->avail_out;
+ if( wlen )
+ {
+ ret = socket_send_raw( sock, outputbuffer, &wlen );
+ if(ret != APR_SUCCESS)
+ {
+ return ret;
+ }
+ }
+ }
+ }
+ return ret;
+}
+
/* Reload the Ganglia configuration */
void
reload_ganglia_configuration(void)
@@ -1533,6 +1583,86 @@ process_udp_recv_channel(const apr_pollfd_t *desc, apr_time_t now)
return;
}
+static z_stream *
+zstream_new()
+{
+ int err;
+
+ z_stream *strm = malloc(sizeof(z_stream));
+ if (strm == 0)
+ {
+ return NULL;
+ }
+
+ strm->next_in = 0;
+ strm->avail_in = 0;
+ strm->next_out = 0;
+ strm->avail_out = 0;
+ strm->zalloc = 0;
+ strm->zfree = 0;
+ strm->opaque = 0;
+
+ /* Yes, 15 + 16 are 2 special magic values documented in zlib.h */
+ err = deflateInit2(strm, Z_DEFAULT_COMPRESSION, Z_DEFLATED, 15 + 16, 8, Z_DEFAULT_STRATEGY);
+ if (err != Z_OK)
+ {
+ free( strm );
+ return NULL;
+ }
+
+ return strm;
+}
+
+static apr_status_t
+socket_flush( apr_socket_t *client )
+{
+ char outputbuffer[2048];
+ const int outputlen = sizeof(outputbuffer);
+ int ret;
+ int status;
+ apr_size_t wlen;
+ z_stream *strm;
+
+ if (!args_info.gzip_output_flag)
+ {
+ return APR_SUCCESS;
+ }
+
+ if (APR_SUCCESS == apr_socket_data_get((void**)&strm, GZIP_KEY, client))
+ {
+ while( 1 )
+ {
+ strm->next_out = (Bytef *)outputbuffer;
+ strm->avail_out = outputlen;
+
+ ret = deflate( strm, Z_FINISH );
+ if (ret != Z_OK && ret != Z_STREAM_END)
+ {
+ return APR_ENOMEM;
+ }
+
+ wlen = outputlen - strm->avail_out;
+ status = socket_send_raw( client, outputbuffer, &wlen );
+ if(status != APR_SUCCESS)
+ return status;
+
+ if(ret == Z_STREAM_END)
+ return APR_SUCCESS;
+ }
+ }
+ return APR_SUCCESS;
+}
+
+static void
+zstream_destroy( z_stream *strm )
+{
+ if (strm)
+ {
+ deflateEnd(strm);
+ free (strm);
+ }
+}
+
static apr_status_t
print_xml_header( apr_socket_t *client )
{
@@ -1842,6 +1972,23 @@ process_tcp_accept_channel(const apr_pollfd_t *desc, apr_time_t now)
if(Ganglia_acl_action( channel->acl, remotesa ) != GANGLIA_ACCESS_ALLOW)
goto close_accept_socket;
+ if (args_info.gzip_output_flag)
+ {
+ z_stream *strm = zstream_new();
+ if (strm == NULL)
+ {
+ debug_msg("failed to allocate gzip stream");
+ goto close_accept_socket;
+ }
+ apr_status_t r = apr_socket_data_set(client, strm, GZIP_KEY, &zstream_destroy);
+ if (r != APR_SUCCESS)
+ {
+ debug_msg("failed to set socket user data");
+ zstream_destroy(strm);
+ goto close_accept_socket;
+ }
+ }
+
/* Print the DTD, GANGLIA_XML and CLUSTER tags */
status = print_xml_header(client);
if(status != APR_SUCCESS)
@@ -1906,6 +2053,13 @@ process_tcp_accept_channel(const apr_pollfd_t *desc, apr_time_t now)
/* Close the CLUSTER and GANGLIA_XML tags */
print_xml_footer(client);
+ status = socket_flush( client );
+ if (status != APR_SUCCESS)
+ {
+ debug_msg("failed to finish compressing stream; returned '%d'",status);
+ goto close_accept_socket;
+ }
+
/* Close down the accepted socket */
close_accept_socket:
apr_socket_shutdown(client, APR_SHUTDOWN_READ);

0 comments on commit 23b6f57

Please sign in to comment.
Something went wrong with that request. Please try again.