Skip to content

Commit

Permalink
Resolved merge conflict
Browse files Browse the repository at this point in the history
  • Loading branch information
Ilya Ganelin committed Jul 8, 2015
2 parents 61a4c8e + 8f3cd93 commit 12c11a7
Show file tree
Hide file tree
Showing 181 changed files with 3,545 additions and 1,410 deletions.
2 changes: 1 addition & 1 deletion R/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ SparkR is an R package that provides a light-weight frontend to use Spark from R

#### Build Spark

Build Spark with [Maven](http://spark.apache.org/docs/latest/building-spark.html#building-with-buildmvn) and include the `-PsparkR` profile to build the R package. For example to use the default Hadoop versions you can run
Build Spark with [Maven](http://spark.apache.org/docs/latest/building-spark.html#building-with-buildmvn) and include the `-Psparkr` profile to build the R package. For example to use the default Hadoop versions you can run
```
build/mvn -DskipTests -Psparkr package
```
Expand Down
3 changes: 2 additions & 1 deletion R/pkg/R/generics.R
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,8 @@
# @rdname aggregateRDD
# @seealso reduce
# @export
setGeneric("aggregateRDD", function(x, zeroValue, seqOp, combOp) { standardGeneric("aggregateRDD") })
setGeneric("aggregateRDD",
function(x, zeroValue, seqOp, combOp) { standardGeneric("aggregateRDD") })

# @rdname cache-methods
# @export
Expand Down
12 changes: 6 additions & 6 deletions R/pkg/R/pairRDD.R
Original file line number Diff line number Diff line change
Expand Up @@ -560,8 +560,8 @@ setMethod("join",
# Left outer join two RDDs
#
# @description
# \code{leftouterjoin} This function left-outer-joins two RDDs where every element is of the form list(K, V).
# The key types of the two RDDs should be the same.
# \code{leftouterjoin} This function left-outer-joins two RDDs where every element is of
# the form list(K, V). The key types of the two RDDs should be the same.
#
# @param x An RDD to be joined. Should be an RDD where each element is
# list(K, V).
Expand Down Expand Up @@ -597,8 +597,8 @@ setMethod("leftOuterJoin",
# Right outer join two RDDs
#
# @description
# \code{rightouterjoin} This function right-outer-joins two RDDs where every element is of the form list(K, V).
# The key types of the two RDDs should be the same.
# \code{rightouterjoin} This function right-outer-joins two RDDs where every element is of
# the form list(K, V). The key types of the two RDDs should be the same.
#
# @param x An RDD to be joined. Should be an RDD where each element is
# list(K, V).
Expand Down Expand Up @@ -634,8 +634,8 @@ setMethod("rightOuterJoin",
# Full outer join two RDDs
#
# @description
# \code{fullouterjoin} This function full-outer-joins two RDDs where every element is of the form list(K, V).
# The key types of the two RDDs should be the same.
# \code{fullouterjoin} This function full-outer-joins two RDDs where every element is of
# the form list(K, V). The key types of the two RDDs should be the same.
#
# @param x An RDD to be joined. Should be an RDD where each element is
# list(K, V).
Expand Down
9 changes: 6 additions & 3 deletions R/pkg/R/sparkR.R
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,8 @@ sparkR.init <- function(
sparkPackages = "") {

if (exists(".sparkRjsc", envir = .sparkREnv)) {
cat("Re-using existing Spark Context. Please stop SparkR with sparkR.stop() or restart R to create a new Spark Context\n")
cat(paste("Re-using existing Spark Context.",
"Please stop SparkR with sparkR.stop() or restart R to create a new Spark Context\n"))
return(get(".sparkRjsc", envir = .sparkREnv))
}

Expand Down Expand Up @@ -180,14 +181,16 @@ sparkR.init <- function(

sparkExecutorEnvMap <- new.env()
if (!any(names(sparkExecutorEnv) == "LD_LIBRARY_PATH")) {
sparkExecutorEnvMap[["LD_LIBRARY_PATH"]] <- paste0("$LD_LIBRARY_PATH:",Sys.getenv("LD_LIBRARY_PATH"))
sparkExecutorEnvMap[["LD_LIBRARY_PATH"]] <-
paste0("$LD_LIBRARY_PATH:",Sys.getenv("LD_LIBRARY_PATH"))
}
for (varname in names(sparkExecutorEnv)) {
sparkExecutorEnvMap[[varname]] <- sparkExecutorEnv[[varname]]
}

nonEmptyJars <- Filter(function(x) { x != "" }, jars)
localJarPaths <- sapply(nonEmptyJars, function(j) { utils::URLencode(paste("file:", uriSep, j, sep = "")) })
localJarPaths <- sapply(nonEmptyJars,
function(j) { utils::URLencode(paste("file:", uriSep, j, sep = "")) })

# Set the start time to identify jobjs
# Seconds resolution is good enough for this purpose, so use ints
Expand Down
31 changes: 18 additions & 13 deletions R/pkg/R/utils.R
Original file line number Diff line number Diff line change
Expand Up @@ -334,18 +334,21 @@ getStorageLevel <- function(newLevel = c("DISK_ONLY",
"MEMORY_ONLY_SER_2",
"OFF_HEAP")) {
match.arg(newLevel)
storageLevelClass <- "org.apache.spark.storage.StorageLevel"
storageLevel <- switch(newLevel,
"DISK_ONLY" = callJStatic("org.apache.spark.storage.StorageLevel", "DISK_ONLY"),
"DISK_ONLY_2" = callJStatic("org.apache.spark.storage.StorageLevel", "DISK_ONLY_2"),
"MEMORY_AND_DISK" = callJStatic("org.apache.spark.storage.StorageLevel", "MEMORY_AND_DISK"),
"MEMORY_AND_DISK_2" = callJStatic("org.apache.spark.storage.StorageLevel", "MEMORY_AND_DISK_2"),
"MEMORY_AND_DISK_SER" = callJStatic("org.apache.spark.storage.StorageLevel", "MEMORY_AND_DISK_SER"),
"MEMORY_AND_DISK_SER_2" = callJStatic("org.apache.spark.storage.StorageLevel", "MEMORY_AND_DISK_SER_2"),
"MEMORY_ONLY" = callJStatic("org.apache.spark.storage.StorageLevel", "MEMORY_ONLY"),
"MEMORY_ONLY_2" = callJStatic("org.apache.spark.storage.StorageLevel", "MEMORY_ONLY_2"),
"MEMORY_ONLY_SER" = callJStatic("org.apache.spark.storage.StorageLevel", "MEMORY_ONLY_SER"),
"MEMORY_ONLY_SER_2" = callJStatic("org.apache.spark.storage.StorageLevel", "MEMORY_ONLY_SER_2"),
"OFF_HEAP" = callJStatic("org.apache.spark.storage.StorageLevel", "OFF_HEAP"))
"DISK_ONLY" = callJStatic(storageLevelClass, "DISK_ONLY"),
"DISK_ONLY_2" = callJStatic(storageLevelClass, "DISK_ONLY_2"),
"MEMORY_AND_DISK" = callJStatic(storageLevelClass, "MEMORY_AND_DISK"),
"MEMORY_AND_DISK_2" = callJStatic(storageLevelClass, "MEMORY_AND_DISK_2"),
"MEMORY_AND_DISK_SER" = callJStatic(storageLevelClass,
"MEMORY_AND_DISK_SER"),
"MEMORY_AND_DISK_SER_2" = callJStatic(storageLevelClass,
"MEMORY_AND_DISK_SER_2"),
"MEMORY_ONLY" = callJStatic(storageLevelClass, "MEMORY_ONLY"),
"MEMORY_ONLY_2" = callJStatic(storageLevelClass, "MEMORY_ONLY_2"),
"MEMORY_ONLY_SER" = callJStatic(storageLevelClass, "MEMORY_ONLY_SER"),
"MEMORY_ONLY_SER_2" = callJStatic(storageLevelClass, "MEMORY_ONLY_SER_2"),
"OFF_HEAP" = callJStatic(storageLevelClass, "OFF_HEAP"))
}

# Utility function for functions where an argument needs to be integer but we want to allow
Expand Down Expand Up @@ -545,9 +548,11 @@ mergePartitions <- function(rdd, zip) {
lengthOfKeys <- part[[len - lengthOfValues]]
stopifnot(len == lengthOfKeys + lengthOfValues)

# For zip operation, check if corresponding partitions of both RDDs have the same number of elements.
# For zip operation, check if corresponding partitions
# of both RDDs have the same number of elements.
if (zip && lengthOfKeys != lengthOfValues) {
stop("Can only zip RDDs with same number of elements in each pair of corresponding partitions.")
stop(paste("Can only zip RDDs with same number of elements",
"in each pair of corresponding partitions."))
}

if (lengthOfKeys > 1) {
Expand Down
4 changes: 2 additions & 2 deletions R/pkg/inst/tests/test_includeJAR.R
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,8 @@ context("include an external JAR in SparkContext")

runScript <- function() {
sparkHome <- Sys.getenv("SPARK_HOME")
jarPath <- paste("--jars",
shQuote(file.path(sparkHome, "R/lib/SparkR/test_support/sparktestjar_2.10-1.0.jar")))
sparkTestJarPath <- "R/lib/SparkR/test_support/sparktestjar_2.10-1.0.jar"
jarPath <- paste("--jars", shQuote(file.path(sparkHome, sparkTestJarPath)))
scriptPath <- file.path(sparkHome, "R/lib/SparkR/tests/jarTest.R")
submitPath <- file.path(sparkHome, "bin/spark-submit")
res <- system2(command = submitPath,
Expand Down
12 changes: 8 additions & 4 deletions R/pkg/inst/tests/test_rdd.R
Original file line number Diff line number Diff line change
Expand Up @@ -669,27 +669,31 @@ test_that("fullOuterJoin() on pairwise RDDs", {
rdd1 <- parallelize(sc, list(list(1,2), list(1,3), list(3,3)))
rdd2 <- parallelize(sc, list(list(1,1), list(2,4)))
actual <- collect(fullOuterJoin(rdd1, rdd2, 2L))
expected <- list(list(1, list(2, 1)), list(1, list(3, 1)), list(2, list(NULL, 4)), list(3, list(3, NULL)))
expected <- list(list(1, list(2, 1)), list(1, list(3, 1)),
list(2, list(NULL, 4)), list(3, list(3, NULL)))
expect_equal(sortKeyValueList(actual), sortKeyValueList(expected))

rdd1 <- parallelize(sc, list(list("a",2), list("a",3), list("c", 1)))
rdd2 <- parallelize(sc, list(list("a",1), list("b",4)))
actual <- collect(fullOuterJoin(rdd1, rdd2, 2L))
expected <- list(list("b", list(NULL, 4)), list("a", list(2, 1)), list("a", list(3, 1)), list("c", list(1, NULL)))
expected <- list(list("b", list(NULL, 4)), list("a", list(2, 1)),
list("a", list(3, 1)), list("c", list(1, NULL)))
expect_equal(sortKeyValueList(actual),
sortKeyValueList(expected))

rdd1 <- parallelize(sc, list(list(1,1), list(2,2)))
rdd2 <- parallelize(sc, list(list(3,3), list(4,4)))
actual <- collect(fullOuterJoin(rdd1, rdd2, 2L))
expect_equal(sortKeyValueList(actual),
sortKeyValueList(list(list(1, list(1, NULL)), list(2, list(2, NULL)), list(3, list(NULL, 3)), list(4, list(NULL, 4)))))
sortKeyValueList(list(list(1, list(1, NULL)), list(2, list(2, NULL)),
list(3, list(NULL, 3)), list(4, list(NULL, 4)))))

rdd1 <- parallelize(sc, list(list("a",1), list("b",2)))
rdd2 <- parallelize(sc, list(list("c",3), list("d",4)))
actual <- collect(fullOuterJoin(rdd1, rdd2, 2L))
expect_equal(sortKeyValueList(actual),
sortKeyValueList(list(list("a", list(1, NULL)), list("b", list(2, NULL)), list("d", list(NULL, 4)), list("c", list(NULL, 3)))))
sortKeyValueList(list(list("a", list(1, NULL)), list("b", list(2, NULL)),
list("d", list(NULL, 4)), list("c", list(NULL, 3)))))
})

test_that("sortByKey() on pairwise RDDs", {
Expand Down
11 changes: 9 additions & 2 deletions R/pkg/inst/tests/test_sparkSQL.R
Original file line number Diff line number Diff line change
Expand Up @@ -391,7 +391,7 @@ test_that("collect() and take() on a DataFrame return the same number of rows an
expect_equal(ncol(collect(df)), ncol(take(df, 10)))
})

test_that("multiple pipeline transformations starting with a DataFrame result in an RDD with the correct values", {
test_that("multiple pipeline transformations result in an RDD with the correct values", {
df <- jsonFile(sqlContext, jsonPath)
first <- lapply(df, function(row) {
row$age <- row$age + 5
Expand Down Expand Up @@ -756,7 +756,14 @@ test_that("toJSON() returns an RDD of the correct values", {
test_that("showDF()", {
df <- jsonFile(sqlContext, jsonPath)
s <- capture.output(showDF(df))
expect_output(s , "+----+-------+\n| age| name|\n+----+-------+\n|null|Michael|\n| 30| Andy|\n| 19| Justin|\n+----+-------+\n")
expected <- paste("+----+-------+\n",
"| age| name|\n",
"+----+-------+\n",
"|null|Michael|\n",
"| 30| Andy|\n",
"| 19| Justin|\n",
"+----+-------+\n", sep="")
expect_output(s , expected)
})

test_that("isLocal()", {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,12 +76,13 @@ private[deploy] object JsonProtocol {
}

def writeMasterState(obj: MasterStateResponse): JObject = {
val aliveWorkers = obj.workers.filter(_.isAlive())
("url" -> obj.uri) ~
("workers" -> obj.workers.toList.map(writeWorkerInfo)) ~
("cores" -> obj.workers.map(_.cores).sum) ~
("coresused" -> obj.workers.map(_.coresUsed).sum) ~
("memory" -> obj.workers.map(_.memory).sum) ~
("memoryused" -> obj.workers.map(_.memoryUsed).sum) ~
("cores" -> aliveWorkers.map(_.cores).sum) ~
("coresused" -> aliveWorkers.map(_.coresUsed).sum) ~
("memory" -> aliveWorkers.map(_.memory).sum) ~
("memoryused" -> aliveWorkers.map(_.memoryUsed).sum) ~
("activeapps" -> obj.activeApps.toList.map(writeApplicationInfo)) ~
("completedapps" -> obj.completedApps.toList.map(writeApplicationInfo)) ~
("activedrivers" -> obj.activeDrivers.toList.map(writeDriverInfo)) ~
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -105,4 +105,6 @@ private[spark] class WorkerInfo(
def setState(state: WorkerState.Value): Unit = {
this.state = state
}

def isAlive(): Boolean = this.state == WorkerState.ALIVE
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,18 +18,18 @@
package org.apache.spark.scheduler.cluster.mesos

import java.io.File
import java.util.{Collections, List => JList}
import java.util.{List => JList}

import scala.collection.JavaConversions._
import scala.collection.mutable.{HashMap, HashSet}

import org.apache.mesos.Protos.{TaskInfo => MesosTaskInfo, _}
import org.apache.mesos.{Scheduler => MScheduler, _}
import org.apache.mesos.Protos.{TaskInfo => MesosTaskInfo, _}
import org.apache.spark.{SparkContext, SparkEnv, SparkException, TaskState}
import org.apache.spark.rpc.RpcAddress
import org.apache.spark.scheduler.TaskSchedulerImpl
import org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend
import org.apache.spark.util.Utils
import org.apache.spark.{SparkContext, SparkEnv, SparkException, TaskState}

/**
* A SchedulerBackend that runs tasks on Mesos, but uses "coarse-grained" tasks, where it holds
Expand Down Expand Up @@ -66,6 +66,10 @@ private[spark] class CoarseMesosSchedulerBackend(

val extraCoresPerSlave = conf.getInt("spark.mesos.extra.cores", 0)

// Offer constraints
private val slaveOfferConstraints =
parseConstraintString(sc.conf.get("spark.mesos.constraints", ""))

var nextMesosTaskId = 0

@volatile var appId: String = _
Expand Down Expand Up @@ -170,13 +174,16 @@ private[spark] class CoarseMesosSchedulerBackend(
override def resourceOffers(d: SchedulerDriver, offers: JList[Offer]) {
synchronized {
val filters = Filters.newBuilder().setRefuseSeconds(5).build()

for (offer <- offers) {
val offerAttributes = toAttributeMap(offer.getAttributesList)
val meetsConstraints = matchesAttributeRequirements(slaveOfferConstraints, offerAttributes)
val slaveId = offer.getSlaveId.toString
val mem = getResource(offer.getResourcesList, "mem")
val cpus = getResource(offer.getResourcesList, "cpus").toInt
if (totalCoresAcquired < maxCores &&
mem >= MemoryUtils.calculateTotalMemory(sc) &&
val id = offer.getId.getValue
if (meetsConstraints &&
totalCoresAcquired < maxCores &&
mem >= calculateTotalMemory(sc) &&
cpus >= 1 &&
failuresBySlaveId.getOrElse(slaveId, 0) < MAX_SLAVE_FAILURES &&
!slaveIdsWithExecutors.contains(slaveId)) {
Expand All @@ -193,33 +200,25 @@ private[spark] class CoarseMesosSchedulerBackend(
.setCommand(createCommand(offer, cpusToUse + extraCoresPerSlave))
.setName("Task " + taskId)
.addResources(createResource("cpus", cpusToUse))
.addResources(createResource("mem",
MemoryUtils.calculateTotalMemory(sc)))
.addResources(createResource("mem", calculateTotalMemory(sc)))

sc.conf.getOption("spark.mesos.executor.docker.image").foreach { image =>
MesosSchedulerBackendUtil
.setupContainerBuilderDockerInfo(image, sc.conf, task.getContainerBuilder())
.setupContainerBuilderDockerInfo(image, sc.conf, task.getContainerBuilder)
}

d.launchTasks(
Collections.singleton(offer.getId), Collections.singletonList(task.build()), filters)
// accept the offer and launch the task
logDebug(s"Accepting offer: $id with attributes: $offerAttributes mem: $mem cpu: $cpus")
d.launchTasks(List(offer.getId), List(task.build()), filters)
} else {
// Filter it out
d.launchTasks(
Collections.singleton(offer.getId), Collections.emptyList[MesosTaskInfo](), filters)
// Decline the offer
logDebug(s"Declining offer: $id with attributes: $offerAttributes mem: $mem cpu: $cpus")
d.declineOffer(offer.getId)
}
}
}
}

/** Build a Mesos resource protobuf object */
private def createResource(resourceName: String, quantity: Double): Protos.Resource = {
Resource.newBuilder()
.setName(resourceName)
.setType(Value.Type.SCALAR)
.setScalar(Value.Scalar.newBuilder().setValue(quantity).build())
.build()
}

override def statusUpdate(d: SchedulerDriver, status: TaskStatus) {
val taskId = status.getTaskId.getValue.toInt
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import org.apache.mesos.Protos.Environment.Variable
import org.apache.mesos.Protos.TaskStatus.Reason
import org.apache.mesos.Protos.{TaskState => MesosTaskState, _}
import org.apache.mesos.{Scheduler, SchedulerDriver}

import org.apache.spark.deploy.mesos.MesosDriverDescription
import org.apache.spark.deploy.rest.{CreateSubmissionResponse, KillSubmissionResponse, SubmissionStatusResponse}
import org.apache.spark.metrics.MetricsSystem
Expand Down
Loading

0 comments on commit 12c11a7

Please sign in to comment.