Skip to content

Commit

Permalink
[SPARK-21376][YARN] Fix yarn client token expire issue when cleaning …
Browse files Browse the repository at this point in the history
…the staging files in long running scenario

## What changes were proposed in this pull request?

This issue happens in long running application with yarn cluster mode, because yarn#client doesn't sync token with AM, so it will always keep the initial token, this token may be expired in the long running scenario, so when yarn#client tries to clean up staging directory after application finished, it will use this expired token and meet token expire issue.

## How was this patch tested?

Manual verification is secure cluster.

Author: jerryshao <sshao@hortonworks.com>

Closes #18617 from jerryshao/SPARK-21376.
  • Loading branch information
jerryshao authored and Marcelo Vanzin committed Jul 13, 2017
1 parent 5c8edfc commit cb8d5cc
Showing 1 changed file with 26 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import java.io.{File, FileOutputStream, IOException, OutputStreamWriter}
import java.net.{InetAddress, UnknownHostException, URI}
import java.nio.ByteBuffer
import java.nio.charset.StandardCharsets
import java.security.PrivilegedExceptionAction
import java.util.{Locale, Properties, UUID}
import java.util.zip.{ZipEntry, ZipOutputStream}

Expand Down Expand Up @@ -192,16 +193,32 @@ private[spark] class Client(
* Cleanup application staging directory.
*/
private def cleanupStagingDir(appId: ApplicationId): Unit = {
val stagingDirPath = new Path(appStagingBaseDir, getAppStagingDir(appId))
try {
val preserveFiles = sparkConf.get(PRESERVE_STAGING_FILES)
val fs = stagingDirPath.getFileSystem(hadoopConf)
if (!preserveFiles && fs.delete(stagingDirPath, true)) {
logInfo(s"Deleted staging directory $stagingDirPath")
if (sparkConf.get(PRESERVE_STAGING_FILES)) {
return
}

def cleanupStagingDirInternal(): Unit = {
val stagingDirPath = new Path(appStagingBaseDir, getAppStagingDir(appId))
try {
val fs = stagingDirPath.getFileSystem(hadoopConf)
if (fs.delete(stagingDirPath, true)) {
logInfo(s"Deleted staging directory $stagingDirPath")
}
} catch {
case ioe: IOException =>
logWarning("Failed to cleanup staging dir " + stagingDirPath, ioe)
}
} catch {
case ioe: IOException =>
logWarning("Failed to cleanup staging dir " + stagingDirPath, ioe)
}

if (isClusterMode && principal != null && keytab != null) {
val newUgi = UserGroupInformation.loginUserFromKeytabAndReturnUGI(principal, keytab)
newUgi.doAs(new PrivilegedExceptionAction[Unit] {
override def run(): Unit = {
cleanupStagingDirInternal()
}
})
} else {
cleanupStagingDirInternal()
}
}

Expand Down

0 comments on commit cb8d5cc

Please sign in to comment.