Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -96,11 +96,7 @@ private static FirehoseAsyncClient createFirehoseClient(
RESOURCE_NOT_FOUND_EXCEPTION_CLASSIFIER,
getSdkClientMisconfiguredExceptionClassifier());

// deprecated, use numRecordsSendErrorsCounter instead.
@Deprecated private final Counter numRecordsOutErrorsCounter;

/* A counter for the total number of records that have encountered an error during put */
private final Counter numRecordsSendErrorsCounter;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why remove it? numRecordsOutErrorsCounter is deprecated but still contains the errors counter.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

numRecordsSend and numRecordsSendErrors are registered in InternalSinkWriterMetricGroup using the same counter of numRecordsOut and numRecordsOutErrors, in order to make these metrics having the same value. Thus we only need to keep one counter in the sink writer implementation.

Copy link
Contributor

@JingGe JingGe Oct 18, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

They are using different counters:

numRecordsSendErrors = parentMetricGroup.counter(MetricNames.NUM_RECORDS_SEND_ERRORS);

BTW, removing it breaks the backward compatibility with 1.15.x too.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've also modified InternalSinkWriterMetricGroup to make these two metrics share the same counter:

https://github.com/apache/flink/pull/21065/files#diff-6ecdbb02f8667509442f88d7d251838f6388b0919df9390867f987bcc6423678R46-R48

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That is what I tried to explain. The consensus we have in the discussion is to modify numXXXOutCounter to have the same count number as numXXXSendErrors instead of reverting the change we did with numXXXSendErrors.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure if I understand your argument. Please correct me if I'm wrong.

Firstly, we need to keep only one counter variable (no matter sendCounter or outCounter) in sink writer implementations, as these two variables are pointing to the same underlying counter actually, and InternalSinkWriterMetricGroup registers the underlying counter twice in the registry with two different metric names out and send. Keeping two variables and increasing both of them will finally count twice towards the same underlying counter.

Then about which variable to remove. I think what we are arguing with is just the name of the variable in sink writer's implementation. From the user's perspective, the result is the same no matter whether the variable name is sendCounter or outCounter or fooCounter: the sink will register two metrics numXXXSend and numXXXOut having the same value. I think it's better to respect the original definition in FLIP-33 to use out to name variables for consistency.

private final Counter numRecordsOutErrorsCounter;

/* Name of the delivery stream in Kinesis Data Firehose */
private final String deliveryStreamName;
Expand Down Expand Up @@ -173,7 +169,6 @@ private static FirehoseAsyncClient createFirehoseClient(
this.deliveryStreamName = deliveryStreamName;
this.metrics = context.metricGroup();
this.numRecordsOutErrorsCounter = metrics.getNumRecordsOutErrorsCounter();
this.numRecordsSendErrorsCounter = metrics.getNumRecordsSendErrorsCounter();
this.httpClient = createHttpClient(firehoseClientProperties);
this.firehoseClient = createFirehoseClient(firehoseClientProperties, httpClient);
}
Expand Down Expand Up @@ -221,7 +216,6 @@ private void handleFullyFailedRequest(
requestEntries.get(0).toString(),
err);
numRecordsOutErrorsCounter.inc(requestEntries.size());
numRecordsSendErrorsCounter.inc(requestEntries.size());

if (isRetryable(err)) {
requestResult.accept(requestEntries);
Expand All @@ -237,7 +231,6 @@ private void handlePartiallyFailedRequest(
requestEntries.size(),
requestEntries.get(0).toString());
numRecordsOutErrorsCounter.inc(response.failedPutCount());
numRecordsSendErrorsCounter.inc(response.failedPutCount());

if (failOnError) {
getFatalExceptionCons()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,11 +77,7 @@ class KinesisStreamsSinkWriter<InputT> extends AsyncSinkWriter<InputT, PutRecord
RESOURCE_NOT_FOUND_EXCEPTION_CLASSIFIER,
getSdkClientMisconfiguredExceptionClassifier());

// deprecated, use numRecordsSendErrorsCounter instead.
@Deprecated private final Counter numRecordsOutErrorsCounter;

/* A counter for the total number of records that have encountered an error during put */
private final Counter numRecordsSendErrorsCounter;
private final Counter numRecordsOutErrorsCounter;

/* Name of the stream in Kinesis Data Streams */
private final String streamName;
Expand Down Expand Up @@ -154,7 +150,6 @@ class KinesisStreamsSinkWriter<InputT> extends AsyncSinkWriter<InputT, PutRecord
this.streamName = streamName;
this.metrics = context.metricGroup();
this.numRecordsOutErrorsCounter = metrics.getNumRecordsOutErrorsCounter();
this.numRecordsSendErrorsCounter = metrics.getNumRecordsSendErrorsCounter();
this.httpClient = AWSGeneralUtil.createAsyncHttpClient(kinesisClientProperties);
this.kinesisClient = buildClient(kinesisClientProperties, this.httpClient);
}
Expand Down Expand Up @@ -207,7 +202,6 @@ private void handleFullyFailedRequest(
requestEntries.size(),
err);
numRecordsOutErrorsCounter.inc(requestEntries.size());
numRecordsSendErrorsCounter.inc(requestEntries.size());

