Skip to content

Commit

Permalink
Merge pull request #125 from Cinimex-Informatica/feature/issue95_add_…
Browse files Browse the repository at this point in the history
…metric_postprocessing

Add new queue-specific metrics, that reflects average size of read messages
  • Loading branch information
echerniak committed Apr 17, 2019
2 parents 81dff84 + f220795 commit fdfc215
Show file tree
Hide file tree
Showing 5 changed files with 218 additions and 16 deletions.
18 changes: 18 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -950,6 +950,24 @@ This section provides a description of metrics related to MQGET, MQCB and MQCTL
<td>Shows the number of calls to MQCTL.</td>
<td>MQCTL count</td>
</tr>
<tr>
<td>mqobject_get_average_destructive_mqget_persistent_message_size_bytes</td>
<td>gauge</td>
<td>Shows an average amount of bytes per persistent message that are returned by MQGET.</td>
<td>destructive MQGET persistent average message byte count</td>
</tr>
<tr>
<td>mqobject_get_average_destructive_mqget_non_persistent_message_size_bytes</td>
<td>gauge</td>
<td>Shows an average amount of bytes per non-persistent message that are returned by MQGET.</td>
<td>destructive MQGET non-persistent average message byte count</td>
</tr>
<tr>
<td>mqobject_get_average_destructive_mqget_persistent_and_non_persistent_message_size_bytes</td>
<td>gauge</td>
<td>Shows an average amount of bytes per persistent and non-persistent messages that are returned by MQGET.</td>
<td>destructive MQGET persistent and non-persistent average message byte count</td>
</tr>
</tbody>
</table>

Expand Down
7 changes: 6 additions & 1 deletion src/main/java/ru/cinimex/exporter/prometheus/HTTPServer.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,11 @@
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.*;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.FutureTask;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.zip.GZIPOutputStream;

