Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FLINK-13184]Starting a TaskExecutor blocks the YarnResourceManager's main thread #10143

Closed
wants to merge 2 commits into from

Conversation

wangyang0918
Copy link
Contributor

@wangyang0918 wangyang0918 commented Nov 10, 2019

What is the purpose of the change

Currently, YarnResourceManager starts all task executors in main thread. This could cause RM to become unresponsive when launching a large number of TEs (e.g. > 1000) because it involves blocking I/O operations (writing files to HDFS, communicating with the node manager using a synchronous NMClient).

We could do the following two optimizations:

  • Use the NMClientAsync for the communication with Yarn's NodeManager
  • Don't upload files (avoid blocking file system operations), use dynamic properties instead

Brief change log

  • Use NMClientAsync instead of NMClient to avoid starting TaskExecutor blocking call.
  • Use dynamic properties instead of uploading taskmanager-conf.yaml to hdfs.

Verifying this change

  • The changes could be covered by the existing test. All the unit tests, integrated tests and e2e should pass.
  • Manually check with large scale, more than 1000 containers.
    ./bin/flink run -d -m yarn-cluster -p 1000 -yD jobmanager.heap.mb=4096 -yD taskmanager.heap.size=1500m -yD taskmanager.network.memory.fraction=0.4 examples/streaming/WindowJoin.jar

Does this pull request potentially affect one of the following parts:

  • Dependencies (does it add or upgrade a dependency): (no)
  • The public API, i.e., is any changed class annotated with @Public(Evolving): (no)
  • The serializers: (no)
  • The runtime per-record code paths (performance sensitive): (no)
  • Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes)
  • The S3 file system connector: (no)

Documentation

  • Does this pull request introduce a new feature? (no)
  • If yes, how is the feature documented? (not applicable)

@flinkbot
Copy link
Collaborator

flinkbot commented Nov 10, 2019

Thanks a lot for your contribution to the Apache Flink project. I'm the @flinkbot. I help the community
to review your pull request. We will use this comment to track the progress of the review.

Automated Checks

Last check on commit f158d75 (Wed Dec 04 15:56:35 UTC 2019)

Warnings:

  • No documentation files were touched! Remember to keep the Flink docs up to date!

Mention the bot in a comment to re-run the automated checks.

Review Progress

  • ❓ 1. The [description] looks good.
  • ❓ 2. There is [consensus] that the contribution should go into to Flink.
  • ❓ 3. Needs [attention] from.
  • ❓ 4. The change fits into the overall [architecture].
  • ❓ 5. Overall code [quality] is good.

Please see the Pull Request Review Guide for a full explanation of the review process.


The Bot is tracking the review progress through labels. Labels are applied according to the order of the review items. For consensus, approval by a Flink committer of PMC member is required Bot commands
The @flinkbot bot supports the following commands:

  • @flinkbot approve description to approve one or more aspects (aspects: description, consensus, architecture and quality)
  • @flinkbot approve all to approve all aspects
  • @flinkbot approve-until architecture to approve everything until architecture
  • @flinkbot attention @username1 [@username2 ..] to require somebody's attention
  • @flinkbot disapprove architecture to remove an approval you gave earlier

@flinkbot
Copy link
Collaborator

flinkbot commented Nov 10, 2019

CI report:

Bot commands The @flinkbot bot supports the following commands:
  • @flinkbot run travis re-run the last Travis build

@wangyang0918
Copy link
Contributor Author

@tillrohrmann is on vacation. @tisonkun Could you take a look this PR. I think it is very important for large scale deployment on yarn cluster.

@wangyang0918 wangyang0918 force-pushed the FLINK-13184 branch 2 times, most recently from 624a1e2 to 1adc1cb Compare November 10, 2019 10:07
Copy link
Contributor

@walterddr walterddr left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the contribution @wangyang0918 . I am pretty new to the cluster management side but we've faced similar issues when launching large amount of TM containers as well.

I have some questions regarding the detail approach (please bare with me if they appeared dump :-P). Please kindly take a look. Thanks --Rong.

@@ -491,41 +492,9 @@ static ContainerLaunchContext createTaskExecutorContext(
flinkJar = registerLocalResource(fs, remoteJarPath);
}

