Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -706,6 +706,22 @@ private[spark] object Config extends Logging {
.stringConf
.createOptional

val KUBERNETES_FILE_UPLOADED_FAILS =
ConfigBuilder("spark.kubernetes.uploaded.files")
.internal()
.doc("Remember all uploaded uri by spark client, used to delete uris when app finished.")
.version("3.5.0")
.stringConf
.toSequence
.createWithDefault(Nil)

val KUBERNETES_UPLOAD_FILE_DELETE_ON_TERMINATION =
ConfigBuilder("spark.kubernetes.uploaded.file.delete.on.termination")
.doc("Deleting uploaded file when app finished")
.version("3.5.0")
.booleanConf
.createWithDefault(false)

val KUBERNETES_EXECUTOR_CHECK_ALL_CONTAINERS =
ConfigBuilder("spark.kubernetes.executor.checkAllContainers")
.doc("If set to true, all containers in the executor pod will be checked when reporting" +
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -306,6 +306,25 @@ object KubernetesUtils extends Logging {
}
}

def deleteFileUri(uri: String, conf: SparkConf): Unit = {
logInfo(s"Try to deleted uploaded uri: $uri")
val fileUri = Utils.resolveURI(uri)
try {
val hadoopConf = SparkHadoopUtil.get.newConfiguration(conf)
val fs = getHadoopFileSystem(Utils.resolveURI(uri), hadoopConf)
val path = new Path(uri)
if (fs.exists(path)) {
fs.delete(path, true)
logInfo(s"Deleted uploaded uri: $uri")
} else {
logInfo(s"Uploaded uri: $uri not exists.")
}
} catch {
case e: Exception =>
throw new SparkException(s"Deleting file ${fileUri.getPath} failed...", e)
}
}

@Since("3.0.0")
def uploadFileUri(uri: String, conf: Option[SparkConf] = None): String = {
conf match {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -167,6 +167,7 @@ private[spark] class BasicDriverFeatureStep(conf: KubernetesDriverConf)
KUBERNETES_DRIVER_SUBMIT_CHECK.key -> "true",
MEMORY_OVERHEAD_FACTOR.key -> defaultOverheadFactor.toString)
// try upload local, resolvable files to a hadoop compatible file system
var uploadedUris: mutable.Set[String] = mutable.Set()
Seq(JARS, FILES, ARCHIVES, SUBMIT_PYTHON_FILES).foreach { key =>
val (localUris, remoteUris) =
conf.get(key).partition(uri => KubernetesUtils.isLocalAndResolvable(uri))
Expand All @@ -187,6 +188,14 @@ private[spark] class BasicDriverFeatureStep(conf: KubernetesDriverConf)
resolved
}
additionalProps.put(key.key, (resolvedValue ++ remoteUris).mkString(","))
resolvedValue.foreach(uri =>
uploadedUris.add(uri.substring(0, uri.lastIndexOf("/")))
)
}
}
if (conf.sparkConf.get(KUBERNETES_FILE_UPLOAD_PATH).isDefined) {
if (uploadedUris.nonEmpty) {
additionalProps.put(KUBERNETES_FILE_UPLOADED_FAILS.key, uploadedUris.mkString(","))
}
}
additionalProps.toMap
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,8 @@ private[spark] class KubernetesClusterSchedulerBackend(

private val shouldDeleteExecutors = conf.get(KUBERNETES_DELETE_EXECUTORS)

private val shouldDeleteUploadFile = conf.get(KUBERNETES_UPLOAD_FILE_DELETE_ON_TERMINATION)

private val defaultProfile = scheduler.sc.resourceProfileManager.defaultResourceProfile

private val namespace = conf.get(KUBERNETES_NAMESPACE)
Expand Down Expand Up @@ -169,6 +171,18 @@ private[spark] class KubernetesClusterSchedulerBackend(
}
}

Utils.tryLogNonFatalError {
if (shouldDeleteUploadFile) {
val uploadedUris = conf.get(KUBERNETES_FILE_UPLOADED_FAILS.key, "")
if (!"".equals(uploadedUris)) {
logInfo(s"Try to delete uris: ${uploadedUris}")
uploadedUris.split(",").foreach(uri =>
KubernetesUtils.deleteFileUri(uri, conf)
)
}
}
}

Utils.tryLogNonFatalError {
ThreadUtils.shutdown(executorService)
}
Expand Down