Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-21714][CORE][YARN] Avoiding re-uploading remote resources in yarn client mode #18962

Closed
wants to merge 4 commits into from

Conversation

jerryshao
Copy link
Contributor

What changes were proposed in this pull request?

With SPARK-10643, Spark supports download resources from remote in client deploy mode. But the implementation overrides variables which representing added resources (like args.jars, args.pyFiles) to local path, And yarn client leverage this local path to re-upload resources to distributed cache. This is unnecessary to break the semantics of putting resources in a shared FS. So here proposed to fix it.

How was this patch tested?

This is manually verified with jars, pyFiles in local and remote storage, both in client and cluster mode.

Change-Id: I2cb667aedd53b228e6dbfed5725cd8b268a498e9
…ver in yarn mode

Change-Id: I6317a464c4fd526a8057c578a05a60420d975a47
@jerryshao jerryshao changed the title [SPARK-21714][CORE][YARN] Avoiding re-uploading remote resources in yarn client [SPARK-21714][CORE][YARN] Avoiding re-uploading remote resources in yarn client mode Aug 16, 2017

// Spark on YARN doesn't support upload remote resources from http, https or ftp server
// directly to distributed cache, so print a warning and exit the process.
if (isNoneFsFileExist(args.jars) ||
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In the yarn mode, directly uploading remote resources from http(s) to hadoop FS will throw exception, so here guard out such scenario.

We can also support this by filtering and downloading http(s) resources to local and re-upload to Hadoop FS. But it makes the code here quite complicated, so I chose to simply guard out such scenario.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In the yarn mode, directly uploading remote resources from http(s) to hadoop FS will throw exception

That sounds like a bug.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what is the error you see here with http: files?

https://hadoop.apache.org/docs/r2.7.2/api/org/apache/hadoop/filecache/DistributedCache.html

claims it supports http and hdfs

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The code here compare two FSs and copy src to dest if FS is different. AFAIK there's no http scheme in Hadoop, so val srcFs = srcPath.getFileSystem(hadoopConf) this probably will throw exception.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That kinda looks like a bug. Spark shouldn't be trying to upload files that the distributed cache can handle itself; not sure if there's a programmatic way of figuring out the list of schemes that it supports, though.

At worst, Spark shouldn't do anything for those URLs; executors should be able to download directly from http / https if needed.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@tgravescs I checked the code in NM ContainerLocallizer and FSDownload, If I understand correctly of YARN code, seems here it still tries to use Hadoop FileSystem to download resources, since scheme "http" is not support in current version of Hadoop, then I assume it will be failed here. Also how to create a LocalResource from remote resource in http(s) in also a question.

    FileSystem sourceFs = sCopy.getFileSystem(conf);
    Path dCopy = new Path(dstdir, "tmp_"+sCopy.getName());
    FileStatus sStat = sourceFs.getFileStatus(sCopy);
    if (sStat.getModificationTime() != resource.getTimestamp()) {
      throw new IOException("Resource " + sCopy +
          " changed on src filesystem (expected " + resource.getTimestamp() +
          ", was " + sStat.getModificationTime());
    }
    if (resource.getVisibility() == LocalResourceVisibility.PUBLIC) {
      if (!isPublic(sourceFs, sCopy, sStat, statCache)) {
        throw new IOException("Resource " + sCopy +
            " is not publicly accessable and as such cannot be part of the" +
            " public cache.");
      }
    }

    FileUtil.copy(sourceFs, sStat, FileSystem.getLocal(conf), dCopy, false,
        true, conf);
    return dCopy;

The link above mentioned about DistributedCache seems a MR only thing.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

From my understanding, to solve this issue, there's some solutions:

  1. Filtering resource from http(s) and add to spark.jars/spark.files, using Spark's internal fileServer to distribute. This should be worked in yarn client mode, but in yarn cluster mode it cannot be worked, since AM doesn't have such resources.
  2. Filtering resources from http(s) and download them to local both in yarn client and cluster mode. This will re-upload resources to Hadoop FS, so the problem here is resources are downloaded and uploaded unnecessarily. But this could solve the whole problem here, also if there's no internet access other than gateway nodes, this should be a possible solution. I assume not so many users will use resources on http(s), so as a compromise solution it looks acceptable.

What do you think? @vanzin @tgravescs .

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

https://issues.apache.org/jira/browse/HADOOP-14383 is adding http for filesystem. That doesn't help us right now though.
This is basically something that isn't supported now or ever in spark 2.x (regardless of this issue) so it feels like this could be a separate issue. I'm ok with leaving this code here as a warning for users and then us file separate jira. @vanzin what do you think?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I also think this is a separate issue. It didn't work before, it's ok if it doesn't work now. Maybe it works in 2.2, but I think it's safe to assume people aren't really depending on that since 2.2 is very recent.

This change should be about avoiding to re-upload files downloaded by the client, and we can look at fixing http-on-YARN in a different patch.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @tgravescs @vanzin , I agree with you. I will separate fixing http-on-YARN to another JIRA.

@@ -461,6 +488,7 @@ object SparkSubmit extends CommandLineUtils {
OptionAssigner(args.queue, YARN, ALL_DEPLOY_MODES, sysProp = "spark.yarn.queue"),
OptionAssigner(args.numExecutors, YARN, ALL_DEPLOY_MODES,
sysProp = "spark.executor.instances"),
OptionAssigner(args.pyFiles, YARN, ALL_DEPLOY_MODES, sysProp = "spark.yarn.dist.pyFiles"),
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

"spark.submit.pyFiles" both used in python and YARN#client, where in python it requires local files, but in YARN#client we can support the remote files, so using one configuration to control two scenarios is hard to do. In YARN I use a new configuration only for pyfiles to add to distributed cache.

@jerryshao
Copy link
Contributor Author

I will also add some UTs to partially verify the changes.

Ping @tgravescs @vanzin to help to review the changes, thanks a lot!

@SparkQA
Copy link

SparkQA commented Aug 16, 2017

Test build #80734 has finished for PR 18962 at commit 6e30931.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

Change-Id: Iccd687d4b531376dec9751ef9238f56be3a9719a
@SparkQA
Copy link

SparkQA commented Aug 22, 2017

Test build #80951 has finished for PR 18962 at commit 16ce99f.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

}
// Add the main application jar and any added jars to classpath in case YARN client
// requires these jars.
// This assumes both primaryResource and user jars are local jars, eitherwise it will not be
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

otherwise

test("Avoid re-upload remote resources in yarn client mode") {
val hadoopConf = new Configuration()
hadoopConf.set("fs.s3a.impl", "org.apache.spark.deploy.TestFileSystem")
hadoopConf.set("fs.s3a.impl.disable.cache", "true")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This pattern is getting really repetitive, can you put this code in its own method and call that from existing tests?

@@ -859,6 +859,44 @@ class SparkSubmitSuite
}
}

