Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

SPARK-1676: Cache Hadoop UGIs by default to prevent FileSystem leak #607

Closed
wants to merge 1 commit into from

Conversation

aarondav
Copy link
Contributor

@aarondav aarondav commented May 1, 2014

UserGroupInformation objects (UGIs) are used for Hadoop security. A relatively recent PR (#29) makes Spark always use UGIs when executing tasks. Unfortunately, this causes HDFS-3545, which causes the FileSystem cache to continuously create new FileSystems, as the UGIs look different (even though they're logically identical). This causes a memory and sometimes file descriptor leak for FileSystems (like S3N) which maintain open connections.

This solution is to introduce a config option (enabled by default) which reuses a single Spark user UGI, rather than creating new ones for each task. The downside to this approach is that UGIs cannot be safely cached (see the notes in HDFS-3545). For example, if a token expires, it will never be cleared from the UGI but may be used anyway (usage of a particular token on a UGI is nondeterministic as it is backed by a Set).

This setting is enabled by default because the memory leak can become serious very quickly. In one benchmark, attempting to read 10k files from an S3 directory caused 45k connections to remain open to S3 after the job completed. These file descriptors are never cleaned up, nor the memory used by the associated FileSystems.

UserGroupInformation objects (UGIs) are used for Hadoop security. A relatively
recent PR (apache#29) makes Spark always use UGIs when executing tasks. Unfortunately,
this causes HDFS-3545, which causes the FileSystem cache to continuously create
new FileSystems, as the UGIs look different (even though they're logically
identical). This causes a memory and sometimes file descriptor leak for FileSystems
(like S3N) which maintain open connections.

This solution is to introduce a config option (enabled by default) which reuses a
single Spark user UGI, rather than creating new ones for each task. The downside
to this approach is that UGIs cannot be safely cached (see the notes in HDFS-3545).
For example, if a token expires, it will never be cleared from the UGI but may
be used anyway (usage of a particular token on a UGI is nondeterministic as it is
backed by a Set).

This setting is enabled by default because the memory leak can become serious
very quickly. In one benchmark, attempting to read 10k files from an S3 directory
caused 45k connections to remain open to S3 after the job completed. These file
descriptors are never cleaned up, nor the memory used by the associated
FileSystems.
@AmplabJenkins
Copy link

Merged build triggered.

@AmplabJenkins
Copy link

Merged build started.

@aarondav
Copy link
Contributor Author

aarondav commented May 1, 2014

@tgravescs @sryza @pwendell Please take a look if possible. I know next to nothing about Hadoop security, but this problem bit us pretty hard recently. As far as my understanding goes, this solution should preserve the behavior of Spark on YARN as long as tokens do not expire, which I hope is the usual case for people just getting started with YARN.

I have not tested thoroughly on YARN, I can try to do that if this solution seems reasonable.

edit: wrong Tom Graves

@AmplabJenkins
Copy link

Merged build finished. All automated tests passed.

@AmplabJenkins
Copy link

All automated tests passed.
Refer to this link for build results: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/14599/

<td>true</td>
<td>
Caching UGIs is a workaround for [SPARK-1676](https://issues.apache.org/jira/browse/SPARK-1676)
for users who are not using security in a very serious manner. Caching UGIs can produce
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This sort of implies that we can actually break the hadoop security model if this is enabled, but I don't think that's true. I just think that some long-running jobs might have trouble with token renewal.

@tgravescs
Copy link
Contributor

From reading HDFS-3545 it sounds like we should just FileSystem.closeAllForUGI after the task runs. I'll talk to Daryn tomorrow about it just to make sure.

Have you tried that?

@pwendell
Copy link
Contributor

pwendell commented May 1, 2014

@tgravescs that could be a solution, but should make sure it doesn't interfere with other tasks that are also using that filesystem. I.e. if task 1 runs and then calls this static close method while task 2 is in the middle of reading data from the FS.

@tgravescs
Copy link
Contributor

I agree we need to make sure it doesn't do that, but if its creating filesystems specific for the ugi (supposed to be the leak) then closing them after the task is finished (that ugi is done) then it shouldn't be a problem. I'll look into it some more tomorrow to confirm.

@aarondav
Copy link
Contributor Author

aarondav commented May 1, 2014

One downside to closing the FileSystems after task completion is you lose all benefits of having a FileSystem cache, which could still be significant if the FileSystem includes significant buildup or teardown.

@aarondav
Copy link
Contributor Author

aarondav commented May 1, 2014

A second downside, following Patrick's idea, is that all FileSystems created during the execution of the same task will share the same UGI and thus may reuse the cached version. This means if we close FileSystems, then we have to be very careful about potentially sharing FileSystems across tasks (say in the future if we use FileSystem for writing DiskStore blocks), and it's pretty hard to reason about this given both the cache and the weird namespacing of the UGI doAs.

@sryza
Copy link
Contributor

sryza commented May 1, 2014

I would think the right thing would be to enclose the entire executor in a doAs (or save the UGI and keep using it, as this patch is doing, if the former isn't straightforward). Hadoop UserGroupInformations rest on Java LoginContexts, which are propagated to child threads when they start.

@tgravescs correct me if I'm wrong, but Hadoop currently has no mechanism that replaces old delegation tokens with new ones. Even if we do add such a mechanism, I don't think it would make sense to send credentials with each task. So I can't see any situation where constructing a UGI for each task would make sense.

All that said, I remember when dealing with a JT memory leak that there were some tricky ways that UGIs interact with the FileSystem cache. I'm going to review the discussion on MAPREDUCE-5508 to try to understand the specifics.

@tgravescs
Copy link
Contributor

I had thought about this more last night and I agree with Sandy, unless there is a case we launch tasks as different users we should look at moving the doAs up and possibly for the entire executor. Doing it per task is less efficient and if running on spark on yarn with security on is purely overhead anyway. That should also help with actually sharing the filesystem more. The downside to sharing that is you have to make sure one task doesn't close it on another task whereas now this is actually providing you a bit of isolation.

@tgravescs
Copy link
Contributor

I talked to Daryn a bit about the FileSystem.closeAllForUGI and it is fine to do here if we want to take that route. He also agreed that moving it up makes more sense. So unless there is a reason not to move it up (like tasks are running as different users) I would prefer to do that. Note we also need to move the one in ApplicationMaster up if that hasn't been done yet since I believe Patrick found that issue with localization.

I do understand your concerns that if leave it as it is now and do the FileSystem.closeAllForUGI after the tasks run, if it does get shared between tasks and we close it then we close it for both tasks. The only thing I can say is don't do it... Its really no different then if we move the doas up, we have to make sure the task code doesn't close the filesystem and cause issues for the entire executor. Either way you have to be careful. if we move it up we can actually share it and seems a bit less prone to error.

Also in response to @aarondav first concern, I don't really follow. You aren't getting any benefits from the cache now. Its specific to that task so no other tasks can use that cached version of the filesystem so it is causing extra overhead.

@pwendell
Copy link
Contributor

pwendell commented May 1, 2014

@tgravescs @sryza just wondering - how does ugi.doAs actually store the contextual information of the credentials? For instance, we create thread pools etc inside of the executor. Does this get transferred correctly? I'm not totally sure how this would work... does it use threadlocals?

Since the security model of Spark currently assumes the application is run as a single user, it seems like moving this up makes sense. Would either of you be able to propose a patch that just pulls it up, maybe, e.g. to wrap the creation of the thread pool?

@tgravescs
Copy link
Contributor

High level answer is yes it all gets transferred correctly as long as its all within the doAs. UGI uses javax.security.auth.Subject.doAs underneath, I'm not sure on the mechanism it uses other then I think it has a jvm context that this gets put into.

@sryza
Copy link
Contributor

sryza commented May 1, 2014

I just tested this out with a toy program and observed that the child thread gets the same user and the same FIleSystem.

final Configuration conf = new Configuration();
final UserGroupInformation ugi = UserGroupInformation.createRemoteUser("arloguthrie");

ugi.doAs(new PrivilegedAction<Integer>() {
  public Integer run() {
    FileSystem fs = null;
    try {
      System.out.println("current user in parent thread: " + UserGroupInformation.getCurrentUser());
      fs = FileSystem.get(conf);
    } catch (Exception ex) {
      ex.printStackTrace();
    }
    final FileSystem finalFs = fs;
    Thread child = new Thread() {
      public void run() {
        try {
          System.out.println("current user in child thread: " + UserGroupInformation.getCurrentUser());
          System.out.println("same fs in child: " + (FileSystem.get(conf) == finalFs));
        } catch (Exception ex) {
          ex.printStackTrace();
        }
      }
    };
    child.start();
    return 0;
  }
});

@tgravescs
Copy link
Contributor

I think you would want it even higher then just wrapping the thread pool. Looking at the CoarseGrainExecutorBackend for instance I think you would have it wrap the run routine. This way your akka connection gets setup with it as the correct user. You also don't have to worry about someone changing the executor to have something important out side of the wrapping of thread pool.

I'm not sure how this affect mesos though. Does mesos run things as the user who launched, as a super user, or other? I assume its been working as it is now so moving it up into the MesosExecutorBackend is ok. I'm just not sure if it should wrap everything (like MesosExecutorDriver)

ugi.doAs(new PrivilegedExceptionAction[Unit] {
def run: Unit = func()
})
Some(UserGroupInformation.createRemoteUser(user))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is my first time looking at this code, so bear with me a little. :-)

I'm not sure what's the objective of calling createRemoteUser() here. What purpose is it serving? Isn't it better to just rely on getCurrentUser() to define the user? Then you wouldn't need SPARK_USER nor SPARK_UNKNOWN_USER.

Unless you want to create a dummy user for the non-kerberos case that is different from the logged in user? I'd say that, in that case, it's better to let users do this in their own code (by wrapping their app in a UGI.doAs() call) instead of building it into Spark.

As for the approach, I think this should work. But to address @pwendell's comments about tokens, there should be code somewhere that's renewing the kerberos ticket (by calling UserGroupInformation.reloginFromKeytab() at appropriate periods). Unfortunately I don't know what the best practices are around this - in our internal code, we just call reloginFromKeytab() periodically as part of our framework for talking to Hadoop services (so individual clients don't need to worry about it), and that seems to work fine.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure what's the objective of calling createRemoteUser() here.

In non-secure mode, the process runs as "yarn", but needs to interact with HDFS as the app user.

But to address @pwendell's comments about tokens, there should be code somewhere that's renewing the kerberos ticket (by calling UserGroupInformation.reloginFromKeytab() at appropriate periods).

On YARN, neither the driver nor the container will necessarily have keytabs. They authenticate using delegation tokens, which currently don't get replaced.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, I noticed later that this is mainly talking about the executors (d'oh).

@aarondav
Copy link
Contributor Author

aarondav commented May 2, 2014

I am going to resubmit this as a patch for branch-0.9 (but with default off) and then submit a new patch that wraps the entire executor for 1.0. My goal is to minimize behavioral changes for 0.9.2 but still provide a fix for users who encounter this problem.

@aarondav
Copy link
Contributor Author

aarondav commented May 2, 2014

Examining the possible patch to the Executor more, it seems less clear that running the Executor in a doAs is the right solution, because the constructor of the Executor is run at one point, but then tasks are launched at the request of an RPC service which is invoked at some later time, after the doAs has already returned. Since the thread pool is not required to create the child threads immediately, it's possible that only some of them get the UGI token.

I think my original solution is thus cleaner, as it has the same semantics as was intended by "only doAs once", but is easily disable-able by a flag and only incurs the overhead of calling doAs once per task. I am inclined to reopen this pull request as the "final" solution for 1.0.

@tgravescs
Copy link
Contributor

I disagree. I would rather see this fixed properly. If we leave it with your patch and I disable this option then I still leak filesystems, it seems bad to leave a leak in the code that we know about. If you want to use your patch and clean up properly that would be ok.

I would personally still like to see the doAs moved up. Some of the reasoning you mentioned is why the doAs should go up into the CoarseGrainExecutorBackend (and other Backends) like I mentioned above.

I'll make a patch for this that moves the doAs and post it hopefully later this morning.

@tgravescs
Copy link
Contributor

This brings up a good question, what all setups are we going to support?

on yarn:

  • secure yarn cluster
  • non-secure yarn cluster - daemons run as superuser (like yarn), user access hdfs as themselves.
  • yarn cluster daemons run as same user as running applications

standalone:

  • daemons run as the user who also owns hdfs (no security)
  • daemons run as a super user, set SPARK_USER to access hdfs (no security)
  • daemons run as super user logged in via key tab and proxy as user to access secure hdfs (not sure on this one, I thought someone was using this setup)?

mesos:

  • I assume this is same as the standalone?

@aarondav
Copy link
Contributor Author

aarondav commented May 2, 2014

Ah, your point about making sure we clean up if we don't cache the UGIs is
definitely correct, no reason not to do that. For your last question, you
might note that my solution is very careful in the sense that I have no
idea about Hadoop security and wanted to avoid changing the semantics for
that reason.

Do you have a workaround for the fact that the Executor is created in one
place, but used by Akka in a different place? I'd rather not leak the whole
UGI/Hadoop security through to the actual SchedulerBackend abstraction if
possible.

On Fri, May 2, 2014 at 9:27 AM, Tom Graves notifications@github.com wrote:

This brings up a good question, what all setups are we going to support?

on yarn:

  • secure yarn cluster
  • non-secure yarn cluster - daemons run as superuser (like yarn), user
    access hdfs as themselves.
  • yarn cluster daemons run as same user as running applications

standalone:

  • daemons run as the user who also owns hdfs (no security)
  • daemons run as a super user, set SPARK_USER to access hdfs (no
    security)
  • daemons run as super user logged in via key tab and proxy as user to
    access secure hdfs (not sure on this one, I thought someone was using this
    setup)?

mesos:

  • I assume this is same as the standalone?


Reply to this email directly or view it on GitHubhttps://github.com//pull/607#issuecomment-42050371
.

@tgravescs
Copy link
Contributor

What is your concern with moving it up into the ExecutorBackend?

I'd be ok with using the cache and closeAll (if config off) if the consensus is that moving it up is to risky for 1.0. I looked more at the cache concern mentioned in hdfs-3545 and I believe the concern there was with Hive caching it across jobs. In the Spark case, we have a single set of tokens/credentials per backend that won't be replaced. On yarn the RM deals with renewing the tokens. They eventually expire after a week (or so) and you can't currently run anything longer then that. I assume if someone is using token on mesos/standalone they have their own way to refresh or don't run for more then 24 hours, or have the config set much longer then default.

I do still believe the better thing to do is move it up though. We wouldn't have to have our own cached version as it does it for you, which I see as less maintenance, its less prone to someone adding code in a place that should be within doAs, and in general seems like it would be more secure and fit into a normal authentication protocol better.

asfgit pushed a commit that referenced this pull request May 3, 2014
…leak

Move the doAs in Executor higher up so that we only have 1 ugi and aren't leaking filesystems.
Fix spark on yarn to work when the cluster is running as user "yarn" but the clients are launched as the user and want to read/write to hdfs as the user.

Note this hasn't been fully tested yet.  Need to test in standalone mode.

Putting this up for people to look at and possibly test.  I don't have access to a mesos cluster.

This is alternative to #607

Author: Thomas Graves <tgraves@apache.org>

Closes #621 from tgravescs/SPARK-1676 and squashes the following commits:

244d55a [Thomas Graves] fix line length
44163d4 [Thomas Graves] Rework
9398853 [Thomas Graves] change to have doAs in executor higher up.

(cherry picked from commit 3d0a02d)
Signed-off-by: Aaron Davidson <aaron@databricks.com>

Conflicts:
	core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala
	core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala
	yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/WorkerLauncher.scala
	yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
	yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/WorkerLauncher.scala
asfgit pushed a commit that referenced this pull request May 3, 2014
…leak

Move the doAs in Executor higher up so that we only have 1 ugi and aren't leaking filesystems.
Fix spark on yarn to work when the cluster is running as user "yarn" but the clients are launched as the user and want to read/write to hdfs as the user.

Note this hasn't been fully tested yet.  Need to test in standalone mode.

Putting this up for people to look at and possibly test.  I don't have access to a mesos cluster.

This is alternative to #607

Author: Thomas Graves <tgraves@apache.org>

Closes #621 from tgravescs/SPARK-1676 and squashes the following commits:

244d55a [Thomas Graves] fix line length
44163d4 [Thomas Graves] Rework
9398853 [Thomas Graves] change to have doAs in executor higher up.
asfgit pushed a commit that referenced this pull request May 3, 2014
…leak

Move the doAs in Executor higher up so that we only have 1 ugi and aren't leaking filesystems.
Fix spark on yarn to work when the cluster is running as user "yarn" but the clients are launched as the user and want to read/write to hdfs as the user.

Note this hasn't been fully tested yet.  Need to test in standalone mode.

Putting this up for people to look at and possibly test.  I don't have access to a mesos cluster.

This is alternative to #607

Author: Thomas Graves <tgraves@apache.org>

Closes #621 from tgravescs/SPARK-1676 and squashes the following commits:

244d55a [Thomas Graves] fix line length
44163d4 [Thomas Graves] Rework
9398853 [Thomas Graves] change to have doAs in executor higher up.

(cherry picked from commit 3d0a02d)
Signed-off-by: Aaron Davidson <aaron@databricks.com>
pdeyhim pushed a commit to pdeyhim/spark-1 that referenced this pull request Jun 25, 2014
…leak

Move the doAs in Executor higher up so that we only have 1 ugi and aren't leaking filesystems.
Fix spark on yarn to work when the cluster is running as user "yarn" but the clients are launched as the user and want to read/write to hdfs as the user.

Note this hasn't been fully tested yet.  Need to test in standalone mode.

Putting this up for people to look at and possibly test.  I don't have access to a mesos cluster.

This is alternative to apache#607

Author: Thomas Graves <tgraves@apache.org>

Closes apache#621 from tgravescs/SPARK-1676 and squashes the following commits:

244d55a [Thomas Graves] fix line length
44163d4 [Thomas Graves] Rework
9398853 [Thomas Graves] change to have doAs in executor higher up.
rvesse pushed a commit to rvesse/spark that referenced this pull request Mar 2, 2018
* Add message to redirect PRs upstream if possible

We want to re-direct community dev upstream as much as possible. However, some contributions impact components (e.g. shuffle server) that do not yet exist upstream. To handle this, we decided to add this message and leave it up to developers, but encourage them to submit upstream unless it isn't feasible.

* Add dev mailling list and jira links
bzhaoopenstack pushed a commit to bzhaoopenstack/spark that referenced this pull request Sep 11, 2019
k8s test-infra now use python3 by default. Update related jobs to py3 as
well.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
6 participants