Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FLINK-6998][Kafka Connector] Add kafka offset commit metrics in cons… #4187

Closed
wants to merge 2 commits into from

Conversation

zhenzhongxu
Copy link
Contributor

add "kafkaCommitsSucceeded" and "kafkaCommitsFailed" metrics in KafkaConsumerThread class.

@zentol
Copy link
Contributor

zentol commented Jun 27, 2017

Please resolve the checkstyle violations:


[ERROR] src/main/java/org/apache/flink/streaming/connectors/kafka/internal/KafkaConsumerThread.java:[96] (javadoc) JavadocStyle: First sentence should end with a period.
[ERROR] src/main/java/org/apache/flink/streaming/connectors/kafka/internal/KafkaConsumerThread.java:[99] (javadoc) JavadocStyle: First sentence should end with a period.

@tzulitai
Copy link
Contributor

tzulitai commented Jun 27, 2017

Hi @zhenzhongxu, thanks for the PR. I like the idea of exposing these metrics.

Regarding the metric naming:
I think we also expose metrics that are directly available through the Kafka client.
Apart from this PR, there was also previous discussion about adding other Flink-specific metrics to the connector (e.g., offset lag since last checkpoint).
I wonder whether or not we should regulate the naming scheme for these Flink-specific metrics, to set them apart from Kafka's provided metrics.

Regarding the implementation:
This current solution only exposes the metric for Kafka 09 and 010.
Ideally, this should be added to the FlinkKafkaConsumerBase, so that we don't end up doing the same thing in future versions. It would be a bit more work, such as providing a callback to AbstractFetcher.commitInternalOffsetsToKafka, but then it would be properly implemented.

What do you think?

<tr>
<th rowspan="1">Slot/Consumer</th>
<td>kafkaCommitsSucceeded</td>
<td>Kafka offset commit success count if Kafka commit is turned on.</td>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would make this statement more strict, in that the metric only exists if commit is turned on AND checkpointing is enabled. The added metrics would not appear if only Kafka's periodic offset committing is turned on.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure, thanks for the feedback. Will update.a

<tr>
<th rowspan="1">Slot/Consumer</th>
<td>kafkaCommitsFailed</td>
<td>Kafka offset commit failure count if Kafka commit is turned on.</td>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same here.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure, thanks for the feedback. Will update.a

