Skip to content

Commit

Permalink
Merge branch 'master' of github.com:apache/spark into R5
Browse files Browse the repository at this point in the history
  • Loading branch information
Davies Liu committed Apr 14, 2015
2 parents 41f8184 + 25998e4 commit 4f5ac09
Show file tree
Hide file tree
Showing 325 changed files with 7,187 additions and 2,302 deletions.
18 changes: 5 additions & 13 deletions R/pkg/R/RDD.R
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ setMethod("initialize", "PipelinedRDD", function(.Object, prev, func, jrdd_val)

if (!inherits(prev, "PipelinedRDD") || !isPipelinable(prev)) {
# This transformation is the first in its stage:
.Object@func <- func
.Object@func <- cleanClosure(func)
.Object@prev_jrdd <- getJRDD(prev)
.Object@env$prev_serializedMode <- prev@env$serializedMode
# NOTE: We use prev_serializedMode to track the serialization mode of prev_JRDD
Expand All @@ -94,7 +94,7 @@ setMethod("initialize", "PipelinedRDD", function(.Object, prev, func, jrdd_val)
pipelinedFunc <- function(split, iterator) {
func(split, prev@func(split, iterator))
}
.Object@func <- pipelinedFunc
.Object@func <- cleanClosure(pipelinedFunc)
.Object@prev_jrdd <- prev@prev_jrdd # maintain the pipeline
# Get the serialization mode of the parent RDD
.Object@env$prev_serializedMode <- prev@env$prev_serializedMode
Expand Down Expand Up @@ -144,17 +144,13 @@ setMethod("getJRDD", signature(rdd = "PipelinedRDD"),
return(rdd@env$jrdd_val)
}

computeFunc <- function(split, part) {
rdd@func(split, part)
}

packageNamesArr <- serialize(.sparkREnv[[".packages"]],
connection = NULL)

broadcastArr <- lapply(ls(.broadcastNames),
function(name) { get(name, .broadcastNames) })

serializedFuncArr <- serialize(computeFunc, connection = NULL)
serializedFuncArr <- serialize(rdd@func, connection = NULL)

prev_jrdd <- rdd@prev_jrdd

Expand Down Expand Up @@ -279,7 +275,7 @@ setMethod("unpersist",
#' @examples
#'\dontrun{
#' sc <- sparkR.init()
#' setCheckpointDir(sc, "checkpoints")
#' setCheckpointDir(sc, "checkpoint")
#' rdd <- parallelize(sc, 1:10, 2L)
#' checkpoint(rdd)
#'}
Expand Down Expand Up @@ -551,11 +547,7 @@ setMethod("mapPartitions",
setMethod("lapplyPartitionsWithIndex",
signature(X = "RDD", FUN = "function"),
function(X, FUN) {
FUN <- cleanClosure(FUN)
closureCapturingFunc <- function(split, part) {
FUN(split, part)
}
PipelinedRDD(X, closureCapturingFunc)
PipelinedRDD(X, FUN)
})

#' @rdname lapplyPartitionsWithIndex
Expand Down
2 changes: 1 addition & 1 deletion R/pkg/R/context.R
Original file line number Diff line number Diff line change
Expand Up @@ -216,7 +216,7 @@ broadcast <- function(sc, object) {
#' @examples
#'\dontrun{
#' sc <- sparkR.init()
#' setCheckpointDir(sc, "~/checkpoints")
#' setCheckpointDir(sc, "~/checkpoint")
#' rdd <- parallelize(sc, 1:2, 2L)
#' checkpoint(rdd)
#'}
Expand Down
4 changes: 0 additions & 4 deletions R/pkg/R/pairRDD.R
Original file line number Diff line number Diff line change
Expand Up @@ -694,10 +694,6 @@ setMethod("cogroup",
for (i in 1:rddsLen) {
rdds[[i]] <- lapply(rdds[[i]],
function(x) { list(x[[1]], list(i, x[[2]])) })
# TODO(hao): As issue [SparkR-142] mentions, the right value of i
# will not be captured into UDF if getJRDD is not invoked.
# It should be resolved together with that issue.
getJRDD(rdds[[i]]) # Capture the closure.
}
union.rdd <- Reduce(unionRDD, rdds)
group.func <- function(vlist) {
Expand Down
4 changes: 2 additions & 2 deletions R/pkg/inst/tests/test_binaryFile.R
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ test_that("saveAsObjectFile()/objectFile() following textFile() works", {
fileName2 <- tempfile(pattern="spark-test", fileext=".tmp")
writeLines(mockFile, fileName1)

rdd <- textFile(sc, fileName1)
rdd <- textFile(sc, fileName1, 1)
saveAsObjectFile(rdd, fileName2)
rdd <- objectFile(sc, fileName2)
expect_equal(collect(rdd), as.list(mockFile))
Expand All @@ -40,7 +40,7 @@ test_that("saveAsObjectFile()/objectFile() works on a parallelized list", {
fileName <- tempfile(pattern="spark-test", fileext=".tmp")

l <- list(1, 2, 3)
rdd <- parallelize(sc, l)
rdd <- parallelize(sc, l, 1)
saveAsObjectFile(rdd, fileName)
rdd <- objectFile(sc, fileName)
expect_equal(collect(rdd), l)
Expand Down
5 changes: 3 additions & 2 deletions R/pkg/inst/tests/test_rdd.R
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,8 @@ test_that("PipelinedRDD support actions: cache(), persist(), unpersist(), checkp
unpersist(rdd2)
expect_false(rdd2@env$isCached)

setCheckpointDir(sc, "checkpoints")
tempDir <- tempfile(pattern = "checkpoint")
setCheckpointDir(sc, tempDir)
checkpoint(rdd2)
expect_true(rdd2@env$isCheckpointed)

Expand All @@ -152,7 +153,7 @@ test_that("PipelinedRDD support actions: cache(), persist(), unpersist(), checkp
# make sure the data is collectable
collect(rdd2)

unlink("checkpoints")
unlink(tempDir)
})

test_that("reduce on RDD", {
Expand Down
4 changes: 2 additions & 2 deletions R/pkg/inst/tests/test_textFile.R
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ test_that("textFile() followed by a saveAsTextFile() returns the same content",
fileName2 <- tempfile(pattern="spark-test", fileext=".tmp")
writeLines(mockFile, fileName1)

rdd <- textFile(sc, fileName1)
rdd <- textFile(sc, fileName1, 1L)
saveAsTextFile(rdd, fileName2)
rdd <- textFile(sc, fileName2)
expect_equal(collect(rdd), as.list(mockFile))
Expand All @@ -93,7 +93,7 @@ test_that("textFile() followed by a saveAsTextFile() returns the same content",
test_that("saveAsTextFile() on a parallelized list works as expected", {
fileName <- tempfile(pattern="spark-test", fileext=".tmp")
l <- list(1, 2, 3)
rdd <- parallelize(sc, l)
rdd <- parallelize(sc, l, 1L)
saveAsTextFile(rdd, fileName)
rdd <- textFile(sc, fileName)
expect_equal(collect(rdd), lapply(l, function(x) {toString(x)}))
Expand Down
2 changes: 1 addition & 1 deletion bagel/src/test/resources/log4j.properties
Original file line number Diff line number Diff line change
Expand Up @@ -24,4 +24,4 @@ log4j.appender.file.layout=org.apache.log4j.PatternLayout
log4j.appender.file.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss.SSS} %t %p %c{1}: %m%n

# Ignore messages below warning level from Jetty, because it's a bit verbose
log4j.logger.org.eclipse.jetty=WARN
log4j.logger.org.spark-project.jetty=WARN
5 changes: 3 additions & 2 deletions bin/load-spark-env.sh
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
# This script loads spark-env.sh if it exists, and ensures it is only loaded once.
# spark-env.sh is loaded from SPARK_CONF_DIR if set, or within the current directory's
# conf/ subdirectory.
FWDIR="$(cd "`dirname "$0"`"/..; pwd)"

if [ -z "$SPARK_ENV_LOADED" ]; then
export SPARK_ENV_LOADED=1
Expand All @@ -41,8 +42,8 @@ fi

if [ -z "$SPARK_SCALA_VERSION" ]; then

ASSEMBLY_DIR2="$SPARK_HOME/assembly/target/scala-2.11"
ASSEMBLY_DIR1="$SPARK_HOME/assembly/target/scala-2.10"
ASSEMBLY_DIR2="$FWDIR/assembly/target/scala-2.11"
ASSEMBLY_DIR1="$FWDIR/assembly/target/scala-2.10"

if [[ -d "$ASSEMBLY_DIR2" && -d "$ASSEMBLY_DIR1" ]]; then
echo -e "Presence of build for both scala versions(SCALA 2.10 and SCALA 2.11) detected." 1>&2
Expand Down
4 changes: 2 additions & 2 deletions conf/log4j.properties.template
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ log4j.appender.console.layout=org.apache.log4j.PatternLayout
log4j.appender.console.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss} %p %c{1}: %m%n

# Settings to quiet third party logs that are too verbose
log4j.logger.org.eclipse.jetty=WARN
log4j.logger.org.eclipse.jetty.util.component.AbstractLifeCycle=ERROR
log4j.logger.org.spark-project.jetty=WARN
log4j.logger.org.spark-project.jetty.util.component.AbstractLifeCycle=ERROR
log4j.logger.org.apache.spark.repl.SparkIMain$exprTyper=INFO
log4j.logger.org.apache.spark.repl.SparkILoop$SparkILoopInterpreter=INFO
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ log4j.appender.console.layout=org.apache.log4j.PatternLayout
log4j.appender.console.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss} %p %c{1}: %m%n

# Settings to quiet third party logs that are too verbose
log4j.logger.org.eclipse.jetty=WARN
log4j.logger.org.eclipse.jetty.util.component.AbstractLifeCycle=ERROR
log4j.logger.org.spark-project.jetty=WARN
log4j.logger.org.spark-project.jetty.util.component.AbstractLifeCycle=ERROR
log4j.logger.org.apache.spark.repl.SparkIMain$exprTyper=INFO
log4j.logger.org.apache.spark.repl.SparkILoop$SparkILoopInterpreter=INFO
44 changes: 32 additions & 12 deletions core/src/main/scala/org/apache/spark/ContextCleaner.scala
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import java.lang.ref.{ReferenceQueue, WeakReference}
import scala.collection.mutable.{ArrayBuffer, SynchronizedBuffer}

import org.apache.spark.broadcast.Broadcast
import org.apache.spark.rdd.RDD
import org.apache.spark.rdd.{RDDCheckpointData, RDD}
import org.apache.spark.util.Utils

/**
Expand All @@ -33,6 +33,7 @@ private case class CleanRDD(rddId: Int) extends CleanupTask
private case class CleanShuffle(shuffleId: Int) extends CleanupTask
private case class CleanBroadcast(broadcastId: Long) extends CleanupTask
private case class CleanAccum(accId: Long) extends CleanupTask
private case class CleanCheckpoint(rddId: Int) extends CleanupTask

/**
* A WeakReference associated with a CleanupTask.
Expand Down Expand Up @@ -94,12 +95,12 @@ private[spark] class ContextCleaner(sc: SparkContext) extends Logging {
@volatile private var stopped = false

/** Attach a listener object to get information of when objects are cleaned. */
def attachListener(listener: CleanerListener) {
def attachListener(listener: CleanerListener): Unit = {
listeners += listener
}

/** Start the cleaner. */
def start() {
def start(): Unit = {
cleaningThread.setDaemon(true)
cleaningThread.setName("Spark Context Cleaner")
cleaningThread.start()
Expand All @@ -108,7 +109,7 @@ private[spark] class ContextCleaner(sc: SparkContext) extends Logging {
/**
* Stop the cleaning thread and wait until the thread has finished running its current task.
*/
def stop() {
def stop(): Unit = {
stopped = true
// Interrupt the cleaning thread, but wait until the current task has finished before
// doing so. This guards against the race condition where a cleaning thread may
Expand All @@ -121,7 +122,7 @@ private[spark] class ContextCleaner(sc: SparkContext) extends Logging {
}

/** Register a RDD for cleanup when it is garbage collected. */
def registerRDDForCleanup(rdd: RDD[_]) {
def registerRDDForCleanup(rdd: RDD[_]): Unit = {
registerForCleanup(rdd, CleanRDD(rdd.id))
}

Expand All @@ -130,17 +131,22 @@ private[spark] class ContextCleaner(sc: SparkContext) extends Logging {
}

/** Register a ShuffleDependency for cleanup when it is garbage collected. */
def registerShuffleForCleanup(shuffleDependency: ShuffleDependency[_, _, _]) {
def registerShuffleForCleanup(shuffleDependency: ShuffleDependency[_, _, _]): Unit = {
registerForCleanup(shuffleDependency, CleanShuffle(shuffleDependency.shuffleId))
}

/** Register a Broadcast for cleanup when it is garbage collected. */
def registerBroadcastForCleanup[T](broadcast: Broadcast[T]) {
def registerBroadcastForCleanup[T](broadcast: Broadcast[T]): Unit = {
registerForCleanup(broadcast, CleanBroadcast(broadcast.id))
}

/** Register a RDDCheckpointData for cleanup when it is garbage collected. */
def registerRDDCheckpointDataForCleanup[T](rdd: RDD[_], parentId: Int): Unit = {
registerForCleanup(rdd, CleanCheckpoint(parentId))
}

/** Register an object for cleanup. */
private def registerForCleanup(objectForCleanup: AnyRef, task: CleanupTask) {
private def registerForCleanup(objectForCleanup: AnyRef, task: CleanupTask): Unit = {
referenceBuffer += new CleanupTaskWeakReference(task, objectForCleanup, referenceQueue)
}

Expand All @@ -164,6 +170,8 @@ private[spark] class ContextCleaner(sc: SparkContext) extends Logging {
doCleanupBroadcast(broadcastId, blocking = blockOnCleanupTasks)
case CleanAccum(accId) =>
doCleanupAccum(accId, blocking = blockOnCleanupTasks)
case CleanCheckpoint(rddId) =>
doCleanCheckpoint(rddId)
}
}
}
Expand All @@ -175,7 +183,7 @@ private[spark] class ContextCleaner(sc: SparkContext) extends Logging {
}

/** Perform RDD cleanup. */
def doCleanupRDD(rddId: Int, blocking: Boolean) {
def doCleanupRDD(rddId: Int, blocking: Boolean): Unit = {
try {
logDebug("Cleaning RDD " + rddId)
sc.unpersistRDD(rddId, blocking)
Expand All @@ -187,7 +195,7 @@ private[spark] class ContextCleaner(sc: SparkContext) extends Logging {
}

/** Perform shuffle cleanup, asynchronously. */
def doCleanupShuffle(shuffleId: Int, blocking: Boolean) {
def doCleanupShuffle(shuffleId: Int, blocking: Boolean): Unit = {
try {
logDebug("Cleaning shuffle " + shuffleId)
mapOutputTrackerMaster.unregisterShuffle(shuffleId)
Expand All @@ -200,7 +208,7 @@ private[spark] class ContextCleaner(sc: SparkContext) extends Logging {
}

/** Perform broadcast cleanup. */
def doCleanupBroadcast(broadcastId: Long, blocking: Boolean) {
def doCleanupBroadcast(broadcastId: Long, blocking: Boolean): Unit = {
try {
logDebug(s"Cleaning broadcast $broadcastId")
broadcastManager.unbroadcast(broadcastId, true, blocking)
Expand All @@ -212,7 +220,7 @@ private[spark] class ContextCleaner(sc: SparkContext) extends Logging {
}

/** Perform accumulator cleanup. */
def doCleanupAccum(accId: Long, blocking: Boolean) {
def doCleanupAccum(accId: Long, blocking: Boolean): Unit = {
try {
logDebug("Cleaning accumulator " + accId)
Accumulators.remove(accId)
Expand All @@ -223,6 +231,18 @@ private[spark] class ContextCleaner(sc: SparkContext) extends Logging {
}
}

/** Perform checkpoint cleanup. */
def doCleanCheckpoint(rddId: Int): Unit = {
try {
logDebug("Cleaning rdd checkpoint data " + rddId)
RDDCheckpointData.clearRDDCheckpointData(sc, rddId)
logInfo("Cleaned rdd checkpoint data " + rddId)
}
catch {
case e: Exception => logError("Error cleaning rdd checkpoint data " + rddId, e)
}
}

private def blockManagerMaster = sc.env.blockManager.master
private def broadcastManager = sc.env.broadcastManager
private def mapOutputTrackerMaster = sc.env.mapOutputTracker.asInstanceOf[MapOutputTrackerMaster]
Expand Down

0 comments on commit 4f5ac09

Please sign in to comment.