Skip to content

Commit

Permalink
bridge: Put metrics interframe compression in CockpitMetrics
Browse files Browse the repository at this point in the history
That way it can be used by multiple metrics channel implementations

Closes #1677
Reviewed-by: Marius Vollmer <marius.vollmer@redhat.com>
  • Loading branch information
stefwalter authored and mvollmer committed Jan 21, 2015
1 parent 28c3871 commit a89447f
Show file tree
Hide file tree
Showing 6 changed files with 395 additions and 96 deletions.
7 changes: 7 additions & 0 deletions src/bridge/Makefile.am
Expand Up @@ -106,6 +106,7 @@ BRIDGE_CHECKS = \
test-package \
test-resource \
test-fs \
test-metrics \
test-pcp \
$(NULL)

Expand Down Expand Up @@ -162,6 +163,12 @@ test_fs_SOURCES = \
test_fs_CFLAGS = $(libcockpit_bridge_a_CFLAGS)
test_fs_LDADD = $(libcockpit_bridge_LIBS)

test_metrics_SOURCES = \
src/bridge/test-metrics.c \
src/bridge/mock-transport.c src/bridge/mock-transport.h
test_metrics_CFLAGS = $(libcockpit_bridge_a_CFLAGS)
test_metrics_LDADD = $(libcockpit_bridge_LIBS)

test_pcp_SOURCES = \
src/bridge/test-pcp.c \
src/bridge/mock-transport.c src/bridge/mock-transport.h
Expand Down
143 changes: 107 additions & 36 deletions src/bridge/cockpitmetrics.c
Expand Up @@ -28,6 +28,7 @@ struct _CockpitMetricsPrivate {
guint timeout;
gint64 next;
gint64 interval;
JsonArray *last;
};

G_DEFINE_ABSTRACT_TYPE (CockpitMetrics, cockpit_metrics, COCKPIT_TYPE_CHANNEL);
Expand Down Expand Up @@ -73,6 +74,12 @@ cockpit_metrics_dispose (GObject *object)
self->priv->timeout = 0;
}

if (self->priv->last)
{
json_array_unref (self->priv->last);
self->priv->last = NULL;
}

G_OBJECT_CLASS (cockpit_metrics_parent_class)->dispose (object);
}

Expand Down Expand Up @@ -172,10 +179,23 @@ send_object (CockpitMetrics *self,
g_bytes_unref (bytes);
}

/*
* cockpit_metrics_send_meta:
* @self: The CockpitMetrics
* @meta: An object containing metric meta data
*
* Send metrics meta data down the channel. If you use cockpit_metrics_send_data()
* then you must use this function instead of sending stuff on the channel directly.
*/
void
cockpit_metrics_send_meta (CockpitMetrics *self,
JsonObject *meta)
{
/* Cannot compress across meta message */
if (self->priv->last)
json_array_unref (self->priv->last);
self->priv->last = NULL;

send_object (self, meta);
}

Expand All @@ -199,54 +219,105 @@ send_array (CockpitMetrics *self,
g_bytes_unref (bytes);
}

void
cockpit_metrics_send_data (CockpitMetrics *self,
JsonArray *data)
static JsonArray *
push_array_at (JsonArray *array,
guint index,
JsonNode *node)
{
send_array (self, data);
}
if (array == NULL)
array = json_array_new ();

/* ---- */
g_assert (index >= json_array_get_length (array));

void
cockpit_compressed_array_builder_init (CockpitCompressedArrayBuilder *compr)
{
compr->array = NULL;
compr->n_skip = 0;
while (index > json_array_get_length (array))
json_array_add_null_element (array);

if (node)
json_array_add_element (array, node);

return array;
}

