Skip to content

Commit

Permalink
Merge branch 'master' of https://github.com/apache/spark
Browse files Browse the repository at this point in the history
  • Loading branch information
hujy committed Jul 23, 2015
2 parents e2dec2c + 2f5cbd8 commit ded1a14
Show file tree
Hide file tree
Showing 149 changed files with 4,752 additions and 764 deletions.
1 change: 1 addition & 0 deletions R/pkg/NAMESPACE
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ exportMethods("arrange",
"collect",
"columns",
"count",
"crosstab",
"describe",
"distinct",
"dropna",
Expand Down
28 changes: 28 additions & 0 deletions R/pkg/R/DataFrame.R
Original file line number Diff line number Diff line change
Expand Up @@ -1554,3 +1554,31 @@ setMethod("fillna",
}
dataFrame(sdf)
})

#' crosstab
#'
#' Computes a pair-wise frequency table of the given columns. Also known as a contingency
#' table. The number of distinct values for each column should be less than 1e4. At most 1e6
#' non-zero pair frequencies will be returned.
#'
#' @param col1 name of the first column. Distinct items will make the first item of each row.
#' @param col2 name of the second column. Distinct items will make the column names of the output.
#' @return a local R data.frame representing the contingency table. The first column of each row
#' will be the distinct values of `col1` and the column names will be the distinct values
#' of `col2`. The name of the first column will be `$col1_$col2`. Pairs that have no
#' occurrences will have `null` as their counts.
#'
#' @rdname statfunctions
#' @export
#' @examples
#' \dontrun{
#' df <- jsonFile(sqlCtx, "/path/to/file.json")
#' ct = crosstab(df, "title", "gender")
#' }
setMethod("crosstab",
signature(x = "DataFrame", col1 = "character", col2 = "character"),
function(x, col1, col2) {
statFunctions <- callJMethod(x@sdf, "stat")
sct <- callJMethod(statFunctions, "crosstab", col1, col2)
collect(dataFrame(sct))
})
4 changes: 4 additions & 0 deletions R/pkg/R/generics.R
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,10 @@ setGeneric("count", function(x) { standardGeneric("count") })
# @export
setGeneric("countByValue", function(x) { standardGeneric("countByValue") })

# @rdname statfunctions
# @export
setGeneric("crosstab", function(x, col1, col2) { standardGeneric("crosstab") })

# @rdname distinct
# @export
setGeneric("distinct", function(x, numPartitions = 1) { standardGeneric("distinct") })
Expand Down
13 changes: 13 additions & 0 deletions R/pkg/inst/tests/test_sparkSQL.R
Original file line number Diff line number Diff line change
Expand Up @@ -987,6 +987,19 @@ test_that("fillna() on a DataFrame", {
expect_identical(expected, actual)
})

test_that("crosstab() on a DataFrame", {
rdd <- lapply(parallelize(sc, 0:3), function(x) {
list(paste0("a", x %% 3), paste0("b", x %% 2))
})
df <- toDF(rdd, list("a", "b"))
ct <- crosstab(df, "a", "b")
ordered <- ct[order(ct$a_b),]
row.names(ordered) <- NULL
expected <- data.frame("a_b" = c("a0", "a1", "a2"), "b0" = c(1, 0, 1), "b1" = c(1, 1, 0),
stringsAsFactors = FALSE, row.names = NULL)
expect_identical(expected, ordered)
})

unlink(parquetPath)
unlink(jsonPath)
unlink(jsonPathNa)
4 changes: 2 additions & 2 deletions bin/spark-shell
Original file line number Diff line number Diff line change
Expand Up @@ -47,11 +47,11 @@ function main() {
# (see https://github.com/sbt/sbt/issues/562).
stty -icanon min 1 -echo > /dev/null 2>&1
export SPARK_SUBMIT_OPTS="$SPARK_SUBMIT_OPTS -Djline.terminal=unix"
"$FWDIR"/bin/spark-submit --class org.apache.spark.repl.Main "$@"
"$FWDIR"/bin/spark-submit --class org.apache.spark.repl.Main --name "Spark shell" "$@"
stty icanon echo > /dev/null 2>&1
else
export SPARK_SUBMIT_OPTS
"$FWDIR"/bin/spark-submit --class org.apache.spark.repl.Main "$@"
"$FWDIR"/bin/spark-submit --class org.apache.spark.repl.Main --name "Spark shell" "$@"
fi
}

