-
Notifications
You must be signed in to change notification settings - Fork 13k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[FLINK-14070] [runtime] Use TimeUtils to parse duration configs in flink-runtime #9745
[FLINK-14070] [runtime] Use TimeUtils to parse duration configs in flink-runtime #9745
Conversation
Thanks a lot for your contribution to the Apache Flink project. I'm the @flinkbot. I help the community Automated ChecksLast check on commit 8097abd (Wed Oct 16 08:28:53 UTC 2019) Warnings:
Mention the bot in a comment to re-run the automated checks. Review Progress
Please see the Pull Request Review Guide for a full explanation of the review process. The Bot is tracking the review progress through labels. Labels are applied according to the order of the review items. For consensus, approval by a Flink committer of PMC member is required Bot commandsThe @flinkbot bot supports the following commands:
|
@@ -49,8 +51,6 @@ import scala.language.postfixOps | |||
object AkkaUtils { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The parsing of akka configs are not replaced by TimeUtils.parseDuration(..), since they are directly used for ActorSystem creation.
The only exception is "akka.ask.timeout" which is also widely used as Flink RPC timeout config. So I parsed it with TimeUtils only when it is used by Flink components (in AkkaUtils#getTimeout). It is still parsed by scala Duration when used by akka (in AkkaUtils#getRemoteAkkaConfig).
The configs "akka.lookup.timeout" and "akka.client.timeout" are actually not akka configs. They are only used by Flink components so the parsing logics for them are updated.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Would it hurt to replace https://github.com/apache/flink/blob/master/flink-runtime/src/main/scala/org/apache/flink/runtime/akka/AkkaUtils.scala#L400 with val akkaAskTimeout = Duration(getTimeout(configuration).toMillis, TimeUnit.MILLISECONDS)
? The benefit would be that we don't use two mechanisms for parsing. Consequently, these two mechanism don't need to stay in sync in the future.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I gave it a thought again and I think we can replace them.
The concern was that scala Duration supports infinite values like Inf, +Inf, etc. Replacing the parsing may break jobs specifying infinite values. However, since we never mentioned infinite durations in the config descriptors, I think we can treat them as finite durations and throw errors for infinite values.
Besides that, do you think we should parse the other duration configs (e.g. "akka.tcp.timeout") which were treated as string and directly set to Akka configs? Parsing them can help to do an early check, rather than expecting Akka to fail due to bad configs.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
A hotfix is added to convert akka duration configs to akka recognizable duration format.
Given that "these two mechanism don't need to stay in sync in the future" and TimeUtils currently supports non-labeled config, I think the hotfix is necessary.
cdc9812
to
b0cd94b
Compare
b0cd94b
to
156dcfe
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for reducing flink-runtime's
Scala dependency @zhuzhurk. The change looks good to me. I had a couple of minor comments and a question whether we should not also use AkkaUtils#getTimeout
in #getRemoteAkkaConfig
.
|
||
new FiniteDuration(duration.toMillis, TimeUnit.MILLISECONDS) | ||
def getTimeout(config: Configuration): time.Duration = { | ||
return TimeUtils.parseDuration(config.getString(AkkaOptions.ASK_TIMEOUT)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
return
is not needed
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done.
|
||
new FiniteDuration(duration.toMillis, TimeUnit.MILLISECONDS) | ||
def getLookupTimeout(config: Configuration): time.Duration = { | ||
return TimeUtils.parseDuration(config.getString(AkkaOptions.LOOKUP_TIMEOUT)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
same here with return
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done.
|
||
new FiniteDuration(duration.toMillis, TimeUnit.MILLISECONDS) | ||
def getClientTimeout(config: Configuration): time.Duration = { | ||
return TimeUtils.parseDuration(config.getString(AkkaOptions.CLIENT_TIMEOUT)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
and here
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done.
@@ -49,8 +51,6 @@ import scala.language.postfixOps | |||
object AkkaUtils { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Would it hurt to replace https://github.com/apache/flink/blob/master/flink-runtime/src/main/scala/org/apache/flink/runtime/akka/AkkaUtils.scala#L400 with val akkaAskTimeout = Duration(getTimeout(configuration).toMillis, TimeUnit.MILLISECONDS)
? The benefit would be that we don't use two mechanisms for parsing. Consequently, these two mechanism don't need to stay in sync in the future.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the fixups @zhuzhurk. LGTM. Merging now.
What is the purpose of the change
TimeUtils now can parse all duration configs supported by scala FiniteDuration.
And we'd like to use it to replace scala Duration for duration config parsing.
This is one step to make flink-runtime scala free.
Brief change log
Verifying this change
This change is already covered by existing tests of changed components.
Does this pull request potentially affect one of the following parts:
@Public(Evolving)
: (yes / no)Documentation