void
cockpit_compressed_array_builder_add (CockpitCompressedArrayBuilder *compr,
JsonNode *element)
static JsonArray *
interframe_compress_samples (JsonArray *last,
JsonArray *samples)
{
if (element == NULL)
compr->n_skip++;
else
JsonArray *output = NULL;
JsonArray *res = NULL;
JsonNode *a, *b;
JsonNode *node;
guint alen;
guint blen;
guint i;

if (last)
{
if (!compr->array)
compr->array = json_array_new ();
for (int i = 0; i < compr->n_skip; i++)
json_array_add_null_element (compr->array);
compr->n_skip = 0;
json_array_add_element (compr->array, element);
alen = json_array_get_length (last);
blen = json_array_get_length (samples);

for (i = 0; i < blen; i++)
{
a = NULL;
if (i < alen)
a = json_array_get_element (last, i);

b = json_array_get_element (samples, i);

if (a == NULL)
{
output = push_array_at (output, i, json_node_copy (b));
}
else if (json_node_get_node_type (a) == JSON_NODE_ARRAY &&
json_node_get_node_type (b) == JSON_NODE_ARRAY)
{
res = interframe_compress_samples (json_node_get_array (a),
json_node_get_array (b));
node = json_node_new (JSON_NODE_ARRAY);
json_node_take_array (node, res ? res : json_array_new ());
output = push_array_at (output, i, node);
}
else if (!cockpit_json_equal (a, b))
{
output = push_array_at (output, i, json_node_copy (b));
}
}
if (blen < alen)
{
output = push_array_at (output, blen, NULL);
}
}

return output;
}

/*
* cockpit_metrics_send_data:
* @self: The CockpitMetrics
* @data: An array of JSON arrays
*
* Send metrics data down the channel, possibly doing interframe
* compression between what was sent last. @data should no longer
* be modified by the caller.
*/
void
cockpit_compressed_array_builder_take_and_add_array (CockpitCompressedArrayBuilder *compr,
JsonArray *array)
cockpit_metrics_send_data (CockpitMetrics *self,
JsonArray *data)
{
JsonNode *node = json_node_alloc ();
json_node_init_array (node, array);
cockpit_compressed_array_builder_add (compr, node);
json_array_unref (array);
}
JsonArray *res;

JsonArray *
cockpit_compressed_array_builder_finish (CockpitCompressedArrayBuilder *compr)
{
if (compr->array)
return compr->array;
res = interframe_compress_samples (self->priv->last, data);

if (self->priv->last)
json_array_unref (self->priv->last);
self->priv->last = json_array_ref (data);

if (res)
{
send_array (self, res);
json_array_unref (res);
}
else
return json_array_new ();
{
send_array (self, data);
}
}
25 changes: 6 additions & 19 deletions src/bridge/cockpitmetrics.h
Expand Up @@ -30,20 +30,22 @@ G_BEGIN_DECLS
#define COCKPIT_METRICS_CLASS(k) (G_TYPE_CHECK_CLASS_CAST((k), COCKPIT_TYPE_METRICS, CockpitMetricsClass))
#define COCKPIT_METRICS_GET_CLASS(o) (G_TYPE_INSTANCE_GET_CLASS ((o), COCKPIT_TYPE_METRICS, CockpitMetricsClass))

typedef struct _CockpitMetrics CockpitMetrics;
typedef struct _CockpitMetricsClass CockpitMetricsClass;
typedef struct _CockpitMetricsPrivate CockpitMetricsPrivate;

typedef struct {
struct _CockpitMetrics {
CockpitChannel parent;
CockpitMetricsPrivate *priv;
} CockpitMetrics;
};

typedef struct {
struct _CockpitMetricsClass {
CockpitChannelClass parent_class;

void (* tick) (CockpitMetrics *metrics,
gint64 current_monotic_time);

} CockpitMetricsClass;
};

GType cockpit_metrics_get_type (void) G_GNUC_CONST;