@@ -119,6 +126,9 @@ public KafkaConsumerThread(

this.nextOffsetsToCommit = new AtomicReference<>();
this.running = true;

this.successfulCommits = kafkaMetricGroup.counter("kafkaCommitsSucceeded");
this.failedCommits = kafkaMetricGroup.counter("kafkaCommitsFailed");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please see my comment regarding the naming of the metrics.

@zhenzhongxu
Copy link
Contributor Author

@tzulitai

Regarding the metric naming:
Any suggestions on naming conventions for these flink specific metrics? How do you like 'kafkaconnector-commits-succeeded' (component-metric-name) as an example? I personally like hyphen seperators better than camel case for metric names.
I'll not include the other proposed metric in this PR just for the sake of simplicity. I also have some opinions on "offset lag" metric, I think this particular metric is more useful when some external entity
perform the monitoring (difference of committed offset vs log head), especially in failure situations.

Regarding the implementation:
Thanks for the feedback. I'll explore the more proper implementation suggested, I'll get back to you with a solution or question.

@zentol
Copy link
Contributor

zentol commented Jun 28, 2017

I don't think it matters to much whether a metric is completely measured by Flink or just forwarded from kafka classes, so i would not add "kafka" to the metric name. Having "kafka" in the name also introduces an inherent redundancy, since the scope for, say the KafkaConsumerThread, already contains "KafkaConsumer".

@zhenzhongxu
Copy link
Contributor Author

How about just "commits-succeeded" and "commits-failed" as metric names.

@tzulitai
Copy link
Contributor

I agree. "commits-succeeded" and "commits-failed" seems good!

@tzulitai
Copy link
Contributor

I'll not include the other proposed metric in this PR just for the sake of simplicity. I also have some opinions on "offset lag" metric, I think this particular metric is more useful when some external entity
perform the monitoring (difference of committed offset vs log head), especially in failure situations.

Yes, lets keep that apart from this PR. There is also a JIRA for exactly this feature: https://issues.apache.org/jira/browse/FLINK-6109

@zentol
Copy link
Contributor

zentol commented Jun 29, 2017

Please follow the camel-case pattern that we use for other metrics.

@tzulitai
Copy link
Contributor

tzulitai commented Jul 5, 2017

Hi @zhenzhongxu, could you rebase on to the latest master? Currently the PR contains commits unrelated to the change.

@zhenzhongxu
Copy link
Contributor Author

@tzulitai rebase done.

@@ -104,7 +106,7 @@
// configuration state, set on the client relevant for all subtasks
// ------------------------------------------------------------------------

/** Describes whether we are discovering partitions for fixed topics or a topic pattern. */
/** Descrnterbes whether we are discovering partitions for fixed topics or a topic pattern. */
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is probably an unintended change.

@@ -184,6 +186,18 @@
/** Flag indicating whether the consumer is still running. */
private volatile boolean running = true;

// ------------------------------------------------------------------------
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The indentation is off by 1 space.

@zhenzhongxu
Copy link
Contributor Author

@tzulitai seems the last CI pipeline failed because of stability issues, how can I trigger another build without making a commit?

</thead>
<tbody>
<tr>
<th rowspan="1">Slot/Consumer</th>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This documentation is inconsistent with the rest.

Scope should be "Operator", and you should add an additional "Infix" column which contains the names of the metric groups you are creating, concatenated with a period.

@Override
public void onComplete(Exception exception) {
if (exception == null) {
successfulCommits.inc();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If the callbacks are executed by another thread the result will be inaccurate, as the default counter implementation is not thread-safe.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is only invoked during checkpoint complete notification. There is no thread race condition.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would also like to raise a thread-safety issue here.

Currently, since there's always only one pending offset commit in Kafka 09+, and Kafka08 commits in a blocking call, there will be no race condition in incrementing the counters. However, changing these implementations in subclasses (perhaps in the future) can easily introduce race conditions here.

At the very least, we probably should add a notice about thread-safety contract in the Javadoc of commitInternalOffsetsToKafka.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think I'll add a detailed javadoc to describe the current unprotected implementation. I am hesitant to add in lock protection because higher level abstraction currently guarantees no concurrent commit at the moment.

package org.apache.flink.streaming.connectors.kafka.internals;

/**
* A callback interface that the user can implement to trigger custom actions when a commit request completes.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this callback actually exposed to users?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I updated the wording, this is only exposed to the kafka source operator (invoked upon checkpoint compete), not directly to users. Hence, there is no thread race condition either.

Copy link
Contributor

@tzulitai tzulitai left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the updates @zhenzhongxu.
Overall I like the changes so far! I have some comments inline.

*
* <p>Only one commit operation may be pending at any time. If the committing takes longer than
* the frequency with which this method is called, then some commits may be skipped due to being
* superseded by newer ones.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unnecessary empty space.

/** Counter for failed Kafka offset commits. */
private transient Counter failedCommits;

private transient KafkaCommitCallback offsetCommitCallback;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you include a Javadoc for this too? (for consistency)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sounds fair, I'll include a javadoc as well a notice about the thread-safety contract as you suggested.

@Override
public void onComplete(Exception exception) {
if (exception == null) {
successfulCommits.inc();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would also like to raise a thread-safety issue here.

Currently, since there's always only one pending offset commit in Kafka 09+, and Kafka08 commits in a blocking call, there will be no race condition in incrementing the counters. However, changing these implementations in subclasses (perhaps in the future) can easily introduce race conditions here.

At the very least, we probably should add a notice about thread-safety contract in the Javadoc of commitInternalOffsetsToKafka.

/**
* A callback interface that the user can implement to trigger custom actions when a commit request completes.
*/
public interface KafkaCommitCallback {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not entirely sure of the need of an interface here. Do we allow "version-specific subclasses" to have their own KafkaCommitCallback implementation?

If not, then this makes the usage slightly confusing.

@@ -896,6 +896,28 @@ Thus, in order to infer the metric identifier:
</tbody>
</table>

#### Connector:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's name this heading "Connectors", and add a new sub-heading specifically for kafka.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok make sense

@zhenzhongxu
Copy link
Contributor Author

@tzulitai all tests passing now. let me know if this looks ok now, also let me know if you want me to go ahead squash all commits.

* A callback interface that the source operator can implement to trigger custom actions when a commit request completes,
* which should normally be triggered from checkpoint complete event.
*/
public interface KafkaCommitCallback {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it would be nice to have two methods here: onSuccess() and onException(...).
Or does this have to be a SAM interface so you can use lambdas?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Having the two methods would make the om success case more clear...

*
* @param exception The exception thrown during processing of the request, or null if the commit completed successfully
*/
void onComplete(Exception exception);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Most other exception handlers take a Throwable. Would it make sense to do that here as well?

Zhenzhong Xu added 2 commits July 27, 2017 16:21
…umer callback

3CR feedback;make code change backward compatible with test cases;fix an earlier merge issue

add sub-heading for kafka connectors in metrics.md
Copy link
Contributor

@tzulitai tzulitai left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this is good to merge now, +1.
I have some final cosmetic stuff I would like to be changed, will address them myself when merging!
Thanks for the contribution @zhenzhongxu.

@@ -308,17 +311,33 @@ public void shutdown() {
*
* <p>Only one commit operation may be pending at any time. If the committing takes longer than
* the frequency with which this method is called, then some commits may be skipped due to being
* superseded by newer ones.
* superseded by newer ones.
*
* @param offsetsToCommit The offsets to commit
*/
public void setOffsetsToCommit(Map<TopicPartition, OffsetAndMetadata> offsetsToCommit) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think we need this variant anymore. The only user of the method is Kafka09Fetcher, anyways.

ZookeeperOffsetHandler zkHandler = this.zookeeperOffsetHandler;
if (zkHandler != null) {
try {
// the ZK handler takes care of incrementing the offsets by 1 before committing
zkHandler.prepareAndCommitOffsets(offsets);
if (commitCallback != null) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would actually like to remove these null checks, and have the contract that a callback will always be provided with @Nonnull annotation.

AFAIK, the only reason we need these null checks is that the tests, for simplicity, provide a null as the callback. IMO, it isn't a good practice to have logic in the main code just to satisfy testing shortcuts.

}
catch (Exception e) {
if (running) {
if (commitCallback != null) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

See my comment above. Would like to remove these null checks.

@@ -482,6 +501,12 @@ public void onComplete(Map<TopicPartition, OffsetAndMetadata> offsets, Exception

if (ex != null) {
log.warn("Committing offsets to Kafka failed. This does not compromise Flink's checkpoints.", ex);
if (callerCommitCallback != null) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

See my comment above. Would like to remove these null checks.

callerCommitCallback.onException(ex);
}
}
else if (callerCommitCallback != null) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

See my comment above. Would like to remove these null checks.

tzulitai added a commit to tzulitai/flink that referenced this pull request Jul 28, 2017
@tzulitai
Copy link
Contributor

Merging ..

@asfgit asfgit closed this in 1ded2d8 Jul 28, 2017
tzulitai added a commit to tzulitai/flink that referenced this pull request Jul 28, 2017
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
6 participants