Skip to content

Commit

Permalink
[FLINK-35245][cdc-connector][tidb] Add metrics for flink-connector-ti…
Browse files Browse the repository at this point in the history
  • Loading branch information
xieyi888 committed May 7, 2024
1 parent b6cfbcc commit fa6e7ea
Show file tree
Hide file tree
Showing 3 changed files with 178 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,11 @@
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeutils.base.LongSerializer;
import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
import org.apache.flink.cdc.connectors.tidb.metrics.TiDBSourceMetrics;
import org.apache.flink.cdc.connectors.tidb.table.StartupMode;
import org.apache.flink.cdc.connectors.tidb.table.utils.TableKeyRangeUtils;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.metrics.MetricGroup;
import org.apache.flink.runtime.state.FunctionInitializationContext;
import org.apache.flink.runtime.state.FunctionSnapshotContext;
import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
Expand All @@ -42,6 +44,7 @@
import org.tikv.common.TiSession;
import org.tikv.common.key.RowKey;
import org.tikv.common.meta.TiTableInfo;
import org.tikv.common.meta.TiTimestamp;
import org.tikv.kvproto.Cdcpb;
import org.tikv.kvproto.Coprocessor;
import org.tikv.kvproto.Kvrpcpb;
Expand Down Expand Up @@ -91,6 +94,7 @@ public class TiKVRichParallelSourceFunction<T> extends RichParallelSourceFunctio

private transient boolean running = true;
private transient ExecutorService executorService;
private transient TiDBSourceMetrics sourceMetrics;

/** offset state. */
private transient ListState<Long> offsetState;
Expand Down Expand Up @@ -146,6 +150,9 @@ public void open(final Configuration config) throws Exception {
+ getRuntimeContext().getIndexOfThisSubtask())
.build();
executorService = Executors.newSingleThreadExecutor(threadFactory);
final MetricGroup metricGroup = getRuntimeContext().getMetricGroup();
sourceMetrics = new TiDBSourceMetrics(metricGroup);
sourceMetrics.registerMetrics();
}

@Override
Expand Down Expand Up @@ -210,6 +217,7 @@ protected void readSnapshotEvents() throws Exception {
for (final Kvrpcpb.KvPair pair : segment) {
if (TableKeyRangeUtils.isRecordKey(pair.getKey().toByteArray())) {
snapshotEventDeserializationSchema.deserialize(pair, outputCollector);
reportMetrics(0L, startTs);
}
}

Expand All @@ -231,6 +239,8 @@ protected void readChangeEvents() throws Exception {
Cdcpb.Event.Row committedRow = committedEvents.take();
changeEventDeserializationSchema.deserialize(
committedRow, outputCollector);
// use startTs of row as messageTs, use commitTs of row as fetchTs
reportMetrics(committedRow.getStartTs(), committedRow.getCommitTs());
} catch (Exception e) {
e.printStackTrace();
}
Expand Down Expand Up @@ -390,4 +400,20 @@ public void close() {
// do nothing
}
}

private void reportMetrics(long messageTs, long fetchTs) {
long now = System.currentTimeMillis();
// record the latest process time
sourceMetrics.recordProcessTime(now);
long messageTimestamp = TiTimestamp.extractPhysical(messageTs);
long fetchTimestamp = TiTimestamp.extractPhysical(fetchTs);
if (messageTimestamp > 0L) {
// report fetch delay
if (fetchTimestamp >= messageTimestamp) {
sourceMetrics.recordFetchDelay(fetchTimestamp - messageTimestamp);
}
// report emit delay
sourceMetrics.recordEmitDelay(now - messageTimestamp);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.cdc.connectors.tidb.metrics;

import org.apache.flink.cdc.connectors.tidb.TiKVRichParallelSourceFunction;
import org.apache.flink.metrics.Gauge;
import org.apache.flink.metrics.MetricGroup;

import static org.apache.flink.runtime.metrics.MetricNames.CURRENT_EMIT_EVENT_TIME_LAG;
import static org.apache.flink.runtime.metrics.MetricNames.CURRENT_FETCH_EVENT_TIME_LAG;
import static org.apache.flink.runtime.metrics.MetricNames.SOURCE_IDLE_TIME;

/** A collection class for handling metrics in {@link TiKVRichParallelSourceFunction}. */
public class TiDBSourceMetrics {

private final MetricGroup metricGroup;

/**
* The last record processing time, which is updated after {@link
* TiKVRichParallelSourceFunction} fetches a batch of data. It's mainly used to report metrics
* sourceIdleTime for sourceIdleTime = System.currentTimeMillis() - processTime.
*/
private long processTime = 0L;

/**
* currentFetchEventTimeLag = FetchTime - messageTimestamp, where the FetchTime is the time the
* record fetched into the source operator.
*/
private long fetchDelay = 0L;

/**
* currentEmitEventTimeLag = EmitTime - messageTimestamp, where the EmitTime is the time the
* record leaves the source operator.
*/
private long emitDelay = 0L;

public TiDBSourceMetrics(MetricGroup metricGroup) {
this.metricGroup = metricGroup;
}

public void registerMetrics() {

metricGroup.gauge(CURRENT_FETCH_EVENT_TIME_LAG, (Gauge<Long>) this::getFetchDelay);
metricGroup.gauge(CURRENT_EMIT_EVENT_TIME_LAG, (Gauge<Long>) this::getEmitDelay);
metricGroup.gauge(SOURCE_IDLE_TIME, (Gauge<Long>) this::getIdleTime);
}

public long getFetchDelay() {
return fetchDelay;
}

public long getEmitDelay() {
return emitDelay;
}

public long getIdleTime() {
// no previous process time at the beginning, return 0 as idle time
if (processTime == 0) {
return 0;
}
return System.currentTimeMillis() - processTime;
}

public void recordProcessTime(long processTime) {
this.processTime = processTime;
}

public void recordFetchDelay(long fetchDelay) {
this.fetchDelay = fetchDelay;
}

public void recordEmitDelay(long emitDelay) {
this.emitDelay = emitDelay;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.cdc.connectors.tidb.metrics;

import org.apache.flink.metrics.Gauge;
import org.apache.flink.metrics.testutils.MetricListener;

import org.junit.Before;
import org.junit.Test;

import java.util.Optional;

import static org.apache.flink.runtime.metrics.MetricNames.CURRENT_EMIT_EVENT_TIME_LAG;
import static org.apache.flink.runtime.metrics.MetricNames.CURRENT_FETCH_EVENT_TIME_LAG;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;

/** Unit test for {@link TiDBSourceMetrics}. */
public class TiDBSourceMetricsTest {
private MetricListener metricListener;
private TiDBSourceMetrics sourceMetrics;

@Before
public void setUp() {
metricListener = new MetricListener();
sourceMetrics = new TiDBSourceMetrics(metricListener.getMetricGroup());
sourceMetrics.registerMetrics();
}

@Test
public void testFetchEventTimeLagTracking() {
sourceMetrics.recordFetchDelay(5L);
assertGauge(metricListener, CURRENT_FETCH_EVENT_TIME_LAG, 5L);
}

@Test
public void testEmitEventTimeLagTracking() {
sourceMetrics.recordEmitDelay(3L);
assertGauge(metricListener, CURRENT_EMIT_EVENT_TIME_LAG, 3L);
}

private void assertGauge(MetricListener metricListener, String identifier, long expected) {
Optional<Gauge<Object>> gauge = metricListener.getGauge(identifier);
assertTrue(gauge.isPresent());
assertEquals(expected, (long) gauge.get().getValue());
}
}

0 comments on commit fa6e7ea

Please sign in to comment.