From 11a14a36734806c150b2f7855badc5f18e30490c Mon Sep 17 00:00:00 2001 From: jerryshao Date: Wed, 12 Jul 2017 21:00:00 -0700 Subject: [PATCH 1/2] make sure client process could clean the staging files without getting token expire issue Change-Id: I851f829d38ab93a4caafb7ca029d34da76f92f76 --- .../org/apache/spark/deploy/yarn/Client.scala | 32 +++++++++++++------ 1 file changed, 23 insertions(+), 9 deletions(-) diff --git a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala index 1dd0715918042..7e3ca9a4dc0f0 100644 --- a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala +++ b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala @@ -21,6 +21,7 @@ import java.io.{File, FileOutputStream, IOException, OutputStreamWriter} import java.net.{InetAddress, UnknownHostException, URI} import java.nio.ByteBuffer import java.nio.charset.StandardCharsets +import java.security.PrivilegedExceptionAction import java.util.{Locale, Properties, UUID} import java.util.zip.{ZipEntry, ZipOutputStream} @@ -192,16 +193,29 @@ private[spark] class Client( * Cleanup application staging directory. */ private def cleanupStagingDir(appId: ApplicationId): Unit = { - val stagingDirPath = new Path(appStagingBaseDir, getAppStagingDir(appId)) - try { - val preserveFiles = sparkConf.get(PRESERVE_STAGING_FILES) - val fs = stagingDirPath.getFileSystem(hadoopConf) - if (!preserveFiles && fs.delete(stagingDirPath, true)) { - logInfo(s"Deleted staging directory $stagingDirPath") + def cleanupStagingDirInternal(): Unit = { + val stagingDirPath = new Path(appStagingBaseDir, getAppStagingDir(appId)) + try { + val preserveFiles = sparkConf.get(PRESERVE_STAGING_FILES) + val fs = stagingDirPath.getFileSystem(hadoopConf) + if (!preserveFiles && fs.delete(stagingDirPath, true)) { + logInfo(s"Deleted staging directory $stagingDirPath") + } + } catch { + case ioe: IOException => + logWarning("Failed to cleanup staging dir " + stagingDirPath, ioe) } - } catch { - case ioe: IOException => - logWarning("Failed to cleanup staging dir " + stagingDirPath, ioe) + } + + if (isClusterMode && principal != null && keytab != null) { + val newUgi = UserGroupInformation.loginUserFromKeytabAndReturnUGI(principal, keytab) + newUgi.doAs(new PrivilegedExceptionAction[Unit] { + override def run(): Unit = { + cleanupStagingDirInternal() + } + }) + } else { + cleanupStagingDirInternal() } } From c83d426d3d40dda22e4ff339213e7903a2b3b265 Mon Sep 17 00:00:00 2001 From: jerryshao Date: Thu, 13 Jul 2017 14:53:53 -0700 Subject: [PATCH 2/2] Address the comments Change-Id: I1b61dd676a5344c8e69a3d7d5268aa343bd1bf75 --- .../main/scala/org/apache/spark/deploy/yarn/Client.scala | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala index 7e3ca9a4dc0f0..208b6de550e8b 100644 --- a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala +++ b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala @@ -193,12 +193,15 @@ private[spark] class Client( * Cleanup application staging directory. */ private def cleanupStagingDir(appId: ApplicationId): Unit = { + if (sparkConf.get(PRESERVE_STAGING_FILES)) { + return + } + def cleanupStagingDirInternal(): Unit = { val stagingDirPath = new Path(appStagingBaseDir, getAppStagingDir(appId)) try { - val preserveFiles = sparkConf.get(PRESERVE_STAGING_FILES) val fs = stagingDirPath.getFileSystem(hadoopConf) - if (!preserveFiles && fs.delete(stagingDirPath, true)) { + if (fs.delete(stagingDirPath, true)) { logInfo(s"Deleted staging directory $stagingDirPath") } } catch {