test("Avoid re-upload remote resources in yarn client mode") {
val hadoopConf = new Configuration()
hadoopConf.set("fs.s3a.impl", "org.apache.spark.deploy.TestFileSystem")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

classOf[TestFileSystem].getName()

@vanzin
Copy link
Contributor

vanzin commented Aug 22, 2017

Few minor things but looks good.

private[deploy] def prepareSubmitEnvironment(args: SparkSubmitArguments)
private[deploy] def prepareSubmitEnvironment(
args: SparkSubmitArguments,
conf: Option[HadoopConfiguration] = None)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

looks like conf parameter is only used for testing? Perhaps add a comment about it in the description.

@@ -330,19 +332,21 @@ object SparkSubmit extends CommandLineUtils {
args.archives = Option(args.archives).map(resolveGlobPaths(_, hadoopConf)).orNull

// In client mode, download remote files.
var localPrimaryResource: String = null
var localJars: String = null
var localPyFiles: String = null
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm curious why we don't download files and archives here as well? I guess for archives we would want to extract as well to keep same behavior at least for yarn. I guess this was originally for standalone and mesos mode but it seems like for consistency we should download archives.

It looks like the original one did download files? Did that get intentionally dropped?
4af3781

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The purpose of downloading is to make sure resource is correctly added to classpath of java or PYTHONPATH to start. But as for files it is not required to start the the Spark driver process or python process, also Spark's internal fileserver will handle remote URI, so in the previous PR I deliberately removed the support of files.

As for archives, as I know it is only used when running on yarn, so remote path should also be fine (YARN#client will figure out whether to upload or not). And previously we cannot leverage it in the driver side for yarn client mode, so here we still keep the same behavior. Do we need to change the semantics to down archives?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hmm, so I guess the original one may have been to add things to the classpath or python path or the actual python file to run, but to me it seems inconsistent that we download some of these things and not others.

For example, my primary resource program that gets automatically downloaded from hdfs requires file X for the driver to read, but we don't download that automatically. So user has to manually download X (and possibly archive Y) but doesn't have to do the jars or primary resource. Normally if the user used --files they expect that file to be in ./filename. Of course I guess the way this is downloading it wouldn't be in ./ either. That is the same case for archives as well.

It seems to be me this was missed in the design of this feature. Even in their example using docker and marathon to run spark-submit in client mode won't work if they specify --files now and their driver needs that file. to me this is the same as needing the jar. They would have to package it in the docker image, thus defeating the purpose of the feature. I'm sure less people use --files in this case, but we shouldn't have inconsistent behavior.

if the user can't rely on spark to download everything they need to figure out which parts to download themselves or just do as is now and setup everything on the gateway in which case the downloads happening here are just extra.

This to me just sounds like I need to explain things to users even more and they are going to be confused or I just tell them to run like they are now.

anyway, I kind of think this feature shouldn't have went in if we can't make it act just like it was running on the cluster. But I think making it run like on the cluster is really hard with out changing users running directory, which again could break other things. If we download things to ./ then you could do bad things as well that the user doesn't expect.

Honestly now I'm leaning towards we should have this configurable with default off. I see the one use case it was added for, but even that is incomplete in my opinion.

Adding the original author and reviewers: @loneknightpy @cloud-fan @gatorsmile

thoughts?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Files are, strictly speaking, something that never has been exposed to the driver. They are exposed to the driver in the current working directory when running in YARN cluster mode, but that's the exception, not the rule.

It should be possible to make that work for any driver (e.g. download / copy them to a temp directory and return that path from SparkEnv.get.sparkFilesDir), but I'd count that as a new feature.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@tgravescs , if we also download files/archives to local path, then how do we leverage them, since we don't expose the path to the user, even with previous code downloaded files seems never can be used for driver. So for the semantic completeness, we still need to change some codes to support this feature as what @vanzin mentioned.

I agree with you current state of the code is confused for user (some are downloaded which other are not). I think we could fix it in the following PR, what do you think?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok, I guess yarn is special in that case. Actually I see the help menu for --files and --archives is wrong for yarn as it says on the executors when it really goes to both executors and application master/driver. The original feature was for mesos so which makes more sense there.

And really I guess most people aren't even going to know about this feature so unless they stumble into it running yarn client mode most will just continue to specify things as local filesystem.

@jerryshao you are right, like I mentioned above to take advantage of files/archives on yarn you would have to be in ./ which isn't trivial. Unless someone requests it lets just drop it.

Sorry for the tangent, I'm fine with it as is.

Change-Id: Ic9ca657f54204d85fc2a441f15f39abf8cd0e277
@SparkQA
Copy link

SparkQA commented Aug 23, 2017

Test build #81014 has finished for PR 18962 at commit cad4c70.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
  • * (4) the main class for the child

@tgravescs
Copy link
Contributor

also I just realized on our original discussion of "Spark on YARN doesn't support upload remote resources from http, https or ftp server" I think the feature to download without this change would have actually made that work because it would have downloaded from the http/https/ftp server to local and then uploaded it to yarn. The question is do we want to break that here.

If we don't then we need to conditionalize and only do for for HDFS or other hadoop supported filesystems.

@vanzin
Copy link
Contributor

vanzin commented Aug 23, 2017

If we don't then we need to conditionalize and only do for for HDFS or other hadoop supported filesystems.

I thought in the previous discussion we chose to treat that as a separate issue and revert to the previous behavior here?

@tgravescs
Copy link
Contributor

Sorry, you are right we talked about both there. I was just thinking of the hadoop not supporting it case. Still agree to split out to another jira.

@jerryshao
Copy link
Contributor Author

Yes @tgravescs if we download everything to local and then upload to yarn, http/https/ftp should be unrelated here. But still in yarn cluster mode, if we specify remote http jars, then yarn client will be fail to handle this jar, so the issue still exists. And I'm going to create a separate JIRA to track this issue.

@vanzin
Copy link
Contributor

vanzin commented Aug 25, 2017

Merging to master, will also try 2.2.

@vanzin
Copy link
Contributor

vanzin commented Aug 25, 2017

@jerryshao there are conflicts in 2.2, will need a separate PR.

@asfgit asfgit closed this in 1813c4a Aug 25, 2017
@tgravescs
Copy link
Contributor

@jerryshao did you have a chance to put up a 2.2 PR?

@jerryshao
Copy link
Contributor Author

Sorry I missed the comments, I will file another PR against branch 2.2.

jerryshao added a commit to jerryshao/apache-spark that referenced this pull request Aug 29, 2017
…arn client mode

With SPARK-10643, Spark supports download resources from remote in client deploy mode. But the implementation overrides variables which representing added resources (like `args.jars`, `args.pyFiles`) to local path, And yarn client leverage this local path to re-upload resources to distributed cache. This is unnecessary to break the semantics of putting resources in a shared FS. So here proposed to fix it.

This is manually verified with jars, pyFiles in local and remote storage, both in client and cluster mode.

Author: jerryshao <sshao@hortonworks.com>

Closes apache#18962 from jerryshao/SPARK-21714.

(cherry picked from commit 1813c4a)
Signed-off-by: jerryshao <sshao@hortonworks.com>

Change-Id: Ib2e8cb056707b362bc1c496002bac1472dc78ea7
asfgit pushed a commit that referenced this pull request Aug 29, 2017
…ces in yarn client mode

## What changes were proposed in this pull request?

This is a backport PR to fix issue of re-uploading remote resource in yarn client mode. The original PR is #18962.

## How was this patch tested?

Tested in local UT.

Author: jerryshao <sshao@hortonworks.com>

Closes #19074 from jerryshao/SPARK-21714-2.2-backport.
asfgit pushed a commit that referenced this pull request Aug 30, 2017
…ces in yarn client mode

## What changes were proposed in this pull request?

This is a backport PR to fix issue of re-uploading remote resource in yarn client mode. The original PR is #18962.

## How was this patch tested?

Tested in local UT.

Author: jerryshao <sshao@hortonworks.com>

Closes #19074 from jerryshao/SPARK-21714-2.2-backport.
MatthewRBruce pushed a commit to Shopify/spark that referenced this pull request Jul 31, 2018
…ces in yarn client mode

## What changes were proposed in this pull request?

This is a backport PR to fix issue of re-uploading remote resource in yarn client mode. The original PR is apache#18962.

## How was this patch tested?

Tested in local UT.

Author: jerryshao <sshao@hortonworks.com>

Closes apache#19074 from jerryshao/SPARK-21714-2.2-backport.
MatthewRBruce pushed a commit to Shopify/spark that referenced this pull request Jul 31, 2018
…ces in yarn client mode

## What changes were proposed in this pull request?

This is a backport PR to fix issue of re-uploading remote resource in yarn client mode. The original PR is apache#18962.

## How was this patch tested?

Tested in local UT.

Author: jerryshao <sshao@hortonworks.com>

Closes apache#19074 from jerryshao/SPARK-21714-2.2-backport.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants