From 0705a5defa284e289004daf61ea390338719d5fb Mon Sep 17 00:00:00 2001 From: Adam Compton Date: Mon, 25 Feb 2013 19:40:10 +0000 Subject: [PATCH] Improve metric summarization performance under heavy load When computing summary metrics, gmetad's current behavior is for each cluster thread to grab the summarization lock, peform all of its summarization, and then release the lock. This can block the global aggregation thread if a single cluster is taking a long time to summarize its RRDs for whatever reason; this issue manifests are dropouts in the cluster and grid summary RRDs. This also comes up if you have a large number of large clusters which each take a long time to summarize. The change here is to have each cluster thread perform its summarization into a "pending" hash table independently, and then once the summarization is complete each thread 1) grabs the lock, 2) swaps in the finished pending hash table with the fully- aggregated one, and then 3) releases the lock immediately. This significantly reduces the amount of time that the summarization lock is held by each cluster thread, and also eliminates the possibility of a single blocked cluster thread preventing all summary metrics from being written. --- gmetad/cleanup.c | 8 ++++--- gmetad/gmetad.h | 1 + gmetad/process_xml.c | 57 +++++++++++++++++++++++++++----------------- 3 files changed, 41 insertions(+), 25 deletions(-) diff --git a/gmetad/cleanup.c b/gmetad/cleanup.c index 22411720f..6faee9160 100644 --- a/gmetad/cleanup.c +++ b/gmetad/cleanup.c @@ -129,6 +129,7 @@ cleanup_source ( datum_t *key, datum_t *val, void *arg ) #if 0 unsigned int born; #endif + hash_t *metric_summary; /* Currently we never delete a source. */ @@ -136,12 +137,13 @@ cleanup_source ( datum_t *key, datum_t *val, void *arg ) cleanup.tv = tv; cleanup.key = 0; cleanup.hashval = 0; - while (hash_walkfrom(source->metric_summary, cleanup.hashval, + metric_summary = source->metric_summary; /* Just look at the current one */ + while (hash_walkfrom(metric_summary, cleanup.hashval, cleanup_metric, (void*) &cleanup)) { if (cleanup.key) { - cleanup.hashval = hashval(cleanup.key, source->metric_summary); - rv=hash_delete(cleanup.key, source->metric_summary); + cleanup.hashval = hashval(cleanup.key, metric_summary); + rv=hash_delete(cleanup.key, metric_summary); if (rv) datum_free(rv); cleanup.key=0; } diff --git a/gmetad/gmetad.h b/gmetad/gmetad.h index 0be347483..2d22150ac 100644 --- a/gmetad/gmetad.h +++ b/gmetad/gmetad.h @@ -168,6 +168,7 @@ typedef struct hash_t *authority; /* Null for a grid. */ short int authority_ptr; /* An authority URL. */ hash_t *metric_summary; + hash_t *metric_summary_pending; pthread_mutex_t *sum_finished; /* A lock held during summarization. */ data_source_list_t *ds; uint32_t hosts_up; diff --git a/gmetad/process_xml.c b/gmetad/process_xml.c index e40d4a9bd..3d7b823d5 100644 --- a/gmetad/process_xml.c +++ b/gmetad/process_xml.c @@ -203,30 +203,31 @@ startElement_GRID(void *data, const char *el, const char **attr) name); return 1; } + + source->metric_summary_pending = hash_create(DEFAULT_METRICSIZE); + if (!source->metric_summary_pending) + { + err_msg("Could not create pending summary hash for cluster %s", + name); + return 1; + } + source->ds = xmldata->ds; /* Initialize the partial sum lock */ source->sum_finished = (pthread_mutex_t *) malloc(sizeof(pthread_mutex_t)); pthread_mutex_init(source->sum_finished, NULL); - - /* Grab the "partial sum" mutex until we are finished - * summarizing. */ - pthread_mutex_lock(source->sum_finished); } else { /* Found Cluster. Put into our Source buffer in xmldata. */ memcpy(source, hash_datum->data, hash_datum->size); datum_free(hash_datum); - /* Grab the "partial sum" mutex until we are finished - * summarizing. Needs to be done asap.*/ - pthread_mutex_lock(source->sum_finished); - source->hosts_up = 0; source->hosts_down = 0; - hash_foreach(source->metric_summary, zero_out_summary, NULL); + hash_foreach(source->metric_summary_pending, zero_out_summary, NULL); } /* Edge has the same invariant as in fillmetric(). */ @@ -322,28 +323,30 @@ startElement_CLUSTER(void *data, const char *el, const char **attr) err_msg("Could not create summary hash for cluster %s", name); return 1; } + + source->metric_summary_pending = hash_create(DEFAULT_METRICSIZE); + if (!source->metric_summary_pending) + { + err_msg("Could not create pending summary hash for cluster %s", name); + return 1; + } + source->ds = xmldata->ds; /* Initialize the partial sum lock */ source->sum_finished = (pthread_mutex_t *) malloc(sizeof(pthread_mutex_t)); pthread_mutex_init(source->sum_finished, NULL); - - /* Grab the "partial sum" mutex until we are finished summarizing. */ - pthread_mutex_lock(source->sum_finished); } else { memcpy(source, hash_datum->data, hash_datum->size); datum_free(hash_datum); - /* We need this lock before zeroing metric sums. */ - pthread_mutex_lock(source->sum_finished); - source->hosts_up = 0; source->hosts_down = 0; - hash_foreach(source->metric_summary, zero_out_summary, NULL); + hash_foreach(source->metric_summary_pending, zero_out_summary, NULL); } /* Edge has the same invariant as in fillmetric(). */ @@ -680,7 +683,7 @@ startElement_METRIC(void *data, const char *el, const char **attr) /* Always update summary for numeric metrics. */ if (do_summary) { - summary = xmldata->source.metric_summary; + summary = xmldata->source.metric_summary_pending; hash_datum = hash_lookup(&hashkey, summary); if (!hash_datum) { @@ -920,7 +923,7 @@ startElement_METRICS(void *data, const char *el, const char **attr) } } - summary = xmldata->source.metric_summary; + summary = xmldata->source.metric_summary_pending; hash_datum = hash_lookup(&hashkey, summary); if (!hash_datum) { @@ -1137,9 +1140,14 @@ endElement_GRID(void *data, const char *el) if (authority_mode(xmldata)) { source = &xmldata->source; - summary = xmldata->source.metric_summary; - /* Release the partial sum mutex */ + /* Swap the metric_summary and metric_summary_pending over */ + pthread_mutex_lock(source->sum_finished); + { + summary = xmldata->source.metric_summary_pending; + xmldata->source.metric_summary_pending = xmldata->source.metric_summary; + xmldata->source.metric_summary = summary; + } pthread_mutex_unlock(source->sum_finished); hashkey.data = (void*) xmldata->sourcename; @@ -1175,9 +1183,14 @@ endElement_CLUSTER(void *data, const char *el) if (authority_mode(xmldata)) { source = &xmldata->source; - summary = xmldata->source.metric_summary; - /* Release the partial sum mutex */ + /* Swap the metric_summary and metric_summary_pending over */ + pthread_mutex_lock(source->sum_finished); + { + summary = xmldata->source.metric_summary_pending; + xmldata->source.metric_summary_pending = xmldata->source.metric_summary; + xmldata->source.metric_summary = summary; + } pthread_mutex_unlock(source->sum_finished); hashkey.data = (void*) xmldata->sourcename;