Expand All @@ -60,19 +62,4 @@ void cockpit_metrics_send_meta (CockpitMetrics *self,
void cockpit_metrics_send_data (CockpitMetrics *self,
JsonArray *data);

typedef struct {
JsonArray *array;
int n_skip;
} CockpitCompressedArrayBuilder;

void cockpit_compressed_array_builder_init (CockpitCompressedArrayBuilder *compr);

void cockpit_compressed_array_builder_add (CockpitCompressedArrayBuilder *compr,
JsonNode *element);

void cockpit_compressed_array_builder_take_and_add_array (CockpitCompressedArrayBuilder *compr,
JsonArray *array);

JsonArray *cockpit_compressed_array_builder_finish (CockpitCompressedArrayBuilder *compr);

#endif /* COCKPIT_METRICS_H__ */
47 changes: 10 additions & 37 deletions src/bridge/cockpitpcpmetrics.c
Expand Up @@ -243,18 +243,6 @@ build_meta_if_necessary (CockpitPcpMetrics *self,
return build_meta (self, result);
}

static gboolean
result_value_equal (int valfmt,
pmValue *val1,
pmValue *val2)
{
if (valfmt == PM_VAL_INSITU)
return val1->value.lval == val2->value.lval;
else
return (val1->value.pval->vlen == val2->value.pval->vlen
&& memcmp (val1->value.pval, val2->value.pval, val1->value.pval->vlen) == 0);
}

static JsonNode *
build_sample (CockpitPcpMetrics *self,
pmResult *result,
Expand All @@ -276,7 +264,7 @@ build_sample (CockpitPcpMetrics *self,
if (info->desc.sem == PM_SEM_COUNTER && info->desc.type != PM_TYPE_STRING)
{
if (!self->last)
return NULL;
return json_node_new (JSON_NODE_NULL);

pmAtomValue old, new;
pmValue *last_value = &self->last->vset[metric]->vlist[instance];
Expand Down Expand Up @@ -308,13 +296,6 @@ build_sample (CockpitPcpMetrics *self,
}
else
{
if (self->last)
{
pmValue *last_value = &self->last->vset[metric]->vlist[instance];
if (result_value_equal (valfmt, value, last_value))
return NULL;
}

if (info->desc.type == PM_TYPE_STRING)
{
if (pmExtractValue (valfmt, value, PM_TYPE_STRING, &sample, PM_TYPE_STRING) < 0)
Expand Down Expand Up @@ -348,33 +329,31 @@ static JsonArray *
build_samples (CockpitPcpMetrics *self,
pmResult *result)
{
CockpitCompressedArrayBuilder samples;
CockpitCompressedArrayBuilder array;
JsonArray *output;
JsonArray *array;
pmValueSet *vs;
int i, j;

cockpit_compressed_array_builder_init (&samples);

output = json_array_new ();
for (i = 0; i < result->numpmid; i++)
{
vs = result->vset[i];

/* When negative numval is an error code ... we don't care */
if (vs->numval < 0)
cockpit_compressed_array_builder_add (&samples, NULL);
json_array_add_null_element (output);
else if (vs->numval == 1 && vs->vlist[0].inst == PM_IN_NULL)
cockpit_compressed_array_builder_add (&samples, build_sample (self, result, i, 0));
json_array_add_element (output, build_sample (self, result, i, 0));
else
{
cockpit_compressed_array_builder_init (&array);
array = json_array_new ();
for (j = 0; j < vs->numval; j++)
cockpit_compressed_array_builder_add (&array, build_sample (self, result, i, j));
cockpit_compressed_array_builder_take_and_add_array (&samples,
cockpit_compressed_array_builder_finish (&array));
json_array_add_element (array, build_sample (self, result, i, j));
json_array_add_array_element (output, array);
}
}

return cockpit_compressed_array_builder_finish (&samples);
return output;
}

static void
Expand Down Expand Up @@ -403,12 +382,6 @@ cockpit_pcp_metrics_tick (CockpitMetrics *metrics,
{
cockpit_metrics_send_meta (metrics, meta);
json_object_unref (meta);

/* We can't compress across a meta message.
*/
if (self->last)
pmFreeResult (self->last);
self->last = NULL;
}

/* Send one set of samples */
Expand Down

0 comments on commit a89447f

Please sign in to comment.