-
Notifications
You must be signed in to change notification settings - Fork 13.7k
[FLINK-5133][core] Support to set resource for operator in DataStream and DataSet #3303
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
|
@StephanEwen , this PR includes the new API that would be visible to user, but it can not work completely because the following codes in runtime have not been submitted. In order not to confuse users, this PR would be fixed to hide the API temporarily before merging into master. What do you think? |
|
The code here looks very good, with a few minor comments. The main problem is as you mentioned: We are adding something to the API that is not yet supported by the runtime. We have done this before and it always confused users. If this pull request is changed to "hide" the public API part (like commenting out the public getters and setters), then most of the changes are hidden. It looks like most of the internal code also depends on public getters and hence needs to be commented out as well. What do you think we should do? |
|
A more general question on the resource matching: If I understand it correctly, then the resource manager will try to get the "max" resources for an operator, but potentially go down to the "min" resources if it cannot get the "max" resources. I am wondering if "max" is confusing here and we should rather call it "preferred". Calling it "max" would make users think that those are the maximum resources that the operator can handle. I think many users would set max very high, thinking that this gives Flink more freedom to find suitable resources. They probably do not expect from the name that it always tries to satisfy the "max" resources. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Some minor comments in line.
|
|
||
| private int parallelism = ExecutionConfig.PARALLELISM_DEFAULT; // the number of parallel instances to use | ||
|
|
||
| private ResourceSpec minResource; // the minimum resource of the contract instance. optional |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In the newer code changes, we started using annotations like @Nullable (javax.annotation.Nullable) to mark fields that may be null. We found that this helps in code readability and in automatic bug detection in IntelliJ.
I would encourage you to use that as well, if this field can be null.
| * @return The data sink with set minimum and maximum resources. | ||
| */ | ||
| public DataSink<T> setResource(ResourceSpec minResource, ResourceSpec maxResource) { | ||
| Preconditions.checkArgument(minResource != null && maxResource != null, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Minor comment: Most places throw NullPointerException (checkNotNull) for null arguments and use IllegalArgumentException (checkArgument) for non-null but invalid parameters.
| /** | ||
| * The minimum resource for this stream transformation. It defines the lower limit for | ||
| * dynamic resource resize in future plan. | ||
| */ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should this be initialized with UNKNOWN or can this field be null? To understand these kind of assumptions, the @Nullable annotation often helps.
| /** | ||
| * Returns the minimum resource of this operation. | ||
| */ | ||
| def getMinResource: ResourceSpec = stream.getMinResource() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
To make getters feel more like proper Scala, I would write this as
def minResource: ResourceSpec = stream.getMinResource()The "get" keyword is something that Java encourages, but Scala discourages.
| /** | ||
| * Sets the resource of this operation. | ||
| */ | ||
| def setResource(resource: ResourceSpec): DataStream[T] = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The more Scala-like way of writing this would be
def resource_=(resource: ResourceSpec): Unit = {
...
}That way you can write Scala code like this (and it will translate to the setter call):
val stream: DataStream[MyType] = ...
stream.resource = new Resource(a, b, c, d)(We have missed that in most places in Flink so far, but it may make sense to start picking up the proper Scala style for new changed)
| /** | ||
| * Sets the resource of this operation. | ||
| */ | ||
| def setResource(resource: ResourceSpec) = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
May be easier to just call setResource(resource, resource) in this function.
| case op: Operator[_, _] => op.setResource(minResource, maxResource) | ||
| case di: DeltaIterationResultSet[_, _] => | ||
| di.getIterationHead.setResource(minResource, maxResource) | ||
| case _ => |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would suggest to change the error message in all places to something like "Operator does not support configuring custom resources specs".
|
One thought that @tillrohrmann and me had: It is probably okay to comment out or remove the setters and keep the getters. That should help in keeping the internal code. |
de7c4c7 to
c4aa9c0
Compare
|
Hi @StephanEwen , thanks for detail reviews of this PR and I learnt a lot from your comments. I considered all your suggestions above and submitted the modifications, including:
BTW, after this PR merge, I would submit the blocked codes related with JobGraph generation and runtime stack. |
|
I have merged this to my local repository. In particular I fixed that and also renamed the variables to use "resources" consistently as plural, and harmonized the Java getter style. |
|
@StephanEwen , sorry for my carelessness of checkNotNull, it is a low mistake. And I passed the "clean verify" in my local machine, thank you for merging! |
This introduces the internals, but does not yet make it public in the API. This closes apache#3303
This introduces the internals, but does not yet make it public in the API. This closes apache#3303
This introduces the internals, but does not yet make it public in the API. This closes apache#3303
This is part of the fine-grained resource configuration.
For DataStream, the setResource API will be setted onto SingleOutputStreamOperator similar with other existing properties like parallelism, name, etc.
For DataSet, the setResource API will be setted onto Operator in the similar way.
There are two parameters described with minimum ResourceSpec and maximum ResourceSpec separately in the API for considering dynamic resource resize in future improvements.