Expand Down Expand Up @@ -125,6 +129,7 @@ public void handle(HttpExchange t) throws IOException {
ByteArrayOutputStream streamResponse = this.response.get();
streamResponse.reset();
OutputStreamWriter osw = new OutputStreamWriter(streamResponse);
MetricsManager.updateAdditionalMetrics(parseQuery(query));
TextFormat.write004(osw, registry.filteredMetricFamilySamples(parseQuery(query)));
osw.flush();
osw.close();
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,133 @@
package ru.cinimex.exporter.prometheus.metrics;

import io.prometheus.client.CollectorRegistry;

import java.util.ArrayList;
import java.util.Collections;
import java.util.Enumeration;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.stream.Collectors;

import static java.util.stream.Collectors.toMap;

/**
* Util class is used to update metrics.
*/
public class MetricManagerUtils {
private MetricManagerUtils() {
}


/**
* @return - list of metric names, than will be updated
*/
public static List<String> getUpdatedMetricNames() {
List<String> updatedMetricNames = new ArrayList<>();
updatedMetricNames.add("mqobject_get_average_destructive_mqget_persistent_message_size_bytes");
updatedMetricNames.add("mqobject_get_average_destructive_mqget_non_persistent_message_size_bytes");
updatedMetricNames.add("mqobject_get_average_destructive_mqget_persistent_and_non_persistent_message_size_bytes");
return updatedMetricNames;
}


/**
* @param updatedMetricName - metric name, than will be updated
* @return list of metric names, than will be used to update metric with name updatedMetricName
*/
public static List<String> getMetricsNamesUsedToUpdate(String updatedMetricName) {
List<String> listWithNames = new ArrayList<>();
switch (updatedMetricName) {
case "mqobject_get_average_destructive_mqget_persistent_message_size_bytes":
listWithNames.add("mqobject_get_destructive_mqget_persistent_byte_count_totalbytes");
listWithNames.add("mqobject_get_destructive_mqget_persistent_message_count_totalmessages");
break;
case "mqobject_get_average_destructive_mqget_non_persistent_message_size_bytes":
listWithNames.add("mqobject_get_destructive_mqget_non_persistent_byte_count_totalbytes");
listWithNames.add("mqobject_get_destructive_mqget_non_persistent_message_count_totalmessages");
break;
case "mqobject_get_average_destructive_mqget_persistent_and_non_persistent_message_size_bytes":
listWithNames.add("mqobject_get_destructive_mqget_persistent_byte_count_totalbytes");
listWithNames.add("mqobject_get_destructive_mqget_non_persistent_byte_count_totalbytes");
listWithNames.add("mqobject_get_destructive_mqget_persistent_message_count_totalmessages");
listWithNames.add("mqobject_get_destructive_mqget_non_persistent_message_count_totalmessages");
break;
default:
break;
}
return listWithNames;
}

/**
* @param updatedMetricName - metric name, than will be updated
* @return function, that will be used for updating metric value
*/
public static Function<List<Double>, Double> getConversionFunction(String updatedMetricName) {
switch (updatedMetricName) {
case "mqobject_get_average_destructive_mqget_persistent_message_size_bytes":
case "mqobject_get_average_destructive_mqget_non_persistent_message_size_bytes":
return MetricManagerUtils::division;
case "mqobject_get_average_destructive_mqget_persistent_and_non_persistent_message_size_bytes":
return MetricManagerUtils::averageSum;
default:
return MetricManagerUtils::defaultConversion;
}
}

private static Double averageSum(List<Double> params) {
Objects.requireNonNull(params);
if (params.size() != 4) throw new IllegalArgumentException();
return (params.get(2) + params.get(3)) == 0.0 ? 0.0 : (params.get(0) + params.get(1)) / (params.get(2) + params.get(3));
}

private static Double division(List<Double> params) {
Objects.requireNonNull(params);
if (params.size() != 2) throw new IllegalArgumentException();
return params.get(1) == 0.0 ? 0.0 : params.get(0) / params.get(1);
}

private static Double defaultConversion(List<Double> params) {
return Objects.requireNonNull(params).get(0);
}

/**
* @param parsedQuery - parameter, needed for getting metric in special family metrics
* @param updatedMetricName - metric name, than will be updated
* @return map with label list as key and double parameter list used for conversion function as value
*/
public static Map<List<String>, List<Double>> getMetricsUsedToUpdate(Set<String> parsedQuery, String updatedMetricName) {
List<String> metricsNamesUsedToUpdate = getMetricsNamesUsedToUpdate(updatedMetricName);
Map<String, Map<List<String>, Double>> mapWithValues = new HashMap<>();
forEachRemaining(CollectorRegistry.defaultRegistry.filteredMetricFamilySamples(parsedQuery), metricFamilySamples -> {
Map<List<String>, Double> arrayListDoubleMap =
metricFamilySamples.samples.stream()
.filter(sample -> metricsNamesUsedToUpdate.contains(sample.name))
.collect(toMap(sample -> sample.labelValues, sample -> sample.value, (a, b) -> b));
if (!arrayListDoubleMap.isEmpty()) mapWithValues.put(metricFamilySamples.name, arrayListDoubleMap);
});

List<Map<List<String>, Double>> listWithValues = metricsNamesUsedToUpdate.stream()
.map(mapWithValues::get)
.collect(Collectors.toList());
Map<List<String>, List<Double>> params = new HashMap<>();
listWithValues.forEach(l -> l.forEach((k, v) -> {
if (!params.containsKey(k)) params.put(k, new ArrayList<>(Collections.singletonList(v)));
else {
List<Double> paramList = params.get(k);
paramList.add(v);
params.replace(k, paramList);
}
}
));
return params;
}

private static <T> void forEachRemaining(Enumeration<T> e, Consumer<? super T> c) {
while (e.hasMoreElements()) c.accept(e.nextElement());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,14 @@
import ru.cinimex.exporter.mq.pcf.PCFElementRow;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;

import static ru.cinimex.exporter.prometheus.metrics.MetricsReference.getAdditionalMqObjectMetricsReference;
import static ru.cinimex.exporter.prometheus.metrics.MetricManagerUtils.*;

/**
* Class is used to manage work of all metrics.
Expand All @@ -25,6 +31,7 @@ public class MetricsManager {
*/
public static void initMetrics(List<PCFElement> elements, List<MQObject.MQType> types) {
logger.debug("Preparing to initialize metrics. {} metrics will be received from MQ topics and {} metrics will be received via direct PCF commands.", elements.size(), types.size());
String logString = " created! Name: {}, description: {}, labels: {}.";
metrics = new HashMap<>();
for (PCFElement element : elements) {
for (PCFElementRow row : element.getRows()) {
Expand All @@ -33,32 +40,32 @@ public static void initMetrics(List<PCFElement> elements, List<MQObject.MQType>
ArrayList<String> labels = new ArrayList<>();
labels.add(Labels.QMGR_NAME.name());
MetricInterface metric;
if (element.requiresMQObject()) {
labels.add(Labels.MQ_OBJECT_NAME.name());
}
if (element.requiresMQObject()) labels.add(Labels.MQ_OBJECT_NAME.name());
switch (metricType) {
case SIMPLE_GAUGE:
metric = new SimpleGauge(metricName, row.getRowDesc(), labels.stream().toArray(String[]::new));
metric = new SimpleGauge(metricName, row.getRowDesc(), labels.toArray(new String[0]));
metrics.put(metricName, metric);
logger.trace("New gauge created! Name: {}, description: {}, labels: {}.", metricName, row.getRowDesc(), labels);
logger.trace("New " + "gauge" + logString, metricName, row.getRowDesc(), labels);
break;
case SIMPLE_COUNTER:
metric = new SimpleCounter(metricName, row.getRowDesc(), labels.stream().toArray(String[]::new));
metric = new SimpleCounter(metricName, row.getRowDesc(), labels.toArray(new String[0]));
metrics.put(metricName, metric);
logger.trace("New counter created! Name: {}, description: {}, labels: {}.", metricName, row.getRowDesc(), labels);
logger.trace("New counter " + logString, metricName, row.getRowDesc(), labels);
break;
case EXTREME_GAUGE_MAX:
metric = new ExtremeGauge(metricName, row.getRowDesc(), true, labels.stream().toArray(String[]::new));
metric = new ExtremeGauge(metricName, row.getRowDesc(), true, labels.toArray(new String[0]));
metrics.put(metricName, metric);
logger.trace("New extreme gauge created! Name: {}, description: {}, labels: {}.", metricName, row.getRowDesc(), labels);
logger.trace("New extreme gauge" + logString, metricName, row.getRowDesc(), labels);
break;
case EXTREME_GAUGE_MIN:
metric = new ExtremeGauge(metricName, row.getRowDesc(), false, labels.stream().toArray(String[]::new));
metric = new ExtremeGauge(metricName, row.getRowDesc(), false, labels.toArray(new String[0]));
metrics.put(metricName, metric);
logger.trace("New extreme gauge created! Name: {}, description: {}, labels: {}.", metricName, row.getRowDesc(), labels);
logger.trace("New extreme gauge" + logString, metricName, row.getRowDesc(), labels);
break;
default:
logger.error("Error during metrics initialization: Unknown metric type! Make sure it is one " + "of: {}", MetricsReference.Metric.Type.values());
logger.error(
"Error during metrics initialization: Unknown metric type! Make sure it is one " + "of: {}",
(Object[]) MetricsReference.Metric.Type.values());
}
}
}
Expand All @@ -67,9 +74,19 @@ public static void initMetrics(List<PCFElement> elements, List<MQObject.MQType>
metrics.put(metricName, new SimpleGauge(metricName, MetricsReference.getMetricHelp(type), Labels.QMGR_NAME.name(), Labels.MQ_OBJECT_NAME.name()));
logger.trace("New gauge created! Name: {}, description: {}, labels: {}.", metricName, MetricsReference.getMetricHelp(type), Labels.MQ_OBJECT_NAME.name());
}
initAdditionalMetrics();

logger.info("Successfully initialized {} metrics!", metrics.size());
}

private static void initAdditionalMetrics() {
getAdditionalMqObjectMetricsReference().forEach((metricInfo, metric) -> {
List<String> labels = Arrays.asList(Labels.QMGR_NAME.name(), Labels.MQ_OBJECT_NAME.name());
metrics.put(metric.name, new SimpleGauge(metric.name, metricInfo, labels.toArray(new String[0])));
logger.trace("New gauge created! Name: {}, description: {}, labels: {}.", metric.name, metricInfo, labels);
});
}

/**
* Updates specific metric
*
Expand All @@ -81,13 +98,33 @@ public static void updateMetric(String metricName, Double value, String... label
metrics.get(metricName).update(value, labels);
}

/**
* Updates additional metrics, for which need special conversion
*
* @param parsedQuery - parameter, needed for getting metric in special family metrics
*/
public static void updateAdditionalMetrics(Set<String> parsedQuery) {
getUpdatedMetricNames().forEach(updatedMetricName -> complexUpdateMetrics(
getMetricsUsedToUpdate(
parsedQuery,
updatedMetricName),
updatedMetricName));
logger.trace("Additional metrics was updated");
}

private static void complexUpdateMetrics(
Map<List<String>, List<Double>> metricsUsedToUpdate,
String updatedMetricName) {
metricsUsedToUpdate.forEach((k, l) -> updateMetric(
updatedMetricName, getConversionFunction(updatedMetricName).apply(l), k.toArray(new String[0])));
logger.trace("Additional metrics {} was updated", updatedMetricName);
}

/**
* Notifies all metrics after each Prometheus scrape.
*/
public static void notifyMetricsWereScraped() {
for (MetricInterface metric : metrics.values()) {
metric.notifyWasScraped();
}
metrics.values().forEach(MetricInterface::notifyWasScraped);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@

import java.util.EnumMap;
import java.util.HashMap;
import java.util.Map;

public class MetricsReference {
private static final Logger logger = LogManager.getLogger(MetricsReference.class);
Expand Down Expand Up @@ -168,6 +169,14 @@ private static HashMap<String, Metric> getMqObjectMetricsReference() {
return metrics;
}

public static Map<String, Metric> getAdditionalMqObjectMetricsReference() {
Map<String, Metric> metrics = new HashMap<>();
metrics.put("destructive MQGET persistent average message byte count", new Metric("mqobject_get_average_destructive_mqget_persistent_message_size_bytes", Metric.Type.SIMPLE_GAUGE));
metrics.put("destructive MQGET non-persistent average message byte count", new Metric("mqobject_get_average_destructive_mqget_non_persistent_message_size_bytes", Metric.Type.SIMPLE_GAUGE));
metrics.put("destructive MQGET persistent and non-persistent average message byte count", new Metric("mqobject_get_average_destructive_mqget_persistent_and_non_persistent_message_size_bytes", Metric.Type.SIMPLE_GAUGE));
return metrics;
}

/**
* Method is used to initialize ChannelStatuses.
*
Expand Down

0 comments on commit fdfc215

Please sign in to comment.