// register conf with local fs
final LocalResource flinkConf;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Q: This seems to be a relatively small content (especially if we consider this can be constructed as a Stringify argument using the getDynamicProperties code. Can I assume that the main time saver here is the sync interaction with the DFS during setupLocalResource?

If the above assumption is true, would changing the NMClient / AMRMClient to use async resolve the issue already?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I second @walterddr here that wondering how flink-conf.yaml becomes a bottleneck of TM deployment.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Uploading a taskmanager-conf.yaml to hdfs will take 50ms in many production hdfs cluster. If we receive more that 1000 containers, this will take long time in YarnResourceManager main thread. In addition, all the {uuid}-taskmanager-conf.yaml are same and have very small difference with flink config uploaded by client.

So i suggest to remove the io operation in start TaskExecutor. About NMClientAsync, it is another optimazion to eliminate the rpc blocking call in YarnResourceManager main thread.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

in this case, I think the real bottle neck is the HDFS synchronized upload.

I poked around and it seems like there's really no way to make it async, please correct me if I were wrong:

if there's no guarantee that all 100 files had been uploaded to HDFS, launching the request to YARN RM will result in a container localization failure (because when RM tries to ask NM to localize the container, the files may not be available yet in the HDFS)

follow up question: is it possible to upload just one file (by using the setupLocalResource) once and reuse them across different TMs? this makes the argument length of the startup script (of the TM) much shorter.

* @param targetConfig The target configuration.
* @return Dynamic properties as string, separated by space.
*/
static String getDynamicProperties(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Q: do these stringify arguments stored somewhere during the container launch? if yes, are they stored/managed by the NM thread?

If that's a yes, say users configure a rather large flink-conf.yaml, would this be a problem at all? (consider we have one stringify representation per container)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, it does not store anywhere. It will be passed to the launch command of TaskExecutor. We only get the differences with client uploaded flink-conf.yaml, and then pass it to TaskExecutor by dynamic properties.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I asked around, seems like the startup script is cached but the NM thread in some secure path (probably in nmPrivate, or somewhere private to the container itself).

In this case, is the following claim true?

In the situation where, say one worker container is dead, YARN will have to re-ship the entire payload of the launch command to another new host and restart it again. vs if the file is on HDFS, YARN RM can simply just ask the NM to re-localize the file from HDFS.

So the follow up question is: If the launch command is huge (say the properties we attached to the launch command as dynamic properties is beyond 1MB) would that cause any problem?

disclaimer: I haven't checked YARN code to verify this finding yet.

@tisonkun tisonkun self-requested a review November 11, 2019 02:46
fs,
appId,
new Path(tmpConfigurationFile.getAbsolutePath()),
localResources,
homeDir,
"");
envShipFileList.append(flinkConfigKey).append("=").append(remotePathConf).append(",");
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm curious this line. envShipFileList configures files ship on TM. With this line we actually ship flink-conf.yaml to TM even if we remove it from code diff above. What is your decision on whether or not ship flink-conf.yaml indeed?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, we always ship the flink-conf.yaml to all TaskExecutors. The TaskExecutor will load flink configuration from flink-conf.yaml uploaded by client and dynamic properties set by YarnResourceManager.

@tisonkun What do you mean "we remove it from code diff above".

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hmm. I second @tisonkun here - if we eventually have to put the flink-conf.yaml into the envShipFileList . wouldn't we still eventually upload the file into DFS (based on this comment, I thought this is the problem we were trying to avoid in the first place) ?

(@tisonkun please correct me if I understood you wrong)

Copy link
Member

@tisonkun tisonkun Nov 14, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I get the difference and I think it is the fault of Utils#L493 call the tm conf file flinkConf. IIUC we still ship the original flink-conf.yaml, and what @wangyang0918 trying to resolve here is FLINK ship a modified flink-conf.yaml per TM launched...My concern on this issue is shared below.

@wangyang0918 wangyang0918 force-pushed the FLINK-13184 branch 2 times, most recently from a944459 to 6885e12 Compare November 11, 2019 12:47
@wangyang0918
Copy link
Contributor Author

@tisonkun @walterddr
Thanks a lot for your detailed review. I have addressed all your comments. Please take a look again.

@hequn8128
Copy link
Contributor

@wangyang0918 Thanks a lot for the fix and thanks for the review @tisonkun @walterddr

Copy link
Contributor

@walterddr walterddr left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thanks for the update and for answering the questions @wangyang0918 @tisonkun . I have some follow up questions. Much appreciated if you can provide some follow up answers - Rong

@@ -491,41 +492,9 @@ static ContainerLaunchContext createTaskExecutorContext(
flinkJar = registerLocalResource(fs, remoteJarPath);
}

// register conf with local fs
final LocalResource flinkConf;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

in this case, I think the real bottle neck is the HDFS synchronized upload.

I poked around and it seems like there's really no way to make it async, please correct me if I were wrong:

if there's no guarantee that all 100 files had been uploaded to HDFS, launching the request to YARN RM will result in a container localization failure (because when RM tries to ask NM to localize the container, the files may not be available yet in the HDFS)

follow up question: is it possible to upload just one file (by using the setupLocalResource) once and reuse them across different TMs? this makes the argument length of the startup script (of the TM) much shorter.

* @param targetConfig The target configuration.
* @return Dynamic properties as string, separated by space.
*/
static String getDynamicProperties(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I asked around, seems like the startup script is cached but the NM thread in some secure path (probably in nmPrivate, or somewhere private to the container itself).

In this case, is the following claim true?

In the situation where, say one worker container is dead, YARN will have to re-ship the entire payload of the launch command to another new host and restart it again. vs if the file is on HDFS, YARN RM can simply just ask the NM to re-localize the file from HDFS.

So the follow up question is: If the launch command is huge (say the properties we attached to the launch command as dynamic properties is beyond 1MB) would that cause any problem?

disclaimer: I haven't checked YARN code to verify this finding yet.

fs,
appId,
new Path(tmpConfigurationFile.getAbsolutePath()),
localResources,
homeDir,
"");
envShipFileList.append(flinkConfigKey).append("=").append(remotePathConf).append(",");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hmm. I second @tisonkun here - if we eventually have to put the flink-conf.yaml into the envShipFileList . wouldn't we still eventually upload the file into DFS (based on this comment, I thought this is the problem we were trying to avoid in the first place) ?

(@tisonkun please correct me if I understood you wrong)

@wangyang0918
Copy link
Contributor Author

@walterddr
Thanks for your comments. I want to clarify again how the optimization works.
First, all the -yD arguments and the flink-conf.yaml under flink lib will be merged to a new file application_xxxx-flink-conf.yamlxxxx.tmp in flink client. Then it will be uploaded to hdfs and added to local resource for jobmanager. Also it is added to envShipFileList so that the taskmanager could have the same flink-conf.yaml with jobmanager through Yarn distributed cache.

Since we will update some config options in jobmanager(such as taskmanager.memory.size), so i suggest to pass this differences to taskmanager through dynamic properties. I think the updated config options in jobmanager are limited. So the dynamic properties will not have too many and the launch command will not be huge.

@walterddr @tisonkun If you both insist that we should upload only one taskmanager config to hdfs instead of using dynamic properties, i will update the PR to use this way. However, i still think using the dynamic properties is a better solution.

In addition, the launch_container.sh is not localized from hdfs. It is generated by Yarn NodeManager according to the ContainerLaunchContext through NMClient#startContainer.

@tisonkun
Copy link
Member

tisonkun commented Nov 14, 2019

I get your point now @wangyang0918 . One question, I see the only different between jm conf and tm conf is configured by

         public static Configuration cloneConfiguration(Configuration configuration) {
		final Configuration clonedConfiguration = new Configuration(configuration);

		if (clonedConfiguration.getBoolean(USE_LOCAL_DEFAULT_TMP_DIRS)){
			clonedConfiguration.removeConfig(CoreOptions.TMP_DIRS);
			clonedConfiguration.removeConfig(USE_LOCAL_DEFAULT_TMP_DIRS);
		}

		return clonedConfiguration;
	}

other configurations are all the same. Is it correct?

I'm thinking a way just neither use dynamic properties nor ship another conf file at all. Because as @walterddr mentioned, we still encode jm config into dynamic properties, which possibly a huge string.

@tisonkun
Copy link
Member

Besides, IMO first part of this thread is good to go. If we cannot reach a consensus in time it is a workaround that we separated commits into different PR/JIRA.

@wangyang0918
Copy link
Contributor Author

@tisonkun
The differences between jm and tm config at least contains the followings.

  • cloneConfiguration , just as you say
  • resourceManagerConfig.setString(TaskManagerOptions.LEGACY_MANAGED_MEMORY_SIZE, managedMemoryBytes + "b"); in ActiveResourceManagerFactory
  • YarnEntrypointUtils#loadConfiguration
  • ClusterEntrypoint#initializeServices
  • ...

We just set the differences to dynamic properties. If your concern is we will get a huge start command, i think it will not happen since we do not update too many config options in jm side.
@tisonkun @walterddr If you both still have the concern, maybe we need to split the PR into two optimizations.

@tisonkun
Copy link
Member

@wangyang0918 after a consideration my opinion is that the second part of this pull request doesn't fit the title "Starting a TaskExecutor blocks the YarnResourceManager's main thread". We don't mix changes with different purpose in one pass IMO. So I'd like to split the latter into a dedicate JIRA ticket so that we can track it clearly. @walterddr What do you think?

@wangyang0918
Copy link
Contributor Author

@tisonkun I opened a dedicate JIRA FLINK-14582 last week. After discussion with @tillrohrmann. He suggest to duplicate the JIRA and use FLINK-13184 instead. Because it is also a blocking operation in main thread.

If you have concern about removing uploading taskmanager-config.yaml, how about only upload one file for all taskmanagers? Since config files are all same now. @tisonkun @walterddr Does it make sense?

Copy link
Member

@tisonkun tisonkun left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for your explanation. Both of commits make sense to me. Still I prefer to split commits into different tickets but I respect that this is an important fix for the ongoing 1.8.3 releasing. So this patch is good to go in this form.

@walterddr
Copy link
Contributor

walterddr commented Nov 18, 2019

Sorry for the late follow up @tisonkun @wangyang0918. I think both approaches (splitting into 2 or uploading only one taskmanager-config.yaml) should work.

@tisonkun
Copy link
Member

@walterddr I'd like to do a check that your concerns are all addressed above, right?

…arting TaskExecutor blocking call. The start container requests will be executed in a thread pool of NMClientAsync.
…anager-conf.yaml to hdfs. This will reduce the time cost of launching a TaskExecutor so that YarnResourceManager could start a large number of TaskExecutors timely.
@wangyang0918
Copy link
Contributor Author

I have updated the commit messages so that they satisfy the general rules. @tisonkun And i suggest to not squash the two commits when you want to merge. It could show the two different changes.

@tisonkun
Copy link
Member

Yes of course. I'm waiting for a final check from @walterddr

Copy link
Contributor

@walterddr walterddr left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi @wangyang0918 yes I think I don't have any additional concerns. Thanks for addressing them so quickly and thoroughly. And yes I also agree with @tisonkun that we should not merge the two commits and rather making them stay as 2 separate one.

@tisonkun tisonkun closed this in a90520c Nov 20, 2019
@tisonkun
Copy link
Member

Thanks for your contribution @wangyang0918 ! Could you also open PRs towards 1.8 & 1.9 for back port?

@wangyang0918
Copy link
Contributor Author

@tisonkun Of course. I have opened the corresponding PRs for release-1.9 and release-1.8.

Copy link
Contributor

@tillrohrmann tillrohrmann left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for creating this fix @wangyang0918. I think it contains a problematic race condition which needs to be fixed as a follow up. Could you fix it?

@@ -407,15 +404,9 @@ private void startTaskExecutorInContainer(Container container) {
containerIdStr,
container.getNodeId().getHost());

nodeManagerClient.startContainer(container, taskExecutorLaunchContext);
nodeManagerClient.startContainerAsync(container, taskExecutorLaunchContext);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this call no longer needs to be in the try catch block.

Comment on lines +483 to +490
log.error("Could not start TaskManager in container {}.", containerId, throwable);

final ResourceID resourceId = new ResourceID(containerId.toString());
// release the failed container
workerNodeMap.remove(resourceId);
resourceManagerClient.releaseAssignedContainer(containerId);
// and ask for a new one
requestYarnContainerIfRequired();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This section is problematic because it is not being executed in the actor's main thread. This can lead to data races/corruption.

// ------------------------------------------------------------------------
@Override
public void onContainerStarted(ContainerId containerId, Map<String, ByteBuffer> map) {
log.debug("Succeed to call YARN Node Manager to start container", containerId);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

log.debug("Succeeded to call YARN Node Manager to start container with id {}.", containerId);


@Override
public void onContainerStopped(ContainerId containerId) {
log.debug("Succeed to call YARN Node Manager to stop container", containerId);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

log.debug("Succeeded to call YARN Node Manager to stop container with id {}.", containerId);

@tillrohrmann
Copy link
Contributor

Here is the JIRA issue for the problem: https://issues.apache.org/jira/browse/FLINK-15036

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

7 participants