From fa6e7ea51258dcd90f06036196618224156df367 Mon Sep 17 00:00:00 2001 From: Xie Yi <105206850+xieyi888@users.noreply.github.com> Date: Tue, 7 May 2024 09:14:13 +0800 Subject: [PATCH] [FLINK-35245][cdc-connector][tidb] Add metrics for flink-connector-tidb-cdc --- .../tidb/TiKVRichParallelSourceFunction.java | 26 ++++++ .../tidb/metrics/TiDBSourceMetrics.java | 90 +++++++++++++++++++ .../tidb/metrics/TiDBSourceMetricsTest.java | 62 +++++++++++++ 3 files changed, 178 insertions(+) create mode 100644 flink-cdc-connect/flink-cdc-source-connectors/flink-connector-tidb-cdc/src/main/java/org/apache/flink/cdc/connectors/tidb/metrics/TiDBSourceMetrics.java create mode 100644 flink-cdc-connect/flink-cdc-source-connectors/flink-connector-tidb-cdc/src/test/java/org/apache/flink/cdc/connectors/tidb/metrics/TiDBSourceMetricsTest.java diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-tidb-cdc/src/main/java/org/apache/flink/cdc/connectors/tidb/TiKVRichParallelSourceFunction.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-tidb-cdc/src/main/java/org/apache/flink/cdc/connectors/tidb/TiKVRichParallelSourceFunction.java index e93094327c..9570f40ed2 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-tidb-cdc/src/main/java/org/apache/flink/cdc/connectors/tidb/TiKVRichParallelSourceFunction.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-tidb-cdc/src/main/java/org/apache/flink/cdc/connectors/tidb/TiKVRichParallelSourceFunction.java @@ -23,9 +23,11 @@ import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.common.typeutils.base.LongSerializer; import org.apache.flink.api.java.typeutils.ResultTypeQueryable; +import org.apache.flink.cdc.connectors.tidb.metrics.TiDBSourceMetrics; import org.apache.flink.cdc.connectors.tidb.table.StartupMode; import org.apache.flink.cdc.connectors.tidb.table.utils.TableKeyRangeUtils; import org.apache.flink.configuration.Configuration; +import org.apache.flink.metrics.MetricGroup; import org.apache.flink.runtime.state.FunctionInitializationContext; import org.apache.flink.runtime.state.FunctionSnapshotContext; import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction; @@ -42,6 +44,7 @@ import org.tikv.common.TiSession; import org.tikv.common.key.RowKey; import org.tikv.common.meta.TiTableInfo; +import org.tikv.common.meta.TiTimestamp; import org.tikv.kvproto.Cdcpb; import org.tikv.kvproto.Coprocessor; import org.tikv.kvproto.Kvrpcpb; @@ -91,6 +94,7 @@ public class TiKVRichParallelSourceFunction extends RichParallelSourceFunctio private transient boolean running = true; private transient ExecutorService executorService; + private transient TiDBSourceMetrics sourceMetrics; /** offset state. */ private transient ListState offsetState; @@ -146,6 +150,9 @@ public void open(final Configuration config) throws Exception { + getRuntimeContext().getIndexOfThisSubtask()) .build(); executorService = Executors.newSingleThreadExecutor(threadFactory); + final MetricGroup metricGroup = getRuntimeContext().getMetricGroup(); + sourceMetrics = new TiDBSourceMetrics(metricGroup); + sourceMetrics.registerMetrics(); } @Override @@ -210,6 +217,7 @@ protected void readSnapshotEvents() throws Exception { for (final Kvrpcpb.KvPair pair : segment) { if (TableKeyRangeUtils.isRecordKey(pair.getKey().toByteArray())) { snapshotEventDeserializationSchema.deserialize(pair, outputCollector); + reportMetrics(0L, startTs); } } @@ -231,6 +239,8 @@ protected void readChangeEvents() throws Exception { Cdcpb.Event.Row committedRow = committedEvents.take(); changeEventDeserializationSchema.deserialize( committedRow, outputCollector); + // use startTs of row as messageTs, use commitTs of row as fetchTs + reportMetrics(committedRow.getStartTs(), committedRow.getCommitTs()); } catch (Exception e) { e.printStackTrace(); } @@ -390,4 +400,20 @@ public void close() { // do nothing } } + + private void reportMetrics(long messageTs, long fetchTs) { + long now = System.currentTimeMillis(); + // record the latest process time + sourceMetrics.recordProcessTime(now); + long messageTimestamp = TiTimestamp.extractPhysical(messageTs); + long fetchTimestamp = TiTimestamp.extractPhysical(fetchTs); + if (messageTimestamp > 0L) { + // report fetch delay + if (fetchTimestamp >= messageTimestamp) { + sourceMetrics.recordFetchDelay(fetchTimestamp - messageTimestamp); + } + // report emit delay + sourceMetrics.recordEmitDelay(now - messageTimestamp); + } + } } diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-tidb-cdc/src/main/java/org/apache/flink/cdc/connectors/tidb/metrics/TiDBSourceMetrics.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-tidb-cdc/src/main/java/org/apache/flink/cdc/connectors/tidb/metrics/TiDBSourceMetrics.java new file mode 100644 index 0000000000..1f32c0f341 --- /dev/null +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-tidb-cdc/src/main/java/org/apache/flink/cdc/connectors/tidb/metrics/TiDBSourceMetrics.java @@ -0,0 +1,90 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.cdc.connectors.tidb.metrics; + +import org.apache.flink.cdc.connectors.tidb.TiKVRichParallelSourceFunction; +import org.apache.flink.metrics.Gauge; +import org.apache.flink.metrics.MetricGroup; + +import static org.apache.flink.runtime.metrics.MetricNames.CURRENT_EMIT_EVENT_TIME_LAG; +import static org.apache.flink.runtime.metrics.MetricNames.CURRENT_FETCH_EVENT_TIME_LAG; +import static org.apache.flink.runtime.metrics.MetricNames.SOURCE_IDLE_TIME; + +/** A collection class for handling metrics in {@link TiKVRichParallelSourceFunction}. */ +public class TiDBSourceMetrics { + + private final MetricGroup metricGroup; + + /** + * The last record processing time, which is updated after {@link + * TiKVRichParallelSourceFunction} fetches a batch of data. It's mainly used to report metrics + * sourceIdleTime for sourceIdleTime = System.currentTimeMillis() - processTime. + */ + private long processTime = 0L; + + /** + * currentFetchEventTimeLag = FetchTime - messageTimestamp, where the FetchTime is the time the + * record fetched into the source operator. + */ + private long fetchDelay = 0L; + + /** + * currentEmitEventTimeLag = EmitTime - messageTimestamp, where the EmitTime is the time the + * record leaves the source operator. + */ + private long emitDelay = 0L; + + public TiDBSourceMetrics(MetricGroup metricGroup) { + this.metricGroup = metricGroup; + } + + public void registerMetrics() { + + metricGroup.gauge(CURRENT_FETCH_EVENT_TIME_LAG, (Gauge) this::getFetchDelay); + metricGroup.gauge(CURRENT_EMIT_EVENT_TIME_LAG, (Gauge) this::getEmitDelay); + metricGroup.gauge(SOURCE_IDLE_TIME, (Gauge) this::getIdleTime); + } + + public long getFetchDelay() { + return fetchDelay; + } + + public long getEmitDelay() { + return emitDelay; + } + + public long getIdleTime() { + // no previous process time at the beginning, return 0 as idle time + if (processTime == 0) { + return 0; + } + return System.currentTimeMillis() - processTime; + } + + public void recordProcessTime(long processTime) { + this.processTime = processTime; + } + + public void recordFetchDelay(long fetchDelay) { + this.fetchDelay = fetchDelay; + } + + public void recordEmitDelay(long emitDelay) { + this.emitDelay = emitDelay; + } +} diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-tidb-cdc/src/test/java/org/apache/flink/cdc/connectors/tidb/metrics/TiDBSourceMetricsTest.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-tidb-cdc/src/test/java/org/apache/flink/cdc/connectors/tidb/metrics/TiDBSourceMetricsTest.java new file mode 100644 index 0000000000..320b0a90ac --- /dev/null +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-tidb-cdc/src/test/java/org/apache/flink/cdc/connectors/tidb/metrics/TiDBSourceMetricsTest.java @@ -0,0 +1,62 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.cdc.connectors.tidb.metrics; + +import org.apache.flink.metrics.Gauge; +import org.apache.flink.metrics.testutils.MetricListener; + +import org.junit.Before; +import org.junit.Test; + +import java.util.Optional; + +import static org.apache.flink.runtime.metrics.MetricNames.CURRENT_EMIT_EVENT_TIME_LAG; +import static org.apache.flink.runtime.metrics.MetricNames.CURRENT_FETCH_EVENT_TIME_LAG; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +/** Unit test for {@link TiDBSourceMetrics}. */ +public class TiDBSourceMetricsTest { + private MetricListener metricListener; + private TiDBSourceMetrics sourceMetrics; + + @Before + public void setUp() { + metricListener = new MetricListener(); + sourceMetrics = new TiDBSourceMetrics(metricListener.getMetricGroup()); + sourceMetrics.registerMetrics(); + } + + @Test + public void testFetchEventTimeLagTracking() { + sourceMetrics.recordFetchDelay(5L); + assertGauge(metricListener, CURRENT_FETCH_EVENT_TIME_LAG, 5L); + } + + @Test + public void testEmitEventTimeLagTracking() { + sourceMetrics.recordEmitDelay(3L); + assertGauge(metricListener, CURRENT_EMIT_EVENT_TIME_LAG, 3L); + } + + private void assertGauge(MetricListener metricListener, String identifier, long expected) { + Optional> gauge = metricListener.getGauge(identifier); + assertTrue(gauge.isPresent()); + assertEquals(expected, (long) gauge.get().getValue()); + } +}