Expand Down
2 changes: 1 addition & 1 deletion bin/spark-shell2.cmd
Original file line number Diff line number Diff line change
Expand Up @@ -32,4 +32,4 @@ if "x%SPARK_SUBMIT_OPTS%"=="x" (
set SPARK_SUBMIT_OPTS="%SPARK_SUBMIT_OPTS% -Dscala.usejavacp=true"

:run_shell
%SPARK_HOME%\bin\spark-submit2.cmd --class org.apache.spark.repl.Main %*
%SPARK_HOME%\bin\spark-submit2.cmd --class org.apache.spark.repl.Main --name "Spark shell" %*
8 changes: 6 additions & 2 deletions build/sbt-launch-lib.bash
Original file line number Diff line number Diff line change
Expand Up @@ -51,9 +51,13 @@ acquire_sbt_jar () {
printf "Attempting to fetch sbt\n"
JAR_DL="${JAR}.part"
if [ $(command -v curl) ]; then
(curl --silent ${URL1} > "${JAR_DL}" || curl --silent ${URL2} > "${JAR_DL}") && mv "${JAR_DL}" "${JAR}"
(curl --fail --location --silent ${URL1} > "${JAR_DL}" ||\
(rm -f "${JAR_DL}" && curl --fail --location --silent ${URL2} > "${JAR_DL}")) &&\
mv "${JAR_DL}" "${JAR}"
elif [ $(command -v wget) ]; then
(wget --quiet ${URL1} -O "${JAR_DL}" || wget --quiet ${URL2} -O "${JAR_DL}") && mv "${JAR_DL}" "${JAR}"
(wget --quiet ${URL1} -O "${JAR_DL}" ||\
(rm -f "${JAR_DL}" && wget --quiet ${URL2} -O "${JAR_DL}")) &&\
mv "${JAR_DL}" "${JAR}"
else
printf "You do not have curl or wget installed, please install sbt manually from http://www.scala-sbt.org/\n"
exit -1
Expand Down
88 changes: 67 additions & 21 deletions core/src/main/scala/org/apache/spark/SparkContext.scala
Original file line number Diff line number Diff line change
Expand Up @@ -471,7 +471,7 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
.orElse(Option(System.getenv("SPARK_MEM"))
.map(warnSparkMem))
.map(Utils.memoryStringToMb)
.getOrElse(512)
.getOrElse(1024)

// Convert java options to env vars as a work around
// since we can't set env vars directly in sbt.
Expand Down Expand Up @@ -1758,16 +1758,13 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli

/**
* Run a function on a given set of partitions in an RDD and pass the results to the given
* handler function. This is the main entry point for all actions in Spark. The allowLocal
* flag specifies whether the scheduler can run the computation on the driver rather than
* shipping it out to the cluster, for short actions like first().
* handler function. This is the main entry point for all actions in Spark.
*/
def runJob[T, U: ClassTag](
rdd: RDD[T],
func: (TaskContext, Iterator[T]) => U,
partitions: Seq[Int],
allowLocal: Boolean,
resultHandler: (Int, U) => Unit) {
resultHandler: (Int, U) => Unit): Unit = {
if (stopped.get()) {
throw new IllegalStateException("SparkContext has been shutdown")
}
Expand All @@ -1777,54 +1774,104 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
if (conf.getBoolean("spark.logLineage", false)) {
logInfo("RDD's recursive dependencies:\n" + rdd.toDebugString)
}
dagScheduler.runJob(rdd, cleanedFunc, partitions, callSite, allowLocal,
resultHandler, localProperties.get)
dagScheduler.runJob(rdd, cleanedFunc, partitions, callSite, resultHandler, localProperties.get)
progressBar.foreach(_.finishAll())
rdd.doCheckpoint()
}

/**
* Run a function on a given set of partitions in an RDD and return the results as an array. The
* allowLocal flag specifies whether the scheduler can run the computation on the driver rather
* than shipping it out to the cluster, for short actions like first().
* Run a function on a given set of partitions in an RDD and return the results as an array.
*/
def runJob[T, U: ClassTag](
rdd: RDD[T],
func: (TaskContext, Iterator[T]) => U,
partitions: Seq[Int]): Array[U] = {
val results = new Array[U](partitions.size)
runJob[T, U](rdd, func, partitions, (index, res) => results(index) = res)
results
}

/**
* Run a job on a given set of partitions of an RDD, but take a function of type
* `Iterator[T] => U` instead of `(TaskContext, Iterator[T]) => U`.
*/
def runJob[T, U: ClassTag](
rdd: RDD[T],
func: Iterator[T] => U,
partitions: Seq[Int]): Array[U] = {
val cleanedFunc = clean(func)
runJob(rdd, (ctx: TaskContext, it: Iterator[T]) => cleanedFunc(it), partitions)
}


/**
* Run a function on a given set of partitions in an RDD and pass the results to the given
* handler function. This is the main entry point for all actions in Spark.
*
* The allowLocal flag is deprecated as of Spark 1.5.0+.
*/
@deprecated("use the version of runJob without the allowLocal parameter", "1.5.0")
def runJob[T, U: ClassTag](
rdd: RDD[T],
func: (TaskContext, Iterator[T]) => U,
partitions: Seq[Int],
allowLocal: Boolean,
resultHandler: (Int, U) => Unit): Unit = {
if (allowLocal) {
logWarning("sc.runJob with allowLocal=true is deprecated in Spark 1.5.0+")
}
runJob(rdd, func, partitions, resultHandler)
}

/**
* Run a function on a given set of partitions in an RDD and return the results as an array.
*
* The allowLocal flag is deprecated as of Spark 1.5.0+.
*/
@deprecated("use the version of runJob without the allowLocal parameter", "1.5.0")
def runJob[T, U: ClassTag](
rdd: RDD[T],
func: (TaskContext, Iterator[T]) => U,
partitions: Seq[Int],
allowLocal: Boolean
): Array[U] = {
val results = new Array[U](partitions.size)
runJob[T, U](rdd, func, partitions, allowLocal, (index, res) => results(index) = res)
results
if (allowLocal) {
logWarning("sc.runJob with allowLocal=true is deprecated in Spark 1.5.0+")
}
runJob(rdd, func, partitions)
}

/**
* Run a job on a given set of partitions of an RDD, but take a function of type
* `Iterator[T] => U` instead of `(TaskContext, Iterator[T]) => U`.
*
* The allowLocal argument is deprecated as of Spark 1.5.0+.
*/
@deprecated("use the version of runJob without the allowLocal parameter", "1.5.0")
def runJob[T, U: ClassTag](
rdd: RDD[T],
func: Iterator[T] => U,
partitions: Seq[Int],
allowLocal: Boolean
): Array[U] = {
val cleanedFunc = clean(func)
runJob(rdd, (ctx: TaskContext, it: Iterator[T]) => cleanedFunc(it), partitions, allowLocal)
if (allowLocal) {
logWarning("sc.runJob with allowLocal=true is deprecated in Spark 1.5.0+")
}
runJob(rdd, func, partitions)
}

/**
* Run a job on all partitions in an RDD and return the results in an array.
*/
def runJob[T, U: ClassTag](rdd: RDD[T], func: (TaskContext, Iterator[T]) => U): Array[U] = {
runJob(rdd, func, 0 until rdd.partitions.size, false)
runJob(rdd, func, 0 until rdd.partitions.length)
}

/**
* Run a job on all partitions in an RDD and return the results in an array.
*/
def runJob[T, U: ClassTag](rdd: RDD[T], func: Iterator[T] => U): Array[U] = {
runJob(rdd, func, 0 until rdd.partitions.size, false)
runJob(rdd, func, 0 until rdd.partitions.length)
}

/**
Expand All @@ -1835,7 +1882,7 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
processPartition: (TaskContext, Iterator[T]) => U,
resultHandler: (Int, U) => Unit)
{
runJob[T, U](rdd, processPartition, 0 until rdd.partitions.size, false, resultHandler)
runJob[T, U](rdd, processPartition, 0 until rdd.partitions.length, resultHandler)
}

/**
Expand All @@ -1847,7 +1894,7 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
resultHandler: (Int, U) => Unit)
{
val processFunc = (context: TaskContext, iter: Iterator[T]) => processPartition(iter)
runJob[T, U](rdd, processFunc, 0 until rdd.partitions.size, false, resultHandler)
runJob[T, U](rdd, processFunc, 0 until rdd.partitions.length, resultHandler)
}

/**
Expand Down Expand Up @@ -1892,7 +1939,6 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
(context: TaskContext, iter: Iterator[T]) => cleanF(iter),
partitions,
callSite,
allowLocal = false,
resultHandler,
localProperties.get)
new SimpleFutureAction(waiter, resultFunc)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -364,7 +364,7 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable {
// This is useful for implementing `take` from other language frontends
// like Python where the data is serialized.
import scala.collection.JavaConversions._
val res = context.runJob(rdd, (it: Iterator[T]) => it.toArray, partitionIds, true)
val res = context.runJob(rdd, (it: Iterator[T]) => it.toArray, partitionIds)
res.map(x => new java.util.ArrayList(x.toSeq)).toArray
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -358,12 +358,11 @@ private[spark] object PythonRDD extends Logging {
def runJob(
sc: SparkContext,
rdd: JavaRDD[Array[Byte]],
partitions: JArrayList[Int],
allowLocal: Boolean): Int = {
partitions: JArrayList[Int]): Int = {
type ByteArray = Array[Byte]
type UnrolledPartition = Array[ByteArray]
val allPartitions: Array[UnrolledPartition] =
sc.runJob(rdd, (x: Iterator[ByteArray]) => x.toArray, partitions, allowLocal)
sc.runJob(rdd, (x: Iterator[ByteArray]) => x.toArray, partitions)
val flattenedPartition: UnrolledPartition = Array.concat(allPartitions: _*)
serveIterator(flattenedPartition.iterator,
s"serve RDD ${rdd.id} with partitions ${partitions.mkString(",")}")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ package org.apache.spark.api.r
import java.io.{ByteArrayInputStream, ByteArrayOutputStream, DataInputStream, DataOutputStream}

import scala.collection.mutable.HashMap
import scala.language.existentials

import io.netty.channel.ChannelHandler.Sharable
import io.netty.channel.{ChannelHandlerContext, SimpleChannelInboundHandler}
Expand Down
2 changes: 0 additions & 2 deletions core/src/main/scala/org/apache/spark/executor/Executor.scala
Original file line number Diff line number Diff line change
Expand Up @@ -215,8 +215,6 @@ private[spark] class Executor(
attemptNumber = attemptNumber,
metricsSystem = env.metricsSystem)
} finally {
// Note: this memory freeing logic is duplicated in DAGScheduler.runLocallyWithinThread;
// when changing this, make sure to update both copies.
val freedMemory = taskMemoryManager.cleanUpAllAllocatedMemory()
if (freedMemory > 0) {
val errMsg = s"Managed memory leak detected; size = $freedMemory bytes, TID = $taskId"
Expand Down
7 changes: 4 additions & 3 deletions core/src/main/scala/org/apache/spark/rdd/CoGroupedRDD.scala
Original file line number Diff line number Diff line change
Expand Up @@ -94,13 +94,14 @@ class CoGroupedRDD[K](@transient var rdds: Seq[RDD[_ <: Product2[K, _]]], part:
}

override def getDependencies: Seq[Dependency[_]] = {
rdds.map { rdd: RDD[_ <: Product2[K, _]] =>
rdds.map { rdd: RDD[_] =>
if (rdd.partitioner == Some(part)) {
logDebug("Adding one-to-one dependency with " + rdd)
new OneToOneDependency(rdd)
} else {
logDebug("Adding shuffle dependency with " + rdd)
new ShuffleDependency[K, Any, CoGroupCombiner](rdd, part, serializer)
new ShuffleDependency[K, Any, CoGroupCombiner](
rdd.asInstanceOf[RDD[_ <: Product2[K, _]]], part, serializer)
}
}
}
Expand Down Expand Up @@ -133,7 +134,7 @@ class CoGroupedRDD[K](@transient var rdds: Seq[RDD[_ <: Product2[K, _]]], part:
// A list of (rdd iterator, dependency number) pairs
val rddIterators = new ArrayBuffer[(Iterator[Product2[K, Any]], Int)]
for ((dep, depNum) <- dependencies.zipWithIndex) dep match {
case oneToOneDependency: OneToOneDependency[Product2[K, Any]] =>
case oneToOneDependency: OneToOneDependency[Product2[K, Any]] @unchecked =>
val dependencyPartition = split.narrowDeps(depNum).get.split
// Read them from the parent
val it = oneToOneDependency.rdd.iterator(dependencyPartition, context)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -881,7 +881,7 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
}
buf
} : Seq[V]
val res = self.context.runJob(self, process, Array(index), false)
val res = self.context.runJob(self, process, Array(index))
res(0)
case None =>
self.filter(_._1 == key).map(_._2).collect()
Expand Down
4 changes: 2 additions & 2 deletions core/src/main/scala/org/apache/spark/rdd/RDD.scala
Original file line number Diff line number Diff line change
Expand Up @@ -897,7 +897,7 @@ abstract class RDD[T: ClassTag](
*/
def toLocalIterator: Iterator[T] = withScope {
def collectPartition(p: Int): Array[T] = {
sc.runJob(this, (iter: Iterator[T]) => iter.toArray, Seq(p), allowLocal = false).head
sc.runJob(this, (iter: Iterator[T]) => iter.toArray, Seq(p)).head
}
(0 until partitions.length).iterator.flatMap(i => collectPartition(i))
}
Expand Down Expand Up @@ -1273,7 +1273,7 @@ abstract class RDD[T: ClassTag](

val left = num - buf.size
val p = partsScanned until math.min(partsScanned + numPartsToTry, totalParts)
val res = sc.runJob(this, (it: Iterator[T]) => it.take(left).toArray, p, allowLocal = true)
val res = sc.runJob(this, (it: Iterator[T]) => it.take(left).toArray, p)

res.foreach(buf ++= _.take(num - buf.size))
partsScanned += numPartsToTry
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,8 +50,7 @@ class ZippedWithIndexRDD[T: ClassTag](@transient prev: RDD[T]) extends RDD[(T, L
prev.context.runJob(
prev,
Utils.getIteratorSize _,
0 until n - 1, // do not need to count the last partition
allowLocal = false
0 until n - 1 // do not need to count the last partition
).scanLeft(0L)(_ + _)
}
}
Expand Down
Loading

0 comments on commit ded1a14

Please sign in to comment.