Skip to content

Commit

Permalink
Merge branch 'master' into issues/SPARK-25525/not_update_existing_conf
Browse files Browse the repository at this point in the history
  • Loading branch information
ueshin committed Sep 27, 2018
2 parents ac0243a + c3c45cb commit d8c60cb
Show file tree
Hide file tree
Showing 58 changed files with 552 additions and 684 deletions.
17 changes: 15 additions & 2 deletions R/pkg/R/DataFrame.R
Expand Up @@ -2954,6 +2954,9 @@ setMethod("exceptAll",
#' @param source a name for external data source.
#' @param mode one of 'append', 'overwrite', 'error', 'errorifexists', 'ignore'
#' save mode (it is 'error' by default)
#' @param partitionBy a name or a list of names of columns to partition the output by on the file
#' system. If specified, the output is laid out on the file system similar
#' to Hive's partitioning scheme.
#' @param ... additional argument(s) passed to the method.
#'
#' @family SparkDataFrame functions
Expand All @@ -2965,13 +2968,13 @@ setMethod("exceptAll",
#' sparkR.session()
#' path <- "path/to/file.json"
#' df <- read.json(path)
#' write.df(df, "myfile", "parquet", "overwrite")
#' write.df(df, "myfile", "parquet", "overwrite", partitionBy = c("col1", "col2"))
#' saveDF(df, parquetPath2, "parquet", mode = "append", mergeSchema = TRUE)
#' }
#' @note write.df since 1.4.0
setMethod("write.df",
signature(df = "SparkDataFrame"),
function(df, path = NULL, source = NULL, mode = "error", ...) {
function(df, path = NULL, source = NULL, mode = "error", partitionBy = NULL, ...) {
if (!is.null(path) && !is.character(path)) {
stop("path should be character, NULL or omitted.")
}
Expand All @@ -2985,8 +2988,18 @@ setMethod("write.df",
if (is.null(source)) {
source <- getDefaultSqlSource()
}
cols <- NULL
if (!is.null(partitionBy)) {
if (!all(sapply(partitionBy, function(c) is.character(c)))) {
stop("All partitionBy column names should be characters.")
}
cols <- as.list(partitionBy)
}
write <- callJMethod(df@sdf, "write")
write <- callJMethod(write, "format", source)
if (!is.null(cols)) {
write <- callJMethod(write, "partitionBy", cols)
}
write <- setWriteOptions(write, path = path, mode = mode, ...)
write <- handledCallJMethod(write, "save")
})
Expand Down
5 changes: 3 additions & 2 deletions R/pkg/R/functions.R
Expand Up @@ -198,8 +198,9 @@ NULL
#' }
#' @param ... additional argument(s). In \code{to_json} and \code{from_json}, this contains
#' additional named properties to control how it is converted, accepts the same
#' options as the JSON data source. In \code{arrays_zip}, this contains additional
#' Columns of arrays to be merged.
#' options as the JSON data source. Additionally \code{to_json} supports the "pretty"
#' option which enables pretty JSON generation. In \code{arrays_zip}, this contains
#' additional Columns of arrays to be merged.
#' @name column_collection_functions
#' @rdname column_collection_functions
#' @family collection functions
Expand Down
2 changes: 2 additions & 0 deletions R/pkg/R/sparkR.R
Expand Up @@ -626,6 +626,8 @@ sparkConfToSubmitOps[["spark.driver.extraLibraryPath"]] <- "--driver-library-pat
sparkConfToSubmitOps[["spark.master"]] <- "--master"
sparkConfToSubmitOps[["spark.yarn.keytab"]] <- "--keytab"
sparkConfToSubmitOps[["spark.yarn.principal"]] <- "--principal"
sparkConfToSubmitOps[["spark.kerberos.keytab"]] <- "--keytab"
sparkConfToSubmitOps[["spark.kerberos.principal"]] <- "--principal"


# Utility function that returns Spark Submit arguments as a string
Expand Down
8 changes: 8 additions & 0 deletions R/pkg/tests/fulltests/test_sparkSQL.R
Expand Up @@ -2701,8 +2701,16 @@ test_that("read/write text files", {
expect_equal(colnames(df2), c("value"))
expect_equal(count(df2), count(df) * 2)

df3 <- createDataFrame(list(list(1L, "1"), list(2L, "2"), list(1L, "1"), list(2L, "2")),
schema = c("key", "value"))
textPath3 <- tempfile(pattern = "textPath3", fileext = ".txt")
write.df(df3, textPath3, "text", mode = "overwrite", partitionBy = "key")
df4 <- read.df(textPath3, "text")
expect_equal(count(df3), count(df4))

unlink(textPath)
unlink(textPath2)
unlink(textPath3)
})

test_that("read/write text files - compression option", {
Expand Down
4 changes: 2 additions & 2 deletions R/pkg/vignettes/sparkr-vignettes.Rmd
Expand Up @@ -157,8 +157,8 @@ Property Name | Property group | spark-submit equivalent
`spark.driver.extraClassPath` | Runtime Environment | `--driver-class-path`
`spark.driver.extraJavaOptions` | Runtime Environment | `--driver-java-options`
`spark.driver.extraLibraryPath` | Runtime Environment | `--driver-library-path`
`spark.yarn.keytab` | Application Properties | `--keytab`
`spark.yarn.principal` | Application Properties | `--principal`
`spark.kerberos.keytab` | Application Properties | `--keytab`
`spark.kerberos.principal` | Application Properties | `--principal`

**For Windows users**: Due to different file prefixes across operating systems, to avoid the issue of potential wrong prefix, a current workaround is to specify `spark.sql.warehouse.dir` when starting the `SparkSession`.

Expand Down
6 changes: 5 additions & 1 deletion core/src/main/scala/org/apache/spark/SparkConf.scala
Expand Up @@ -726,7 +726,11 @@ private[spark] object SparkConf extends Logging {
DRIVER_MEMORY_OVERHEAD.key -> Seq(
AlternateConfig("spark.yarn.driver.memoryOverhead", "2.3")),
EXECUTOR_MEMORY_OVERHEAD.key -> Seq(
AlternateConfig("spark.yarn.executor.memoryOverhead", "2.3"))
AlternateConfig("spark.yarn.executor.memoryOverhead", "2.3")),
KEYTAB.key -> Seq(
AlternateConfig("spark.yarn.keytab", "2.5")),
PRINCIPAL.key -> Seq(
AlternateConfig("spark.yarn.principal", "2.5"))
)

/**
Expand Down
6 changes: 4 additions & 2 deletions core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
Expand Up @@ -520,6 +520,10 @@ private[spark] class SparkSubmit extends Logging {
confKey = "spark.driver.extraJavaOptions"),
OptionAssigner(args.driverExtraLibraryPath, ALL_CLUSTER_MGRS, ALL_DEPLOY_MODES,
confKey = "spark.driver.extraLibraryPath"),
OptionAssigner(args.principal, ALL_CLUSTER_MGRS, ALL_DEPLOY_MODES,
confKey = PRINCIPAL.key),
OptionAssigner(args.keytab, ALL_CLUSTER_MGRS, ALL_DEPLOY_MODES,
confKey = KEYTAB.key),

// Propagate attributes for dependency resolution at the driver side
OptionAssigner(args.packages, STANDALONE | MESOS, CLUSTER, confKey = "spark.jars.packages"),
Expand All @@ -537,8 +541,6 @@ private[spark] class SparkSubmit extends Logging {
OptionAssigner(args.jars, YARN, ALL_DEPLOY_MODES, confKey = "spark.yarn.dist.jars"),
OptionAssigner(args.files, YARN, ALL_DEPLOY_MODES, confKey = "spark.yarn.dist.files"),
OptionAssigner(args.archives, YARN, ALL_DEPLOY_MODES, confKey = "spark.yarn.dist.archives"),
OptionAssigner(args.principal, YARN, ALL_DEPLOY_MODES, confKey = "spark.yarn.principal"),
OptionAssigner(args.keytab, YARN, ALL_DEPLOY_MODES, confKey = "spark.yarn.keytab"),

// Other options
OptionAssigner(args.executorCores, STANDALONE | YARN | KUBERNETES, ALL_DEPLOY_MODES,
Expand Down
Expand Up @@ -199,8 +199,14 @@ private[deploy] class SparkSubmitArguments(args: Seq[String], env: Map[String, S
numExecutors = Option(numExecutors)
.getOrElse(sparkProperties.get("spark.executor.instances").orNull)
queue = Option(queue).orElse(sparkProperties.get("spark.yarn.queue")).orNull
keytab = Option(keytab).orElse(sparkProperties.get("spark.yarn.keytab")).orNull
principal = Option(principal).orElse(sparkProperties.get("spark.yarn.principal")).orNull
keytab = Option(keytab)
.orElse(sparkProperties.get("spark.kerberos.keytab"))
.orElse(sparkProperties.get("spark.yarn.keytab"))
.orNull
principal = Option(principal)
.orElse(sparkProperties.get("spark.kerberos.principal"))
.orElse(sparkProperties.get("spark.yarn.principal"))
.orNull
dynamicAllocationEnabled =
sparkProperties.get("spark.dynamicAllocation.enabled").exists("true".equalsIgnoreCase)

Expand Down
Expand Up @@ -19,7 +19,6 @@ package org.apache.spark.deploy.history

import java.io.{File, FileNotFoundException, IOException}
import java.nio.file.Files
import java.nio.file.attribute.PosixFilePermissions
import java.util.{Date, ServiceLoader}
import java.util.concurrent.{ConcurrentHashMap, ExecutorService, Future, TimeUnit}
import java.util.zip.{ZipEntry, ZipOutputStream}
Expand Down Expand Up @@ -133,9 +132,8 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)

// Visible for testing.
private[history] val listing: KVStore = storePath.map { path =>
val perms = PosixFilePermissions.fromString("rwx------")
val dbPath = Files.createDirectories(new File(path, "listing.ldb").toPath(),
PosixFilePermissions.asFileAttribute(perms)).toFile()
val dbPath = Files.createDirectories(new File(path, "listing.ldb").toPath()).toFile()
Utils.chmod700(dbPath)

val metadata = new FsHistoryProviderMetadata(CURRENT_LISTING_VERSION,
AppStatusStore.CURRENT_VERSION, logDir.toString())
Expand Down
Expand Up @@ -18,8 +18,6 @@
package org.apache.spark.deploy.history

import java.io.File
import java.nio.file.Files
import java.nio.file.attribute.PosixFilePermissions
import java.util.concurrent.atomic.AtomicLong

import scala.collection.JavaConverters._
Expand Down Expand Up @@ -107,9 +105,8 @@ private class HistoryServerDiskManager(
val needed = approximateSize(eventLogSize, isCompressed)
makeRoom(needed)

val perms = PosixFilePermissions.fromString("rwx------")
val tmp = Files.createTempDirectory(tmpStoreDir.toPath(), "appstore",
PosixFilePermissions.asFileAttribute(perms)).toFile()
val tmp = Utils.createTempDir(tmpStoreDir.getPath(), "appstore")
Utils.chmod700(tmp)

updateUsage(needed)
val current = currentUsage.get()
Expand Down
Expand Up @@ -152,11 +152,11 @@ package object config {
private[spark] val SHUFFLE_SERVICE_PORT =
ConfigBuilder("spark.shuffle.service.port").intConf.createWithDefault(7337)

private[spark] val KEYTAB = ConfigBuilder("spark.yarn.keytab")
private[spark] val KEYTAB = ConfigBuilder("spark.kerberos.keytab")
.doc("Location of user's keytab.")
.stringConf.createOptional

private[spark] val PRINCIPAL = ConfigBuilder("spark.yarn.principal")
private[spark] val PRINCIPAL = ConfigBuilder("spark.kerberos.principal")
.doc("Name of the Kerberos principal.")
.stringConf.createOptional

Expand Down
12 changes: 9 additions & 3 deletions core/src/main/scala/org/apache/spark/scheduler/MapStatus.scala
Expand Up @@ -49,10 +49,16 @@ private[spark] sealed trait MapStatus {

private[spark] object MapStatus {

/**
* Min partition number to use [[HighlyCompressedMapStatus]]. A bit ugly here because in test
* code we can't assume SparkEnv.get exists.
*/
private lazy val minPartitionsToUseHighlyCompressMapStatus = Option(SparkEnv.get)
.map(_.conf.get(config.SHUFFLE_MIN_NUM_PARTS_TO_HIGHLY_COMPRESS))
.getOrElse(config.SHUFFLE_MIN_NUM_PARTS_TO_HIGHLY_COMPRESS.defaultValue.get)

def apply(loc: BlockManagerId, uncompressedSizes: Array[Long]): MapStatus = {
if (uncompressedSizes.length > Option(SparkEnv.get)
.map(_.conf.get(config.SHUFFLE_MIN_NUM_PARTS_TO_HIGHLY_COMPRESS))
.getOrElse(config.SHUFFLE_MIN_NUM_PARTS_TO_HIGHLY_COMPRESS.defaultValue.get)) {
if (uncompressedSizes.length > minPartitionsToUseHighlyCompressMapStatus) {
HighlyCompressedMapStatus(loc, uncompressedSizes)
} else {
new CompressedMapStatus(loc, uncompressedSizes)
Expand Down
Expand Up @@ -388,10 +388,11 @@ private[spark] class AppStatusListener(

job.completionTime = if (event.time > 0) Some(new Date(event.time)) else None
update(job, now, last = true)
if (job.status == JobExecutionStatus.SUCCEEDED) {
appSummary = new AppSummary(appSummary.numCompletedJobs + 1, appSummary.numCompletedStages)
kvstore.write(appSummary)
}
}

appSummary = new AppSummary(appSummary.numCompletedJobs + 1, appSummary.numCompletedStages)
kvstore.write(appSummary)
}

override def onStageSubmitted(event: SparkListenerStageSubmitted): Unit = {
Expand Down Expand Up @@ -653,13 +654,14 @@ private[spark] class AppStatusListener(
if (removeStage) {
liveStages.remove((event.stageInfo.stageId, event.stageInfo.attemptNumber))
}
if (stage.status == v1.StageStatus.COMPLETE) {
appSummary = new AppSummary(appSummary.numCompletedJobs, appSummary.numCompletedStages + 1)
kvstore.write(appSummary)
}
}

// remove any dead executors that were not running for any currently active stages
deadExecutors.retain((execId, exec) => isExecutorActiveForLiveStages(exec))

appSummary = new AppSummary(appSummary.numCompletedJobs, appSummary.numCompletedStages + 1)
kvstore.write(appSummary)
}

private def removeBlackListedStageFrom(exec: LiveExecutor, stageId: Int, now: Long) = {
Expand Down
13 changes: 5 additions & 8 deletions core/src/main/scala/org/apache/spark/storage/BlockManager.scala
Expand Up @@ -438,10 +438,8 @@ private[spark] class BlockManager(
// stream.
channel.close()
// TODO SPARK-25035 Even if we're only going to write the data to disk after this, we end up
// using a lot of memory here. With encryption, we'll read the whole file into a regular
// byte buffer and OOM. Without encryption, we'll memory map the file and won't get a jvm
// OOM, but might get killed by the OS / cluster manager. We could at least read the tmp
// file as a stream in both cases.
// using a lot of memory here. We'll read the whole file into a regular
// byte buffer and OOM. We could at least read the tmp file as a stream.
val buffer = securityManager.getIOEncryptionKey() match {
case Some(key) =>
// we need to pass in the size of the unencrypted block
Expand All @@ -453,7 +451,7 @@ private[spark] class BlockManager(
new EncryptedBlockData(tmpFile, blockSize, conf, key).toChunkedByteBuffer(allocator)

case None =>
ChunkedByteBuffer.map(tmpFile, conf.get(config.MEMORY_MAP_LIMIT_FOR_TESTS).toInt)
ChunkedByteBuffer.fromFile(tmpFile, conf.get(config.MEMORY_MAP_LIMIT_FOR_TESTS).toInt)
}
putBytes(blockId, buffer, level)(classTag)
tmpFile.delete()
Expand Down Expand Up @@ -726,10 +724,9 @@ private[spark] class BlockManager(
*/
def getRemoteBytes(blockId: BlockId): Option[ChunkedByteBuffer] = {
// TODO if we change this method to return the ManagedBuffer, then getRemoteValues
// could just use the inputStream on the temp file, rather than memory-mapping the file.
// could just use the inputStream on the temp file, rather than reading the file into memory.
// Until then, replication can cause the process to use too much memory and get killed
// by the OS / cluster manager (not a java OOM, since it's a memory-mapped file) even though
// we've read the data to disk.
// even though we've read the data to disk.
logDebug(s"Getting remote block $blockId")
require(blockId != null, "BlockId is null")
var runningFailureCount = 0
Expand Down
Expand Up @@ -448,35 +448,35 @@ final class ShuffleBlockFetcherIterator(
buf.release()
throwFetchFailedException(blockId, address, e)
}

input = streamWrapper(blockId, in)
// Only copy the stream if it's wrapped by compression or encryption, also the size of
// block is small (the decompressed block is smaller than maxBytesInFlight)
if (detectCorrupt && !input.eq(in) && size < maxBytesInFlight / 3) {
val originalInput = input
val out = new ChunkedByteBufferOutputStream(64 * 1024, ByteBuffer.allocate)
try {
var isStreamCopied: Boolean = false
try {
input = streamWrapper(blockId, in)
// Only copy the stream if it's wrapped by compression or encryption, also the size of
// block is small (the decompressed block is smaller than maxBytesInFlight)
if (detectCorrupt && !input.eq(in) && size < maxBytesInFlight / 3) {
isStreamCopied = true
val out = new ChunkedByteBufferOutputStream(64 * 1024, ByteBuffer.allocate)
// Decompress the whole block at once to detect any corruption, which could increase
// the memory usage tne potential increase the chance of OOM.
// TODO: manage the memory used here, and spill it into disk in case of OOM.
Utils.copyStream(input, out)
out.close()
Utils.copyStream(input, out, closeStreams = true)
input = out.toChunkedByteBuffer.toInputStream(dispose = true)
} catch {
case e: IOException =>
buf.release()
if (buf.isInstanceOf[FileSegmentManagedBuffer]
|| corruptedBlocks.contains(blockId)) {
throwFetchFailedException(blockId, address, e)
} else {
logWarning(s"got an corrupted block $blockId from $address, fetch again", e)
corruptedBlocks += blockId
fetchRequests += FetchRequest(address, Array((blockId, size)))
result = null
}
} finally {
// TODO: release the buf here to free memory earlier
originalInput.close()
}
} catch {
case e: IOException =>
buf.release()
if (buf.isInstanceOf[FileSegmentManagedBuffer]
|| corruptedBlocks.contains(blockId)) {
throwFetchFailedException(blockId, address, e)
} else {
logWarning(s"got an corrupted block $blockId from $address, fetch again", e)
corruptedBlocks += blockId
fetchRequests += FetchRequest(address, Array((blockId, size)))
result = null
}
} finally {
// TODO: release the buf here to free memory earlier
if (isStreamCopied) {
in.close()
}
}
Expand Down

0 comments on commit d8c60cb

Please sign in to comment.