if (isRetryable(err)) {
requestResult.accept(requestEntries);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,10 +71,10 @@ public abstract class AsyncSinkWriter<InputT, RequestEntryT extends Serializable
private final SinkWriterMetricGroup metrics;

/* Counter for number of bytes this sink has attempted to send to the destination. */
private final Counter numBytesSendCounter;
private final Counter numBytesOutCounter;

/* Counter for number of records this sink has attempted to send to the destination. */
private final Counter numRecordsSendCounter;
Copy link
Contributor

@JingGe JingGe Oct 17, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The decision was to point both numXXXSendCounter and numXXXOutCounter to the same number. Removing numXXXSendCounter breaks the backward compatibility with 1.15.x

private final Counter numRecordsOutCounter;

private final RateLimitingStrategy rateLimitingStrategy;

Expand Down Expand Up @@ -292,8 +292,8 @@ public AsyncSinkWriter(

this.metrics = context.metricGroup();
this.metrics.setCurrentSendTimeGauge(() -> this.ackTime - this.lastSendTimestamp);
this.numBytesSendCounter = this.metrics.getNumBytesSendCounter();
this.numRecordsSendCounter = this.metrics.getNumRecordsSendCounter();
this.numBytesOutCounter = this.metrics.getIOMetricGroup().getNumBytesOutCounter();
this.numRecordsOutCounter = this.metrics.getIOMetricGroup().getNumRecordsOutCounter();

this.fatalExceptionCons =
exception ->
Expand Down Expand Up @@ -410,8 +410,8 @@ private List<RequestEntryT> createNextAvailableBatch(RequestInfo requestInfo) {
batchSizeBytes += requestEntrySize;
}

numRecordsSendCounter.inc(batch.size());
numBytesSendCounter.inc(batchSizeBytes);
numRecordsOutCounter.inc(batch.size());
numBytesOutCounter.inc(batchSizeBytes);

return batch;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -146,10 +146,10 @@ public Optional<Gauge<Long>> getCurrentSendTimeGauge() {
}

public Counter getNumRecordsOutCounter() {
return metricGroup.getNumRecordsSendCounter();
return metricGroup.getIOMetricGroup().getNumRecordsOutCounter();
}

public Counter getNumBytesOutCounter() {
return metricGroup.getNumBytesSendCounter();
return metricGroup.getIOMetricGroup().getNumBytesOutCounter();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ public class FileWriter<IN>

private final OutputFileConfig outputFileConfig;

private final Counter numRecordsSendCounter;
private final Counter numRecordsOutCounter;

private boolean endOfInput;

Expand Down Expand Up @@ -128,7 +128,8 @@ public FileWriter(
this.activeBuckets = new HashMap<>();
this.bucketerContext = new BucketerContext();

this.numRecordsSendCounter = checkNotNull(metricGroup).getNumRecordsSendCounter();
this.numRecordsOutCounter =
checkNotNull(metricGroup).getIOMetricGroup().getNumRecordsOutCounter();
this.processingTimeService = checkNotNull(processingTimeService);
checkArgument(
bucketCheckInterval > 0,
Expand Down Expand Up @@ -195,7 +196,7 @@ public void write(IN element, Context context) throws IOException, InterruptedEx
final String bucketId = bucketAssigner.getBucketId(element, bucketerContext);
final FileWriterBucket<IN> bucket = getOrCreateBucketForBucketId(bucketId);
bucket.write(element, processingTimeService.getCurrentProcessingTime());
numRecordsSendCounter.inc();
numRecordsOutCounter.inc();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -295,7 +295,7 @@ void testNumberRecordsOutCounter(@TempDir java.nio.file.Path tempDir)
InternalSinkWriterMetricGroup.mock(
metricListener.getMetricGroup(), operatorIOMetricGroup);

Counter recordsCounter = sinkWriterMetricGroup.getNumRecordsSendCounter();
Counter recordsCounter = sinkWriterMetricGroup.getIOMetricGroup().getNumRecordsOutCounter();
SinkWriter.Context context = new ContextImpl();
FileWriter<String> fileWriter =
createWriter(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,11 +93,9 @@ class KafkaWriter<IN>
private final Map<String, KafkaMetricMutableWrapper> previouslyCreatedMetrics = new HashMap<>();
private final SinkWriterMetricGroup metricGroup;
private final boolean disabledMetrics;
private final Counter numRecordsSendCounter;
private final Counter numBytesSendCounter;
// deprecated, use numRecordsSendErrorsCounter instead.
@Deprecated private final Counter numRecordsOutErrorsCounter;
private final Counter numRecordsSendErrorsCounter;
private final Counter numRecordsOutCounter;
private final Counter numBytesOutCounter;
private final Counter numRecordsOutErrorsCounter;
private final ProcessingTimeService timeService;

// Number of outgoing bytes at the latest metric sync
Expand Down Expand Up @@ -155,10 +153,9 @@ class KafkaWriter<IN>
checkNotNull(sinkInitContext, "sinkInitContext");
this.timeService = sinkInitContext.getProcessingTimeService();
this.metricGroup = sinkInitContext.metricGroup();
this.numBytesSendCounter = metricGroup.getNumBytesSendCounter();
this.numRecordsSendCounter = metricGroup.getNumRecordsSendCounter();
this.numBytesOutCounter = metricGroup.getIOMetricGroup().getNumBytesOutCounter();
this.numRecordsOutCounter = metricGroup.getIOMetricGroup().getNumRecordsOutCounter();
this.numRecordsOutErrorsCounter = metricGroup.getNumRecordsOutErrorsCounter();
this.numRecordsSendErrorsCounter = metricGroup.getNumRecordsSendErrorsCounter();
this.kafkaSinkContext =
new DefaultKafkaSinkContext(
sinkInitContext.getSubtaskId(),
Expand Down Expand Up @@ -198,7 +195,7 @@ public void write(IN element, Context context) throws IOException {
final ProducerRecord<byte[], byte[]> record =
recordSerializer.serialize(element, kafkaSinkContext, context.timestamp());
currentProducer.send(record, deliveryCallback);
numRecordsSendCounter.inc();
numRecordsOutCounter.inc();
}

@Override
Expand Down Expand Up @@ -391,7 +388,7 @@ private void registerMetricSync() {
long outgoingBytesUntilNow = ((Number) byteOutMetric.metricValue()).longValue();
long outgoingBytesSinceLastUpdate =
outgoingBytesUntilNow - latestOutgoingByteTotal;
numBytesSendCounter.inc(outgoingBytesSinceLastUpdate);
numBytesOutCounter.inc(outgoingBytesSinceLastUpdate);
latestOutgoingByteTotal = outgoingBytesUntilNow;
lastSync = time;
registerMetricSync();
Expand All @@ -417,7 +414,6 @@ public void onCompletion(RecordMetadata metadata, Exception exception) {
mailboxExecutor.execute(
() -> {
numRecordsOutErrorsCounter.inc();
numRecordsSendErrorsCounter.inc();
throwException(metadata, exception, producer);
},
"Failed to send data to Kafka");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -139,21 +139,21 @@ public void testIncreasingRecordBasedCounters() throws Exception {
try (final KafkaWriter<Integer> writer =
createWriterWithConfiguration(
getKafkaClientConfiguration(), DeliveryGuarantee.NONE, metricGroup)) {
final Counter numBytesSend = metricGroup.getNumBytesSendCounter();
final Counter numRecordsSend = metricGroup.getNumRecordsSendCounter();
final Counter numRecordsWrittenErrors = metricGroup.getNumRecordsOutErrorsCounter();
final Counter numBytesOut = metricGroup.getIOMetricGroup().getNumBytesOutCounter();
final Counter numRecordsOut = metricGroup.getIOMetricGroup().getNumRecordsOutCounter();
final Counter numRecordsOutErrors = metricGroup.getNumRecordsOutErrorsCounter();
final Counter numRecordsSendErrors = metricGroup.getNumRecordsSendErrorsCounter();
assertThat(numBytesSend.getCount()).isEqualTo(0L);
assertThat(numRecordsSend.getCount()).isEqualTo(0);
assertThat(numRecordsWrittenErrors.getCount()).isEqualTo(0);
assertThat(numBytesOut.getCount()).isEqualTo(0L);
assertThat(numRecordsOut.getCount()).isEqualTo(0);
assertThat(numRecordsOutErrors.getCount()).isEqualTo(0);
assertThat(numRecordsSendErrors.getCount()).isEqualTo(0);

writer.write(1, SINK_WRITER_CONTEXT);
timeService.trigger();
assertThat(numRecordsSend.getCount()).isEqualTo(1);
assertThat(numRecordsWrittenErrors.getCount()).isEqualTo(0);
assertThat(numRecordsOut.getCount()).isEqualTo(1);
assertThat(numRecordsOutErrors.getCount()).isEqualTo(0);
assertThat(numRecordsSendErrors.getCount()).isEqualTo(0);
assertThat(numBytesSend.getCount()).isGreaterThan(0L);
assertThat(numBytesOut.getCount()).isGreaterThan(0L);
}
}

Expand Down Expand Up @@ -197,13 +197,10 @@ void testNumRecordsOutErrorsCounterMetric() throws Exception {
createWriterWithConfiguration(
properties, DeliveryGuarantee.EXACTLY_ONCE, metricGroup)) {
final Counter numRecordsOutErrors = metricGroup.getNumRecordsOutErrorsCounter();
final Counter numRecordsSendErrors = metricGroup.getNumRecordsSendErrorsCounter();
assertThat(numRecordsOutErrors.getCount()).isEqualTo(0L);
assertThat(numRecordsSendErrors.getCount()).isEqualTo(0L);

writer.write(1, SINK_WRITER_CONTEXT);
assertThat(numRecordsOutErrors.getCount()).isEqualTo(0L);
assertThat(numRecordsSendErrors.getCount()).isEqualTo(0L);

final String transactionalId = writer.getCurrentProducer().getTransactionalId();

Expand All @@ -220,7 +217,6 @@ void testNumRecordsOutErrorsCounterMetric() throws Exception {
writer.flush(false);
writer.prepareCommit();
assertThat(numRecordsOutErrors.getCount()).isEqualTo(1L);
assertThat(numRecordsSendErrors.getCount()).isEqualTo(1L);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,11 +29,14 @@
@PublicEvolving
public interface SinkWriterMetricGroup extends OperatorMetricGroup {

/** @deprecated use {@link #getNumRecordsSendErrorsCounter()} instead. */
@Deprecated
/** The total number of records failed to send. */
Counter getNumRecordsOutErrorsCounter();

/** The total number of records failed to send. */
/**
* The total number of records failed to send.
*
* <p>This metric has the same value as {@code numRecordsOutError}.
*/
Counter getNumRecordsSendErrorsCounter();

/**
Expand All @@ -44,10 +47,16 @@ public interface SinkWriterMetricGroup extends OperatorMetricGroup {
* may have issue to perform the persistence action within its scope. Therefore, this count may
* include the number of records that are failed to write by the downstream system, which should
* be counted by {@link #getNumRecordsSendErrorsCounter()}.
*
* <p>This metric has the same value as {@code numRecordsOut} of the operator.
*/
Counter getNumRecordsSendCounter();

/** The total number of output send bytes since the task started. */
/**
* The total number of output send bytes since the task started.
*
* <p>This metric has the same value as {@code numBytesOut} of the operator
*/
Counter getNumBytesSendCounter();

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,8 +87,7 @@ public static String currentInputWatermarkName(int index) {
public static final String DEBLOATED_BUFFER_SIZE = "debloatedBufferSize";

// FLIP-33 sink
// deprecated use NUM_RECORDS_SEND_ERRORS instead.
@Deprecated public static final String NUM_RECORDS_OUT_ERRORS = "numRecordsOutErrors";
public static final String NUM_RECORDS_OUT_ERRORS = "numRecordsOutErrors";
public static final String NUM_RECORDS_SEND_ERRORS = "numRecordsSendErrors";
public static final String CURRENT_SEND_TIME = "currentSendTime";
public static final String NUM_RECORDS_SEND = "numRecordsSend";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,8 +52,10 @@ public InternalOperatorIOMetricGroup(InternalOperatorMetricGroup parentMetricGro
numRecordsOutRate =
parentMetricGroup.meter(
MetricNames.IO_NUM_RECORDS_OUT_RATE, new MeterView(numRecordsOut));
numBytesIn = parentMetricGroup.getTaskIOMetricGroup().getNumBytesInCounter();
numBytesOut = parentMetricGroup.getTaskIOMetricGroup().getNumBytesOutCounter();
numBytesIn = parentMetricGroup.counter(MetricNames.IO_NUM_BYTES_IN);
numBytesOut = parentMetricGroup.counter(MetricNames.IO_NUM_BYTES_OUT);
parentMetricGroup.meter(MetricNames.IO_NUM_BYTES_IN_RATE, new MeterView(numBytesIn));
parentMetricGroup.meter(MetricNames.IO_NUM_BYTES_OUT_RATE, new MeterView(numBytesOut));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,7 @@
public class InternalSinkWriterMetricGroup extends ProxyMetricGroup<MetricGroup>
implements SinkWriterMetricGroup {

// deprecated, use numRecordsSendErrors instead.
@Deprecated private final Counter numRecordsOutErrors;
private final Counter numRecordsOutErrors;
private final Counter numRecordsSendErrors;
private final Counter numRecordsWritten;
private final Counter numBytesWritten;
Expand All @@ -45,9 +44,15 @@ private InternalSinkWriterMetricGroup(
MetricGroup parentMetricGroup, OperatorIOMetricGroup operatorIOMetricGroup) {
super(parentMetricGroup);
numRecordsOutErrors = parentMetricGroup.counter(MetricNames.NUM_RECORDS_OUT_ERRORS);
numRecordsSendErrors = parentMetricGroup.counter(MetricNames.NUM_RECORDS_SEND_ERRORS);
numRecordsWritten = parentMetricGroup.counter(MetricNames.NUM_RECORDS_SEND);
numBytesWritten = parentMetricGroup.counter(MetricNames.NUM_BYTES_SEND);
numRecordsSendErrors =
parentMetricGroup.counter(MetricNames.NUM_RECORDS_SEND_ERRORS, numRecordsOutErrors);
numRecordsWritten =
parentMetricGroup.counter(
MetricNames.NUM_RECORDS_SEND,
operatorIOMetricGroup.getNumRecordsOutCounter());
numBytesWritten =
parentMetricGroup.counter(
MetricNames.NUM_BYTES_SEND, operatorIOMetricGroup.getNumBytesOutCounter());
this.operatorIOMetricGroup = operatorIOMetricGroup;
}

Expand All @@ -73,7 +78,6 @@ public OperatorIOMetricGroup getIOMetricGroup() {
return operatorIOMetricGroup;
}

@Deprecated
@Override
public Counter getNumRecordsOutErrorsCounter() {
return numRecordsOutErrors;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -164,10 +164,7 @@ public void setup(
environment
.getMetricGroup()
.getOrAddOperator(config.getOperatorID(), config.getOperatorName());
this.output =
new CountingOutput<>(
output,
operatorMetricGroup.getIOMetricGroup().getNumRecordsOutCounter());
this.output = registerCounterOnOutput(output, operatorMetricGroup);
if (config.isChainEnd()) {
operatorMetricGroup.getIOMetricGroup().reuseOutputMetricsForTask();
}
Expand Down Expand Up @@ -649,4 +646,10 @@ public OperatorID getOperatorID() {
protected Optional<InternalTimeServiceManager<?>> getTimeServiceManager() {
return Optional.ofNullable(timeServiceManager);
}

protected Output<StreamRecord<OUT>> registerCounterOnOutput(
Output<StreamRecord<OUT>> output, OperatorMetricGroup operatorMetricGroup) {
return new CountingOutput<>(
output, operatorMetricGroup.getIOMetricGroup().getNumRecordsOutCounter());
}
}
Loading