New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[FLINK-8431] [mesos] Allow to specify # GPUs for TaskManager in Mesos #5307
Conversation
@EronWright @tillrohrmann Review this PR please |
1dc7330
to
b8bfc94
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This looks great. I will review the changelog of Fenzo as I mentioned and reply back.
flink-mesos/pom.xml
Outdated
@@ -88,7 +88,7 @@ under the License. | |||
<dependency> | |||
<groupId>com.netflix.fenzo</groupId> | |||
<artifactId>fenzo-core</artifactId> | |||
<version>0.9.3</version> | |||
<version>0.10.0</version> |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
My number one concern is the impact of updating the Fenzo dependency. You're taking a conservative approach here, which is probably wise. But I also have an aversion to .0
releases. Can we compromise and use 0.10.1
? Meanwhile I am reviewing the Fenzo changelog.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
0.10.1 works:
- It passes all tests under flink-mesos (do you think it is enough to run tests under flink-mesos for the verification of Fenzo upgrade?)
- I tested the scenario above in my test environment (less than and more than available GPUs)
I'm going to test 1.0.1 after the current running of CI is done.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I repeated the processes for Fenzo-1.0.1 and it all passes.
@@ -174,8 +183,9 @@ public AssignedResources getAssignedResources() { | |||
public String toString() { | |||
return "Request{" + | |||
"cpus=" + getCPUs() + | |||
"memory=" + getMemory() + | |||
'}'; | |||
", memory=" + getMemory() + |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good catch on those commas.
@@ -661,6 +661,7 @@ private LaunchableMesosWorker createLaunchableMesosWorker(Protos.TaskID taskID, | |||
// create the specific TM parameters from the resource profile and some defaults | |||
MesosTaskManagerParameters params = new MesosTaskManagerParameters( | |||
resourceProfile.getCpuCores() < 1.0 ? taskManagerParameters.cpus() : resourceProfile.getCpuCores(), | |||
taskManagerParameters.gpus(), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We'll need to add GPU at some point to ResourceProfile
. @eastcirclek did you investigate that possibility?
throw new IllegalConfigurationException(MESOS_RM_TASKS_GPUS.key() + | ||
" cannot be negative"); | ||
} | ||
if (gpus % 1 != 0) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
(minor concern) Is there a possibility that the approximate nature of double will bite us? If the number must be whole, we could parse as an integer.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We'd better be on the safe side as you said.
|
||
@Override | ||
public Map<String, Double> getScalarValues() { | ||
return aggregatedScalarResourceMap; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I notice that we return all scalar resource types (cpus
, gpus
, ...) here, but in LaunchableMesosWorker::getScalarRequests
we return only the generic resource types (gpus
). Would you please double-check that this is expected by Fenzo? I wouldn't want Fenzo to double-count the cpus
or something.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You are correct. It can cause confusion to contain an entry for cpus
in Offer::aggregatedScalarResourceMap
. We need to return only generic resource types (other than cpus
, mem
, network
, and disk
) as we do in LaunchableMesosWorker::getScalarRequests
.
e -> e.getKey(), | ||
e -> e.getValue().stream().mapToDouble(r -> r.getScalar().getValue()).sum() | ||
)); | ||
this.aggregatedScalarResourceMap = Collections.unmodifiableMap(aggregatedScalarResourceMap); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this change was motivated by a need to implement the new getScalarValues
. It is good that the aggregation still occurs eagerly.
The logic could probably be simplified using Map::merge
within the for
loop and skipping the creation of scalarResourceMap
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
thanks for the tip on Java 8 👍
79a084e
to
409290a
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1
@tillrohrmann please merge |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the contribution @eastcirclek. The changes look good to me.
I've got a general question for my understanding. Is the original problem which we want to solve that Flink does not use agents which have GPU resources or that Flink cannot specify the number of GPUs it requires to run? It looks as if the PR solves the latter but I was wondering whether we shouldn't solve the former problem. Because if a user runs Flink on a Mesos cluster with mixed agents (some have GPUs other not), then it can either run on the set of GPU agents or non GPU agents. Wouldn't it also make sense to let Flink run on both sets or is this not in the scope of this PR?
@@ -238,6 +254,12 @@ public static MesosTaskManagerParameters create(Configuration flinkConfig) { | |||
cpus = Math.max(containeredParameters.numSlots(), 1.0); | |||
} | |||
|
|||
double gpus = Math.floor(flinkConfig.getDouble(MESOS_RM_TASKS_GPUS, 0.0)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Are we rounding down because GPUs, cannot be shared? If this is the case, why don't we restrict the Flink configuration value to be an integer?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
My feedback was similar, we could simply parse as an integer. The value must be an integer due to a limitation in Mesos. But the present solution seemed OK.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@tillrohrmann I took possible changes in the future into consideration: what if Mesos accepts a float for # GPUs? If both of you @tillrohrmann and @EronWright think that taking an integer seems better, I'm going to follow your opinion 😃
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
From the user perspective I think it's clearer to make it an integer because then we don't confuse users who haven't read the code and think that they can configure a fraction of a GPU based on its type. Once Mesos accepts floats, we can change it in Flink as well. I will apply the change while merging the PR.
@tillrohrmann regarding your general question, you are right that Flink could, in concept, deploy to GPU hosts even if Flink doesn't require any GPUs. But we should keep in mind the intent of We could adjust the logic in this PR to the effect that the |
As you pointed out, the discussion we had in the mailing list was about JM not starting TMs on GPU-equipped agents. It turned out that a Mesos framework needs to specify a The reason I create FLINK-8431 to allow to specify # gpus is that TMs are not going to see GPUs if they do not request GPUs explicitly and GPUs are isolated as shown in link. Regarding your question,
Yes, the scope of FLINK-8431 and this PR is confined to the latter.
I don't think we need to take care of the former anymore because For those who are in a situation in which JM does not get offers that contains GPUs, I'd like to suggest to restart the Mesos master with |
Thanks for the clarification @EronWright and @eastcirclek. I'll merge the PR then. Thanks for your work :-) |
[FLINK-8431] Upgrade Fenzo dependency to 0.10.1 [FLINK-8431] Simplify scalar aggregation [FLINK-8431] Offer::getScalarValues need not contain entries for cpus, mem, disk, and network [FLINK-8431] Floor # gpus to make sure whole numbers This closes apache#5307.
What is the purpose of the change
This PR introduces a new configuration property named "mesos.resourcemanager.tasks.gpus" to allow users to specify # of GPUs for each TaskManager process in Mesos. The configuration property is necessary because TaskManagers that do not specify to use GPUs cannot see GPUs at all when Mesos agents are configured to isolate GPUs as shown in [1].
[1] http://mesos.apache.org/documentation/latest/gpu-support/#agent-flags
Brief change log
Verifying this change
I tested it by launching a standalone Flink cluster using ./bin/mesos-appmaster.sh. I tested the following scenarios with Mesos configured with --filter_gpu_resources.
LaunchCoordinator isn't given any offer because MesosFlinkResourceManager does not enable GPU_RESOURCES capability when mesos.resourcemanager.tasks.gpus is not specified or it is set to 0.
Given offers, LaunchCoordinator aggregates offers of different roles from the same node and puts aggregated offers to Fenzo for scheduling resources over nodes. When notified of the success of scheduling from Fenzo, LaunchCoordinator allocates resources of different roles to tasks and then populate Protos.TaskInfo using the allocated resources which is then wired to the Mesos master.
Given offers, LaunchCoordinator aggregates offers of different roles from the same node and puts aggregated offers to Fenzo. However, Fenzo notifies LaunchCoordinator of the failure of scheduling with the following messages:
AssignmentFailure {resource=Other, asking=3.0, used=0.0, available=2.0, message=gpus}.
Does this pull request potentially affect one of the following parts:
@Public(Evolving)
: noDocumentation