Skip to content

Commit

Permalink
Canal metrics group mode support.
Browse files Browse the repository at this point in the history
  • Loading branch information
lcybo committed Sep 15, 2018
1 parent 5394613 commit 6682c56
Show file tree
Hide file tree
Showing 3 changed files with 75 additions and 34 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,8 @@ public abstract class AbstractMysqlEventParser extends AbstractEventParser {
protected boolean filterRows = false;
protected boolean filterTableError = false;
protected boolean useDruidDdlFilter = true;
// instance received binlog bytes
protected final AtomicLong receivedBinlogBytes = new AtomicLong(0L);
private final AtomicLong eventsPublishBlockingTime = new AtomicLong(0L);

protected BinlogParser buildParser() {
Expand Down Expand Up @@ -204,4 +206,8 @@ public AtomicLong getEventsPublishBlockingTime() {
return this.eventsPublishBlockingTime;
}

public AtomicLong getReceivedBinlogBytes() {
return this.receivedBinlogBytes;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -71,9 +71,6 @@ public class MysqlEventParser extends AbstractMysqlEventParser implements CanalE
private int dumpErrorCount = 0; // binlogDump失败异常计数
private int dumpErrorCountThreshold = 2; // binlogDump失败异常计数阀值

// instance received binlog bytes
private final AtomicLong receivedBinlogBytes = new AtomicLong(0L);

protected ErosaConnection buildErosaConnection() {
return buildMysqlConnection(this.runningInfo);
}
Expand Down Expand Up @@ -907,8 +904,4 @@ public void setDumpErrorCountThreshold(int dumpErrorCountThreshold) {
this.dumpErrorCountThreshold = dumpErrorCountThreshold;
}

public AtomicLong getReceivedBinlogBytes() {
return this.receivedBinlogBytes;
}

}
Original file line number Diff line number Diff line change
@@ -1,14 +1,15 @@
package com.alibaba.otter.canal.prometheus.impl;

import static com.alibaba.otter.canal.prometheus.CanalInstanceExports.DEST;
import static com.alibaba.otter.canal.prometheus.CanalInstanceExports.DEST_LABELS_LIST;

import com.alibaba.otter.canal.parse.inbound.group.GroupEventParser;
import com.alibaba.otter.canal.parse.inbound.mysql.AbstractMysqlEventParser;
import io.prometheus.client.Collector;
import io.prometheus.client.CounterMetricFamily;
import io.prometheus.client.GaugeMetricFamily;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
Expand All @@ -19,7 +20,6 @@

import com.alibaba.otter.canal.instance.core.CanalInstance;
import com.alibaba.otter.canal.parse.CanalEventParser;
import com.alibaba.otter.canal.parse.inbound.mysql.MysqlEventParser;
import com.alibaba.otter.canal.prometheus.InstanceRegistry;
import com.google.common.base.Preconditions;

Expand All @@ -34,11 +34,13 @@ public class ParserCollector extends Collector implements InstanceRegistry {
private static final String RECEIVED_BINLOG = "canal_instance_received_binlog_bytes";
private static final String PARSER_MODE = "canal_instance_parser_mode";
private static final String MODE_LABEL = "parallel";
private static final String PARSER_LABEL = "parser";
private static final String PUBLISH_BLOCKING_HELP = "Publish blocking time of dump thread in milliseconds";
private static final String RECEIVED_BINLOG_HELP = "Received binlog bytes";
private static final String MODE_HELP = "Parser mode(parallel/serial) of instance";
private final List<String> modeLabels = Arrays.asList(DEST, MODE_LABEL);
private final ConcurrentMap<String, ParserMetricsHolder> instances = new ConcurrentHashMap<String, ParserMetricsHolder>();
private final List<String> parserLabels = Arrays.asList(DEST, PARSER_LABEL);
private final ConcurrentMap<String, ParserMetricsHolder> instances = new ConcurrentHashMap<>();

private ParserCollector() {}

Expand All @@ -52,65 +54,105 @@ public static ParserCollector instance() {

@Override
public List<MetricFamilySamples> collect() {
List<MetricFamilySamples> mfs = new ArrayList<MetricFamilySamples>();
boolean hasParallel = false;
List<MetricFamilySamples> mfs = new ArrayList<>();
CounterMetricFamily bytesCounter = new CounterMetricFamily(RECEIVED_BINLOG,
RECEIVED_BINLOG_HELP, DEST_LABELS_LIST);
RECEIVED_BINLOG_HELP, parserLabels);
GaugeMetricFamily modeGauge = new GaugeMetricFamily(PARSER_MODE,
MODE_HELP, modeLabels);
CounterMetricFamily blockingCounter = new CounterMetricFamily(PUBLISH_BLOCKING,
PUBLISH_BLOCKING_HELP, DEST_LABELS_LIST);
PUBLISH_BLOCKING_HELP, parserLabels);
for (ParserMetricsHolder emh : instances.values()) {
if (emh.isParallel) {
blockingCounter.addMetric(emh.destLabelValues, (emh.eventsPublishBlockingTime.doubleValue() / NANO_PER_MILLI));
hasParallel = true;
if (emh instanceof GroupParserMetricsHolder) {
GroupParserMetricsHolder group = (GroupParserMetricsHolder) emh;
for (ParserMetricsHolder semh : group.holders) {
singleCollect(bytesCounter, blockingCounter, modeGauge, semh);
}
}
else {
singleCollect(bytesCounter, blockingCounter, modeGauge, emh);
}
modeGauge.addMetric(emh.modeLabelValues, 1);
bytesCounter.addMetric(emh.destLabelValues, emh.receivedBinlogBytes.doubleValue());

}
mfs.add(bytesCounter);
mfs.add(modeGauge);
if (hasParallel) {
if (!blockingCounter.samples.isEmpty()) {
mfs.add(blockingCounter);
}
return mfs;
}

private void singleCollect(CounterMetricFamily bytesCounter, CounterMetricFamily blockingCounter, GaugeMetricFamily modeGauge, ParserMetricsHolder holder) {
if (holder.isParallel) {
blockingCounter.addMetric(holder.parserLabelValues, (holder.eventsPublishBlockingTime.doubleValue() / NANO_PER_MILLI));
}
modeGauge.addMetric(holder.modeLabelValues, 1);
bytesCounter.addMetric(holder.parserLabelValues, holder.receivedBinlogBytes.doubleValue());
}

@Override
public void register(CanalInstance instance) {
final String destination = instance.getDestination();
ParserMetricsHolder holder = new ParserMetricsHolder();
ParserMetricsHolder holder;
CanalEventParser parser = instance.getEventParser();
if (!(parser instanceof MysqlEventParser)) {
throw new IllegalArgumentException("CanalEventParser must be MysqlEventParser");
if (parser instanceof AbstractMysqlEventParser) {
holder = singleHolder(destination, (AbstractMysqlEventParser)parser, "0");
} else if (parser instanceof GroupEventParser) {
holder = groupHolder(destination, (GroupEventParser)parser);
} else {
throw new IllegalArgumentException("CanalEventParser must be either AbstractMysqlEventParser or GroupEventParser.");
}
MysqlEventParser mysqlParser = (MysqlEventParser) parser;
holder.destLabelValues = Collections.singletonList(destination);
holder.modeLabelValues = Arrays.asList(destination, Boolean.toString(mysqlParser.isParallel()));
holder.eventsPublishBlockingTime = mysqlParser.getEventsPublishBlockingTime();
holder.receivedBinlogBytes = mysqlParser.getReceivedBinlogBytes();
holder.isParallel = mysqlParser.isParallel();
Preconditions.checkNotNull(holder.eventsPublishBlockingTime);
Preconditions.checkNotNull(holder.receivedBinlogBytes);
Preconditions.checkNotNull(holder);
ParserMetricsHolder old = instances.put(destination, holder);
if (old != null) {
logger.warn("Remove stale ParserCollector for instance {}.", destination);
}
}

private ParserMetricsHolder singleHolder(String destination, AbstractMysqlEventParser parser, String id) {
ParserMetricsHolder holder = new ParserMetricsHolder();
holder.parserLabelValues = Arrays.asList(destination, id);
holder.modeLabelValues = Arrays.asList(destination, Boolean.toString(parser.isParallel()));
holder.eventsPublishBlockingTime = parser.getEventsPublishBlockingTime();
holder.receivedBinlogBytes = parser.getReceivedBinlogBytes();
holder.isParallel = parser.isParallel();
Preconditions.checkNotNull(holder.eventsPublishBlockingTime);
Preconditions.checkNotNull(holder.receivedBinlogBytes);
return holder;
}

private GroupParserMetricsHolder groupHolder(String destination, GroupEventParser group) {
List<CanalEventParser> parsers = group.getEventParsers();
GroupParserMetricsHolder groupHolder = new GroupParserMetricsHolder();
int num = parsers.size();
for (int i = 0; i < num; i ++) {
CanalEventParser parser = parsers.get(i);
if (parser instanceof AbstractMysqlEventParser) {
ParserMetricsHolder single = singleHolder(destination, (AbstractMysqlEventParser)parser, Integer.toString(i + 1));
groupHolder.holders.add(single);
} else {
logger.warn("Null or non AbstractMysqlEventParser, ignore.");
}
}
return groupHolder;
}

@Override
public void unregister(CanalInstance instance) {
final String destination = instance.getDestination();
instances.remove(destination);
}

private class ParserMetricsHolder {
private List<String> destLabelValues;
private List<String> parserLabelValues;
private List<String> modeLabelValues;
// metrics for single parser
private AtomicLong receivedBinlogBytes;
private AtomicLong eventsPublishBlockingTime;
// parser mode
private boolean isParallel;
}

private class GroupParserMetricsHolder extends ParserMetricsHolder {
private final List<ParserMetricsHolder> holders = new ArrayList<>();
}

}

0 comments on commit 6682c56

Please sign in to comment.