Skip to content
Permalink
Browse files
[FLINK-25470][changelog] Expose more metrics of ChangelogStateBackend…
… and materialization
  • Loading branch information
masteryhx authored and rkhachatryan committed May 12, 2022
1 parent 94e8f54 commit 7d2ca7b146180148fbc5dbc4e8de40a0651be22d
Showing 22 changed files with 881 additions and 220 deletions.
@@ -1277,7 +1277,7 @@ Note that the metrics are only available via reporters.
</thead>
<tbody>
<tr>
<th rowspan="20"><strong>Job (only available on TaskManager)</strong></th>
<th rowspan="8"><strong>Job (only available on TaskManager)</strong></th>
<td>numberOfUploadRequests</td>
<td>Total number of upload requests made</td>
<td>Counter</td>
@@ -1317,6 +1317,42 @@ Note that the metrics are only available via reporters.
<td>Current size of upload queue. Queue items can be packed together and form a single upload.</td>
<td>Gauge</td>
</tr>
<tr>
<th rowspan="7"><strong>Task/Operator</strong></th>
<td>startedMaterialization</td>
<td>The number of started materializations.</td>
<td>Counter</td>
</tr>
<tr>
<td>completedMaterialization</td>
<td>The number of successfully completed materializations.</td>
<td>Counter</td>
</tr>
<tr>
<td>failedMaterialization</td>
<td>The number of failed materializations.</td>
<td>Counter</td>
</tr>
<tr>
<td>lastFullSizeOfMaterialization</td>
<td>The full size of the materialization part of the last reported checkpoint (in bytes).</td>
<td>Gauge</td>
</tr>
<tr>
<td>lastIncSizeOfMaterialization</td>
<td>The incremental size of the materialization part of the last reported checkpoint (in bytes).</td>
<td>Gauge</td>
</tr>
<tr>
<td>lastFullSizeOfNonMaterialization</td>
<td>The full size of the non-materialization part of the last reported checkpoint (in bytes).</td>
<td>Gauge</td>
</tr>
<tr>
<td>lastIncSizeOfNonMaterialization</td>
<td>The incremental size of the non-materialization part of the last reported checkpoint (in bytes).</td>
<td>Gauge</td>
</tr>
</tbody>
</table>

@@ -1270,7 +1270,7 @@ Note that the metrics are only available via reporters.
</thead>
<tbody>
<tr>
<th rowspan="20"><strong>Job (only available on TaskManager)</strong></th>
<th rowspan="8"><strong>Job (only available on TaskManager)</strong></th>
<td>numberOfUploadRequests</td>
<td>Total number of upload requests made</td>
<td>Counter</td>
@@ -1310,6 +1310,42 @@ Note that the metrics are only available via reporters.
<td>Current size of upload queue. Queue items can be packed together and form a single upload.</td>
<td>Gauge</td>
</tr>
<tr>
<th rowspan="7"><strong>Task/Operator</strong></th>
<td>startedMaterialization</td>
<td>The number of started materializations.</td>
<td>Counter</td>
</tr>
<tr>
<td>completedMaterialization</td>
<td>The number of successfully completed materializations.</td>
<td>Counter</td>
</tr>
<tr>
<td>failedMaterialization</td>
<td>The number of failed materializations.</td>
<td>Counter</td>
</tr>
<tr>
<td>lastFullSizeOfMaterialization</td>
<td>The full size of the materialization part of the last reported checkpoint (in bytes).</td>
<td>Gauge</td>
</tr>
<tr>
<td>lastIncSizeOfMaterialization</td>
<td>The incremental size of the materialization part of the last reported checkpoint (in bytes).</td>
<td>Gauge</td>
</tr>
<tr>
<td>lastFullSizeOfNonMaterialization</td>
<td>The full size of the non-materialization part of the last reported checkpoint (in bytes).</td>
<td>Gauge</td>
</tr>
<tr>
<td>lastIncSizeOfNonMaterialization</td>
<td>The incremental size of the non-materialization part of the last reported checkpoint (in bytes).</td>
<td>Gauge</td>
</tr>
</tbody>
</table>

