Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Flink-7945][Metrics&connector]Fix per partition-lag metric lost in kafka connector #4935

Closed
wants to merge 1 commit into from

Conversation

Aitozi
Copy link
Contributor

@Aitozi Aitozi commented Nov 1, 2017

What is the purpose of the change

*When used KafkaConnector, we cant get per partition lag metric. But it has been exposed after kafka 0.10.2 https://issues.apache.org/jira/browse/KAFKA-4381. After read the kafka code, i found that the per partition lag is register after KafkaConsumer#poll method be invoked, so i change the metric register time in flink , and after this, with kafka-connector10 and kafka-connector11 we can see the correct lag metric. *

Brief change log

  • Change the kafka metric register time in Flink kafka-connector

Verifying this change

This change is already run through the test case

Does this pull request potentially affect one of the following parts:

  • Dependencies (does it add or upgrade a dependency): (no)
  • The public API, i.e., is any changed class annotated with @Public(Evolving): (no)
  • The serializers: (no)
  • The runtime per-record code paths (performance sensitive): (yes)
  • Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (no)

Documentation

  • Does this pull request introduce a new feature? (no)

@Aitozi
Copy link
Contributor Author

Aitozi commented Nov 2, 2017

cc @zentol @tzulitai please help review the code.

Map<MetricName, ? extends Metric> metrics = consumer.metrics();
if (metrics == null) {
// MapR's Kafka implementation returns null here.
log.info("Consumer implementation does not support metrics");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The log will be overloaded with these, if the MapR implementation is used and metrics is turned on.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i change the level to debug.

log.info("Consumer implementation does not support metrics");
} else {
// we have Kafka metrics, register them
for (Map.Entry<MetricName, ? extends Metric> metric: metrics.entrySet()) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm really not sure about this.
This includes a loop through every consumer metric on every record poll.
AFAIK, the Kafka consumer contains at least 6~8 shipped metrics. That could be harmful for the performance of the consumer.

Is there any way we can avoid that?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, i agree with you this is not the best way to solve. what do you think about try to register kafka metrics at the beginnng of the job for about serval times which can be configured by properties, after beyond the count, we will not run in the loop~

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i have realized the register several times and then skip the register, and then can successfully register the related metrics. Please let me know if you have any suggestion, thanks~ and the commits have been squash :)

@tzulitai
Copy link
Contributor

tzulitai commented Nov 2, 2017

One other side note:
Please squash your commits into a single one, with an appropriate commit message (you can refer to the other commits in the codebase as a starter).

[FLINK-7945][Metrics&connector]Fix per partition-lag metric lost in kafka connector is good, but we would like that to be the commit message also.

@Aitozi Aitozi force-pushed the FLINK-7945 branch 3 times, most recently from d40d0f7 to 9b70e49 Compare November 4, 2017 04:11
@Aitozi
Copy link
Contributor Author

Aitozi commented Nov 4, 2017

update the code according to the comment. ping @tzulitai

@Aitozi
Copy link
Contributor Author

Aitozi commented Nov 7, 2017

Hi @tzulitai , could you take look at this again :-) ?

@Aitozi
Copy link
Contributor Author

Aitozi commented Nov 27, 2017

ping @tzulitai ~

@tzulitai
Copy link
Contributor

Hi @Aitozi, sorry for the long delay in relaying back to this PR.

I'm still not convinced that this is a sane solution. For example, what is a "good" setting for the KEY_REGISTER_TIMES property? Isn't 1 enough, since you mentioned that the missing metric is registered in Kafka after the first poll. Making this configurable seems unnecessary to me.

I wonder if we can try the following two approaches:

  1. Manually register the metrics that we know would be missing before the first poll, or
  2. Poll once first outside the loop just to make sure that all Kafka metrics are existent, perform the metrics registration, and then start the regular fetch loop.

What do you think?

@Aitozi
Copy link
Contributor Author

Aitozi commented Nov 30, 2017

Hi, @tzulitai
After i read kafkaConsumer code again, i found that the per partition kafka lag metric is register in method FetchManagerMetrics#recordPartitionLag But the when the client get the num equal to max.poll.records at once poll, it will return the record it polls in advance left some partition haven't not been sendFetches to. So some partition will be lost. In test , if we just poll once , then register kafka metric , if i have many partition like about(100), some partition lag metric will be losed.
So i think, with a configurable property, users can choose to when they have too many partition, and will do little harmless to the performance .

Please let me know your idea ,thanks

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants