Skip to content

Commit

Permalink
Merge branch 'master' of github.com:apache/spark into existing_rdd
Browse files Browse the repository at this point in the history
  • Loading branch information
Davies Liu committed Mar 11, 2016
2 parents b482d2c + 2ef4c59 commit 5975560
Show file tree
Hide file tree
Showing 321 changed files with 5,144 additions and 3,236 deletions.
22 changes: 21 additions & 1 deletion R/pkg/R/DataFrame.R
Original file line number Diff line number Diff line change
Expand Up @@ -303,8 +303,28 @@ setMethod("colnames",
#' @rdname columns
#' @name colnames<-
setMethod("colnames<-",
signature(x = "DataFrame", value = "character"),
signature(x = "DataFrame"),
function(x, value) {

# Check parameter integrity
if (class(value) != "character") {
stop("Invalid column names.")
}

if (length(value) != ncol(x)) {
stop(
"Column names must have the same length as the number of columns in the dataset.")
}

if (any(is.na(value))) {
stop("Column names cannot be NA.")
}

# Check if the column names have . in it
if (any(regexec(".", value, fixed=TRUE)[[1]][1] != -1)) {
stop("Colum names cannot contain the '.' symbol.")
}

sdf <- callJMethod(x@sdf, "toDF", as.list(value))
dataFrame(sdf)
})
Expand Down
40 changes: 32 additions & 8 deletions R/pkg/R/functions.R
Original file line number Diff line number Diff line change
Expand Up @@ -536,15 +536,27 @@ setMethod("factorial",
#'
#' Aggregate function: returns the first value in a group.
#'
#' The function by default returns the first values it sees. It will return the first non-missing
#' value it sees when na.rm is set to true. If all values are missing, then NA is returned.
#'
#' @rdname first
#' @name first
#' @family agg_funcs
#' @export
#' @examples \dontrun{first(df$c)}
#' @examples
#' \dontrun{
#' first(df$c)
#' first(df$c, TRUE)
#' }
setMethod("first",
signature(x = "Column"),
function(x) {
jc <- callJStatic("org.apache.spark.sql.functions", "first", x@jc)
signature(x = "characterOrColumn"),
function(x, na.rm = FALSE) {
col <- if (class(x) == "Column") {
x@jc
} else {
x
}
jc <- callJStatic("org.apache.spark.sql.functions", "first", col, na.rm)
column(jc)
})

Expand Down Expand Up @@ -663,15 +675,27 @@ setMethod("kurtosis",
#'
#' Aggregate function: returns the last value in a group.
#'
#' The function by default returns the last values it sees. It will return the last non-missing
#' value it sees when na.rm is set to true. If all values are missing, then NA is returned.
#'
#' @rdname last
#' @name last
#' @family agg_funcs
#' @export
#' @examples \dontrun{last(df$c)}
#' @examples
#' \dontrun{
#' last(df$c)
#' last(df$c, TRUE)
#' }
setMethod("last",
signature(x = "Column"),
function(x) {
jc <- callJStatic("org.apache.spark.sql.functions", "last", x@jc)
signature(x = "characterOrColumn"),
function(x, na.rm = FALSE) {
col <- if (class(x) == "Column") {
x@jc
} else {
x
}
jc <- callJStatic("org.apache.spark.sql.functions", "last", col, na.rm)
column(jc)
})

Expand Down
4 changes: 2 additions & 2 deletions R/pkg/R/generics.R
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ setGeneric("filterRDD", function(x, f) { standardGeneric("filterRDD") })

# @rdname first
# @export
setGeneric("first", function(x) { standardGeneric("first") })
setGeneric("first", function(x, ...) { standardGeneric("first") })

# @rdname flatMap
# @export
Expand Down Expand Up @@ -889,7 +889,7 @@ setGeneric("lag", function(x, ...) { standardGeneric("lag") })

#' @rdname last
#' @export
setGeneric("last", function(x) { standardGeneric("last") })
setGeneric("last", function(x, ...) { standardGeneric("last") })

#' @rdname last_day
#' @export
Expand Down
22 changes: 22 additions & 0 deletions R/pkg/inst/tests/testthat/test_sparkSQL.R
Original file line number Diff line number Diff line change
Expand Up @@ -691,6 +691,17 @@ test_that("names() colnames() set the column names", {
colnames(df) <- c("col3", "col4")
expect_equal(names(df)[1], "col3")

expect_error(colnames(df) <- c("sepal.length", "sepal_width"),
"Colum names cannot contain the '.' symbol.")
expect_error(colnames(df) <- c(1, 2), "Invalid column names.")
expect_error(colnames(df) <- c("a"),
"Column names must have the same length as the number of columns in the dataset.")
expect_error(colnames(df) <- c("1", NA), "Column names cannot be NA.")

# Note: if this test is broken, remove check for "." character on colnames<- method
irisDF <- suppressWarnings(createDataFrame(sqlContext, iris))
expect_equal(names(irisDF)[1], "Sepal_Length")

# Test base::colnames base::names
m2 <- cbind(1, 1:4)
expect_equal(colnames(m2, do.NULL = FALSE), c("col1", "col2"))
Expand Down Expand Up @@ -1065,6 +1076,17 @@ test_that("column functions", {
result <- collect(select(df, encode(df$a, "utf-8"), decode(df$c, "utf-8")))
expect_equal(result[[1]][[1]], bytes)
expect_equal(result[[2]], markUtf8("大千世界"))

# Test first(), last()
df <- read.json(sqlContext, jsonPath)
expect_equal(collect(select(df, first(df$age)))[[1]], NA)
expect_equal(collect(select(df, first(df$age, TRUE)))[[1]], 30)
expect_equal(collect(select(df, first("age")))[[1]], NA)
expect_equal(collect(select(df, first("age", TRUE)))[[1]], 30)
expect_equal(collect(select(df, last(df$age)))[[1]], 19)
expect_equal(collect(select(df, last(df$age, TRUE)))[[1]], 19)
expect_equal(collect(select(df, last("age")))[[1]], 19)
expect_equal(collect(select(df, last("age", TRUE)))[[1]], 19)
})

test_that("column binary mathfunctions", {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@
<td class="rowGroupColumn"><span title="{{id}}"><a href="{{url}}">{{id}}</a></span></td>
<td class="rowGroupColumn">{{name}}</td>
{{#attempts}}
<td class="attemptIDSpan"><a href="/history/{{id}}/{{attemptId}}/">{{attemptId}}</a></td>
<td class="attemptIDSpan"><a href="history/{{id}}/{{attemptId}}/">{{attemptId}}</a></td>
<td>{{startTime}}</td>
<td>{{endTime}}</td>
<td><span title="{{duration}}" class="durationClass">{{duration}}</span></td>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ $(document).ready(function() {
requestedIncomplete = getParameterByName("showIncomplete", searchString);
requestedIncomplete = (requestedIncomplete == "true" ? true : false);

$.getJSON("/api/v1/applications", function(response,status,jqXHR) {
$.getJSON("api/v1/applications", function(response,status,jqXHR) {
var array = [];
var hasMultipleAttempts = false;
for (i in response) {
Expand Down Expand Up @@ -139,9 +139,9 @@ $(document).ready(function() {

var url = null
if (maxAttemptId == null) {
url = "/history/" + id + "/"
url = "history/" + id + "/"
} else {
url = "/history/" + id + "/" + maxAttemptId + "/"
url = "history/" + id + "/" + maxAttemptId + "/"
}

var app_clone = {"id" : id, "name" : name, "url" : url, "attempts" : [attempt]};
Expand All @@ -150,7 +150,7 @@ $(document).ready(function() {
}

var data = {"applications": array}
$.get("/static/historypage-template.html", function(template) {
$.get("static/historypage-template.html", function(template) {
historySummary.append(Mustache.render($(template).filter("#history-summary-template").html(),data));
var selector = "#history-summary-table";
var conf = {
Expand All @@ -165,7 +165,7 @@ $(document).ready(function() {
{name: 'eighth'},
],
"autoWidth": false,
"order": [[ 0, "desc" ]]
"order": [[ 4, "desc" ]]
};

var rowGroupConf = {
Expand Down
11 changes: 8 additions & 3 deletions core/src/main/scala/org/apache/spark/SparkConf.scala
Original file line number Diff line number Diff line change
Expand Up @@ -388,7 +388,10 @@ class SparkConf(loadDefaults: Boolean) extends Cloneable with Logging {
def getAppId: String = get("spark.app.id")

/** Does the configuration contain a given parameter? */
def contains(key: String): Boolean = settings.containsKey(key)
def contains(key: String): Boolean = {
settings.containsKey(key) ||
configsWithAlternatives.get(key).toSeq.flatten.exists { alt => contains(alt.key) }
}

/** Copy this object */
override def clone: SparkConf = {
Expand Down Expand Up @@ -653,7 +656,9 @@ private[spark] object SparkConf extends Logging {
"spark.memory.offHeap.enabled" -> Seq(
AlternateConfig("spark.unsafe.offHeap", "1.6")),
"spark.rpc.message.maxSize" -> Seq(
AlternateConfig("spark.akka.frameSize", "1.6"))
AlternateConfig("spark.akka.frameSize", "1.6")),
"spark.yarn.jars" -> Seq(
AlternateConfig("spark.yarn.jar", "2.0"))
)

/**
Expand Down Expand Up @@ -715,7 +720,7 @@ private[spark] object SparkConf extends Logging {
allAlternatives.get(key).foreach { case (newKey, cfg) =>
logWarning(
s"The configuration key '$key' has been deprecated as of Spark ${cfg.version} and " +
s"and may be removed in the future. Please use the new key '$newKey' instead.")
s"may be removed in the future. Please use the new key '$newKey' instead.")
return
}
if (key.startsWith("spark.akka") || key.startsWith("spark.ssl.akka")) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -173,7 +173,7 @@ private[spark] class TorrentBroadcast[T: ClassTag](obj: T, id: Long)
TorrentBroadcast.synchronized {
setConf(SparkEnv.get.conf)
val blockManager = SparkEnv.get.blockManager
blockManager.getLocal(broadcastId).map(_.data.next()) match {
blockManager.getLocalValues(broadcastId).map(_.data.next()) match {
case Some(x) =>
releaseLock(broadcastId)
x.asInstanceOf[T]
Expand Down
10 changes: 10 additions & 0 deletions core/src/main/scala/org/apache/spark/deploy/DeployMessage.scala
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,16 @@ private[deploy] object DeployMessages {
case class WorkerSchedulerStateResponse(id: String, executors: List[ExecutorDescription],
driverIds: Seq[String])

/**
* A worker will send this message to the master when it registers with the master. Then the
* master will compare them with the executors and drivers in the master and tell the worker to
* kill the unknown executors and drivers.
*/
case class WorkerLatestState(
id: String,
executors: Seq[ExecutorDescription],
driverIds: Seq[String]) extends DeployMessage

case class Heartbeat(workerId: String, worker: RpcEndpointRef) extends DeployMessage

// Master to Worker
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -626,7 +626,6 @@ object SparkSubmit {
val pathConfigs = Seq(
"spark.jars",
"spark.files",
"spark.yarn.jar",
"spark.yarn.dist.files",
"spark.yarn.dist.archives")
pathConfigs.foreach { config =>
Expand Down
25 changes: 25 additions & 0 deletions core/src/main/scala/org/apache/spark/deploy/master/Master.scala
Original file line number Diff line number Diff line change
Expand Up @@ -368,6 +368,30 @@ private[deploy] class Master(
if (canCompleteRecovery) { completeRecovery() }
}

case WorkerLatestState(workerId, executors, driverIds) =>
idToWorker.get(workerId) match {
case Some(worker) =>
for (exec <- executors) {
val executorMatches = worker.executors.exists {
case (_, e) => e.application.id == exec.appId && e.id == exec.execId
}
if (!executorMatches) {
// master doesn't recognize this executor. So just tell worker to kill it.
worker.endpoint.send(KillExecutor(masterUrl, exec.appId, exec.execId))
}
}

for (driverId <- driverIds) {
val driverMatches = worker.drivers.exists { case (id, _) => id == driverId }
if (!driverMatches) {
// master doesn't recognize this driver. So just tell worker to kill it.
worker.endpoint.send(KillDriver(driverId))
}
}
case None =>
logWarning("Worker state from unknown worker: " + workerId)
}

case UnregisterApplication(applicationId) =>
logInfo(s"Received unregister request from application $applicationId")
idToApp.get(applicationId).foreach(finishApplication)
Expand Down Expand Up @@ -763,6 +787,7 @@ private[deploy] class Master(
logInfo("Telling app of lost executor: " + exec.id)
exec.application.driver.send(ExecutorUpdated(
exec.id, ExecutorState.LOST, Some("worker lost"), None))
exec.state = ExecutorState.LOST
exec.application.removeExecutor(exec)
}
for (driver <- worker.drivers.values) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ private[mesos] class MesosClusterDispatcher(

def start(): Unit = {
webUi.bind()
scheduler.frameworkUrl = webUi.activeWebUiUrl
scheduler.frameworkUrl = conf.get("spark.mesos.dispatcher.webui.url", webUi.activeWebUiUrl)
scheduler.start()
server.start()
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ private[mesos] class MesosClusterDispatcherArguments(args: Array[String], conf:
port = value
parse(tail)

case ("--webui-port" | "-p") :: IntParam(value) :: tail =>
case ("--webui-port") :: IntParam(value) :: tail =>
webUiPort = value
parse(tail)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -374,6 +374,11 @@ private[deploy] class Worker(
}, CLEANUP_INTERVAL_MILLIS, CLEANUP_INTERVAL_MILLIS, TimeUnit.MILLISECONDS)
}

val execs = executors.values.map { e =>
new ExecutorDescription(e.appId, e.execId, e.cores, e.state)
}
masterRef.send(WorkerLatestState(workerId, execs.toList, drivers.keys.toSeq))

case RegisterWorkerFailed(message) =>
if (!registered) {
logError("Worker registration failed: " + message)
Expand Down
9 changes: 6 additions & 3 deletions core/src/main/scala/org/apache/spark/executor/Executor.scala
Original file line number Diff line number Diff line change
Expand Up @@ -97,9 +97,11 @@ private[spark] class Executor(
// Set the classloader for serializer
env.serializer.setDefaultClassLoader(replClassLoader)

// Max RPC message size. If task result is bigger than this, we use the block manager
// Max size of direct result. If task result is bigger than this, we use the block manager
// to send the result back.
private val maxRpcMessageSize = RpcUtils.maxMessageSizeBytes(conf)
private val maxDirectResultSize = Math.min(
conf.getSizeAsBytes("spark.task.maxDirectResultSize", 1L << 20),
RpcUtils.maxMessageSizeBytes(conf))

// Limit of bytes for total size of results (default is 1GB)
private val maxResultSize = Utils.getMaxResultSize(conf)
Expand Down Expand Up @@ -279,6 +281,7 @@ private[spark] class Executor(

// Note: accumulator updates must be collected after TaskMetrics is updated
val accumUpdates = task.collectAccumulatorUpdates()
// TODO: do not serialize value twice
val directResult = new DirectTaskResult(valueBytes, accumUpdates)
val serializedDirectResult = ser.serialize(directResult)
val resultSize = serializedDirectResult.limit
Expand All @@ -290,7 +293,7 @@ private[spark] class Executor(
s"(${Utils.bytesToString(resultSize)} > ${Utils.bytesToString(maxResultSize)}), " +
s"dropping it.")
ser.serialize(new IndirectTaskResult[Any](TaskResultBlockId(taskId), resultSize))
} else if (resultSize >= maxRpcMessageSize) {
} else if (resultSize > maxDirectResultSize) {
val blockId = TaskResultBlockId(taskId)
env.blockManager.putBytes(
blockId, serializedDirectResult, StorageLevel.MEMORY_AND_DISK_SER)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,8 @@ package org.apache.spark.memory
import javax.annotation.concurrent.GuardedBy

import org.apache.spark.{Logging, SparkConf}
import org.apache.spark.storage.{BlockId, MemoryStore}
import org.apache.spark.storage.BlockId
import org.apache.spark.storage.memory.MemoryStore
import org.apache.spark.unsafe.array.ByteArrayMethods
import org.apache.spark.unsafe.memory.MemoryAllocator

Expand Down Expand Up @@ -113,6 +114,7 @@ private[spark] abstract class MemoryManager(

/**
* Release all memory for the given task and mark it as inactive (e.g. when a task ends).
*
* @return the number of bytes freed.
*/
private[memory] def releaseAllExecutionMemoryForTask(taskAttemptId: Long): Long = synchronized {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,8 @@ package org.apache.spark.memory
import javax.annotation.concurrent.GuardedBy

import org.apache.spark.Logging
import org.apache.spark.storage.{BlockId, MemoryStore}
import org.apache.spark.storage.BlockId
import org.apache.spark.storage.memory.MemoryStore

/**
* Performs bookkeeping for managing an adjustable-size pool of memory that is used for storage
Expand Down Expand Up @@ -55,6 +56,7 @@ private[memory] class StorageMemoryPool(lock: Object) extends MemoryPool(lock) w

/**
* Acquire N bytes of memory to cache the given block, evicting existing ones if necessary.
*
* @return whether all N bytes were successfully granted.
*/
def acquireMemory(blockId: BlockId, numBytes: Long): Boolean = lock.synchronized {
Expand Down
Loading

0 comments on commit 5975560

Please sign in to comment.