@@ -21,13 +21,12 @@
import org.apache.flink.metrics.Gauge;
import org.apache.flink.metrics.Histogram;
import org.apache.flink.metrics.MetricGroup;
import org.apache.flink.metrics.ThreadSafeSimpleCounter;
import org.apache.flink.runtime.metrics.DescriptiveStatisticsHistogram;
import org.apache.flink.runtime.metrics.groups.ProxyMetricGroup;

import javax.annotation.concurrent.ThreadSafe;

import java.util.concurrent.atomic.LongAdder;

/**
* Metrics related to the Changelog Storage used by the Changelog State Backend. Thread-safety is
* required because it is used by multiple uploader threads.
@@ -47,7 +46,7 @@ public class ChangelogStorageMetricGroup extends ProxyMetricGroup<MetricGroup> {
public ChangelogStorageMetricGroup(MetricGroup parent) {
super(parent);
this.uploadsCounter =
counter(CHANGELOG_STORAGE_NUM_UPLOAD_REQUESTS, new ThreadSafeCounter());
counter(CHANGELOG_STORAGE_NUM_UPLOAD_REQUESTS, new ThreadSafeSimpleCounter());
this.uploadBatchSizes =
histogram(
CHANGELOG_STORAGE_UPLOAD_BATCH_SIZES,
@@ -69,7 +68,7 @@ public ChangelogStorageMetricGroup(MetricGroup parent) {
CHANGELOG_STORAGE_UPLOAD_LATENCIES_NANOS,
new DescriptiveStatisticsHistogram(WINDOW_SIZE));
this.uploadFailuresCounter =
counter(CHANGELOG_STORAGE_NUM_UPLOAD_FAILURES, new ThreadSafeCounter());
counter(CHANGELOG_STORAGE_NUM_UPLOAD_FAILURES, new ThreadSafeSimpleCounter());
}

public Counter getUploadsCounter() {
@@ -108,35 +107,6 @@ public void registerUploadQueueSizeGauge(Gauge<Integer> gauge) {
gauge(CHANGELOG_STORAGE_UPLOAD_QUEUE_SIZE, gauge);
}

private static class ThreadSafeCounter implements Counter {
private final LongAdder longAdder = new LongAdder();

@Override
public void inc() {
longAdder.increment();
}

@Override
public void inc(long n) {
longAdder.add(n);
}

@Override
public void dec() {
longAdder.decrement();
}

@Override
public void dec(long n) {
longAdder.add(-n);
}

@Override
public long getCount() {
return longAdder.longValue();
}
}

private static final String PREFIX = "ChangelogStorage";
public static final String CHANGELOG_STORAGE_NUM_UPLOAD_REQUESTS =
PREFIX + ".numberOfUploadRequests";
@@ -0,0 +1,72 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.metrics;

import org.apache.flink.annotation.Internal;

import java.util.concurrent.atomic.LongAdder;

/** A simple low-overhead {@link org.apache.flink.metrics.Counter} that is thread-safe. */
@Internal
public class ThreadSafeSimpleCounter implements Counter {

/** the current count. */
private final LongAdder longAdder = new LongAdder();

/** Increment the current count by 1. */
@Override
public void inc() {
longAdder.increment();
}

/**
* Increment the current count by the given value.
*
* @param n value to increment the current count by
*/
@Override
public void inc(long n) {
longAdder.add(n);
}

/** Decrement the current count by 1. */
@Override
public void dec() {
longAdder.decrement();
}

/**
* Decrement the current count by the given value.
*
* @param n value to decrement the current count by
*/
@Override
public void dec(long n) {
longAdder.add(-n);
}

/**
* Returns the current count.
*
* @return current count
*/
@Override
public long getCount() {
return longAdder.longValue();
}
}

0 comments on commit 7d2ca7b

Please sign in to comment.