Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-29993][SS]Avoid continuous warn log in get record and offset from Kafka #26632

Closed
wants to merge 1 commit into from

Conversation

wenxuanguan
Copy link
Contributor

What changes were proposed in this pull request?

Log warn once when create Kafka consumer to avoid continuous warn log in ss application.

Why are the changes needed?

In streaming application, Kafka consumer run in UninterruptibleThread, in which log warn info every
time consumer get record and offset, making continuous warn log.
Since warn info is for Kafka consumer, and there is no need to log every time when consumer get record and offset, we can log once when create consumer.

Does this PR introduce any user-facing change?

No

How was this patch tested?

N/A

@AmplabJenkins
Copy link

Can one of the admins verify this patch?

@@ -587,8 +587,11 @@ private[kafka010] class KafkaDataConsumer(
case ut: UninterruptibleThread =>
ut.runUninterruptibly(body)
case _ =>
logWarning("KafkaDataConsumer is not running in UninterruptibleThread. " +
"It may hang when KafkaDataConsumer's methods are interrupted because of KAFKA-1894")
val warned = _consumer.isDefined
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It looks less intuitive for me. I'm not sure how much this makes us be bothered (the message is quite serious one so I'd be even OK if it's a bit bothering), but if it's really bugging, then I'd add an explicit flag to check it.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@HeartSaVioR In micro-batch application, every mini-batch job will generate two logs per task in executor log file. Since micro-batch application is long running, there will be massive warning log.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The thing is, the thread should have been UninterruptibleThread; if we found the other case we should file an issue and try to fix.

@gaborgsomogyi
Copy link
Contributor

I think the issue is more than just logging something here and there. KAFKA-1894 is a serious issue which was fixed in Kafka 2.0.0. Since the change only effects the consumer and Spark has already this version maybe Spark doesn't need to run consumers on UninterruptibleThread.

  • Micro-batch processing has still this feature which can be removed after it has been double checked that the mentioned Kafka jira solved all the APIs which are used by Spark.
  • Continuous processing I guess not considered KAFKA-1894

So my point is first we must understand on what APIs KAFKA-1894 provided solution and then I see 2 options:

  • All the APIs which used by Spark fixed => UninterruptibleThread can be removed.
  • NOT all the APIs which used by Spark fixed => Continuous processing must use it

@HeartSaVioR
Copy link
Contributor

HeartSaVioR commented Nov 27, 2019

KAFKA-1894 will not be resolved with just a version upgrade, as it was closed with KIP-266 as a solution. It requires to change the calls of Kafka API as non-blocking version (Having Duration as parameter), which we still don't migrate. We still call poll() with long parameter, and also wait KIP-396 for replacement of poll(0).

When we migrate we should take the side effect into account that the semantic of timeout is changing - it will include the duration of fetching metadata as well. End users need to set bigger value of timeout, or we should add artificial value to given timeout.

@gaborgsomogyi
Copy link
Contributor

KAFKA-1894 will not be resolved with just a version upgrade

+1

It requires to change the calls of Kafka API as non-blocking version

That's what we can't do all the places at the moment since API is still missing which will be only available in Kafka 2.5.

@wenxuanguan
Copy link
Contributor Author

@HeartSaVioR @gaborgsomogyi Since warn log is important and need to be logged every time, I will close this PR.
Thanks for your reply.

@wenxuanguan wenxuanguan closed this Dec 2, 2019
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
5 participants