Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP

Loading…

Adding gzipped-XML-tree support to gmond, gmetad, and gmetad-python #80

Merged
merged 1 commit into from

2 participants

@comptonqc

This patch does the following:

  • Adds a command-line parameter for gmond, "-z", which causes it to emit its output XML trees as a gzip stream instead of as plaintext
  • Changes gmetad and gmetad-python to automatically detect a gzipped data stream from a gmond and handle it appropriately
@comptonqc comptonqc Adding gzipped-XML-tree support to gmond, gmetad, and gmetad-python
This patch does the following:
- Adds a command-line parameter for gmond, "-z", which causes it to
  emit its output XML trees as a gzip stream instead of as plaintext
- Changes gmetad and gmetad-python to automatically detect a
  gzipped data stream from a gmond and handle it appropriately
23b6f57
@jbuchbinder jbuchbinder merged commit f2d7998 into from
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Commits on Jan 28, 2013
  1. @comptonqc

    Adding gzipped-XML-tree support to gmond, gmetad, and gmetad-python

    comptonqc authored
    This patch does the following:
    - Adds a command-line parameter for gmond, "-z", which causes it to
      emit its output XML trees as a gzip stream instead of as plaintext
    - Changes gmetad and gmetad-python to automatically detect a
      gzipped data stream from a gmond and handle it appropriately
This page is out of date. Refresh to see the latest.
View
18 configure.ac
@@ -595,6 +595,24 @@ else
fi
AM_CONDITIONAL(HAVE_SYSTEMD, [test -n "$with_systemdsystemunitdir" -a "x$with_systemdsystemunitdir" != xno ])
+echo
+echo Checking for zlib
+AC_ARG_WITH([zlib],
+ AS_HELP_STRING([--with-zlib=DIR], [Specify location for zlib]),
+ [if test x"$withval" != xno; then libzlib="yes"; libzlibpath="$withval"; fi])
+if test x"$libzlibpath" != x && test x"$libzlibpath" != xyes; then
+ CFLAGS="$CFLAGS -I$libzlibpath/include"
+ CPPFLAGS="$CPPFLAGS -I$libzlibpath/include"
+ LDFLAGS="$LDFLAGS -L$libzlibpath/${LIB_SUFFIX}"
+ echo "Added -I$libzlibpath/include to CFLAGS and CPPFLAGS"
+ echo "Added -L$libzlibpath/${LIB_SUFFIX} to LDFLAGS"
+fi
+AC_CHECK_HEADERS([zlib.h])
+AC_CHECK_LIB(z, deflate)
+if test x"$ac_cv_lib_z_deflate" != xyes; then
+ echo "zlib library not configured properly"; exit 1;
+fi
+echo "Found a suitable zlib"
echo
View
7 gmetad-python/Gmetad/gmetad_gmondReader.py
@@ -36,6 +36,7 @@
import socket
import time
import logging
+import zlib
from gmetad_config import GmetadConfig, getConfig
from gmetad_random import getRandomInterval
@@ -135,6 +136,12 @@ def run(self):
break
xmlbuf += buf
sock.close()
+
+ # These are the gzip header magic numbers, per RFC 1952 section 2.3.1
+ if xmlbuf[0:2] == '\x1f\x8b':
+ # 32 is a magic number in zlib.h for autodetecting the zlib or gzip header
+ xmlbuf = zlib.decompress(xmlbuf, zlib.MAX_WBITS + 32)
+
# Create an XML parser and parse the buffer
gch = GmondContentHandler()
xml.sax.parseString(xmlbuf, gch)
View
92 gmetad/data_thread.c
@@ -6,6 +6,7 @@
#include <sys/time.h>
#include <gmetad.h>
#include <string.h>
+#include <zlib.h>
#include <apr_time.h>
@@ -190,6 +191,97 @@ data_thread ( void *arg )
}
}
+ /* These are the gzip header magic numbers, per RFC 1952 section 2.3.1 */
+ if(read_index > 2 && (unsigned char)buf[0] == 0x1f && (unsigned char)buf[1] == 0x8b)
+ {
+ /* Uncompress the buffer */
+ int ret;
+ z_stream strm;
+ char * uncompressed;
+ unsigned int write_index = 0;
+
+ if( get_debug_msg_level() > 1 )
+ {
+ err_msg("GZIP compressed data for [%s] data source, %d bytes", d->name, read_index);
+ }
+
+ uncompressed = malloc(buf_size);
+ if( !uncompressed )
+ {
+ err_quit("data_thread() unable to malloc enough room for [%s] GZIP", d->name);
+ }
+
+ strm.zalloc = NULL;
+ strm.zfree = NULL;
+ strm.opaque = NULL;
+ strm.next_in = (Bytef *)buf;
+ strm.avail_in = read_index;
+
+ /* Initialize the stream, 15 and 16 are magic numbers (gzip and max window size) */
+ ret = inflateInit2(&strm, 15 + 16);
+ if( ret != Z_OK )
+ {
+ err_msg("InflateInitError! for [%s] data source, failed to call inflateInit", d->name);
+ d->dead = 1;
+
+ free(buf);
+ buf = uncompressed;
+ goto take_a_break;
+ }
+
+ while (1)
+ {
+ /* Create more buffer space if needed */
+ if ( (write_index + 2048) > buf_size)
+ {
+ buf_size += 2048;
+ uncompressed = realloc(uncompressed, buf_size);
+ if(!uncompressed)
+ {
+ err_quit("data_thread() unable to realloc enough room for [%s] GZIP", d->name) ;
+ }
+ }
+
+ /* Do the inflate */
+ strm.next_out = (Bytef *)(uncompressed + write_index);
+ strm.avail_out = buf_size - write_index - 1;
+
+ ret = inflate(&strm, Z_FINISH);
+ write_index = strm.total_out;
+
+ if (ret == Z_OK || ret == Z_BUF_ERROR)
+ {
+ /* These are normal - just continue on */
+ continue;
+ }
+ else if( ret == Z_STREAM_END )
+ {
+ /* We have finished, set things up for the XML parser */
+ free (buf);
+ buf = uncompressed;
+ read_index = write_index;
+ if(get_debug_msg_level() > 1)
+ {
+ err_msg("Uncompressed to %d bytes", read_index);
+ }
+ break;
+ }
+ else
+ {
+ /* Oh dear, something bad */
+ inflateEnd(&strm);
+
+ err_msg("InflateError! for [%s] data source, failed to call inflate (%s)", d->name, zError(ret));
+ d->dead = 1;
+
+ free(buf);
+ buf = uncompressed;
+ goto take_a_break;
+ }
+ }
+ inflateEnd(&strm);
+ }
+
buf[read_index] = '\0';
/* Parse the buffer */
View
19 gmond/cmdline.c.in
@@ -43,6 +43,7 @@ const char *gengetopt_args_info_help[] = {
" -b, --bandwidth Calculate minimum bandwidth use for configuration \n (default=off)",
" -r, --convert=STRING Convert a 2.5.x configuration file to the new 3.x \n format",
" -p, --pid-file=STRING Write process-id to file",
+ " -z, --gzip-output Compress output with gzip before sending \n (default=off)",
0
};
@@ -79,6 +80,7 @@ void clear_given (struct gengetopt_args_info *args_info)
args_info->bandwidth_given = 0 ;
args_info->convert_given = 0 ;
args_info->pid_file_given = 0 ;
+ args_info->gzip_output_given = 0 ;
}
static
@@ -99,6 +101,7 @@ void clear_args (struct gengetopt_args_info *args_info)
args_info->convert_orig = NULL;
args_info->pid_file_arg = NULL;
args_info->pid_file_orig = NULL;
+ args_info->gzip_output_flag = 0;
}
@@ -118,6 +121,7 @@ void init_args_info(struct gengetopt_args_info *args_info)
args_info->bandwidth_help = gengetopt_args_info_help[8] ;
args_info->convert_help = gengetopt_args_info_help[9] ;
args_info->pid_file_help = gengetopt_args_info_help[10] ;
+ args_info->gzip_output_help = gengetopt_args_info_help[11] ;
}
@@ -259,6 +263,8 @@ cmdline_parser_dump(FILE *outfile, struct gengetopt_args_info *args_info)
write_into_file(outfile, "convert", args_info->convert_orig, 0);
if (args_info->pid_file_given)
write_into_file(outfile, "pid-file", args_info->pid_file_orig, 0);
+ if (args_info->gzip_output_given)
+ write_into_file(outfile, "gzip-output", 0, 0 );
i = EXIT_SUCCESS;
@@ -524,10 +530,11 @@ cmdline_parser_internal (
{ "bandwidth", 0, NULL, 'b' },
{ "convert", 1, NULL, 'r' },
{ "pid-file", 1, NULL, 'p' },
+ { "gzip-output", 0, NULL, 'z' },
{ 0, 0, 0, 0 }
};
- c = getopt_long (argc, argv, "hVc:l:d:ftmbr:p:", long_options, &option_index);
+ c = getopt_long (argc, argv, "hVc:l:d:ftmbr:p:z", long_options, &option_index);
if (c == -1) break; /* Exit from `while (1)' loop. */
@@ -643,6 +650,16 @@ cmdline_parser_internal (
goto failure;
break;
+ case 'z': /* Compress output with gzip before sending. */
+
+
+ if (update_arg((void *)&(args_info->gzip_output_flag), 0, &(args_info->gzip_output_given),
+ &(local_args_info.gzip_output_given), optarg, 0, 0, ARG_FLAG,
+ check_ambiguity, override, 1, 0, "gzip-output", 'z',
+ additional_error))
+ goto failure;
+
+ break;
case 0: /* Long option with no short option */
case '?': /* Invalid option. */
View
3  gmond/cmdline.h
@@ -62,6 +62,8 @@ struct gengetopt_args_info
char * pid_file_arg; /**< @brief Write process-id to file. */
char * pid_file_orig; /**< @brief Write process-id to file original value given at command line. */
const char *pid_file_help; /**< @brief Write process-id to file help description. */
+ unsigned int gzip_output_flag; /**< @brief Compress output with gzip before sending (default=off). */
+ const char *gzip_output_help; /**< @brief Compress output with gzip before sending help description. */
unsigned int help_given ; /**< @brief Whether help was given. */
unsigned int version_given ; /**< @brief Whether version was given. */
@@ -74,6 +76,7 @@ struct gengetopt_args_info
unsigned int bandwidth_given ; /**< @brief Whether bandwidth was given. */
unsigned int convert_given ; /**< @brief Whether convert was given. */
unsigned int pid_file_given ; /**< @brief Whether pid-file was given. */
+ unsigned int gzip_output_given ; /**< @brief Whether gzip-output was given. */
} ;
View
1  gmond/cmdline.sh
@@ -15,6 +15,7 @@ option "metrics" m "Print the list of metrics this gmond supports" flag off
option "bandwidth" b "Calculate minimum bandwidth use for configuration" flag off
option "convert" r "Convert a 2.5.x configuration file to the new 3.x format" string no
option "pid-file" p "Write process-id to file" string no
+option "gzip-output" z "Compress output with gzip before sending" flag off
#Usage (a little tutorial)
#
View
156 gmond/gmond.c
@@ -22,6 +22,7 @@
#ifdef LINUX
#include <sys/utsname.h>
#endif
+#include <zlib.h>
#include <apr.h>
#include <apr_strings.h>
@@ -60,6 +61,9 @@
before retry. Specified in seconds */
#define RETRY_BIND_DELAY 60
+/* The key in the apr_socket_t struct where our gzipped data is stored */
+#define GZIP_KEY "gzip"
+
/* When this gmond was started */
apr_time_t started;
/* My name */
@@ -226,7 +230,7 @@ extern char **environ;
/* apr_socket_send can't assure all characters in buf been sent. */
static apr_status_t
-socket_send(apr_socket_t *sock, const char *buf, apr_size_t *len)
+socket_send_raw(apr_socket_t *sock, const char *buf, apr_size_t *len)
{
apr_size_t total = *len;
apr_size_t thisTime = total;
@@ -248,6 +252,52 @@ socket_send(apr_socket_t *sock, const char *buf, apr_size_t *len)
return ret;
}
+/* wrap socket_send_raw with gzip deflate if enabled. */
+static apr_status_t
+socket_send(apr_socket_t *sock, const char *buf, apr_size_t *len)
+{
+ char outputbuffer[2048];
+ const int outputlen = sizeof(outputbuffer);
+ apr_size_t wlen;
+ apr_status_t ret;
+ z_stream *strm;
+ int z_ret;
+
+ ret = apr_socket_data_get((void**)&strm, GZIP_KEY, sock);
+ if (ret != APR_SUCCESS)
+ {
+ ret = socket_send_raw( sock, buf, len );
+ }
+ else
+ {
+ strm->next_in = (Bytef *)buf;
+ strm->avail_in = *len;
+
+ while( strm->avail_in )
+ {
+ strm->next_out = (Bytef *)outputbuffer;
+ strm->avail_out = outputlen;
+
+ z_ret = deflate( strm, 0 );
+ if (z_ret != Z_OK)
+ {
+ return APR_ENOMEM;
+ }
+
+ wlen = outputlen - strm->avail_out;
+ if( wlen )
+ {
+ ret = socket_send_raw( sock, outputbuffer, &wlen );
+ if(ret != APR_SUCCESS)
+ {
+ return ret;
+ }
+ }
+ }
+ }
+ return ret;
+}
+
/* Reload the Ganglia configuration */
void
reload_ganglia_configuration(void)
@@ -1533,6 +1583,86 @@ process_udp_recv_channel(const apr_pollfd_t *desc, apr_time_t now)
return;
}
+static z_stream *
+zstream_new()
+{
+ int err;
+
+ z_stream *strm = malloc(sizeof(z_stream));
+ if (strm == 0)
+ {
+ return NULL;
+ }
+
+ strm->next_in = 0;
+ strm->avail_in = 0;
+ strm->next_out = 0;
+ strm->avail_out = 0;
+ strm->zalloc = 0;
+ strm->zfree = 0;
+ strm->opaque = 0;
+
+ /* Yes, 15 + 16 are 2 special magic values documented in zlib.h */
+ err = deflateInit2(strm, Z_DEFAULT_COMPRESSION, Z_DEFLATED, 15 + 16, 8, Z_DEFAULT_STRATEGY);
+ if (err != Z_OK)
+ {
+ free( strm );
+ return NULL;
+ }
+
+ return strm;
+}
+
+static apr_status_t
+socket_flush( apr_socket_t *client )
+{
+ char outputbuffer[2048];
+ const int outputlen = sizeof(outputbuffer);
+ int ret;
+ int status;
+ apr_size_t wlen;
+ z_stream *strm;
+
+ if (!args_info.gzip_output_flag)
+ {
+ return APR_SUCCESS;
+ }
+
+ if (APR_SUCCESS == apr_socket_data_get((void**)&strm, GZIP_KEY, client))
+ {
+ while( 1 )
+ {
+ strm->next_out = (Bytef *)outputbuffer;
+ strm->avail_out = outputlen;
+
+ ret = deflate( strm, Z_FINISH );
+ if (ret != Z_OK && ret != Z_STREAM_END)
+ {
+ return APR_ENOMEM;
+ }
+
+ wlen = outputlen - strm->avail_out;
+ status = socket_send_raw( client, outputbuffer, &wlen );
+ if(status != APR_SUCCESS)
+ return status;
+
+ if(ret == Z_STREAM_END)
+ return APR_SUCCESS;
+ }
+ }
+ return APR_SUCCESS;
+}
+
+static void
+zstream_destroy( z_stream *strm )
+{
+ if (strm)
+ {
+ deflateEnd(strm);
+ free (strm);
+ }
+}
+
static apr_status_t
print_xml_header( apr_socket_t *client )
{
@@ -1842,6 +1972,23 @@ process_tcp_accept_channel(const apr_pollfd_t *desc, apr_time_t now)
if(Ganglia_acl_action( channel->acl, remotesa ) != GANGLIA_ACCESS_ALLOW)
goto close_accept_socket;
+ if (args_info.gzip_output_flag)
+ {
+ z_stream *strm = zstream_new();
+ if (strm == NULL)
+ {
+ debug_msg("failed to allocate gzip stream");
+ goto close_accept_socket;
+ }
+ apr_status_t r = apr_socket_data_set(client, strm, GZIP_KEY, &zstream_destroy);
+ if (r != APR_SUCCESS)
+ {
+ debug_msg("failed to set socket user data");
+ zstream_destroy(strm);
+ goto close_accept_socket;
+ }
+ }
+
/* Print the DTD, GANGLIA_XML and CLUSTER tags */
status = print_xml_header(client);
if(status != APR_SUCCESS)
@@ -1906,6 +2053,13 @@ process_tcp_accept_channel(const apr_pollfd_t *desc, apr_time_t now)
/* Close the CLUSTER and GANGLIA_XML tags */
print_xml_footer(client);
+ status = socket_flush( client );
+ if (status != APR_SUCCESS)
+ {
+ debug_msg("failed to finish compressing stream; returned '%d'",status);
+ goto close_accept_socket;
+ }
+
/* Close down the accepted socket */
close_accept_socket:
apr_socket_shutdown(client, APR_SHUTDOWN_READ);
Something went wrong with that request. Please try again.