Skip to content

Commit

Permalink
Flink: missed IcebergSourceReader group in PR #5393 for FLIP-27 sourc…
Browse files Browse the repository at this point in the history
…e reader metrics (#5401)
  • Loading branch information
stevenzwu committed Aug 5, 2022
1 parent dafb480 commit b26a1c5
Show file tree
Hide file tree
Showing 6 changed files with 18 additions and 10 deletions.
Expand Up @@ -137,7 +137,7 @@ public Boundedness getBoundedness() {

@Override
public SourceReader<T, IcebergSourceSplit> createReader(SourceReaderContext readerContext) {
return new IcebergSourceReader<>(readerFunction, readerContext);
return new IcebergSourceReader<>(lazyTable().name(), readerFunction, readerContext);
}

@Override
Expand Down
Expand Up @@ -21,19 +21,22 @@
import java.util.Collection;
import java.util.Collections;
import java.util.Map;
import org.apache.flink.annotation.Internal;
import org.apache.flink.api.connector.source.SourceReaderContext;
import org.apache.flink.connector.base.source.reader.SingleThreadMultiplexSourceReaderBase;
import org.apache.iceberg.flink.source.split.IcebergSourceSplit;
import org.apache.iceberg.flink.source.split.SplitRequestEvent;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;

@Internal
public class IcebergSourceReader<T>
extends SingleThreadMultiplexSourceReaderBase<
RecordAndPosition<T>, T, IcebergSourceSplit, IcebergSourceSplit> {

public IcebergSourceReader(ReaderFunction<T> readerFunction, SourceReaderContext context) {
public IcebergSourceReader(
String fullTableName, ReaderFunction<T> readerFunction, SourceReaderContext context) {
super(
() -> new IcebergSourceSplitReader<>(readerFunction, context),
() -> new IcebergSourceSplitReader<>(fullTableName, readerFunction, context),
new IcebergSourceRecordEmitter<>(),
context.getConfiguration(),
context);
Expand Down
Expand Up @@ -54,12 +54,13 @@ class IcebergSourceSplitReader<T> implements SplitReader<RecordAndPosition<T>, I
private IcebergSourceSplit currentSplit;
private String currentSplitId;

IcebergSourceSplitReader(ReaderFunction<T> openSplitFunction, SourceReaderContext context) {
IcebergSourceSplitReader(
String fullTableName, ReaderFunction<T> openSplitFunction, SourceReaderContext context) {
this.openSplitFunction = openSplitFunction;
this.indexOfSubtask = context.getIndexOfSubtask();
this.splits = new ArrayDeque<>();

MetricGroup metrics = context.metricGroup();
MetricGroup metrics = context.metricGroup().addGroup("IcebergSourceReader", fullTableName);
this.assignedSplits = metrics.counter("assignedSplits");
this.assignedBytes = metrics.counter("assignedBytes");
this.finishedSplits = metrics.counter("finishedSplits");
Expand Down
Expand Up @@ -137,7 +137,7 @@ public Boundedness getBoundedness() {

@Override
public SourceReader<T, IcebergSourceSplit> createReader(SourceReaderContext readerContext) {
return new IcebergSourceReader<>(readerFunction, readerContext);
return new IcebergSourceReader<>(lazyTable().name(), readerFunction, readerContext);
}

@Override
Expand Down
Expand Up @@ -21,19 +21,22 @@
import java.util.Collection;
import java.util.Collections;
import java.util.Map;
import org.apache.flink.annotation.Internal;
import org.apache.flink.api.connector.source.SourceReaderContext;
import org.apache.flink.connector.base.source.reader.SingleThreadMultiplexSourceReaderBase;
import org.apache.iceberg.flink.source.split.IcebergSourceSplit;
import org.apache.iceberg.flink.source.split.SplitRequestEvent;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;

@Internal
public class IcebergSourceReader<T>
extends SingleThreadMultiplexSourceReaderBase<
RecordAndPosition<T>, T, IcebergSourceSplit, IcebergSourceSplit> {

public IcebergSourceReader(ReaderFunction<T> readerFunction, SourceReaderContext context) {
public IcebergSourceReader(
String fullTableName, ReaderFunction<T> readerFunction, SourceReaderContext context) {
super(
() -> new IcebergSourceSplitReader<>(readerFunction, context),
() -> new IcebergSourceSplitReader<>(fullTableName, readerFunction, context),
new IcebergSourceRecordEmitter<>(),
context.getConfiguration(),
context);
Expand Down
Expand Up @@ -54,12 +54,13 @@ class IcebergSourceSplitReader<T> implements SplitReader<RecordAndPosition<T>, I
private IcebergSourceSplit currentSplit;
private String currentSplitId;

IcebergSourceSplitReader(ReaderFunction<T> openSplitFunction, SourceReaderContext context) {
IcebergSourceSplitReader(
String fullTableName, ReaderFunction<T> openSplitFunction, SourceReaderContext context) {
this.openSplitFunction = openSplitFunction;
this.indexOfSubtask = context.getIndexOfSubtask();
this.splits = new ArrayDeque<>();

MetricGroup metrics = context.metricGroup();
MetricGroup metrics = context.metricGroup().addGroup("IcebergSourceReader", fullTableName);
this.assignedSplits = metrics.counter("assignedSplits");
this.assignedBytes = metrics.counter("assignedBytes");
this.finishedSplits = metrics.counter("finishedSplits");
Expand Down

0 comments on commit b26a1c5

Please sign in to comment.