Skip to content

Commit

Permalink
Merge branch 'master' into SPARK-10117
Browse files Browse the repository at this point in the history
  • Loading branch information
Lewuathe committed Sep 4, 2015
2 parents 2c12894 + cf42138 commit b56a948
Show file tree
Hide file tree
Showing 20 changed files with 199 additions and 53 deletions.
6 changes: 4 additions & 2 deletions R/pkg/R/deserialize.R
Original file line number Diff line number Diff line change
Expand Up @@ -57,8 +57,10 @@ readTypedObject <- function(con, type) {

readString <- function(con) {
stringLen <- readInt(con)
string <- readBin(con, raw(), stringLen, endian = "big")
rawToChar(string)
raw <- readBin(con, raw(), stringLen, endian = "big")
string <- rawToChar(raw)
Encoding(string) <- "UTF-8"
string
}

readInt <- function(con) {
Expand Down
2 changes: 1 addition & 1 deletion R/pkg/R/serialize.R
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ writeJobj <- function(con, value) {
writeString <- function(con, value) {
utfVal <- enc2utf8(value)
writeInt(con, as.integer(nchar(utfVal, type = "bytes") + 1))
writeBin(utfVal, con, endian = "big")
writeBin(utfVal, con, endian = "big", useBytes=TRUE)
}

writeInt <- function(con, value) {
Expand Down
26 changes: 26 additions & 0 deletions R/pkg/inst/tests/test_sparkSQL.R
Original file line number Diff line number Diff line change
Expand Up @@ -431,6 +431,32 @@ test_that("collect() and take() on a DataFrame return the same number of rows an
expect_equal(ncol(collect(df)), ncol(take(df, 10)))
})

test_that("collect() support Unicode characters", {
markUtf8 <- function(s) {
Encoding(s) <- "UTF-8"
s
}

lines <- c("{\"name\":\"안녕하세요\"}",
"{\"name\":\"您好\", \"age\":30}",
"{\"name\":\"こんにちは\", \"age\":19}",
"{\"name\":\"Xin chào\"}")

jsonPath <- tempfile(pattern="sparkr-test", fileext=".tmp")
writeLines(lines, jsonPath)

df <- read.df(sqlContext, jsonPath, "json")
rdf <- collect(df)
expect_true(is.data.frame(rdf))
expect_equal(rdf$name[1], markUtf8("안녕하세요"))
expect_equal(rdf$name[2], markUtf8("您好"))
expect_equal(rdf$name[3], markUtf8("こんにちは"))
expect_equal(rdf$name[4], markUtf8("Xin chào"))

df1 <- createDataFrame(sqlContext, rdf)
expect_equal(collect(where(df1, df1$name == markUtf8("您好")))$name, markUtf8("您好"))
})

test_that("multiple pipeline transformations result in an RDD with the correct values", {
df <- jsonFile(sqlContext, jsonPath)
first <- lapply(df, function(row) {
Expand Down
8 changes: 8 additions & 0 deletions core/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -272,6 +272,14 @@
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.curator</groupId>
<artifactId>curator-client</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.curator</groupId>
<artifactId>curator-framework</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.curator</groupId>
<artifactId>curator-recipes</artifactId>
Expand Down
9 changes: 4 additions & 5 deletions core/src/main/scala/org/apache/spark/api/r/SerDe.scala
Original file line number Diff line number Diff line change
Expand Up @@ -329,12 +329,11 @@ private[spark] object SerDe {
out.writeDouble((value.getTime / 1000).toDouble + value.getNanos.toDouble / 1e9)
}

// NOTE: Only works for ASCII right now
def writeString(out: DataOutputStream, value: String): Unit = {
val len = value.length
out.writeInt(len + 1) // For the \0
out.writeBytes(value)
out.writeByte(0)
val utf8 = value.getBytes("UTF-8")
val len = utf8.length
out.writeInt(len)
out.write(utf8, 0, len)
}

def writeBytes(out: DataOutputStream, value: Array[Byte]): Unit = {
Expand Down
3 changes: 3 additions & 0 deletions core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
Original file line number Diff line number Diff line change
Expand Up @@ -322,6 +322,9 @@ object SparkSubmit {
case (MESOS, CLUSTER) if args.isPython =>
printErrorAndExit("Cluster deploy mode is currently not supported for python " +
"applications on Mesos clusters.")
case (MESOS, CLUSTER) if args.isR =>
printErrorAndExit("Cluster deploy mode is currently not supported for R " +
"applications on Mesos clusters.")
case (STANDALONE, CLUSTER) if args.isPython =>
printErrorAndExit("Cluster deploy mode is currently not supported for python " +
"applications on standalone clusters.")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -392,15 +392,14 @@ private[spark] object RestSubmissionClient {
mainClass: String,
appArgs: Array[String],
conf: SparkConf,
env: Map[String, String] = sys.env): SubmitRestProtocolResponse = {
env: Map[String, String] = Map()): SubmitRestProtocolResponse = {
val master = conf.getOption("spark.master").getOrElse {
throw new IllegalArgumentException("'spark.master' must be set.")
}
val sparkProperties = conf.getAll.toMap
val environmentVariables = env.filter { case (k, _) => k.startsWith("SPARK_") }
val client = new RestSubmissionClient(master)
val submitRequest = client.constructSubmitRequest(
appResource, mainClass, appArgs, sparkProperties, environmentVariables)
appResource, mainClass, appArgs, sparkProperties, env)
client.createSubmission(submitRequest)
}

Expand All @@ -413,6 +412,16 @@ private[spark] object RestSubmissionClient {
val mainClass = args(1)
val appArgs = args.slice(2, args.size)
val conf = new SparkConf
run(appResource, mainClass, appArgs, conf)
val env = filterSystemEnvironment(sys.env)
run(appResource, mainClass, appArgs, conf, env)
}

/**
* Filter non-spark environment variables from any environment.
*/
private[rest] def filterSystemEnvironment(env: Map[String, String]): Map[String, String] = {
env.filter { case (k, _) =>
(k.startsWith("SPARK_") && k != "SPARK_ENV_LOADED") || k.startsWith("MESOS_")
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import com.fasterxml.jackson.annotation.{JsonIgnore, JsonInclude, JsonPropertyOr
import com.fasterxml.jackson.annotation.JsonInclude.Include
import com.fasterxml.jackson.databind.ObjectMapper
import com.fasterxml.jackson.module.scala.DefaultScalaModule
import com.google.common.base.Objects

import org.apache.spark.{Logging, SparkContext}

Expand Down Expand Up @@ -67,6 +68,8 @@ private[spark] class RDDOperationScope(
}
}

override def hashCode(): Int = Objects.hashCode(id, name, parent)

override def toString: String = toJson
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,9 +34,15 @@ class AccumulableInfo private[spark] (
override def equals(other: Any): Boolean = other match {
case acc: AccumulableInfo =>
this.id == acc.id && this.name == acc.name &&
this.update == acc.update && this.value == acc.value
this.update == acc.update && this.value == acc.value &&
this.internal == acc.internal
case _ => false
}

override def hashCode(): Int = {
val state = Seq(id, name, update, value, internal)
state.map(_.hashCode).reduceLeft(31 * _ + _)
}
}

object AccumulableInfo {
Expand Down
46 changes: 9 additions & 37 deletions core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
Original file line number Diff line number Diff line change
Expand Up @@ -250,11 +250,12 @@ class DAGScheduler(
case Some(stage) => stage
case None =>
// We are going to register ancestor shuffle dependencies
registerShuffleDependencies(shuffleDep, firstJobId)
getAncestorShuffleDependencies(shuffleDep.rdd).foreach { dep =>
shuffleToMapStage(dep.shuffleId) = newOrUsedShuffleStage(dep, firstJobId)
}
// Then register current shuffleDep
val stage = newOrUsedShuffleStage(shuffleDep, firstJobId)
shuffleToMapStage(shuffleDep.shuffleId) = stage

stage
}
}
Expand Down Expand Up @@ -365,16 +366,6 @@ class DAGScheduler(
parents.toList
}

/** Find ancestor missing shuffle dependencies and register into shuffleToMapStage */
private def registerShuffleDependencies(shuffleDep: ShuffleDependency[_, _, _], firstJobId: Int) {
val parentsWithNoMapStage = getAncestorShuffleDependencies(shuffleDep.rdd)
while (parentsWithNoMapStage.nonEmpty) {
val currentShufDep = parentsWithNoMapStage.pop()
val stage = newOrUsedShuffleStage(currentShufDep, firstJobId)
shuffleToMapStage(currentShufDep.shuffleId) = stage
}
}

/** Find ancestor shuffle dependencies that are not registered in shuffleToMapStage yet */
private def getAncestorShuffleDependencies(rdd: RDD[_]): Stack[ShuffleDependency[_, _, _]] = {
val parents = new Stack[ShuffleDependency[_, _, _]]
Expand All @@ -391,11 +382,9 @@ class DAGScheduler(
if (!shuffleToMapStage.contains(shufDep.shuffleId)) {
parents.push(shufDep)
}

waitingForVisit.push(shufDep.rdd)
case _ =>
waitingForVisit.push(dep.rdd)
}
waitingForVisit.push(dep.rdd)
}
}
}
Expand Down Expand Up @@ -1052,10 +1041,11 @@ class DAGScheduler(
// we registered these map outputs.
mapOutputTracker.registerMapOutputs(
shuffleStage.shuffleDep.shuffleId,
shuffleStage.outputLocs.map(list => if (list.isEmpty) null else list.head),
shuffleStage.outputLocs.map(_.headOption.orNull),
changeEpoch = true)

clearCacheLocs()

if (shuffleStage.outputLocs.contains(Nil)) {
// Some tasks had failed; let's resubmit this shuffleStage
// TODO: Lower-level scheduler should also deal with this
Expand All @@ -1064,27 +1054,9 @@ class DAGScheduler(
shuffleStage.outputLocs.zipWithIndex.filter(_._1.isEmpty)
.map(_._2).mkString(", "))
submitStage(shuffleStage)
} else {
val newlyRunnable = new ArrayBuffer[Stage]
for (shuffleStage <- waitingStages) {
logInfo("Missing parents for " + shuffleStage + ": " +
getMissingParentStages(shuffleStage))
}
for (shuffleStage <- waitingStages if getMissingParentStages(shuffleStage).isEmpty)
{
newlyRunnable += shuffleStage
}
waitingStages --= newlyRunnable
runningStages ++= newlyRunnable
for {
shuffleStage <- newlyRunnable.sortBy(_.id)
jobId <- activeJobForStage(shuffleStage)
} {
logInfo("Submitting " + shuffleStage + " (" +
shuffleStage.rdd + "), which is now runnable")
submitMissingTasks(shuffleStage, jobId)
}
}

// Note: newly runnable stages will be submitted below when we submit waiting stages
}
}

Expand Down Expand Up @@ -1186,7 +1158,7 @@ class DAGScheduler(
// TODO: This will be really slow if we keep accumulating shuffle map stages
for ((shuffleId, stage) <- shuffleToMapStage) {
stage.removeOutputsOnExecutor(execId)
val locs = stage.outputLocs.map(list => if (list.isEmpty) null else list.head)
val locs = stage.outputLocs.map(_.headOption.orNull)
mapOutputTracker.registerMapOutputs(shuffleId, locs, changeEpoch = true)
}
if (shuffleToMapStage.isEmpty) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.storage

import org.apache.spark.SparkException

private[spark]
case class BlockFetchException(messages: String, throwable: Throwable)
extends SparkException(messages, throwable)
21 changes: 19 additions & 2 deletions core/src/main/scala/org/apache/spark/storage/BlockManager.scala
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import java.nio.{ByteBuffer, MappedByteBuffer}
import scala.collection.mutable.{ArrayBuffer, HashMap}
import scala.concurrent.{ExecutionContext, Await, Future}
import scala.concurrent.duration._
import scala.util.control.NonFatal
import scala.util.Random

import sun.nio.ch.DirectBuffer
Expand Down Expand Up @@ -600,10 +601,26 @@ private[spark] class BlockManager(
private def doGetRemote(blockId: BlockId, asBlockResult: Boolean): Option[Any] = {
require(blockId != null, "BlockId is null")
val locations = Random.shuffle(master.getLocations(blockId))
var numFetchFailures = 0
for (loc <- locations) {
logDebug(s"Getting remote block $blockId from $loc")
val data = blockTransferService.fetchBlockSync(
loc.host, loc.port, loc.executorId, blockId.toString).nioByteBuffer()
val data = try {
blockTransferService.fetchBlockSync(
loc.host, loc.port, loc.executorId, blockId.toString).nioByteBuffer()
} catch {
case NonFatal(e) =>
numFetchFailures += 1
if (numFetchFailures == locations.size) {
// An exception is thrown while fetching this block from all locations
throw new BlockFetchException(s"Failed to fetch block from" +
s" ${locations.size} locations. Most recent failure cause:", e)
} else {
// This location failed, so we retry fetch from a different one by returning null here
logWarning(s"Failed to fetch remote block $blockId " +
s"from $loc (failed attempt $numFetchFailures)", e)
null
}
}

if (data != null) {
if (asBlockResult) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -366,6 +366,18 @@ class StandaloneRestSubmitSuite extends SparkFunSuite with BeforeAndAfterEach {
assert(conn3.getResponseCode === HttpServletResponse.SC_INTERNAL_SERVER_ERROR)
}

test("client does not send 'SPARK_ENV_LOADED' env var by default") {
val environmentVariables = Map("SPARK_VAR" -> "1", "SPARK_ENV_LOADED" -> "1")
val filteredVariables = RestSubmissionClient.filterSystemEnvironment(environmentVariables)
assert(filteredVariables == Map("SPARK_VAR" -> "1"))
}

test("client includes mesos env vars") {
val environmentVariables = Map("SPARK_VAR" -> "1", "MESOS_VAR" -> "1", "OTHER_VAR" -> "1")
val filteredVariables = RestSubmissionClient.filterSystemEnvironment(environmentVariables)
assert(filteredVariables == Map("SPARK_VAR" -> "1", "MESOS_VAR" -> "1"))
}

/* --------------------- *
| Helper methods |
* --------------------- */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -286,6 +286,10 @@ class InputOutputMetricsSuite extends SparkFunSuite with SharedSparkContext
private def runAndReturnMetrics(job: => Unit,
collector: (SparkListenerTaskEnd) => Option[Long]): Long = {
val taskMetrics = new ArrayBuffer[Long]()

// Avoid receiving earlier taskEnd events
sc.listenerBus.waitUntilEmpty(500)

sc.addSparkListener(new SparkListener() {
override def onTaskEnd(taskEnd: SparkListenerTaskEnd) {
collector(taskEnd).foreach(taskMetrics += _)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,13 @@ class RDDOperationScopeSuite extends SparkFunSuite with BeforeAndAfter {
sc.stop()
}

test("equals and hashCode") {
val opScope1 = new RDDOperationScope("scope1", id = "1")
val opScope2 = new RDDOperationScope("scope1", id = "1")
assert(opScope1 === opScope2)
assert(opScope1.hashCode() === opScope2.hashCode())
}

test("getAllScopes") {
assert(scope1.getAllScopes === Seq(scope1))
assert(scope2.getAllScopes === Seq(scope1, scope2))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -285,6 +285,15 @@ class DAGSchedulerSuite
assertDataStructuresEmpty()
}

test("equals and hashCode AccumulableInfo") {
val accInfo1 = new AccumulableInfo(1, " Accumulable " + 1, Some("delta" + 1), "val" + 1, true)
val accInfo2 = new AccumulableInfo(1, " Accumulable " + 1, Some("delta" + 1), "val" + 1, false)
val accInfo3 = new AccumulableInfo(1, " Accumulable " + 1, Some("delta" + 1), "val" + 1, false)
assert(accInfo1 !== accInfo2)
assert(accInfo2 === accInfo3)
assert(accInfo2.hashCode() === accInfo3.hashCode())
}

test("cache location preferences w/ dependency") {
val baseRdd = new MyRDD(sc, 1, Nil).cache()
val finalRdd = new MyRDD(sc, 1, List(new OneToOneDependency(baseRdd)))
Expand Down
Loading

0 comments on commit b56a948

Please sign in to comment.